volttron.platform.dbutils package¶
Submodules¶
volttron.platform.dbutils.basedb module¶
-
exception
volttron.platform.dbutils.basedb.
ConnectionError
[source]¶ Bases:
exceptions.Exception
Custom class for connection errors
-
class
volttron.platform.dbutils.basedb.
DbDriver
(dbapimodule, **kwargs)[source]¶ Bases:
object
Parent class used by
sqlhistorian.historian.SQLHistorian
to do the database operations. This class is inherited byvolttron.platform.dbutils.mysqlfuncts.MySqlFuncts
volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ Collect the aggregate data by querying the historian’s data store
Parameters: - topic_ids – list of topic ids for which aggregation should be performed.
- agg_type – type of aggregation
- start_time – start time for query (inclusive)
- end_time – end time for query (exclusive)
Returns: a tuple of (aggregated value, count of records over which this aggregation was computed)
-
create_aggregate_store
(agg_type, period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
execute_many
(stmt, args, commit=False)[source]¶ Execute a sql statement with multiple args
Parameters: - stmt – the statement to execute
- args – optional arguments
- commit – True if transaction should be committed. Defaults to
False :return: count of the number of affected rows
-
execute_stmt
(stmt, args=None, commit=False)[source]¶ Execute a sql statement
Parameters: - stmt – the statement to execute
- args – optional arguments
- commit – True if transaction should be committed. Defaults to
False :return: count of the number of affected rows
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_agg_meta
(topic_id, metadata)[source]¶ Inserts metadata for aggregate topic
Parameters: - topic_id – aggregate topic id for which metadata is inserted
- metadata – metadata
Returns: True if execution completes. Raises exception if connection to
database fails
-
insert_agg_topic
(topic, agg_type, agg_time_period)[source]¶ Insert a new aggregate topic
Parameters: - topic – topic name to insert
- agg_type – type of aggregation
- agg_time_period – time period of aggregation
Returns: id of the topic inserted if insert was successful. Raises exception if unable to connect to database
-
insert_aggregate
(agg_topic_id, agg_type, period, ts, data, topic_ids)[source]¶ Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table
Parameters: - agg_topic_id – topic id
- agg_type – type of aggregation
- period – time period of aggregation
- ts – end time of aggregation period (not inclusive)
- data – computed aggregate
- topic_ids – topic ids or topic ids for which aggregate was computed
Returns: True if execution was successful, raises exception
in case of connection failures
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
insert_data
(ts, topic_id, data)[source]¶ Inserts data for topic
Parameters: - ts – timestamp
- topic_id – topic id for which data is inserted
- metadata – data values
Returns: True if execution completes. raises Exception if unable to
connect to database
-
insert_meta
(topic_id, metadata)[source]¶ Inserts metadata for topic
Parameters: - topic_id – topic id for which metadata is inserted
- metadata – metadata
Returns: True if execution completes. Raises exception if unable to
connect to database
-
insert_topic
(topic)[source]¶ Insert a new topic
Parameters: topic – topic to insert Returns: id of the topic inserted if insert was successful. Raises exception if unable to connect to database
-
manage_db_size
(history_limit_timestamp, storage_limit_gb)[source]¶ Optional function to manage database size.
Parameters: - history_limit_timestamp – remove all data older than this timestamp
- storage_limit_gb – remove oldest data until database is smaller than this value.
-
query
(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]¶ Queries the raw historian data or aggregate data and returns the results of the query
Parameters: - topic_ids – list of topic ids to query for.
- id_name_map – dictionary that maps topic id to topic name
- start (datetime) – Start of query timestamp as a datetime.
- end (datetime) – End of query timestamp as a datetime.
- agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
- agg_period – If this is a query for aggregate data, the time period of aggregation
- skip (int) – Skip this number of results.
- count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
- order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns: result of the query in the format:
{ topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], ...}
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-
read_tablenames_from_db
(meta_table_name)[source]¶ Reads names of the tables used by this historian to store data, topics, metadata, aggregate topics and aggregate metadata
Parameters: meta_table_name – The volttron metadata table in which table definitions are stored Returns: table names { 'data_table': name of table that store data, 'topics_table':name of table that store list of topics, 'meta_table':name of table that store metadata, 'agg_topics_table':name of table that stores aggregate topics, 'agg_meta_table':name of table that store aggregate metadata }
-
replace_agg_meta_stmt
()[source]¶ Returns: query string to insert metadata for an aggregate topic into database
-
select
(query, args=None, fetch_all=True)[source]¶ Execute a select statement
Parameters: - query – select statement
- args – arguments for the where clause
- fetch_all – Set to True if function should return retrieve all
the records from cursors and return it. Set to False to return cursor. :return: resultant rows if fetch_all is True else returns the cursor It is up to calling method to close the cursor
-
update_agg_topic
(agg_id, agg_topic_name)[source]¶ Update a aggregate topic name
Parameters: - agg_id – topic id for which update is done
- agg_topic_name – new aggregate topic name
Returns: True if execution is complete. Raises exception if unable to
connect to database
volttron.platform.dbutils.crateutils module¶
volttron.platform.dbutils.influxdbutils module¶
volttron.platform.dbutils.mongoutils module¶
-
volttron.platform.dbutils.mongoutils.
get_agg_topics
(client, agg_topics_collection, agg_meta_collection)[source]¶
volttron.platform.dbutils.mysqlfuncts module¶
-
class
volttron.platform.dbutils.mysqlfuncts.
MySqlFuncts
(connect_params, table_names)[source]¶ Bases:
volttron.platform.dbutils.basedb.DbDriver
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ Collect the aggregate data by querying the historian’s data store
Parameters: - topic_ids – list of topic ids for which aggregation should be performed.
- agg_type – type of aggregation
- start_time – start time for query (inclusive)
- end_time – end time for query (exclusive)
Returns: a tuple of (aggregated value, count of records over which this aggregation was computed)
-
create_aggregate_store
(agg_type, agg_time_period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
query
(topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order='FIRST_TO_LAST')[source]¶ Queries the raw historian data or aggregate data and returns the results of the query
Parameters: - topic_ids – list of topic ids to query for.
- id_name_map – dictionary that maps topic id to topic name
- start (datetime) – Start of query timestamp as a datetime.
- end (datetime) – End of query timestamp as a datetime.
- agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
- agg_period – If this is a query for aggregate data, the time period of aggregation
- skip (int) – Skip this number of results.
- count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
- order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns: result of the query in the format:
{ topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], ...}
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-
volttron.platform.dbutils.sqlitefuncts module¶
-
class
volttron.platform.dbutils.sqlitefuncts.
SqlLiteFuncts
(connect_params, table_names)[source]¶ Bases:
volttron.platform.dbutils.basedb.DbDriver
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ This function should return the results of a aggregation query @param topic_ids: list of single topics @param agg_type: type of aggregation @param start: start time @param end: end time @return: aggregate value, count of number of records over which aggregation was computed
-
create_aggregate_store
(agg_type, period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
static
get_tagging_query_from_ast
(topic_tags_table, tup, tag_refs)[source]¶ Get a query condition syntax tree and generate sqlite query to query topic names by tags. It calls the get_compound_query to parse the abstract syntax tree tuples and then fixes the precedence
Example: # User input query string :
campus.geoPostalCode=”20500” and equip and boiler and “equip_tag 7” > 4
# Example output sqlite query
- SELECT topic_prefix from test_topic_tags WHERE tag=”campusRef”
- and value IN(
- SELECT topic_prefix from test_topic_tags WHERE tag=”campus” and value=1 INTERSECT SELECT topic_prefix from test_topic_tags WHERE tag=”geoPostalCode” and value=”20500”
)
INTERSECT SELECT topic_prefix from test_tags WHERE tag=”equip” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag=”boiler” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag = “equip_tag 7” and value > 4
Parameters: - topic_tags_table – table to query
- tup – parsed query string (abstract syntax tree)
- tag_refs – dictionary of ref tags and its parent tag
Returns: sqlite query
:rtype str
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
manage_db_size
(history_limit_timestamp, storage_limit_gb)[source]¶ Manage database size.
Parameters: - history_limit_timestamp – remove all data older than this timestamp
- storage_limit_gb – remove oldest data until database is smaller than this value.
-
query
(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]¶ This function should return the results of a query in the form:
{"values": [(timestamp1, value1), (timestamp2, value2), ...], "metadata": {"key1": value1, "key2": value2, ...}}
metadata is not required (The caller will normalize this to {} for you) @param topic_ids: topic_ids to query data for @param id_name_map: dictionary containing topic_id:topic_name @param start: @param end: @param agg_type: @param agg_period: @param skip: @param count: @param order:
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-