volttron.platform.dbutils package¶
Submodules¶
volttron.platform.dbutils.basedb module¶
-
exception
volttron.platform.dbutils.basedb.
ConnectionError
[source]¶ Bases:
exceptions.Exception
Custom class for connection errors
-
class
volttron.platform.dbutils.basedb.
DbDriver
(dbapimodule, **kwargs)[source]¶ Bases:
object
Parent class used by
sqlhistorian.historian.SQLHistorian
to do the database operations. This class is inherited byvolttron.platform.dbutils.mysqlfuncts.MySqlFuncts
volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts
-
bulk_insert
(**kwds)[source]¶ Function to meet bulk insert requirements. This function can be overridden by historian drivers to yield the required method for data insertion during bulk inserts in the respective historians. In this generic case it will yield the single insert method
Yields: insert method
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ Collect the aggregate data by querying the historian’s data store
Parameters: - topic_ids – list of topic ids for which aggregation should be performed.
- agg_type – type of aggregation
- start_time – start time for query (inclusive)
- end_time – end time for query (exclusive)
Returns: a tuple of (aggregated value, count of records over which this aggregation was computed)
-
create_aggregate_store
(agg_type, period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
execute_many
(stmt, args, commit=False)[source]¶ Execute a sql statement with multiple args
Parameters: - stmt – the statement to execute
- args – optional arguments
- commit – True if transaction should be committed. Defaults to
False :return: count of the number of affected rows
-
execute_stmt
(stmt, args=None, commit=False)[source]¶ Execute a sql statement
Parameters: - stmt – the statement to execute
- args – optional arguments
- commit – True if transaction should be committed. Defaults to
False :return: count of the number of affected rows
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_agg_meta
(topic_id, metadata)[source]¶ Inserts metadata for aggregate topic
Parameters: - topic_id – aggregate topic id for which metadata is inserted
- metadata – metadata
Returns: True if execution completes. Raises exception if connection to
database fails
-
insert_agg_topic
(topic, agg_type, agg_time_period)[source]¶ Insert a new aggregate topic
Parameters: - topic – topic name to insert
- agg_type – type of aggregation
- agg_time_period – time period of aggregation
Returns: id of the topic inserted if insert was successful. Raises exception if unable to connect to database
-
insert_aggregate
(agg_topic_id, agg_type, period, ts, data, topic_ids)[source]¶ Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table
Parameters: - agg_topic_id – topic id
- agg_type – type of aggregation
- period – time period of aggregation
- ts – end time of aggregation period (not inclusive)
- data – computed aggregate
- topic_ids – topic ids or topic ids for which aggregate was computed
Returns: True if execution was successful, raises exception
in case of connection failures
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
insert_data
(ts, topic_id, data)[source]¶ Inserts data for topic
Parameters: - ts – timestamp
- topic_id – topic id for which data is inserted
- data – data value
Returns: True if execution completes. raises Exception if unable to
connect to database
-
insert_meta
(topic_id, metadata)[source]¶ Inserts metadata for topic
Parameters: - topic_id – topic id for which metadata is inserted
- metadata – metadata
Returns: True if execution completes. Raises exception if unable to
connect to database
-
insert_topic
(topic)[source]¶ Insert a new topic
Parameters: topic – topic to insert Returns: id of the topic inserted if insert was successful. Raises exception if unable to connect to database
-
manage_db_size
(history_limit_timestamp, storage_limit_gb)[source]¶ Optional function to manage database size.
Parameters: - history_limit_timestamp – remove all data older than this timestamp
- storage_limit_gb – remove oldest data until database is smaller than this value.
-
query
(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]¶ Queries the raw historian data or aggregate data and returns the results of the query
Parameters: - topic_ids – list of topic ids to query for.
- id_name_map – dictionary that maps topic id to topic name
- start (datetime) – Start of query timestamp as a datetime.
- end (datetime) – End of query timestamp as a datetime.
- agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
- agg_period – If this is a query for aggregate data, the time period of aggregation
- skip (int) – Skip this number of results.
- count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
- order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns: result of the query in the format:
{ topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], ...}
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-
read_tablenames_from_db
(meta_table_name)[source]¶ Reads names of the tables used by this historian to store data, topics, metadata, aggregate topics and aggregate metadata
Parameters: meta_table_name – The volttron metadata table in which table definitions are stored Returns: table names { 'data_table': name of table that store data, 'topics_table':name of table that store list of topics, 'meta_table':name of table that store metadata, 'agg_topics_table':name of table that stores aggregate topics, 'agg_meta_table':name of table that store aggregate metadata }
-
replace_agg_meta_stmt
()[source]¶ Returns: query string to insert metadata for an aggregate topic into database
-
select
(query, args=None, fetch_all=True)[source]¶ Execute a select statement
Parameters: - query – select statement
- args – arguments for the where clause
- fetch_all – Set to True if function should return retrieve all
the records from cursors and return it. Set to False to return cursor. :return: resultant rows if fetch_all is True else returns the cursor It is up to calling method to close the cursor
-
update_agg_topic
(agg_id, agg_topic_name)[source]¶ Update a aggregate topic name
Parameters: - agg_id – topic id for which update is done
- agg_topic_name – new aggregate topic name
Returns: True if execution is complete. Raises exception if unable to
connect to database
volttron.platform.dbutils.crateutils module¶
-
volttron.platform.dbutils.crateutils.
create_schema
(connection, schema='historian', table_names={}, num_replicas='0-1', num_shards=6, use_v2=True)[source]¶
volttron.platform.dbutils.influxdbutils module¶
volttron.platform.dbutils.mongoutils module¶
-
volttron.platform.dbutils.mongoutils.
get_agg_topics
(client, agg_topics_collection, agg_meta_collection)[source]¶
volttron.platform.dbutils.mysqlfuncts module¶
-
class
volttron.platform.dbutils.mysqlfuncts.
MySqlFuncts
(connect_params, table_names)[source]¶ Bases:
volttron.platform.dbutils.basedb.DbDriver
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ Collect the aggregate data by querying the historian’s data store
Parameters: - topic_ids – list of topic ids for which aggregation should be performed.
- agg_type – type of aggregation
- start_time – start time for query (inclusive)
- end_time – end time for query (exclusive)
Returns: a tuple of (aggregated value, count of records over which this aggregation was computed)
-
create_aggregate_store
(agg_type, agg_time_period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
query
(topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order='FIRST_TO_LAST')[source]¶ Queries the raw historian data or aggregate data and returns the results of the query
Parameters: - topic_ids – list of topic ids to query for.
- id_name_map – dictionary that maps topic id to topic name
- start (datetime) – Start of query timestamp as a datetime.
- end (datetime) – End of query timestamp as a datetime.
- agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
- agg_period – If this is a query for aggregate data, the time period of aggregation
- skip (int) – Skip this number of results.
- count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
- order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns: result of the query in the format:
{ topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], topic_name:[(timestamp1, value1), (timestamp2:,value2), ...], ...}
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-
volttron.platform.dbutils.postgresqlfuncts module¶
volttron.platform.dbutils.redshiftfuncts module¶
volttron.platform.dbutils.sqlitefuncts module¶
-
class
volttron.platform.dbutils.sqlitefuncts.
SqlLiteFuncts
(connect_params, table_names)[source]¶ Bases:
volttron.platform.dbutils.basedb.DbDriver
-
collect_aggregate
(topic_ids, agg_type, start=None, end=None)[source]¶ This function should return the results of a aggregation query @param topic_ids: list of single topics @param agg_type: type of aggregation @param start: start time @param end: end time @return: aggregate value, count of number of records over which aggregation was computed
-
create_aggregate_store
(agg_type, period)[source]¶ Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>
Parameters: - agg_type – The type of aggregation. (avg, sum etc.)
- agg_time_period – The time period of aggregation
:return - True if successful, False otherwise
-
get_agg_topic_map
()[source]¶ Get a map of aggregate_topics to aggregate_topic_id
Returns: dict of format {(agg_topic_name, agg_type, agg_time_period):agg_topic_id}
-
get_agg_topics
()[source]¶ Get the list of aggregate topics available
Returns: list of tuples containing (agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)
-
get_aggregation_list
()[source]¶ Return list of aggregation supported by the specific data store
Returns: list of aggregations
-
static
get_tagging_query_from_ast
(topic_tags_table, tup, tag_refs)[source]¶ Get a query condition syntax tree and generate sqlite query to query topic names by tags. It calls the get_compound_query to parse the abstract syntax tree tuples and then fixes the precedence
Example: # User input query string :
campus.geoPostalCode=”20500” and equip and boiler and “equip_tag 7” > 4
# Example output sqlite query
- SELECT topic_prefix from test_topic_tags WHERE tag=”campusRef”
- and value IN(
- SELECT topic_prefix from test_topic_tags WHERE tag=”campus” and value=1 INTERSECT SELECT topic_prefix from test_topic_tags WHERE tag=”geoPostalCode” and value=”20500”
)
INTERSECT SELECT topic_prefix from test_tags WHERE tag=”equip” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag=”boiler” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag = “equip_tag 7” and value > 4
Parameters: - topic_tags_table – table to query
- tup – parsed query string (abstract syntax tree)
- tag_refs – dictionary of ref tags and its parent tag
Returns: sqlite query
:rtype str
-
get_topic_map
()[source]¶ Returns details of topics in database
Returns: two dictionaries. - First one maps topic_name.lower() to topic id and
- Second one maps topic_name.lower() to topic name
-
insert_aggregate_stmt
(table_name)[source]¶ The sql statement to insert collected aggregate for a given time period into database
Parameters: table_name – name of the table into which the aggregate data needs to be inserted Returns: sql insert/replace statement to insert aggregate data for a specific time slice Return type: str
-
manage_db_size
(history_limit_timestamp, storage_limit_gb)[source]¶ Manage database size.
Parameters: - history_limit_timestamp – remove all data older than this timestamp
- storage_limit_gb – remove oldest data until database is smaller than this value.
-
query
(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]¶ This function should return the results of a query in the form:
{"values": [(timestamp1, value1), (timestamp2, value2), ...], "metadata": {"key1": value1, "key2": value2, ...}}
metadata is not required (The caller will normalize this to {} for you) @param topic_ids: topic_ids to query data for @param id_name_map: dictionary containing topic_id:topic_name @param start: @param end: @param agg_type: @param agg_period: @param skip: @param count: @param order:
-
query_topics_by_pattern
(topic_pattern)[source]¶ Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:
-