volttron.platform.dbutils package

Submodules

volttron.platform.dbutils.basedb module

exception volttron.platform.dbutils.basedb.ConnectionError[source]

Bases: exceptions.Exception

Custom class for connection errors

class volttron.platform.dbutils.basedb.DbDriver(dbapimodule, **kwargs)[source]

Bases: object

Parent class used by sqlhistorian.historian.SQLHistorian to do the database operations. This class is inherited by

close()[source]

Close connection to database :return:

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store

Parameters:
  • topic_ids – list of topic ids for which aggregation should be performed.
  • agg_type – type of aggregation
  • start_time – start time for query (inclusive)
  • end_time – end time for query (exclusive)
Returns:

a tuple of (aggregated value, count of records over which this aggregation was computed)

commit()[source]

Commit a transaction

Returns:True if successful, False otherwise
create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

cursor()[source]
execute_many(stmt, args, commit=False)[source]

Execute a sql statement with multiple args

Parameters:
  • stmt – the statement to execute
  • args – optional arguments
  • commit – True if transaction should be committed. Defaults to

False :return: count of the number of affected rows

execute_stmt(stmt, args=None, commit=False)[source]

Execute a sql statement

Parameters:
  • stmt – the statement to execute
  • args – optional arguments
  • commit – True if transaction should be committed. Defaults to

False :return: count of the number of affected rows

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
insert_agg_meta(topic_id, metadata)[source]

Inserts metadata for aggregate topic

Parameters:
  • topic_id – aggregate topic id for which metadata is inserted
  • metadata – metadata
Returns:

True if execution completes. Raises exception if connection to

database fails

insert_agg_topic(topic, agg_type, agg_time_period)[source]

Insert a new aggregate topic

Parameters:
  • topic – topic name to insert
  • agg_type – type of aggregation
  • agg_time_period – time period of aggregation
Returns:

id of the topic inserted if insert was successful. Raises exception if unable to connect to database

insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate(agg_topic_id, agg_type, period, ts, data, topic_ids)[source]

Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table

Parameters:
  • agg_topic_id – topic id
  • agg_type – type of aggregation
  • period – time period of aggregation
  • ts – end time of aggregation period (not inclusive)
  • data – computed aggregate
  • topic_ids – topic ids or topic ids for which aggregate was computed
Returns:

True if execution was successful, raises exception

in case of connection failures

insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data(ts, topic_id, data)[source]

Inserts data for topic

Parameters:
  • ts – timestamp
  • topic_id – topic id for which data is inserted
  • metadata – data values
Returns:

True if execution completes. raises Exception if unable to

connect to database

insert_data_query()[source]
Returns:query string to insert data into database
insert_meta(topic_id, metadata)[source]

Inserts metadata for topic

Parameters:
  • topic_id – topic id for which metadata is inserted
  • metadata – metadata
Returns:

True if execution completes. Raises exception if unable to

connect to database

insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic(topic)[source]

Insert a new topic

Parameters:topic – topic to insert
Returns:id of the topic inserted if insert was successful. Raises exception if unable to connect to database
insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Optional function to manage database size.

Parameters:
  • history_limit_timestamp – remove all data older than this timestamp
  • storage_limit_gb – remove oldest data until database is smaller than this value.
query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query

Parameters:
  • topic_ids – list of topic ids to query for.
  • id_name_map – dictionary that maps topic id to topic name
  • start (datetime) – Start of query timestamp as a datetime.
  • end (datetime) – End of query timestamp as a datetime.
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

result of the query in the format:

{
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
...}
query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

read_tablenames_from_db(meta_table_name)[source]

Reads names of the tables used by this historian to store data, topics, metadata, aggregate topics and aggregate metadata

Parameters:meta_table_name – The volttron metadata table in which table definitions are stored
Returns:table names
{
 'data_table': name of table that store data,
 'topics_table':name of table that store list of topics,
 'meta_table':name of table that store metadata,
 'agg_topics_table':name of table that stores aggregate topics,
 'agg_meta_table':name of table that store aggregate metadata
 }
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

rollback()[source]

Rollback a transaction

Returns:True if successful, False otherwise
select(query, args=None, fetch_all=True)[source]

Execute a select statement

Parameters:
  • query – select statement
  • args – arguments for the where clause
  • fetch_all – Set to True if function should return retrieve all

the records from cursors and return it. Set to False to return cursor. :return: resultant rows if fetch_all is True else returns the cursor It is up to calling method to close the cursor

setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic(agg_id, agg_topic_name)[source]

Update a aggregate topic name

Parameters:
  • agg_id – topic id for which update is done
  • agg_topic_name – new aggregate topic name
Returns:

True if execution is complete. Raises exception if unable to

connect to database

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic(topic, topic_id)[source]

Update a topic name

Parameters:
  • topic – new topic name
  • topic_id – topic id for which update is done
Returns:

True if execution is complete. Raises exception if unable to

connect to database

update_topic_query()[source]
Returns:query string to update a topic in database
volttron.platform.dbutils.basedb.closing(*args, **kwds)[source]

volttron.platform.dbutils.crateutils module

volttron.platform.dbutils.crateutils.create_schema(connection, schema='historian', num_replicas='0-1', num_shards=6, use_v2=True)[source]
volttron.platform.dbutils.crateutils.drop_schema(connection, truncate=False, schema=None)[source]
volttron.platform.dbutils.crateutils.insert_data_query(schema)[source]
volttron.platform.dbutils.crateutils.insert_topic_query(schema)[source]
volttron.platform.dbutils.crateutils.select_all_topics_query(schema)[source]

volttron.platform.dbutils.influxdbutils module

volttron.platform.dbutils.mongoutils module

volttron.platform.dbutils.mongoutils.get_agg_topic_map(client, agg_topics_collection)[source]
volttron.platform.dbutils.mongoutils.get_agg_topics(client, agg_topics_collection, agg_meta_collection)[source]
volttron.platform.dbutils.mongoutils.get_mongo_client(connection_params, **kwargs)[source]
volttron.platform.dbutils.mongoutils.get_tagging_queries_from_ast(tup, tag_refs, sub_queries)[source]
volttron.platform.dbutils.mongoutils.get_topic_map(client, topics_collection)[source]

volttron.platform.dbutils.mysqlfuncts module

class volttron.platform.dbutils.mysqlfuncts.MySqlFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store

Parameters:
  • topic_ids – list of topic ids for which aggregation should be performed.
  • agg_type – type of aggregation
  • start_time – start time for query (inclusive)
  • end_time – end time for query (exclusive)
Returns:

a tuple of (aggregated value, count of records over which this aggregation was computed)

create_aggregate_store(agg_type, agg_time_period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
init_microsecond_support()[source]
insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
query(topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query

Parameters:
  • topic_ids – list of topic ids to query for.
  • id_name_map – dictionary that maps topic id to topic name
  • start (datetime) – Start of query timestamp as a datetime.
  • end (datetime) – End of query timestamp as a datetime.
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

result of the query in the format:

{
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
...}
query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(tables_def, meta_table_name)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.sqlitefuncts module

class volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

This function should return the results of a aggregation query @param topic_ids: list of single topics @param agg_type: type of aggregation @param start: start time @param end: end time @return: aggregate value, count of number of records over which aggregation was computed

create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
static get_tagging_query_from_ast(topic_tags_table, tup, tag_refs)[source]

Get a query condition syntax tree and generate sqlite query to query topic names by tags. It calls the get_compound_query to parse the abstract syntax tree tuples and then fixes the precedence

Example: # User input query string :

campus.geoPostalCode=”20500” and equip and boiler and “equip_tag 7” > 4

# Example output sqlite query

SELECT topic_prefix from test_topic_tags WHERE tag=”campusRef”
and value IN(
SELECT topic_prefix from test_topic_tags WHERE tag=”campus” and value=1 INTERSECT SELECT topic_prefix from test_topic_tags WHERE tag=”geoPostalCode” and value=”20500”

)

INTERSECT SELECT topic_prefix from test_tags WHERE tag=”equip” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag=”boiler” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag = “equip_tag 7” and value > 4

Parameters:
  • topic_tags_table – table to query
  • tup – parsed query string (abstract syntax tree)
  • tag_refs – dictionary of ref tags and its parent tag
Returns:

sqlite query

:rtype str

get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Manage database size.

Parameters:
  • history_limit_timestamp – remove all data older than this timestamp
  • storage_limit_gb – remove oldest data until database is smaller than this value.
query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

This function should return the results of a query in the form:

{"values": [(timestamp1, value1), (timestamp2, value2), ...],
 "metadata": {"key1": value1, "key2": value2, ...}}

metadata is not required (The caller will normalize this to {} for you) @param topic_ids: topic_ids to query data for @param id_name_map: dictionary containing topic_id:topic_name @param start: @param end: @param agg_type: @param agg_period: @param skip: @param count: @param order:

query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(table_defs, meta_table_name)[source]
regex_select(query, args, fetch_all=True, cache_size=None)[source]
static regexp(expr, item)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

set_cache(cache_size)[source]
setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.sqlutils module

volttron.platform.dbutils.sqlutils.get_dbfuncts_class(database_type)[source]