CSV Historian
The CSV Historian Agent is an example historian agent that writes device data to the CSV file specified in the configuration file.
Explanation of CSV Historian
The Utils module of the VOLTTRON platform includes functions for setting up global logging for the platform:
utils.setup_logging()
_log = logging.getLogger(__name__)
The historian
function is called by utils.vip_main
when the agents is started (see below). utils.vip_main
expects a callable object that returns an instance of an Agent. This method of dealing with a configuration file and
instantiating an Agent is common practice.
def historian(config_path, **kwargs):
"""
This method is called by the main method to parse
the passed config file or configuration dictionary object, validate the
configuration entries, and create an instance of the CSVHistorian class
:param config_path: could be a path to a configuration file or can be a dictionary object
:param kwargs: additional keyword arguments if any
:return: an instance of :py:class:`CSVHistorian`
"""
if isinstance(config_path, dict):
config_dict = config_path
else:
config_dict = utils.load_config(config_path)
output_path = config_dict.get("output", "historian_output.csv")
return CSVHistorian(output_path=output_path, **kwargs)
All historians must inherit from BaseHistorian. The BaseHistorian class handles the capturing and caching of all device, logging, analysis, and record data published to the message bus.
class CSVHistorian(BaseHistorian):
The Base Historian creates a separate thread to handle publishing data to the data store. In this thread the Base
Historian calls two methods on the created historian, historian_setup
and publish_to_historian
.
The Base Historian created the new thread in it’s __init__
method. This means that any instance variables
must assigned in __init__
before calling the Base Historian’s __init__
method.
def __init__(self, output_path="", **kwargs):
self.output_path = output_path
self.csv_dict = None
self.csv_file = None
self.default_dir = "./data"
super(CSVHistorian, self).__init__(**kwargs)
Historian setup is called shortly after the new thread starts. This is where a Historian sets up a connect the first time. In our example we create the Dictwriter object that we will use to create and add lines to the CSV file.
We keep a reference to the file object so that we may flush its contents to disk after writing the header and after we have written new data to the file.
The CSV file we create will have 4 columns: timestamp, source, topic, and value.
def historian_setup(self):
# if the current file doesn't exist, or the path provided doesn't include a directory, use the default dir
# in <agent dir>/data
if not (os.path.isfile(self.output_path) or os.path.dirname(self.output_path)):
if not os.path.isdir(self.default_dir):
os.mkdir(self.default_dir)
self.output_path = os.path.join(self.default_dir, self.output_path)
self.csv_file = open(self.output_path, "w")
self.csv_dict = csv.DictWriter(self.csv_file, fieldnames=["timestamp", "source", "topic", "value"])
self.csv_dict.writeheader()
self.csv_file.flush()
publish_to_historian
is called when data is ready to be published. It is passed a list of dictionaries. Each
dictionary contains a record of a single value that was published to the message bus.
The dictionary takes the form:
{
'_id': 1,
'timestamp': timestamp1.replace(tzinfo=pytz.UTC), #Timestamp in UTC
'source': 'scrape', #Source of the data point.
'topic': "pnnl/isb1/hvac1/thermostat", #Topic that published to without prefix.
'value': 73.0, #Value that was published
'meta': {"units": "F", "tz": "UTC", "type": "float"} #Meta data published with the topic
}
Once the data is written to the historian we call self.report_all_handled()
to inform the BaseHistorian that all
data we received was successfully published and can be removed from the cache. Then we can flush the file to ensure
that the data is written to disk.
def publish_to_historian(self, to_publish_list):
for record in to_publish_list:
row = dict()
row["timestamp"] = record["timestamp"]
row["source"] = record["source"]
row["topic"] = record["topic"]
row["value"] = record["value"]
self.csv_dict.writerow(row)
self.report_all_handled()
self.csv_file.flush()
This agent does not support the Historian Query interface.
Agent Testing
The CSV Historian can be tested by running the included launch_my_historian.sh script.
Agent Installation
This Agent may be installed on the platform using the standard method.