Source code for mongotagging.tagging
# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import collections
import csv
import logging
import sys
from collections import OrderedDict
import pymongo
import re
from pkg_resources import resource_string, resource_exists
from pymongo.errors import BulkWriteError
from volttron.platform.agent import utils
from volttron.platform.agent.base_tagging import BaseTaggingService
from volttron.platform.dbutils import mongoutils
from volttron.platform.messaging.health import (STATUS_BAD, Status)
from volttron.utils.docs import doc_inherit
__version__ = "1.1"
utils.setup_logging()
_log = logging.getLogger(__name__)
TAGGING_SERVICE_SETUP_FAILED = 'TAGGING_SERVICE_SETUP_FAILED'
[docs]def tagging_service(config_path, **kwargs):
"""
This method is called by the :py:func:`tagging.main` to
parse the passed config file or configuration dictionary object, validate
the configuration entries, and create an instance of MongodbTaggingService
:param config_path: could be a path to a configuration file or can be a
dictionary object
:param kwargs: additional keyword arguments if any
:return: an instance of :py:class:`tagging.MongodbTaggingService`
"""
if isinstance(config_path, dict):
config_dict = config_path
else:
config_dict = utils.load_config(config_path)
if not config_dict.get('connection') or \
not config_dict.get('connection').get('params') or \
not config_dict.get('connection').get('params').get('database'):
raise ValueError("Missing database connection parameters. Agent "
"configuration should contain database connection "
"parameters with the details about type of database"
"and name of database. Please refer to sample "
"configuration file in Agent's source directory.")
utils.update_kwargs_with_config(kwargs, config_dict)
return MongodbTaggingService(**kwargs)
[docs]class MongodbTaggingService(BaseTaggingService):
"""This is a tagging service agent that writes data to a Mongo database.
For instance with large amount of tags and frequent tag queries, a NOSQL
database such as Mongodb would provide better efficiency than SQLite.
"""
def __init__(self, connection, table_prefix=None, **kwargs):
"""Initialise the tagging service.
:param connection: dictionary object containing the database
connection details
:param table_prefix: optional prefix to be used for all tag tables
:param kwargs: additional keyword arguments. (optional identity and
topic_replace_list used by parent classes)
"""
super(MongodbTaggingService, self).__init__(**kwargs)
self.connection = connection
self._client = mongoutils.get_mongo_client(connection['params'])
self.tags_collection = "tags"
self.tag_refs_collection = "tag_refs"
#self.units_table = "units" #in version 2
self.categories_collection = "categories"
self.topic_tags_collection = "topic_tags"
if table_prefix:
self.tags_collection = table_prefix + "_" + \
self.tags_collection
self.tag_refs_collection = table_prefix + "_" + \
self.tag_refs_collection
self.categories_collection = table_prefix + "_" + \
self.categories_collection
self.topic_tags_collection = table_prefix + "_" + \
self.topic_tags_collection
[docs] @doc_inherit
def setup(self):
"""
Read resource files and load list of valid tags, categories,
tags grouped by categories, list of reference tags and its parent.
:return:
"""
_log.debug("Setup of mongodb tagging agent")
err_message = ""
collections = []
db = None
try:
db = self._client.get_default_database()
collections = db.collection_names(include_system_collections=False)
_log.debug(collections)
except Exception as e:
err_message = "Unable to query list of existing tables from the " \
"database. Exception in init of tagging service: {}. " \
"Stopping tagging service agent".format(e.args)
collection = ""
try:
collection = self.tags_collection
if self.tags_collection in collections:
_log.info("{} collection exists. Assuming initial values have "
"been loaded".format(collection))
else:
self._init_tags(db)
self._init_category_tags(db)
collection = self.tag_refs_collection
if self.tag_refs_collection in collections:
_log.info("{} collection exists. Assuming initial values have "
"been loaded".format(collection))
else:
self._init_tag_refs(db)
collection = self.categories_collection
if self.categories_collection in collections:
_log.info("{} collection exists. Assuming initial values "
"have been loaded".format(collection))
else:
self._init_categories(db)
except Exception as e:
err_message = "Initialization of " + collection + \
" collection failed with exception: {}" \
"Stopping tagging service agent. ".format(e.args)
if err_message:
_log.error(err_message)
self.vip.health.set_status(STATUS_BAD,
"Initialization of tagging service "
"failed")
status = Status.from_json(self.vip.health.get_status_json())
# status.context = status.context + \
# " Exception: {}".format(e.args) + \
# " Stopping tagging service agent"
# _log.debug("status:{}".format(status))
self.vip.health.send_alert(TAGGING_SERVICE_SETUP_FAILED, status)
self.core.stop()
[docs] @doc_inherit
def load_valid_tags(self):
# Now cache list of tags and kind/type for validation during
# insert
db = self._client.get_default_database()
cursor = db[self.tags_collection].find({}, projection=['_id', 'kind'])
for record in cursor:
self.valid_tags[record['_id']] = record['kind']
[docs] @doc_inherit
def load_tag_refs(self):
# Now cache ref tags and its parent
db = self._client.get_default_database()
cursor = db[self.tag_refs_collection].find({}, projection=['_id',
'parent'])
for record in cursor:
self.tag_refs[record['_id']] = record['parent']
_log.debug("After load tag_refs is {}".format(self.tag_refs))
def _init_tags(self, db):
file_path = self.resource_sub_dir+'/tags.csv'
_log.debug("Loading file :" + file_path)
with open(file_path, 'r') as content_file:
csv_str = content_file.read()
if csv_str:
# csv.DictReader uses first line in file for column headings
# by default
dr = csv.DictReader(csv_str.splitlines())
bulk_tags = db[self.tags_collection].initialize_ordered_bulk_op()
for i in dr:
bulk_tags.insert({"_id":i['name'],
"kind":i['kind'],
"description":i['description']})
bulk_tags.execute()
else:
raise ValueError(
"Unable to load list of reference tags and its parent. No "
"such file: {}".format(file_path))
def _init_tag_refs(self, db):
file_path = self.resource_sub_dir+'/tag_refs.csv'
_log.debug("Loading file :" + file_path)
with open(file_path, 'r') as content_file:
csv_str = content_file.read()
if csv_str:
# csv.DictReader uses first line in file for column headings
# by default
dr = csv.DictReader(csv_str.splitlines())
bulk_tags = db[
self.tag_refs_collection].initialize_ordered_bulk_op()
for i in dr:
bulk_tags.insert({"_id":i['tag'],
"parent":i['parent_tag']})
bulk_tags.execute()
else:
raise ValueError(
"Unable to load list of reference tags and its parent. No "
"such file: {}".format(file_path))
def _init_categories(self, db):
file_path = self.resource_sub_dir + '/categories.csv'
_log.debug("Loading file :" + file_path)
with open(file_path, 'r') as content_file:
csv_str = content_file.read()
if csv_str:
dr = csv.DictReader(csv_str.splitlines())
bulk = db[
self.categories_collection].initialize_ordered_bulk_op()
for i in dr:
bulk.insert({"_id": i['name'],
"description": i['description']})
bulk.execute()
else:
_log.warning("No categories to initialize. No such file " + file_path)
def _init_category_tags(self, db):
file_path = self.resource_sub_dir + '/category_tags.txt'
_log.debug("Loading file :" + file_path)
with open(file_path, 'r') as content_file:
txt_str = content_file.read()
bulk_tags = db[self.tags_collection].initialize_ordered_bulk_op()
if txt_str:
current_category = ""
tags = set()
mapping = collections.defaultdict(set)
for line in txt_str.splitlines():
if not line or line.startswith("##"):
continue
if line.startswith("#") and line.endswith("#"):
new_category = line.strip()[1:-1]
if len(tags) > 0:
for tag in tags:
mapping[tag].add(current_category)
current_category = new_category
tags = set()
else:
temp= line.split(":") # ignore description
tags.update(re.split(" +", temp[0]))
if len(tags)>0:
for tag in tags:
mapping[tag].add(current_category)
for tag in mapping.keys():
bulk_tags.find({"_id": tag}).update(
{'$set': {"categories": list(mapping[tag])}})
bulk_tags.execute()
db[self.tags_collection].create_index(
[('categories', pymongo.ASCENDING)], background=True)
else:
_log.warning("No category to tags mapping to initialize. No such file " + file_path)
[docs] @doc_inherit
def query_categories(self, include_description=False, skip=0, count=None,
order="FIRST_TO_LAST"):
db = self._client.get_default_database()
order_by = pymongo.ASCENDING
if order == 'LAST_TO_FIRST':
order_by = pymongo.DESCENDING
skip_count = 0
if skip > 0:
skip_count = skip
if count is None:
cursor = db[self.categories_collection].find(
projection=['_id', 'description'], skip=skip_count,
sort=[('_id', order_by)])
else:
cursor = db[self.categories_collection].find(
projection=['_id', 'description'], skip=skip_count,
limit=count, sort=[('_id', order_by)])
result_dict = list(cursor)
results = OrderedDict()
for r in result_dict:
results[r['_id']] = r.get('description', "")
if include_description:
return list(results.items())
else:
return list(results.keys())
[docs] @doc_inherit
def query_tags_by_category(self, category, include_kind=False,
include_description=False, skip=0, count=None,
order="FIRST_TO_LAST"):
db = self._client.get_default_database()
order_by = pymongo.ASCENDING
if order == 'LAST_TO_FIRST':
order_by = pymongo.DESCENDING
skip_count = 0
if skip > 0:
skip_count = skip
_log.debug("category: {}".format(category))
if count is None:
cursor = db[self.tags_collection].find(
{'categories': {'$in': [category]}},
projection=['_id', 'kind', 'description'], skip=skip_count,
sort=[('_id', order_by)])
else:
cursor = db[self.tags_collection].find(
{'categories': {'$in': [category]}},
projection=['_id', 'kind', 'description'], skip=skip_count,
limit=count, sort=[('_id', order_by)])
records = list(cursor)
results = []
for r in records:
results_element = [r['_id']]
if include_kind:
results_element.append(r['kind'])
if include_description:
results_element.append(r['description'])
if include_kind or include_description:
results.append(results_element)
else:
results.append(r['_id'])
return results
[docs] @doc_inherit
def insert_topic_tags(self, tags, update_version=False):
db = self._client.get_default_database()
bulk = db[self.topic_tags_collection].initialize_unordered_bulk_op()
result = dict()
result['info'] = dict()
result['error'] = dict()
execute = False
for topic_pattern, topic_tags in tags.items():
for tag_name, tag_value in topic_tags.items():
if tag_name not in self.valid_tags:
raise ValueError(
"Invalid tag name:{}".format(tag_name))
# TODO: Validate and convert values based on tag kind/type
# for example, for Marker tags set value as true even if
# value passed is None.
# tag_value = get_tag_value(tag_value,
# self.valid_tags[tag_name])
if tag_name == 'id' and tag_value is not None:
_log.warning("id tags are not explicitly stored. "
"topic prefix servers as unique identifier for"
"an entity. id value sent({}) will not be "
"stored".format(tag_value))
prefixes = self.get_matching_topic_prefixes(topic_pattern)
if not prefixes:
result['error'][topic_pattern] = "No matching topic found"
continue
result['info'][topic_pattern] = []
for prefix in prefixes:
temp = topic_tags.copy()
temp['_id'] = prefix
temp['id'] = prefix
execute = True
bulk.find({'_id': prefix}).upsert().update_one(
{'$set': temp})
result['info'][topic_pattern].append(prefix)
if len(result['info'][topic_pattern]) == 1 and \
topic_pattern == result['info'][topic_pattern][0]:
# means value sent was actually some pattern so add
# info to tell user the list of topic prefix that matched
# the pattern sent
_log.debug("topic passed is exact name. Not pattern. Removing"
" from result info: {}".format(topic_pattern))
result['info'].pop(topic_pattern)
if execute:
try:
bulk.execute()
except BulkWriteError as bwe:
errors = bwe.details['writeErrors']
_log.error("bwe error count {}".format(len(errors)))
for e in errors:
_log.error(e['op'])
result['error'][e['op']['q']['_id']] = e['errmsg']
return result
[docs] @doc_inherit
def query_tags_by_topic(self, topic_prefix, include_kind=False,
include_description=False, skip=0, count=None,
order="FIRST_TO_LAST"):
db = self._client.get_default_database()
_log.debug("topic_prefix: {}".format(topic_prefix))
cursor = db[self.topic_tags_collection].find({"_id": topic_prefix},
projection={'_id': False})
l = list(cursor)
if l and len(l) == 1:
d = l[0]
else:
_log.debug("tags for topic_prefix {} is {}".format(topic_prefix,
l))
return []
reverse = False
if order == 'LAST_TO_FIRST':
reverse = True
ordered_result_dict = OrderedDict(sorted(d.items(), reverse=reverse))
_log.debug("Ordered tags: {}".format(ordered_result_dict))
#Now get the kind and description for each of the tag in earlier
# result dict
skip_count = 0
if skip:
skip_count = skip
if count is None:
count = -1
meta = {}
if include_description or include_kind:
cursor = db[self.tags_collection].find(
{"_id":{"$in":list(d.keys())}})
records = list(cursor)
for r in records:
meta[r['_id']] = (r['kind'], r['description'])
results = []
counter = 0
for tag, value in ordered_result_dict.items():
counter = counter + 1
if counter <= skip:
continue
if count < 0 or counter <= (count + skip_count):
results_element = [tag, value]
if include_kind:
results_element.append(meta[tag][0])
if include_description:
results_element.append(meta[tag][1])
results.append(results_element)
elif counter > (count+skip_count):
break
return results
[docs] @doc_inherit
def query_topics_by_tags(self, ast, skip=0, count=None, order=None):
if count is None:
count = 100
skip_count = 0
if skip > 0:
skip_count = skip
order_by = 1
if order == 'LAST_TO_FIRST':
order_by = -1
sub_queries = list()
find_cond = mongoutils.get_tagging_queries_from_ast(ast,
self.tag_refs,
sub_queries)
_log.debug("main query condition: {}".format(find_cond))
_log.debug("sub queries: {}".format(sub_queries))
db = self._client.get_default_database()
if sub_queries:
i = 1
for sub_query in sub_queries:
cursor = db[self.topic_tags_collection].find(sub_query,
['_id'])
result = [(row['_id']) for row in cursor]
cursor.close()
_log.debug("Subquery result is: {}".format(result))
_log.debug("Calling find replace for "
"temp val {}".format("##VOLTTRON_Q" + str(i)))
self._find_replace(find_cond, "##VOLTTRON_Q" + str(i), result)
_log.debug("condition after replace : {}".format(find_cond))
i += 1
cursor = db[self.topic_tags_collection].find(find_cond, ['_id'])
cursor = cursor.skip(skip_count).limit(count)
cursor = cursor.sort([("_id", order_by)])
topic_prefix = [(row['_id']) for row in cursor]
cursor.close()
return topic_prefix
def _find_replace(self, obj, temp_value, new_value):
# Utility function used to replace sub query place holders with
# results of the sub query since mongo does not support nested queries
_log.debug("In find_replace. obj ={} temp_val={} "
"new_val={}".format(obj, temp_value, new_value))
if not isinstance(obj, dict):
return
for k, v in obj.items():
_log.debug("k={} v={}".format(k, v))
if isinstance(v, dict):
_log.debug("Calling with obj {}".format(v))
found = self._find_replace(v, temp_value, new_value)
if found:
return
elif isinstance(v, list):
# and/or/in condition
for list_item in v:
found = self._find_replace(list_item, temp_value,
new_value)
if found:
return
else:
_log.debug("In find_replace. k={} v={} temp_val={} "
"new_val={}".format(
k, v, temp_value, new_value))
if v == temp_value:
obj[k] = new_value
return True
[docs]def main(argv=sys.argv):
""" Main entry point for the agent.
:param argv:
:return:
"""
try:
utils.vip_main(tagging_service, version=__version__)
except Exception as e:
print(e)
_log.exception('unhandled exception')
if __name__ == '__main__':
# Entry point for script
try:
sys.exit(main())
except KeyboardInterrupt:
pass