volttron.platform.agent package

Submodules

volttron.platform.agent.bacnet_proxy_reader module

class volttron.platform.agent.bacnet_proxy_reader.BACnetReader(vip, bacnet_proxy_identity, iam_response_fn=None, config_response_fn=None, batch_size=20)[source]

Bases: object

The BACnetReader

get_iam(device_id, callback, address=None, timeout=10)[source]
read_device_description(address, device_id)[source]

Reads the device name from the specified address and device_id

Parameters:
  • address – Address of the bacnet device
  • device_id – The device id of the bacnet device.
Returns:

The device desciption or an empty string

read_device_name(address, device_id)[source]

Reads the device name from the specified address and device_id

Parameters:
  • address – Address of the bacnet device
  • device_id – The device id of the bacnet device.
Returns:

The device name or the string “MISSING DEVICE NAME”

read_device_properties(target_address, device_id, filter=None)[source]

Starts the processes of reading a device’s meta data.

The device will first be queried for all of it’s objects. For each of the returned indexes only the properties that have a presentValue as a property will be used. Processing of the objects will continue in batches until all of the device points have been received.

Data will ultimately be written through the self._emit_reresponses function. The self._response_function that was set in the constructor of the object will be used to return the data to the caller.

Parameters:
  • target_address – The address of the bacnet device
  • device_id – The device_id of the bacnet device
  • filter – A list of two-tuples with (bacnet_type, [index]) where the bacnet_type is one of the bacnet_type strings and the [index] is an array of indexes to return.
start_whois(low_device_id=None, high_device_id=None, target_address=None)[source]
stop_iam_responses()[source]

volttron.platform.agent.base module

VOLTTRON platform™ base agent and helper classes/functions.

volttron.platform.agent.base.periodic(period, *args, **kwargs)[source]

Decorator to set a method up as a periodic callback.

The decorated method will be called with the given arguments every period seconds while the agent is executing its run loop.

class volttron.platform.agent.base.BaseAgent(subscribe_address, **kwargs)[source]

Bases: volttron.platform.agent.base.AgentBase

Base class for creating VOLTTRON platform™ agents.

This class can be used as is, but it won’t do much. It will sit and do nothing but listen for messages and exit when the platform shutdown message is received. That is it.

LOOP_INTERVAL = 60
closed

Return whether the subscription channel is closed.

finish()[source]

Finish for the agent execution loop.

Extend this method with code that must run once after the main loop. Be sure to call the base class implementation from the overridden method.

handle_sub_message(block=False)[source]

Handle incoming messages on the subscription socket.

Receives a multipart message containing a topic, headers, and zero or more message parts. For each prefix (key) in subscriptions map matching the beginning of the topic, the associated callback will be called if either no test is associated with the callback or the test function returns a value evaluating to True.

See the class documentation for more information on the signature for test and callback functions.

loop()[source]

Main agent execution loop.

This method should rarely need to be overridden. Instead, override the step method to customize execution behavior. The default implementation loops until self.closed() returns True calling self.step() each iteration.

periodic_timer(period, function, *args, **kwargs)[source]

Create a periodic timer to call function every period seconds.

Like the timer method except that the timer is automatically rearmed after the function completes.

poll(timeout=None)[source]

Polls for events while handling timers.

poll() will wait up to timeout seconds for sockets or files registered with self.reactor to become ready. A timeout of None will cause poll to wait an infinite amount of time. While waiting for poll events, scheduled events will be handled, potentially causing the wait time to slip a bit.

run()[source]

Entry point for running agent.

Subclasses should not override this method. Instead, the setup, step, and finish methods should be overridden to customize behavior.

schedule(time, event)[source]

Schedule an event to run at the given wall time.

time must be a datetime object or a Unix time value as returned by time.time(). event must be a callable accepting a single argument, the time the event was scheduled to run, and must return a time to be scheduled next or None to not reschedule. sched.Event and sched.RecurringEvent are examples of this interface and may be used here. Generators send functions are also be good candidates for event functions.

setup()[source]

Setup for the agent execution loop.

Extend this method with code that must run once before the main loop. Be sure to call the base class implementation from the overridden method.

step(timeout=None)[source]

Performs a single step in the main agent loop.

Override this method to customize agent behavior. The default method blocks indefinitely until at least one socket in the reactor is ready and then run each associated callback. The method can be called from the overridden method in a subclass with the behavior customized by passing in different timeout. timeout is the maximum number of seconds (can be fractional) to wait or None to wait indefinitely. Returns the number of events fired or zero if a timeout occured.

subscribe(prefix, callback=None, test=None)[source]

Subscribe to topic and register callback.

Subscribes to topics beginning with prefix. If callback is supplied, it should be a function taking four arguments, callback(topic, headers, message, match), where topic is the full message topic, headers is a case-insensitive dictionary (mapping) of message headers, message is a possibly empty list of message parts, and match is the return value of the test function or None if test is None.

If test is given, it should be a function taking two arguments, test(topic, prefix), where topic is the complete topic of the incoming message and prefix is the string which caused the subscription match. The test function should return a true value if the callback should be called or a false value otherwise. The result of the test will be passed into the callback function where the results can be used.

Returns and ID number which can be used later to unsubscribe.

timer(interval, function, *args, **kwargs)[source]

Create a timer to call function after interval seconds.

interval is specified in seconds and can include fractional part. function is a function that takes the optional args and kwargs. Returns a timer object that can be used to modify the callback parameters or to cancel using the cancel() method.

unsubscribe(handler_id, prefix=None)[source]

Remove subscription handler by its ID.

Remove all handlers matching the given handler ID, which is the ID returned by the subscribe method. If all handlers for a topic prefix are removed, the topic is also unsubscribed.

unsubscribe_all(prefix)[source]

Remove all handlers for the given prefix and unsubscribe.

If prefix is None, unsubscribe from all topics and remove all handlers. Otherwise, unsubscribe from the given topic and remove all handlers for that topic prefix.

class volttron.platform.agent.base.PublishMixin(publish_address, **kwargs)[source]

Bases: volttron.platform.agent.base.AgentBase

Agent mix-in for publishing to the VOLTTRON publish socket.

Connects the agent to the publish channel and provides several publish methods.

Include before BaseAgent class in subclass list.

ping_back(callback, timeout=None, period=1)[source]
publish(topic, headers, *msg_parts, **kwargs)[source]

Publish a message to the publish channel. Adds volttron platform version compatibility information to header as variables min_compatible_version and max_compatible version

publish_ex(topic, headers, *msg_tuples, **kwargs)[source]

Publish messages given as (content-type, message) tuples. Adds volttron platform version compatibility information to header as variables min_compatible_version and max_compatible version

publish_json(topic, headers, *msg_parts, **kwargs)[source]

Publish JSON encoded message. Adds volttron platform version compatibility information to header as variables min_compatible_version and max_compatible version

volttron.platform.agent.base_aggregate_historian module

class volttron.platform.agent.base_aggregate_historian.AggregateHistorian(config_path, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

Base agent to aggregate data in historian based on a specific time period. Different subclasses of this agent is needed to interact with different type of historians. Subclasses should implement the following methods

collect_aggregate(topic_ids, agg_type, start_time, end_time)[source]

Collect the aggregate data by querying the historian’s data store

Parameters:
  • topic_ids – list of topic ids for which aggregation should be performed.
  • agg_type – type of aggregation
  • start_time – start time for query (inclusive)
  • end_time – end time for query (exclusive)
Returns:

a tuple of (aggregated value, count of record over which

this aggregation was computed)

collect_aggregate_data(collection_time, agg_time_period, use_calendar_periods, points)[source]

Method that does the collection and computation of aggregate data based on raw date in historian’s data table. This method is called for the first time when a agent is configured with a new configuration or when the config in config store is updated. After the collection of aggregate data, this methods schedules itself to be called after a specific period of time. The time interval is calculated by compute_next_collection_time() This method in turn calls the platform historian’s - :py:method:`get_topics_by_pattern()` <BaseHistorian.get_topics_by_pattern>

and the following methods implemented by child classes:

Parameters:
  • collection_time – time of aggregation collection
  • agg_time_period (param) – time agg_time_period for which data needs to be collected and aggregated
  • use_calendar_periods (param) – flag that indicates if time agg_time_period should be aligned to calendar times
  • points (param) – list of points for which aggregate data needs to be collected. Each element in the list is a dictionary containing topic_names/topic_name_pattern, aggregation_type(ex. sum, avg etc.), and min_count(minimum number of raw data to be present within the given time agg_time_period for the aggregate to be computed. If count is less than minimum no aggregate is computed for that agg_time_period)
static compute_aggregation_time_slice(collection_time, agg_period, use_calender_time_periods)[source]

Computes the start and end time for querying the historians data table for computing aggregates. Start and end time depend on whether the time periods should align to calendar time periods. For example a daily average could be computed for data collected between 12am to 11.59am of a specific date or data between (collection_time - 24 hours) and current_time. Setting use_calendar_time_periods to true results in former.

Parameters:
  • collection_time – Time of aggregation collection
  • agg_period – time period of the aggregation
  • use_calender_time_periods – boolean to indicate if the time period should align to the calendar time periods
Returns:

start and end time of aggregation. start time is inclusive

and end time is not.

static compute_next_collection_time(collection_time, agg_period, use_calendar_periods)[source]

compute the next collection time based on current time in utc and aggregation time period.

Parameters:
  • collection_time – time of aggregate collection
  • agg_period – period string from AggregateHistorian config
  • use_calendar_periods – boolean to say if aggregate period should be based on calendar periods. For example: Week = Sunday to Saturday, Hourly average would be 1AM= 2AM, 2AM-3AM etc.
Returns:

next collection time in utc

configure(config_name, action, config)[source]

Converts aggregation time period into seconds, validates configuration values and calls the collect aggregate method for the first time

Parameters:
  • config_name – name of the config entry in store. We only use one config store entry with the default name config
  • action – “NEW or “UPDATE” code treats both the same way
  • config – configuration as json object
get_agg_topic_map()[source]

Query the aggregate_topics table and create a map of (topic name, aggregation type, aggregation time period) to topic id. This should be done as part of init

Returns:Returns a list of topic_map containing
{(agg_topic_name.lower(), agg_type, agg_time_period) :id}
get_aggregation_list()[source]

Returns a list of supported aggregations

Returns:list of supported aggregations
get_supported_aggregations()[source]
get_topic_map()[source]

Query the topics table and create a map of topic name to topic id. This should be done as part of init

Returns:Returns a list of topic_map containing {topic_name.lower():id}
initialize_aggregate_store(aggregation_topic_name, agg_type, agg_time_period, topics_meta)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period

Parameters:
  • aggregation_topic_name – Unique topic name for this aggregation. If aggregation is done over multiple points it is a unique name given by user, else it is same as topic_name for which aggregation is done
  • agg_type – The type of aggregation. For example, avg, sum etc.
  • agg_time_period – The time period of aggregation
  • topics_meta – String that represents the list of topics across which this aggregation is computed. It could be topic name pattern or list of topics. This information should go into metadata table
Returns:

Return a aggregation_topic_id after inserting aggregation_topic_name into topics table

insert_aggregate(agg_topic_id, agg_type, agg_time_period, end_time, value, topic_ids)[source]

Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table

Parameters:
  • agg_topic_id – If len(topic_ids) is 1. This would be the same as the topic_ids[0]. Else this id corresponds to the unique topic name given by user for this aggregation across multiple points.
  • agg_type – type of aggregation
  • agg_time_period – The time period of aggregation
  • end_time – end time used for query records that got aggregated
  • topic_ids – topic ids for which aggregation was computed
  • value – aggregation result
is_supported_aggregation(agg_type)[source]

Checks if the given aggregation is supported by the historian’s data store

Parameters:agg_type – The type of aggregation to be computed
Returns:True is supported False otherwise
static normalize_aggregation_time_period(time_period)[source]

Validates and normalizes aggregation time period. For example, if aggregation time period is given as 48h it will get converted into 2d

Parameters:time_period – time period string to be validated and normalized
Returns:normalized time period
update_aggregate_metadata(agg_id, aggregation_topic_name, topic_meta)[source]

Update aggregation_topic_name and topic_meta data for the given agg_id.

Parameters:
  • agg_id – Aggregation topic id for which update should be done
  • aggregation_topic_name – New aggregation_topic_name
  • topic_meta – new topic metadata

volttron.platform.agent.base_historian module

Historian Development

Support for storing and retrieving historical device and analysis data published to the message bus is handled with Historian Agents. If a new type of data store or a new way of storing data is desired a new type of Historian Agent should created.

Historian Agents are implemented by subclassing BaseHistorian.

Agents that need short term storage of device data should subscribe to device data and use internal data structures for storage. Agents which need long term Historical data that predates the startup of the Agent should interact with a Historian Agent in order to obtain that data as needed.

While it is possible to create an Agent from scratch which handles gathering and storing device data it will miss out on the benefits of creating a proper Historian Agent that subclassing BaseHistorian. The BaseHistorian class provides the following features:

  • A separate thread for all communication with a data store removing the need to use or implement special libraries to work with gevent.
  • Automatically subscribe to and process device publishes.
  • Automatically backup data retrieved off the message bus to a disk cache. Cached data will only be removed once it is successfully published to a data store.
  • Existing Agents that publish analytical data for storage or query for historical data will be able to use the new Historian without any code changes.
  • Data can be graphed in VOLTTRON Central.

Creating a New Historian

To create a new Historian create a new Agent that subclasses BaseHistorian. BaseHistorian inherits from volttron.platform.vip.agent.Agent so including it in the class parents is not needed.

The new Agent must implement the following methods:

If this historian has a corresponding AggregateHistorian (see AggregateHistorian) implement the following method in addition to the above ones: - BaseQueryHistorianAgent.record_table_definitions() - BaseQueryHistorianAgent.query_aggregate_topics()

While not required this method may be overridden as needed: - BaseHistorianAgent.historian_setup()

Optionally a Historian Agent can inherit from BaseHistorianAgent instead of BaseHistorian if support for querying data is not needed for the data store. If this route is taken then VOLTTRON Central will not be able to graph data from the store. It is possible to run more than one Historian agent at a time to store data in more than one place. If needed one can be used to allow querying while another is used to put data in the desired store that does not allow querying.

Historian Execution Flow

At startup the BaseHistorian class starts a new thread to handle all data caching and publishing (the publishing thread). The main thread then subscribes to all Historian related topics on the message bus. Whenever subscribed data comes in it is published to a Queue to be be processed by the publishing thread as soon as possible.

At startup the publishing thread calls two methods:

Historian a chance to setup any connections in the thread. - BaseQueryHistorianAgent.record_table_definitions() to give the implemented Historian a chance to record the table/collection names into a meta table/collection with the named passed as parameter. The implemented historian is responsible for creating the meta table if it does not exist.

The process thread then enters the following logic loop:

Wait for data to appear in the Queue. Proceed if data appears or a
`retry_period` time elapses.
If new data appeared in Queue:
    Save new data to cache.
While data is in cache:
    Publish data to store by calling
        :py:meth:`BaseHistorianAgent.publish_to_historian`.
    If no data was published:
        Go back to start and check Queue for data.
    Remove published data from cache.
    If we have been publishing for `max_time_publishing`:
        Go back to start and check Queue for data.

The logic will also forgo waiting the retry_period for new data to appear when checking for new data if publishing has been successful and there is still data in the cache to be publish.

Storing Data

The BaseHistorian will call BaseHistorianAgent.publish_to_historian() as the time series data becomes available. Data is batched in a groups up to submit_size_limit.

After processing the list or individual items in the list BaseHistorianAgent.publish_to_historian() must call BaseHistorianAgent.report_handled() to report an individual point of data was published or BaseHistorianAgent.report_all_handled() to report that everything from the batch was successfully published. This tells the BaseHistorianAgent class what to remove from the cache and if any publishing was successful.

The to_publish_list argument of BaseHistorianAgent.publish_to_historian() is a list of records that takes the following form:

[
    {
        '_id': 1,
        'timestamp': timestamp1.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/thermostat",
        'value': 73.0,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    {
        '_id': 2,
        'timestamp': timestamp2.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/temperature",
        'value': 74.1,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    ...
]

As records are published to the data store BaseHistorianAgent.publish_to_historian() must call BaseHistorianAgent.report_handled() with the record or list of records that was published or BaseHistorianAgent.report_all_handled() if everything was published.

Querying Data

Other Notes

Implemented Historians must be tolerant to receiving the same data for submission twice. While very rare, it is possible for a Historian to be forcibly shutdown after data is published but before it is removed from the cache. When restarted the BaseHistorian will submit the same date over again.

class volttron.platform.agent.base_historian.AsyncBackupDatabase(*args, **kwargs)[source]

Bases: volttron.platform.agent.base_historian.BackupDatabase

Wrapper around BackupDatabase to allow it to run in the main Historian gevent loop. Wraps the more expensive methods in threadpool.apply calls.

backup_new_data(**kwargs)
Parameters:new_publish_list (iterable) – An iterable of records to cache to disk.
Returns:True if records the cache has reached a full state.
Return type:bool
get_outstanding_to_publish(**kwargs)

Retrieve up to size_limit records from the cache.

Parameters:size_limit (int) – Max number of records to retrieve.
Returns:List of records for publication.
Return type:list
remove_successfully_published(**kwargs)

Removes the reported successful publishes from the backup database. If None is found in successful_publishes we assume that everything was published.

Parameters:
  • successful_publishes (list) – List of records that was published.
  • submit_size (int) – Number of things requested from previous call to get_outstanding_to_publish()
class volttron.platform.agent.base_historian.BackupDatabase(owner, backup_storage_limit_gb, backup_storage_report, check_same_thread=True)[source]

A creates and manages backup cache for the BaseHistorianAgent class.

Historian implementors do not need to use this class. It is for internal use only.

backup_new_data(new_publish_list)[source]
Parameters:new_publish_list (iterable) – An iterable of records to cache to disk.
Returns:True if records the cache has reached a full state.
Return type:bool
close()[source]
get_backlog_count()[source]

Retrieve the current number of records in the cashe.

get_outstanding_to_publish(size_limit)[source]

Retrieve up to size_limit records from the cache.

Parameters:size_limit (int) – Max number of records to retrieve.
Returns:List of records for publication.
Return type:list
remove_successfully_published(successful_publishes, submit_size)[source]

Removes the reported successful publishes from the backup database. If None is found in successful_publishes we assume that everything was published.

Parameters:
  • successful_publishes (list) – List of records that was published.
  • submit_size (int) – Number of things requested from previous call to get_outstanding_to_publish()
class volttron.platform.agent.base_historian.BaseHistorian(**kwargs)[source]

Bases: volttron.platform.agent.base_historian.BaseHistorianAgent, volttron.platform.agent.base_historian.BaseQueryHistorianAgent

class volttron.platform.agent.base_historian.BaseHistorianAgent(retry_period=300.0, submit_size_limit=1000, max_time_publishing=30.0, backup_storage_limit_gb=None, backup_storage_report=0.9, topic_replace_list=[], gather_timing_data=False, readonly=False, process_loop_in_greenlet=False, capture_device_data=True, capture_log_data=True, capture_analysis_data=True, capture_record_data=True, message_publish_count=10000, history_limit_days=None, storage_limit_gb=None, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

This is the base agent for historian Agents.

It automatically subscribes to all device publish topics.

Event processing occurs in its own thread as to not block the main thread. Both the historian_setup and publish_to_historian happen in the same thread.

By default the base historian will listen to 4 separate root topics ( datalogger/, record/, analysis/, and device/. Messages published to datalogger will be assumed to be timepoint data that is composed of units and specific types with the assumption that they have the ability to be graphed easily. Messages published to devices are data that comes directly from drivers. Data sent to analysis/* topics is result of analysis done by applications. The format of data sent to analysis/* topics is similar to data sent to device/* topics. Messages that are published to record will be handled as string data and can be customized to the user specific situation. Refer to Historian-Topic-Syntax for data syntax

This base historian will cache all received messages to a local database before publishing it to the historian. This allows recovery for unexpected happenings before the successful writing of data to the historian.

configure(configuration)[source]

Optional, may be implemented by a concrete implementation to add support for the configuration store. Values should be stored in this function only.

The process thread is stopped before this is called if it is running. It is started afterwards.

historian_setup is called after this is called.

get_renamed_topic(input_topic)[source]

replace topic name based on configured topic replace list, is any :param input_topic: :return:

historian_setup()[source]

Optional setup routine, run in the processing thread before main processing loop starts. Gives the Historian a chance to setup connections in the publishing thread.

historian_teardown()[source]

Optional teardown routine, run in the processing thread if the main processing loop is stopped. This happened whenever a new configuration arrives from the config store.

insert(records)[source]

RPC method to allow remote inserts to the local cache

Parameters:records (list of dictionaries) – List of items to be added to the local event queue
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Called in the process thread after data is published. This can be overridden in historian implementations to apply the storage_limit_gb and history_limit_days settings to the storage medium.

Parameters:
  • history_limit_timestamp – remove all data older than this timestamp
  • storage_limit_gb – remove oldest data until database is smaller than this value.
parse_table_def(tables_def)[source]
publish_to_historian(to_publish_list)[source]

Main publishing method for historian Agents.

Parameters:to_publish_list (list) – List of records

to_publish_list takes the following form:

[
    {
        'timestamp': timestamp1.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/thermostat",
        'value': 73.0,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    {
        'timestamp': timestamp2.replace(tzinfo=pytz.UTC),
        'source': 'scrape',
        'topic': "pnnl/isb1/hvac1/temperature",
        'value': 74.1,
        'meta': {"units": "F", "tz": "UTC", "type": "float"}
    },
    ...
]

The contents of meta is not consistent. The keys in the meta data values can be different and can change along with the values of the meta data. It is safe to assume that the most recent value of the “meta” dictionary are the only values that are relevant. This is the way the cache treats meta data.

Once one or more records are published either BaseHistorianAgent.report_all_handled() or BaseHistorianAgent.report_handled() must be called to report records as being published.

record_table_definitions(meta_table_name)[source]

Record the table or collection names in which data, topics and metadata are stored into the metadata table. This is essentially information from information from configuration item ‘table_defs’. The metadata table contents will be used by the corresponding aggregate historian(if any)

Parameters:meta_table_name – table name into which the table names and

table name prefix for data, topics, and meta tables should be inserted

report_all_handled()[source]

Call this from BaseHistorianAgent.publish_to_historian() to report that all records passed to BaseHistorianAgent.publish_to_historian() have been successfully published and should be removed from the cache.

report_handled(record)[source]

Call this from BaseHistorianAgent.publish_to_historian() to report a record or list of records has been successfully published and should be removed from the cache.

Parameters:record (dict or list) – Record or list of records to remove from cache.
start_process_thread()[source]
stop_process_thread()[source]
stopping(sender, **kwargs)[source]

Release subscription to the message bus because we are no longer able to respond to messages now.

update_default_config(config)[source]

May be called by historians to add to the default configuration for its own use.

class volttron.platform.agent.base_historian.BaseQueryHistorianAgent(identity=None, address=None, context=None, publickey=None, secretkey=None, serverkey=None, heartbeat_autostart=False, heartbeat_period=60, volttron_home='/home/docs/.volttron', agent_uuid=None, enable_store=True, enable_web=False, enable_channel=False, reconnect_interval=None, version='0.1', enable_fncs=False)[source]

Bases: volttron.platform.vip.agent.Agent

This is the base agent for historian Agents that support querying of their data stores.

get_aggregate_topics()[source]

RPC call to get the list of aggregate topics

Returns:List of aggregate topics in the data store. Each list element contains (topic_name, aggregation_type, aggregation_time_period, metadata)
Return type:list
get_topic_list()[source]

RPC call to get a list of topics in data store

Returns:List of topics in the data store.
Return type:list
get_topics_by_pattern(topic_pattern)[source]

Find the list of topics and its id for a given topic_pattern

Returns:returns list of dictionary object {topic_name:id}
get_topics_metadata(topics)[source]

RPC call to get one or more topic’s metadata

Parameters:topics – single topic or list of topics for which metadata is requested
Returns:List of aggregate topics in the data store. Each list element contains (topic_name, aggregation_type, aggregation_time_period, metadata)
Return type:list
get_version()[source]

RPC call to get the version of the historian

Returns:version number of the historian used
Return type:string
query(topic=None, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

RPC call to query an Historian for time series data.

Parameters:
  • topic (str or list) – Topic or topics to query for.
  • start (str) – Start time of the query. Defaults to None which is the beginning of time.
  • end (str) – End time of the query. Defaults to None which is the end of time.
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value.
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
Returns:

Results of the query

Return type:

dict

Return values will have the following form:

{
    "values": [(<timestamp string1>: value1),
               (<timestamp string2>: value2),
                ...],
    "metadata": {"key1": value1,
                 "key2": value2,
                 ...}
}

The string arguments can be either the output from volttron.platform.agent.utils.format_timestamp() or the special string “now”.

Times relative to “now” may be specified with a relative time string using the Unix “at”-style specifications. For instance “now -1h” will specify one hour ago. “now -1d -1h -20m” would specify 25 hours and 20 minutes ago.

query_aggregate_topics()[source]

This function is called by BaseQueryHistorianAgent.get_aggregate_topics() to find out the available aggregates in the data store

Returns:List of tuples containing (topic_name, aggregation_type, aggregation_time_period, metadata)
Return type:list
query_historian(topic, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order=None)[source]

This function is called by BaseQueryHistorianAgent.query() to actually query the data store and must return the results of a query in the following format:

Single topic query:

{
"values": [(timestamp1, value1),
            (timestamp2:,value2),
            ...],
 "metadata": {"key1": value1,
              "key2": value2,
              ...}
}

Multiple topics query:

{
"values": {topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
           topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
            ...}
 "metadata": {} #empty metadata
}

Timestamps must be strings formatted by volttron.platform.agent.utils.format_timestamp().

“metadata” is not required. The caller will normalize this to {} for you if it is missing.

Parameters:
  • topic (str or list) – Topic or list of topics to query for.
  • start (datetime) – Start of query timestamp as a datetime.
  • end (datetime) – End of query timestamp as a datetime.
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

Results of the query

Return type:

dict

query_topic_list()[source]

This function is called by BaseQueryHistorianAgent.get_topic_list() to actually topic list from the data store.

Returns:List of topics in the data store.
Return type:list
query_topics_by_pattern(topic_pattern)[source]

Find the list of topics and its id for a given topic_pattern

Returns:returns list of dictionary object {topic_name:id}
query_topics_metadata(topics)[source]

This function is called by BaseQueryHistorianAgent.get_topics_metadata() to find out the metadata for the given topics

Parameters:topics (str or list) – single topic or list of topics
Returns:dictionary with the format
{topic_name: {metadata_key:metadata_value, ...},
topic_name: {metadata_key:metadata_value, ...} ...}
Return type:dict
version()[source]

Return the current version number of the historian :return: version number

volttron.platform.agent.base_historian.add_timing_data_to_header(headers, agent_id, phase)[source]
volttron.platform.agent.base_historian.dumps(data)[source]
volttron.platform.agent.base_historian.get_timeunit(t)[source]
volttron.platform.agent.base_historian.is_number(x)
volttron.platform.agent.base_historian.loads(data_string)[source]
volttron.platform.agent.base_historian.now(tzstr='UTC')[source]

Returns an aware datetime object with the current time in tzstr timezone

volttron.platform.agent.base_historian.p_abstime(t)[source]

abstime : NUMBER | QSTRING | NOW

volttron.platform.agent.base_historian.p_error(p)[source]
volttron.platform.agent.base_historian.p_query_pair(t)[source]

query : ‘(‘ timeref ‘,’ timeref ‘)’

volttron.platform.agent.base_historian.p_query_single(t)[source]

query : timeref

volttron.platform.agent.base_historian.p_reltime(t)[source]

reltime : NUMBER LVALUE | NUMBER LVALUE reltime

volttron.platform.agent.base_historian.p_timeref(t)[source]

timeref : abstime | abstime reltime

volttron.platform.agent.base_historian.parse_time(ts)[source]
volttron.platform.agent.base_historian.strptime_tz(str, format='%x %X', tzstr='Local')[source]

Returns an aware datetime object. tzstr is a timezone string such as ‘US/Pacific’ or ‘Local’ by default which uses the local timezone.

volttron.platform.agent.base_historian.t_LVALUE(t)[source]

[a-zA-Z~$_][a-zA-Z0-9/%_-]*

volttron.platform.agent.base_historian.t_NUMBER(t)[source]

([+-]?([0-9]*.)?[0-9]+)

volttron.platform.agent.base_historian.t_QSTRING(t)[source]

(“[^”\]*?(.[^”\]*?)*?”)|(‘[^’\]*?(.[^’\]*?)*?’)

volttron.platform.agent.base_historian.t_error(t)[source]
volttron.platform.agent.base_historian.t_newline(t)[source]

[nr]+

volttron.platform.agent.base_tagging module

Base class for tagging service implementation. Tagging Service provides api’s for users to associate haystack based tags and values to topic names and topic name prefixes.

Implementing classes should implement the following methods

On start calls the following methods

Querying for topics based on tags

Base tagging service provides a parser to parse query condition for querying topics based on tags. Please see documentation of BaseTaggingService.get_topics_by_tags() for syntax definition of query

class volttron.platform.agent.base_tagging.BaseTaggingService(historian_vip_identity=None, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

This is the base class for tagging service implementations. There can be different implementations based on backend/data store used to persist the tag details

add_tags(tags, update_version=False)[source]

Add tags to multiple topics. Calls method BaseTaggingService.insert_topic_tags(). Implementing methods could use BaseTaggingService.get_matching_topic_prefixes() to get the list of topic prefix or topic names for a given topic pattern.

Parameters:
  • tags (dict) – dictionary object or file containing the topic and the tag details. Dictionary object or the file content should be of the format <topic_name or prefix or topic_name pattern>: {<valid tag>:<value>, … }, … }
  • update_version (bool) – True/False. Defaults to False. If set to True and if any of the tags update an existing tag value the older value would be preserved as part of tag version history. Note: this feature is not implemented in the current version of sqlite and mongodb tagging service.
add_topic_tags(topic_prefix, tags, update_version=False)[source]

Add tags to specific topic name or topic name prefix. Calls the method BaseTaggingService.add_tags().

Note: Use of this api require’s a configured historian to be running. This can be configured using the optional historian_id configuration.If not configured, defaults to platform.historian. This api makes RPC calls to historian to get_topic_list api to get the list of topics. This is used to find topic/topic prefix matching any given input topic pattern or specific topic prefix.

Parameters:
  • topic_prefix (str) – topic name or topic name prefix
  • tags (dict) – dictionary of tag and value in the format {<valid tag>:value, <valid_tag>: value,… }
  • update_version (bool) – True/False. Default to False. If set to True and if any of the tags update an existing tag value the older value would be preserved as part of tag version history
get_categories(include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the available list tag categories. category can have multiple tags and tags could belong to multiple categories

Parameters:
  • include_description (bool) – indicate if result should include available description for categories returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

list of category names if include_description is False, list of (category name, description) if include_description is True

Return type:

list

get_matching_topic_prefixes(topic_pattern)[source]

Queries the configured/platform historian to get the list of topics that match the given topic pattern. So use of this api require’s the configured historian (or platform.historian if specific historian id is not specified) to be running. This api makes RPC calls to platform.historian’s BaseHistorian.get_topic_list() to get the list of topics. This is used to find topic/topic prefix matching any given input topic pattern.

Pattern matching done here is not true string pattern matching. Matches are applied to different topic_prefix. For example, ‘campus/building1/device*’ would match campus/building1/device1 and not campus/building1/device1/p1. Works only if separator is /. Else tags are always applied to full topic names

Parameters:topic_pattern (str) – pattern to match again
Returns:list of topic prefixes.
get_tags_by_category(category, include_kind=False, include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the list of tags for a given category name. category can have multiple tags and tags could belong to multiple categories

Parameters:
  • category (str) – name of the category for which associated tags should be returned
  • include_kind (bool) – indicate if result should include the kind/datatype for tags returned
  • include_description (bool) – indicate if result should include available description for tags returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

Will return one of the following

  • list of tag names
  • list of (tags, its data type/kind) if include_kind is True
  • list of (tags, description) if include_description is True
  • list of (tags, its data type/kind, description) if include_kind is True and include_description is true

Return type:

list

get_tags_by_topic(topic_prefix, include_kind=False, include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the list of tags for a given topic prefix or name.

Parameters:
  • topic_prefix (str) – topic_prefix for which associated tags should be returned
  • include_kind (bool) – indicate if result should include the kind/datatype for tags returned
  • include_description (bool) – indicate if result should include available description for tags returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

Will return one of the following

  • list of (tag name, value)
  • list of (tag name, value, data type/kind) if include_kind is True
  • list of (tag name, value, description) if include_description is True
  • list of (tags, value, data type/kind, description) if

include_kind is True and include_description is true

Return type:

list

get_topics_by_tags(and_condition=None, or_condition=None, condition=None, skip=0, count=None, order=None)[source]

Get list of topic names and topic name prefixes based on gives tags and values. This method parses the query condition creates an abstract syntax tree that represents the unambiguous query and calls method BaseTaggingService.query_topics_by_tags() of the implementing service to further process the ast and return list of topic prefixes

Parameters:
  • and_condition (dict or list) – dictionary of tag and its corresponding values that should be matched using equality operator or a list of tags that should exists/be true. Tag conditions are combined with AND condition. Only topics that match all the tags in the list would be returned
  • or_condition (dict or list) – dictionary of tag and its corresponding values that should be matched using equality operator or a list tags that should exist/be true. Tag conditions are combined with OR condition. Topics that match any of the tags in the list would be returned. If both and_condition and or_condition are provided then they are combined using AND operator.
  • condition (str) –

    conditional statement to be used for matching tags. If this parameter is provided the above two parameters are ignored. The value for this parameter should be an expression that contains one or more query conditions combined together with an “AND” or “OR”. Query conditions can be grouped together using parenthesis. Each condition in the expression should conform to one of the following format:

    1. <tag name/ parent.tag_name> <binary_operator> <value>
    2. <tag name/ parent.tag_name>
    3. <tag name/ parent.tag_name> LIKE <regular expression within single quotes
    4. parent tag used in query(using format parent.tag_name) should be of type/kind Ref. For example, campusRef.geoPostalCode = “99353”
    5. the word NOT can be prefixed before any of the above three to negate the condition.
    6. expressions can be grouped with parenthesis.
    Example
    condition="(tag1 = 1 or tag1 = 2) and (tag2 < '' and tag2 >
    '') and tag3 and  (tag4 LIKE '^a.*b$')"
    
    condition="NOT (tag5='US' OR tag5='UK') AND NOT tag3 AND
    NOT (tag4 LIKE 'a.*')"
    
    condition="campusRef.geoPostalCode='20500' and equip and boiler"
    
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

list of topics/topic_prefix that match the given query conditions

Return type:

list

insert_topic_tags(tags, update_version=False)[source]

Add tags to multiple topics.

Parameters:
  • tags (dict) –

    dictionary object or file containing the topic and the tag details. dictionary object or the file content should be of the format:

    <topic_name or prefix or topic_name pattern>: {<valid
    tag>:<value>, ... }, ... }
    
  • update_version (bool) – True/False. Default to False. If set to True and if any of the tags update an existing tag value the older value would be preserved as part of tag version history. Note: this feature is not implemented in the current version of sqlite and mongodb tagging service.
load_tag_refs()[source]

Called right after setup to load a dictionary of reference tags and its corresponding parent tag. Implementing methods should load self.tag_refs with tag and parent tag information

load_valid_tags()[source]

Called right after setup to load a dictionary of valid tags. It should load self.valid_tags with tag and type information

on_start(sender, **kwargs)[source]

Called on start of agent. Calls the methods

query_categories(include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the available list tag categories. category can have multiple tags and tags could belong to multiple categories

Parameters:
  • include_description (bool) – indicate if result should include available description for categories returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

list of category names if include_description is False, list of (category name, description) if include_description is True

Return type:

list

query_tags_by_category(category, include_kind=False, include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the list of tags for a given category name. category can have multiple tags and tags could belong to multiple categories

Parameters:
  • category (str) – name of the category for which associated tags should be returned
  • include_kind (bool) – indicate if result should include the kind/datatype for tags returned
  • include_description (bool) – indicate if result should include available description for tags returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

Will return one of the following

  • list of tag names
  • list of (tags, its data type/kind) if include_kind is True
  • list of (tags, description) if include_description is True
  • list of (tags, its data type/kind, description) if include_kind is True and include_description is true

Return type:

list

query_tags_by_topic(topic_prefix, include_kind=False, include_description=False, skip=0, count=None, order='FIRST_TO_LAST')[source]

Get the list of tags for a given topic prefix or name.

Parameters:
  • topic_prefix (str) – topic_prefix for which associated tags should be returned
  • include_kind (bool) – indicate if result should include the kind/datatype for tags returned
  • include_description (bool) – indicate if result should include available description for tags returned
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

Will return one of the following

  • list of (tag name, value)
  • list of (tag name, value, data type/kind) if include_kind is True
  • list of (tag name, value, description) if include_description is True
  • list of (tags, value, data type/kind, description) if include_kind is True and include_description is true

Return type:

list

query_topics_by_tags(ast, skip=0, count=None, order=None)[source]

Get list of topic names and topic name prefixes based on query condition. Query condition is passed as an abstract syntax tree.

Parameters:
  • ast (tuple) –

    Abstract syntax tree that represents conditional statement to be used for matching tags. The abstract syntax tree represents query condition that is created using the following specification

    Query condition is a boolean expression that contains one or more query conditions combined together with an “AND” or “OR”. Query conditions can be grouped together using parenthesis. Each condition in the expression should conform to one of the following format:

    1. <tag name/ parent.tag_name> <binary_operator> <value>
    2. <tag name/ parent.tag_name>
    3. <tag name/ parent.tag_name> LIKE <regular expression within single quotes
    4. the word NOT can be prefixed before any of the above three to negate the condition.
    5. expressions can be grouped with parenthesis. For example
      condition="(tag1 = 1 or tag1 = 2) and (tag2 < '' and tag2 >
      '') and tag3 and  (tag4 LIKE '^a.*b$')"
      condition="NOT (tag5='US' OR tag5='UK') AND NOT tag3 AND
      NOT (tag4 LIKE 'a.*')"
      condition="campusRef.geoPostalCode='20500' and equip and
      boiler"
      
  • skip (int) – number of tags to skip. usually used with order
  • count (int) – limit on the number of tags to return
  • order (str) – order of result - “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

list of topics/topic_prefix that match the given query conditions

Return type:

list

setup()[source]

Called on start of agent Method to establish database connection, do any initial bootstrap necessary. Example - load master list of tags, units, categories etc. into data store/memory

volttron.platform.agent.base_tagging.p_bool_expr_and(p)[source]

bool_expr : bool_expr AND bool_expr

volttron.platform.agent.base_tagging.p_bool_expr_eq(p)[source]

bool_expr : ID EQ expr

volttron.platform.agent.base_tagging.p_bool_expr_ge(p)[source]

bool_expr : ID GE expr

volttron.platform.agent.base_tagging.p_bool_expr_gt(p)[source]

bool_expr : ID GT expr

volttron.platform.agent.base_tagging.p_bool_expr_id(p)[source]

bool_expr : ID

volttron.platform.agent.base_tagging.p_bool_expr_le(p)[source]

bool_expr : ID LE expr

volttron.platform.agent.base_tagging.p_bool_expr_like1(p)[source]

bool_expr : ID LIKE SQUOTE_STRING

volttron.platform.agent.base_tagging.p_bool_expr_like2(p)[source]

bool_expr : ID LIKE DQUOTE_STRING

volttron.platform.agent.base_tagging.p_bool_expr_lt(p)[source]

bool_expr : ID LT expr

volttron.platform.agent.base_tagging.p_bool_expr_neq(p)[source]

bool_expr : ID NEQ expr

volttron.platform.agent.base_tagging.p_bool_expr_not(p)[source]

bool_expr : NOT bool_expr

volttron.platform.agent.base_tagging.p_bool_expr_or(p)[source]

bool_expr : bool_expr OR bool_expr

volttron.platform.agent.base_tagging.p_bool_expr_paren(p)[source]

bool_expr : LPAREN bool_expr RPAREN %prec PAREN

volttron.platform.agent.base_tagging.p_clause(p)[source]

clause : bool_expr

volttron.platform.agent.base_tagging.p_clause_error(p)[source]

clause : error

volttron.platform.agent.base_tagging.p_error(p)[source]
volttron.platform.agent.base_tagging.p_expr_div(p)[source]

expr : expr DIVIDE expr

volttron.platform.agent.base_tagging.p_expr_double_quote_string(p)[source]

expr : DQUOTE_STRING

volttron.platform.agent.base_tagging.p_expr_fp(p)[source]

expr : FPOINT

volttron.platform.agent.base_tagging.p_expr_minus(p)[source]

expr : expr MINUS expr

volttron.platform.agent.base_tagging.p_expr_mod(p)[source]

expr : expr MOD expr

volttron.platform.agent.base_tagging.p_expr_number(p)[source]

expr : NUMBER

volttron.platform.agent.base_tagging.p_expr_paren(p)[source]

expr : LPAREN expr RPAREN %prec PAREN

volttron.platform.agent.base_tagging.p_expr_plus(p)[source]

expr : expr PLUS expr

volttron.platform.agent.base_tagging.p_expr_single_quote_string(p)[source]

expr : SQUOTE_STRING

volttron.platform.agent.base_tagging.p_expr_times(p)[source]

expr : expr TIMES expr

volttron.platform.agent.base_tagging.p_expr_uminus(p)[source]

expr : MINUS expr %prec UMINUS

volttron.platform.agent.base_tagging.parse_query(query, tags, refs)[source]
volttron.platform.agent.base_tagging.pretty_print(tup)[source]
volttron.platform.agent.base_tagging.t_DQUOTE_STRING(t)[source]

“([^\n]|(.))*?”

volttron.platform.agent.base_tagging.t_FPOINT(t)[source]

[-+]?d+(.(d+)?([eE][-+]?d+)?|[eE][-+]?d+)

volttron.platform.agent.base_tagging.t_ID(t)[source]

[a-zA-Z_][a-zA-Z_0-9]*.*[a-zA-Z_][a-zA-Z_0-9]*

volttron.platform.agent.base_tagging.t_NUMBER(t)[source]

(-)?d+

volttron.platform.agent.base_tagging.t_SQUOTE_STRING(t)[source]

‘([^\n]|(.))*?’

volttron.platform.agent.base_tagging.t_error(t)[source]
volttron.platform.agent.base_tagging.t_newline(t)[source]

n+

volttron.platform.agent.cron module

cron-like schedule generator.

volttron.platform.agent.cron.schedule(cron_string, start=None, stop=None)[source]

Return a schedule generator from a cron-style string.

cron_string is a cron-style time expression consisting of five whitespace-separated fields explained in further detail below. start and stop are used to bound the schedule and can be None, datetime.datetime objects or numeric values, such as is returned by time.time(). stop may also be supplied as a datetime.timedelta object, in which case the end time is start + stop. If start is None, the current time is used. If stop is None, schedule will generate values infinitely. Each iteration yields a datetime.datetime object.

The following description of the cron fields is taken from the crontab(5) man page (with slight modifications).

The time and date fields are:

field allowed values —– ————– minute 0-59 hour 0-23 day of month 1-31 month 1-12 (or names, see below) day of week 0-7 (0 or 7 is Sunday, or use names)

A field may contain an asterisk (*), which always stands for “first-last”.

Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an ‘hours’ entry specifies execution at hours 8, 9, 10, and 11. If the range start or end value is left off, the first or last value will be used. For example, -8 for an ‘hours’ entry is equivalent to 0-8, 20- for a ‘days of month’ entry is equivalent to 20-31, and - for a ‘months’ entry is equivalent to 1-12.

Lists are allowed. A list is a set of numbers (or ranges) separated by commas. Examples: “1,2,5,9”, “0-4,8-12”.

Step values can be used in conjunction with ranges. Following a range with “/<number>” specifies skips of the number’s value through the range. For example, “0-23/2” can be used in the ‘hours’ field to specify every other hour. Step values are also permitted after an asterisk, “*/2” in the ‘hours’ field is equivalent to “0-23/2”.

Names can also be used for the ‘month’ and ‘day of week’ fields. Use at least the first three letters of the particular day or month (case does not matter).

Note: The day can be specified in the following two fields: ‘day of month’, and ‘day of week’. If both fields are restricted (i.e., do not contain the “*” character), then both are used to compute date/time values. For example, “30 4 1,15 * 5” is interpreted as “4:30 am on the 1st and 15th of each month, plus every Friday.”

volttron.platform.agent.driven module

VOLTTRON platform™ abstract agent for to drive VOLTTRON Nation apps.

class volttron.platform.agent.driven.AbstractDrivenAgent(out=None, **kwargs)[source]

Bases: object

classmethod output_format(input_object)[source]

The output object takes the resulting input object as a argument so that it may give correct topics to it’s outputs if needed.

output schema description

{TableName1: {name1:OutputDescriptor1, name2:OutputDescriptor2,…},….}

eg: {‘OAT’: {‘Timestamp’:OutputDescriptor(‘timestamp’, ‘foo/bar/timestamp’),’OAT’:OutputDescriptor(‘OutdoorAirTemperature’, ‘foo/bar/oat’)},
‘Sensor’: {‘SomeValue’:OutputDescriptor(‘integer’, ‘some_output/value’), ‘SomeOtherValue’:OutputDescriptor(‘boolean’, ‘some_output/value), ‘SomeString’:OutputDescriptor(‘string’, ‘some_output/string)}}

Should always call the parent class output_format and update the dictionary returned from the parent.

result = super().output_format(input_object) my_output = {…} result.update(my_output) return result

run(time, inputs)[source]

Do work for each batch of timestamped inputs time- current time inputs - dict of point name -> value

Must return a results object.

shutdown()[source]

Override this to add shutdown routines.

class volttron.platform.agent.driven.ConversionMapper(**kwargs)[source]

Bases: object

process_row(row_dict)[source]
setup_conversion_map(conversion_map_config, field_names)[source]
class volttron.platform.agent.driven.Results(terminate=False)[source]

Bases: object

command(point, value, device=None)[source]
insert_table_row(table, row)[source]
log(message, level=10)[source]
terminate(terminate)[source]

volttron.platform.agent.exit_codes module

volttron.platform.agent.green module

VOLTTRON platform™ greenlet coroutine helper classes/functions.

These utilities are meant to be used with the BaseAgent and greenlet to provide synchronization between light threads (coroutines).

exception volttron.platform.agent.green.Timeout[source]

Bases: exceptions.Exception

Raised in the greenlet when waiting on a channel times out.

class volttron.platform.agent.green.WaitQueue(create_timer)[source]

Bases: object

A holder for tasklets waiting on asynchronous data.

kill_all()[source]

Kill all the tasks in the queue.

notify(data, n=1)[source]

Notify n waiting tasks of the arrival of data.

notify_all(data)[source]

Notify all waiting tasks of the arrival of data.

wait(timeout=None)[source]

Wait for data to become available and return it

If timeout is None, wait indefinitely. Otherwise, timeout if the task hasn’t been notified within timeout seconds.

volttron.platform.agent.green.sleep(timeout, create_timer)[source]

Yield execution for timeout seconds.

volttron.platform.agent.json module

Wrapper around UJSON to protect against NaN values and also to increase the performance involved in serialization and deserialization of data.

volttron.platform.agent.json.dumps(data, **kwargs)[source]
volttron.platform.agent.json.loads(s, **kwargs)[source]

volttron.platform.agent.known_identities module

volttron.platform.agent.matching module

VOLTTRON platform™ topic matching for agent callbacks.

Declaratively attach topic prefix and additional tests for topic matching to agent methods allowing for automated callback registration and topic subscription.

Example:

class MyAgent(BaseAgent):
    @match_regex('topic1/(sub|next|part)/title[1-9]')
    def on_subtopic(topic, headers, message, match):
        # This is only executed if topic matches regex
        ...

    @match_glob('root/sub/*/leaf')
    def on_leafnode(topic, headers, message, match):
        # This is only executed if topic matches glob
        ...

    @match_exact('building/xyz/unit/condenser')
    @match_start('campus/PNNL')
    @match_end('unit/blower')
    def on_multimatch(topic, headers, message, match):
        # Multiple matchers can be attached to a method
        ...
volttron.platform.agent.matching.iter_match_tests(obj)[source]

Iterate match tests attached to the methods of an object.

Each iterated item is the 3-tuple (prefix, method, test) where prefix and test are the same as in match_test() and method is the method to which the test was attached (and is the expected callback).

volttron.platform.agent.matching.match_all(func)[source]

Wildcard matcher to register callback for every message.

volttron.platform.agent.matching.match_contains(substring, prefix='')[source]

Return a match decorator to match a component of a topic.

volttron.platform.agent.matching.match_end(suffix, prefix='')[source]

Return a match decorator to match the end of a topic.

volttron.platform.agent.matching.match_exact(topic)[source]

Return a match decorator to match a topic exactly.

volttron.platform.agent.matching.match_glob(pattern)[source]

Return a match decorator for the given glob pattern.

volttron.platform.agent.matching.match_headers(required_headers)[source]

Only call function if required headers match.

match_headers takes a single argument, required_headers, that is a dictionary containing the required headers and values that must match for the wrapped handler function to be called.

This decorator is not very useful on its own, because it doesn’t trigger any subscriptions, but can be useful to filter out messages that don’t contain the required headers and values.

volttron.platform.agent.matching.match_regex(pattern)[source]

Return a match decorator for the given regular expression.

volttron.platform.agent.matching.match_start(prefix)[source]

Return a match decorator to match the start of a topic.

volttron.platform.agent.matching.match_subtopic(prefix, subtopic, max_levels=None)[source]

Return a match decorator to match a subtopic.

volttron.platform.agent.matching.match_test(prefix, test=None)[source]

Decorate a callback method with subscription and test information.

Returns a decorator to attach (prefix, test) 2-tuples to methods which can be inspected to automatically subscribe to a topic prefix and provide a test for triggering a call back to the method.

prefix must match the start of a desired topic and test is either None or a function of the form test(topic, matched) where topic is the full topic to test against and matched should be the same as prefix. The test function must return a value that evaluates to True if the topic is a match or a value that evaluates to False otherwise. The test function is only called if topic.startswith(prefix) is True. If test is None, it is the same as if test = lambda topic, matched: True.

volttron.platform.agent.matching.test_contains(substring)[source]

Return a test function to match a topic containing substring.

volttron.platform.agent.matching.test_end(suffix)[source]

Return a test function to match the end of a topic.

volttron.platform.agent.matching.test_exact(topic, matched)[source]

Test if topic and match are exactly equal.

volttron.platform.agent.matching.test_glob(pattern)[source]

Return static prefix and regex test for glob pattern.

The pattern may include the following special wildcard patterns:

*      Matches zero or more characters.
**     Matches zero or more characters, including forward
       slashes (/).
?      Matches any single character
[...]  Matches any single characters between the brackets.  A
       range of adjacent characters may be matched using a
       hyphen (-) between the start and end character.  To
       include the hyphen as a search character, include it at
       the end of the pattern.  The range may be negated by
       immediately following the opening [ with a ^ or !.
volttron.platform.agent.matching.test_regex(pattern)[source]

Return the static prefix and a regex test function for pattern.

volttron.platform.agent.matching.test_subtopic(subtopic, max_levels=None)[source]

Return a test function to match a topic component after the prefix.

volttron.platform.agent.math_utils module

Dumping ground for VOLTTRON platform™ agent math helper functions.

Not meant to replace numpy in all cases. A basic set common math routines to remove the need for numpy in simple cases.

This module should NEVER import numpy as that would defeat the purpose.

volttron.platform.agent.math_utils.mean(data)[source]

Return the sample arithmetic mean of data.

volttron.platform.agent.math_utils.pstdev(data)[source]

Calculates the population standard deviation.

volttron.platform.agent.math_utils.stdev(data)[source]

Calculates the sample standard deviation.

volttron.platform.agent.multithreading module

VOLTTRON platform™ multi-threaded agent helper classes/functions.

These utilities are meant to be used with the BaseAgent and threading to provide synchronization between threads and the main agent loop.

exception volttron.platform.agent.multithreading.Timeout[source]

Bases: exceptions.Exception

Raised in the thread when waiting on a queue times out.

class volttron.platform.agent.multithreading.WaitQueue(lock=None)[source]

Bases: object

A holder for threads waiting on asynchronous data.

notify(data, n=1)[source]

Notify n waiting threads of the arrival of data.

notify_all(data)[source]

Notify all waiting threads of the arrival of data.

wait(timeout=None)[source]

Wait for data to become available and return it

If timeout is None, wait indefinitely. Otherwise, timeout if the thread hasn’t been notified within timeout seconds.

volttron.platform.agent.sched module

VOLTTRON platform™ agent event scheduling classes.

class volttron.platform.agent.sched.Event(function, args=None, kwargs=None)[source]

Bases: object

Base class for schedulable objects.

args
cancel()[source]

Mark the timer as canceled to avoid a callback.

canceled
finished
function
kwargs
class volttron.platform.agent.sched.EventWithTime(function, args=None, kwargs=None)[source]

Bases: volttron.platform.agent.sched.Event

Event that passes deadline to event handler.

class volttron.platform.agent.sched.Queue[source]

Bases: object

delay(time)[source]
execute(time)[source]
schedule(time, event)[source]
class volttron.platform.agent.sched.RecurringEvent(period, function, args=None, kwargs=None)[source]

Bases: volttron.platform.agent.sched.Event

period

volttron.platform.agent.utils module

VOLTTRON platform™ agent helper classes/functions.

volttron.platform.agent.utils.load_config(config_path)[source]

Load a JSON-encoded configuration file.

volttron.platform.agent.utils.run_agent(cls, subscribe_address=None, publish_address=None, config_path=None, **kwargs)[source]

Instantiate an agent and run it in the current thread.

Attempts to get keyword parameters from the environment if they are not set.

volttron.platform.agent.utils.start_agent_thread(cls, **kwargs)[source]

Instantiate an agent class and run it in a new daemon thread.

Returns the thread object.

volttron.platform.agent.utils.is_valid_identity(identity_to_check)[source]

Checks the passed identity to see if it contains invalid characters

A None value for identity_to_check will return False

@:param: string: The vip_identity to check for validity @:return: boolean: True if values are in the set of valid characters.