volttron.platform.dbutils package

Submodules

volttron.platform.dbutils.basedb module

exception volttron.platform.dbutils.basedb.ConnectionError[source]

Bases: exceptions.Exception

Custom class for connection errors

class volttron.platform.dbutils.basedb.DbDriver(dbapimodule, **kwargs)[source]

Bases: object

Parent class used by sqlhistorian.historian.SQLHistorian to do the database operations. This class is inherited by

bulk_insert(**kwds)[source]

Function to meet bulk insert requirements. This function can be overridden by historian drivers to yield the required method for data insertion during bulk inserts in the respective historians. In this generic case it will yield the single insert method

Yields:insert method
close()[source]

Close connection to database :return:

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store

Parameters:
  • topic_ids – list of topic ids for which aggregation should be performed.
  • agg_type – type of aggregation
  • start_time – start time for query (inclusive)
  • end_time – end time for query (exclusive)
Returns:

a tuple of (aggregated value, count of records over which this aggregation was computed)

commit()[source]

Commit a transaction

Returns:True if successful, False otherwise
create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

cursor()[source]
execute_many(stmt, args, commit=False)[source]

Execute a sql statement with multiple args

Parameters:
  • stmt – the statement to execute
  • args – optional arguments
  • commit – True if transaction should be committed. Defaults to

False :return: count of the number of affected rows

execute_stmt(stmt, args=None, commit=False)[source]

Execute a sql statement

Parameters:
  • stmt – the statement to execute
  • args – optional arguments
  • commit – True if transaction should be committed. Defaults to

False :return: count of the number of affected rows

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
insert_agg_meta(topic_id, metadata)[source]

Inserts metadata for aggregate topic

Parameters:
  • topic_id – aggregate topic id for which metadata is inserted
  • metadata – metadata
Returns:

True if execution completes. Raises exception if connection to

database fails

insert_agg_topic(topic, agg_type, agg_time_period)[source]

Insert a new aggregate topic

Parameters:
  • topic – topic name to insert
  • agg_type – type of aggregation
  • agg_time_period – time period of aggregation
Returns:

id of the topic inserted if insert was successful. Raises exception if unable to connect to database

insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate(agg_topic_id, agg_type, period, ts, data, topic_ids)[source]

Insert aggregate data collected for a specific time period into database. Data is inserted into <agg_type>_<period> table

Parameters:
  • agg_topic_id – topic id
  • agg_type – type of aggregation
  • period – time period of aggregation
  • ts – end time of aggregation period (not inclusive)
  • data – computed aggregate
  • topic_ids – topic ids or topic ids for which aggregate was computed
Returns:

True if execution was successful, raises exception

in case of connection failures

insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data(ts, topic_id, data)[source]

Inserts data for topic

Parameters:
  • ts – timestamp
  • topic_id – topic id for which data is inserted
  • data – data value
Returns:

True if execution completes. raises Exception if unable to

connect to database

insert_data_query()[source]
Returns:query string to insert data into database
insert_meta(topic_id, metadata)[source]

Inserts metadata for topic

Parameters:
  • topic_id – topic id for which metadata is inserted
  • metadata – metadata
Returns:

True if execution completes. Raises exception if unable to

connect to database

insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic(topic)[source]

Insert a new topic

Parameters:topic – topic to insert
Returns:id of the topic inserted if insert was successful. Raises exception if unable to connect to database
insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Optional function to manage database size.

Parameters:
  • history_limit_timestamp – remove all data older than this timestamp
  • storage_limit_gb – remove oldest data until database is smaller than this value.
query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query

Parameters:
  • topic_ids – list of topic ids to query for.
  • id_name_map – dictionary that maps topic id to topic name
  • start (datetime) – Start of query timestamp as a datetime.
  • end (datetime) – End of query timestamp as a datetime.
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

result of the query in the format:

{
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
...}
query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

read_tablenames_from_db(meta_table_name)[source]

Reads names of the tables used by this historian to store data, topics, metadata, aggregate topics and aggregate metadata

Parameters:meta_table_name – The volttron metadata table in which table definitions are stored
Returns:table names
{
 'data_table': name of table that store data,
 'topics_table':name of table that store list of topics,
 'meta_table':name of table that store metadata,
 'agg_topics_table':name of table that stores aggregate topics,
 'agg_meta_table':name of table that store aggregate metadata
 }
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

rollback()[source]

Rollback a transaction

Returns:True if successful, False otherwise
select(query, args=None, fetch_all=True)[source]

Execute a select statement

Parameters:
  • query – select statement
  • args – arguments for the where clause
  • fetch_all – Set to True if function should return retrieve all

the records from cursors and return it. Set to False to return cursor. :return: resultant rows if fetch_all is True else returns the cursor It is up to calling method to close the cursor

setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic(agg_id, agg_topic_name)[source]

Update a aggregate topic name

Parameters:
  • agg_id – topic id for which update is done
  • agg_topic_name – new aggregate topic name
Returns:

True if execution is complete. Raises exception if unable to

connect to database

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic(topic, topic_id)[source]

Update a topic name

Parameters:
  • topic – new topic name
  • topic_id – topic id for which update is done
Returns:

True if execution is complete. Raises exception if unable to

connect to database

update_topic_query()[source]
Returns:query string to update a topic in database
volttron.platform.dbutils.basedb.closing(*args, **kwds)[source]

volttron.platform.dbutils.crateutils module

volttron.platform.dbutils.crateutils.create_schema(connection, schema='historian', table_names={}, num_replicas='0-1', num_shards=6, use_v2=True)[source]
volttron.platform.dbutils.crateutils.drop_schema(connection, truncate_tables, schema=None, truncate=True)[source]
volttron.platform.dbutils.crateutils.insert_data_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.insert_topic_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.select_all_topics_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.select_topics_metadata_query(schema, table_name)[source]
volttron.platform.dbutils.crateutils.update_topic_query(schema, table_name)[source]

volttron.platform.dbutils.influxdbutils module

volttron.platform.dbutils.mongoutils module

volttron.platform.dbutils.mongoutils.get_agg_topic_map(client, agg_topics_collection)[source]
volttron.platform.dbutils.mongoutils.get_agg_topics(client, agg_topics_collection, agg_meta_collection)[source]
volttron.platform.dbutils.mongoutils.get_mongo_client(connection_params, **kwargs)[source]
volttron.platform.dbutils.mongoutils.get_tagging_queries_from_ast(tup, tag_refs, sub_queries)[source]
volttron.platform.dbutils.mongoutils.get_topic_map(client, topics_collection)[source]

volttron.platform.dbutils.mysqlfuncts module

class volttron.platform.dbutils.mysqlfuncts.MySqlFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

Collect the aggregate data by querying the historian’s data store

Parameters:
  • topic_ids – list of topic ids for which aggregation should be performed.
  • agg_type – type of aggregation
  • start_time – start time for query (inclusive)
  • end_time – end time for query (exclusive)
Returns:

a tuple of (aggregated value, count of records over which this aggregation was computed)

create_aggregate_store(agg_type, agg_time_period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
init_microsecond_support()[source]
insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
query(topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order='FIRST_TO_LAST')[source]

Queries the raw historian data or aggregate data and returns the results of the query

Parameters:
  • topic_ids – list of topic ids to query for.
  • id_name_map – dictionary that maps topic id to topic name
  • start (datetime) – Start of query timestamp as a datetime.
  • end (datetime) – End of query timestamp as a datetime.
  • agg_type – If this is a query for aggregate data, the type of aggregation ( for example, sum, avg)
  • agg_period – If this is a query for aggregate data, the time period of aggregation
  • skip (int) – Skip this number of results.
  • count (int) – Limit results to this value. When the query is for multiple topics, count applies to individual topics. For example, a query on 2 topics with count=5 will return 5 records for each topic
  • order (str) – How to order the results, either “FIRST_TO_LAST” or “LAST_TO_FIRST”
Returns:

result of the query in the format:

{
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
topic_name:[(timestamp1, value1),
            (timestamp2:,value2),
            ...],
...}
query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(tables_def, meta_table_name)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.postgresqlfuncts module

volttron.platform.dbutils.redshiftfuncts module

volttron.platform.dbutils.sqlitefuncts module

class volttron.platform.dbutils.sqlitefuncts.SqlLiteFuncts(connect_params, table_names)[source]

Bases: volttron.platform.dbutils.basedb.DbDriver

collect_aggregate(topic_ids, agg_type, start=None, end=None)[source]

This function should return the results of a aggregation query @param topic_ids: list of single topics @param agg_type: type of aggregation @param start: start time @param end: end time @return: aggregate value, count of number of records over which aggregation was computed

create_aggregate_store(agg_type, period)[source]

Create the data structure (table or collection) that is going to store the aggregate data for the give aggregation type and aggregation time period. Table name should be constructed as <agg_type>_<period>

Parameters:
  • agg_type – The type of aggregation. (avg, sum etc.)
  • agg_time_period – The time period of aggregation

:return - True if successful, False otherwise

get_agg_topic_map()[source]

Get a map of aggregate_topics to aggregate_topic_id

Returns:dict of format

{(agg_topic_name, agg_type, agg_time_period):agg_topic_id}

get_agg_topics()[source]

Get the list of aggregate topics available

Returns:list of tuples containing

(agg_topic_name, agg_type, agg_time_period, configured topics/topic name pattern)

get_aggregation_list()[source]

Return list of aggregation supported by the specific data store

Returns:list of aggregations
static get_tagging_query_from_ast(topic_tags_table, tup, tag_refs)[source]

Get a query condition syntax tree and generate sqlite query to query topic names by tags. It calls the get_compound_query to parse the abstract syntax tree tuples and then fixes the precedence

Example: # User input query string :

campus.geoPostalCode=”20500” and equip and boiler and “equip_tag 7” > 4

# Example output sqlite query

SELECT topic_prefix from test_topic_tags WHERE tag=”campusRef”
and value IN(
SELECT topic_prefix from test_topic_tags WHERE tag=”campus” and value=1 INTERSECT SELECT topic_prefix from test_topic_tags WHERE tag=”geoPostalCode” and value=”20500”

)

INTERSECT SELECT topic_prefix from test_tags WHERE tag=”equip” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag=”boiler” and value=1 INTERSECT SELECT topic_prefix from test_tags WHERE tag = “equip_tag 7” and value > 4

Parameters:
  • topic_tags_table – table to query
  • tup – parsed query string (abstract syntax tree)
  • tag_refs – dictionary of ref tags and its parent tag
Returns:

sqlite query

:rtype str

get_topic_map()[source]

Returns details of topics in database

Returns:two dictionaries.
  • First one maps topic_name.lower() to topic id and
  • Second one maps topic_name.lower() to topic name
insert_agg_topic_stmt()[source]
Returns:query string to insert an aggregate topic into database
insert_aggregate_stmt(table_name)[source]

The sql statement to insert collected aggregate for a given time period into database

Parameters:table_name – name of the table into which the aggregate data needs to be inserted
Returns:sql insert/replace statement to insert aggregate data for a specific time slice
Return type:str
insert_data_query()[source]
Returns:query string to insert data into database
insert_meta_query()[source]
Returns:query string to insert metadata for a topic into database
insert_topic_query()[source]
Returns:query string to insert a topic into database
manage_db_size(history_limit_timestamp, storage_limit_gb)[source]

Manage database size.

Parameters:
  • history_limit_timestamp – remove all data older than this timestamp
  • storage_limit_gb – remove oldest data until database is smaller than this value.
query(topic_ids, id_name_map, start=None, end=None, agg_type=None, agg_period=None, skip=0, count=None, order='FIRST_TO_LAST')[source]

This function should return the results of a query in the form:

{"values": [(timestamp1, value1), (timestamp2, value2), ...],
 "metadata": {"key1": value1, "key2": value2, ...}}

metadata is not required (The caller will normalize this to {} for you) @param topic_ids: topic_ids to query data for @param id_name_map: dictionary containing topic_id:topic_name @param start: @param end: @param agg_type: @param agg_period: @param skip: @param count: @param order:

query_topics_by_pattern(topic_pattern)[source]

Return a map of {topi_name.lower():topic_id} that matches the given pattern :param topic_pattern: pattern to match against topic_name :return:

record_table_definitions(table_defs, meta_table_name)[source]
regex_select(query, args, fetch_all=True, cache_size=None)[source]
static regexp(expr, item)[source]
replace_agg_meta_stmt()[source]
Returns:query string to insert metadata for an aggregate topic into

database

set_cache(cache_size)[source]
setup_aggregate_historian_tables(meta_table_name)[source]
setup_historian_tables()[source]

Create historian tables if necessary

update_agg_topic_stmt()[source]
Returns:query string to update an aggregate topic in database
update_topic_query()[source]
Returns:query string to update a topic in database

volttron.platform.dbutils.sqlutils module

volttron.platform.dbutils.sqlutils.get_dbfuncts_class(database_type)[source]