Source code for volttron.platform.dbutils.mysqlfuncts

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
# Copyright 2017, Battelle Memorial Institute.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
# under Contract DE-AC05-76RL01830
# }}}
import ast
import logging
from collections import defaultdict

import pytz
import re
from basedb import DbDriver
from mysql.connector import Error as MysqlError
from mysql.connector import errorcode as mysql_errorcodes
from volttron.platform.agent import utils
from volttron.platform.agent import json as jsonapi

_log = logging.getLogger(__name__)

Implementation of Mysql database operation for
:py:class:`sqlhistorian.historian.SQLHistorian` and
For method details please refer to base class
[docs]class MySqlFuncts(DbDriver): def __init__(self, connect_params, table_names): # kwargs['dbapimodule'] = 'mysql.connector' self.MICROSECOND_SUPPORT = None self.data_table = None self.topics_table = None self.meta_table = None self.agg_topics_table = None self.agg_meta_table = None if table_names: self.data_table = table_names['data_table'] self.topics_table = table_names['topics_table'] self.meta_table = table_names['meta_table'] self.agg_topics_table = table_names.get('agg_topics_table', None) self.agg_meta_table = table_names.get('agg_meta_table', None) # This is needed when reusing the same connection. Else cursor returns # cached data even if we create a new cursor for each query and # close the cursor after fetching results connect_params['autocommit'] = True super(MySqlFuncts, self).__init__('mysql.connector', **connect_params)
[docs] def init_microsecond_support(self): rows ="SELECT version()", None) p = re.compile('(\d+)\D+(\d+)\D+(\d+)\D*') version_nums = p.match(rows[0][0]).groups() if int(version_nums[0]) < 5: self.MICROSECOND_SUPPORT = False elif int(version_nums[1]) < 6: self.MICROSECOND_SUPPORT = False elif int(version_nums[2]) < 4: self.MICROSECOND_SUPPORT = False else: self.MICROSECOND_SUPPORT = True
[docs] def setup_historian_tables(self): if self.MICROSECOND_SUPPORT is None: self.init_microsecond_support() rows ="show tables like %s", [self.data_table]) if rows: _log.debug("Found table {}. Historian table exists".format( self.data_table)) return try: if self.MICROSECOND_SUPPORT: self.execute_stmt( 'CREATE TABLE IF NOT EXISTS ' + self.data_table + ' (ts timestamp(6) NOT NULL,\ topic_id INTEGER NOT NULL, \ value_string TEXT NOT NULL, \ UNIQUE(topic_id, ts))') else: self.execute_stmt( 'CREATE TABLE IF NOT EXISTS ' + self.data_table + ' (ts timestamp NOT NULL,\ topic_id INTEGER NOT NULL, \ value_string TEXT NOT NULL, \ UNIQUE(topic_id, ts))') self.execute_stmt('''CREATE INDEX data_idx ON ''' + self.data_table + ''' (ts ASC)''') self.execute_stmt('''CREATE TABLE IF NOT EXISTS ''' + self.topics_table + ''' (topic_id INTEGER NOT NULL AUTO_INCREMENT, topic_name varchar(512) NOT NULL, PRIMARY KEY (topic_id), UNIQUE(topic_name))''') self.execute_stmt('''CREATE TABLE IF NOT EXISTS ''' + self.meta_table + '''(topic_id INTEGER NOT NULL, metadata TEXT NOT NULL, PRIMARY KEY(topic_id))''') self.commit() _log.debug("Created data topics and meta tables") except MysqlError as err: err_msg = "Error creating " \ "historian tables as the configured user. " \ "Please create the tables manually before " \ "restarting historian. Please refer to " \ "mysql-create*.sql files for create " \ "statements" if err.errno == mysql_errorcodes.ER_TABLEACCESS_DENIED_ERROR: err_msg = "Access denied : " + err_msg else: err_msg = err.msg + " : " + err_msg raise RuntimeError(err_msg)
[docs] def record_table_definitions(self, tables_def, meta_table_name): _log.debug( "In record_table_def {} {}".format(tables_def, meta_table_name)) self.execute_stmt( 'CREATE TABLE IF NOT EXISTS ' + meta_table_name + ' (table_id varchar(512) PRIMARY KEY, \ table_name varchar(512) NOT NULL, \ table_prefix varchar(512));') table_prefix = tables_def.get('table_prefix', "") insert_stmt = 'REPLACE INTO ' + meta_table_name + \ ' VALUES (%s, %s, %s)' self.execute_stmt(insert_stmt, ('data_table', tables_def['data_table'], table_prefix)) self.execute_stmt(insert_stmt, ('topics_table', tables_def['topics_table'], table_prefix)) self.execute_stmt( insert_stmt, ('meta_table', tables_def['meta_table'], table_prefix), commit=True)
[docs] def setup_aggregate_historian_tables(self, meta_table_name): _log.debug("CREATING AGG TABLES") table_names = self.read_tablenames_from_db(meta_table_name) self.data_table = table_names['data_table'] self.topics_table = table_names['topics_table'] _log.debug("In setup_aggregate_historian self.topics_table" " {}".format(self.topics_table)) self.meta_table = table_names['meta_table'] self.agg_topics_table = table_names.get('agg_topics_table', None) self.agg_meta_table = table_names.get('agg_meta_table', None) self.execute_stmt( 'CREATE TABLE IF NOT EXISTS ' + self.agg_topics_table + ' (agg_topic_id INTEGER NOT NULL AUTO_INCREMENT, \ agg_topic_name varchar(512) NOT NULL, \ agg_type varchar(512) NOT NULL, \ agg_time_period varchar(512) NOT NULL, \ PRIMARY KEY (agg_topic_id), \ UNIQUE(agg_topic_name, agg_type, agg_time_period));') self.execute_stmt( 'CREATE TABLE IF NOT EXISTS ' + self.agg_meta_table + '(agg_topic_id INTEGER NOT NULL, \ metadata TEXT NOT NULL, \ PRIMARY KEY(agg_topic_id));') self.commit() _log.debug("Created aggregate topics and meta tables")
[docs] def query(self, topic_ids, id_name_map, start=None, end=None, skip=0, agg_type=None, agg_period=None, count=None, order="FIRST_TO_LAST"): table_name = self.data_table if agg_type and agg_period: table_name = agg_type + "_" + agg_period query = '''SELECT topic_id, ts, value_string FROM ''' + table_name + ''' {where} {order_by} {limit} {offset}''' if self.MICROSECOND_SUPPORT is None: self.init_microsecond_support() where_clauses = ["WHERE topic_id = %s"] args = [topic_ids[0]] if start is not None: if start.tzinfo != pytz.UTC: start = start.astimezone(pytz.UTC) if not self.MICROSECOND_SUPPORT: start_str = start.isoformat() start = start_str[:start_str.rfind('.')] if end is not None: if end.tzinfo !=pytz.UTC: end = end.astimezone(pytz.UTC) if not self.MICROSECOND_SUPPORT: end_str = end.isoformat() end = end_str[:end_str.rfind('.')] if start and end and start == end: where_clauses.append("ts = %s") args.append(start) else: if start: where_clauses.append("ts >= %s") args.append(start) if end: where_clauses.append("ts < %s") args.append(end) where_statement = ' AND '.join(where_clauses) order_by = 'ORDER BY ts ASC' if order == 'LAST_TO_FIRST': order_by = ' ORDER BY topic_id DESC, ts DESC' # can't have an offset without a limit # -1 = no limit and allows the user to # provide just an offset if count is None: count = 100 limit_statement = 'LIMIT %s' args.append(int(count)) offset_statement = '' if skip > 0: offset_statement = 'OFFSET %s' args.append(skip) _log.debug("About to do real_query") values = defaultdict(list) for topic_id in topic_ids: args[0] = topic_id values[id_name_map[topic_id]] = [] real_query = query.format(where=where_statement, limit=limit_statement, offset=offset_statement, order_by=order_by) _log.debug("Real Query: " + real_query) _log.debug("args: " + str(args)) cursor =, args, fetch_all=False) if cursor: for _id, ts, value in cursor: values[id_name_map[topic_id]].append( (utils.format_timestamp(ts.replace(tzinfo=pytz.UTC)), jsonapi.loads(value))) if cursor is not None: cursor.close() return values
[docs] def insert_meta_query(self): return '''REPLACE INTO ''' + self.meta_table + ''' values(%s, %s)'''
[docs] def insert_data_query(self): return '''REPLACE INTO ''' + self.data_table + \ ''' values(%s, %s, %s)'''
[docs] def insert_topic_query(self): _log.debug("In insert_topic_query - self.topic_table " "{}".format(self.topics_table)) return '''INSERT INTO ''' + self.topics_table + ''' (topic_name) values (%s)'''
[docs] def update_topic_query(self): return '''UPDATE ''' + self.topics_table + ''' SET topic_name = %s WHERE topic_id = %s'''
[docs] def get_aggregation_list(self): return ['AVG', 'MIN', 'MAX', 'COUNT', 'SUM', 'BIT_AND', 'BIT_OR', 'BIT_XOR', 'GROUP_CONCAT', 'STD', 'STDDEV', 'STDDEV_POP', 'STDDEV_SAMP', 'VAR_POP', 'VAR_SAMP', 'VARIANCE']
[docs] def insert_agg_topic_stmt(self): _log.debug("Insert aggregate topics stmt inserts " "into {}".format(self.agg_topics_table)) return '''INSERT INTO ''' + self.agg_topics_table + ''' (agg_topic_name, agg_type, agg_time_period ) values (%s, %s, %s)'''
[docs] def update_agg_topic_stmt(self): return '''UPDATE ''' + self.agg_topics_table + ''' SET agg_topic_name = %s WHERE agg_topic_id = %s '''
[docs] def replace_agg_meta_stmt(self): return '''REPLACE INTO ''' + self.agg_meta_table + ''' values(%s, %s)'''
[docs] def get_topic_map(self): q = "SELECT topic_id, topic_name FROM " + self.topics_table + ";" rows =, None) _log.debug("loading topic map from db") id_map = dict() name_map = dict() for t, n in rows: id_map[n.lower()] = t name_map[n.lower()] = n _log.debug(id_map) _log.debug(name_map) return id_map, name_map
[docs] def get_agg_topics(self): _log.debug("in get_agg_topics") try: query = "SELECT agg_topic_name, agg_type, agg_time_period, " \ "metadata FROM " + self.agg_topics_table + " as t, " + \ self.agg_meta_table + " as m WHERE t.agg_topic_id = " \ "m.agg_topic_id " rows =, None) topics = [] for row in rows: meta = ast.literal_eval(row[3])['configured_topics'] topics.append((row[0], row[1], row[2], meta)) return topics except MysqlError as e: if e.errno == mysql_errorcodes.ER_NO_SUCH_TABLE: return [] else: raise
[docs] def get_agg_topic_map(self): _log.debug("in get_agg_topic_map") try: q = "SELECT agg_topic_id, agg_topic_name, agg_type, " \ "agg_time_period " \ "FROM " + self.agg_topics_table rows =, None) _log.debug("loading agg_topic map from db") id_map = dict() for row in rows: _log.debug("rows from aggregate_topics {}".format(row)) id_map[(row[1].lower(), row[2], row[3])] = row[0] return id_map except MysqlError as e: if e.errno == mysql_errorcodes.ER_NO_SUCH_TABLE: return {} else: raise
[docs] def query_topics_by_pattern(self, topic_pattern): q = "SELECT topic_id, topic_name FROM " + self.topics_table + \ " WHERE lower(topic_name) REGEXP lower('" + topic_pattern + "');" rows =, None) _log.debug("loading topic map from db") id_map = dict() for t, n in rows: id_map[n] = t _log.debug("topics that matched the pattern {} : {}".format( topic_pattern, id_map)) return id_map
[docs] def create_aggregate_store(self, agg_type, agg_time_period): table_name = agg_type + '''_''' + agg_time_period if self.MICROSECOND_SUPPORT is None: self.init_microsecond_support() stmt = "CREATE TABLE IF NOT EXISTS " + table_name + \ " (ts timestamp(6) NOT NULL, topic_id INTEGER NOT NULL, " \ "value_string TEXT NOT NULL, topics_list TEXT," \ " UNIQUE(topic_id, ts)," \ "INDEX (ts ASC))" if not self.MICROSECOND_SUPPORT: stmt = "CREATE TABLE IF NOT EXISTS " + table_name + \ " (ts timestamp NOT NULL, topic_id INTEGER NOT NULL, " \ "value_string TEXT NOT NULL, topics_list TEXT," \ " UNIQUE(topic_id, ts)," \ "INDEX (ts ASC))" return self.execute_stmt(stmt, commit=True)
[docs] def insert_aggregate_stmt(self, table_name): return '''REPLACE INTO ''' + table_name + \ ''' values(%s, %s, %s, %s)'''
[docs] def collect_aggregate(self, topic_ids, agg_type, start=None, end=None): if isinstance(agg_type, str): if agg_type.upper() not in ['AVG', 'MIN', 'MAX', 'COUNT', 'SUM']: raise ValueError( "Invalid aggregation type {}".format(agg_type)) query = '''SELECT ''' \ + agg_type + '''(value_string), count(value_string) FROM ''' \ + self.data_table + ''' {where}''' where_clauses = ["WHERE topic_id = %s"] args = [topic_ids[0]] if len(topic_ids) > 1: where_str = "WHERE topic_id IN (" for _ in topic_ids: where_str += "%s, " where_str = where_str[:-2] # strip last comma and space where_str += ") " where_clauses = [where_str] args = topic_ids[:] if start is not None: where_clauses.append("ts >= %s") if self.MICROSECOND_SUPPORT: args.append(start) else: start_str = start.isoformat() args.append(start_str[:start_str.rfind('.')]) if end is not None: where_clauses.append("ts < %s") if self.MICROSECOND_SUPPORT: args.append(end) else: end_str = end.isoformat() args.append(end_str[:end_str.rfind('.')]) where_statement = ' AND '.join(where_clauses) real_query = query.format(where=where_statement) _log.debug("Real Query: " + real_query) _log.debug("args: " + str(args)) rows =, args) if rows: return rows[0][0], rows[0][1] else: return 0, 0