Source code for topic_watcher.agent

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}

import logging
import os

import gevent

import sqlite3
import datetime

from zmq import ZMQError

from volttron.platform.agent.known_identities import PLATFORM_TOPIC_WATCHER
from volttron.platform.agent import utils
from volttron.platform.messaging.health import Status, STATUS_BAD, STATUS_GOOD
from volttron.platform.vip.agent import Agent, Core, RPC
from volttron.platform.vip.agent.utils import build_agent
from volttron.platform.agent.utils import get_aware_utc_now
from volttron.platform.scheduling import periodic

utils.setup_logging()
_log = logging.getLogger(__name__)

__version__ = '2.1'


[docs]class AlertAgent(Agent): def __init__(self, config_path, **kwargs): super(AlertAgent, self).__init__(**kwargs) self.config = utils.load_config(config_path) self.group_instances = {} self._connection = None self.publish_settings = self.config.get('publish-settings') self._remote_agent = None self._creating_agent = False self._resetting_remote_agent = False self.publish_remote = False self.publish_local = True if self.publish_settings: self.publish_local = self.publish_settings.get('publish-local', True) self.publish_remote = self.publish_settings.get('publish-remote', False) remote = self.publish_settings.get('remote') if self.publish_remote and not remote: raise ValueError("Configured publish-remote without remote section") self.remote_identity = remote.get('identity', None) self.remote_serverkey = remote.get('serverkey', None) self.remote_address = remote.get('vip-address', None) # The remote serverkey need not be specified if the serverkey is added # to the known hosts file. If it is not specified then the call to # build agent will fail. Note not sure what rabbit will do in this # case # # TODO: check rabbit. if self.publish_remote: assert self.remote_identity assert self.remote_address @property def remote_agent(self): if self._remote_agent is None: if not self._creating_agent: self._creating_agent = True try: # Single method to connect to remote instance in following combinations # zmq -> zmq # rmq -> rmq enabled with web # zmq -> zmq enabled with web # rmq -> zmq enabled with web value = self.core.connect_remote_platform(self.remote_address, serverkey=self.remote_serverkey) if isinstance(value, Agent): self._remote_agent = value self._remote_agent.vip.ping("").get(timeout=2) self.vip.health.set_status(STATUS_GOOD) else: _log.error("Exception creation remote agent") status_context = "Couldn't connect to remote platform at: {}".format( self.remote_address) _log.error(status_context) self._remote_agent = None except (gevent.Timeout, ZMQError): _log.error("Exception creation remote agent") status_context = "Couldn't connect to remote platform at: {}".format( self.remote_address) _log.error(status_context) self._remote_agent = None self.vip.health.set_status(STATUS_BAD, status_context) finally: self._creating_agent = False return self._remote_agent
[docs] def reset_remote_agent(self): if not self._resetting_remote_agent and not self._creating_agent: if self._remote_agent is not None: self._remote_agent.core.stop() self._remote_agent = None self._resetting_remote_agent = False
[docs] @Core.receiver('onstart') def onstart(self, sender, **kwargs): """ Setup database tables for persistent logs """ db_dir = os.getcwd() data_dir = "" if utils.is_secure_mode(): for d in os.listdir(os.path.basename(os.getcwd())): if d.endswith(".agent-data"): data_dir = d break if data_dir: db_dir = os.path.join(os.getcwd(), data_dir) self._connection = sqlite3.connect( os.path.join(db_dir, 'alert_log.sqlite'), detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) c = self._connection.cursor() c.execute("CREATE TABLE IF NOT EXISTS topic_log( " "topic TEXT, " "last_seen_before_timeout TIMESTAMP, " "first_seen_after_timeout TIMESTAMP," "PRIMARY KEY(topic, last_seen_before_timeout))") c.execute("CREATE INDEX IF NOT EXISTS topic_index ON " "topic_log (topic)") c.execute("CREATE INDEX IF NOT EXISTS down_time_index ON " "topic_log (last_seen_before_timeout)") c.execute("CREATE INDEX IF NOT EXISTS up_time_index ON " "topic_log (first_seen_after_timeout)") c.execute("CREATE TABLE IF NOT EXISTS agent_log (" "start_time TIMESTAMP, " "stop_time TIMESTAMP)") c.execute("CREATE INDEX IF NOT EXISTS stop_ts_index ON " "agent_log (stop_time)") c.execute("INSERT INTO agent_log(start_time) values(?)", (get_aware_utc_now(),)) c.close() self._connection.commit() for group_name, config in self.config.items(): if group_name != 'publish-settings': self.group_instances[group_name] = self.create_alert_group(group_name, config)
[docs] def create_alert_group(self, group_name, config): group = AlertGroup(group_name, config, self._connection, main_agent=self, publish_local=self.publish_local, publish_remote=self.publish_remote) return group
[docs] @Core.receiver('onstop') def onstop(self, sender, **kwargs): c = self._connection.cursor() c.execute("UPDATE agent_log set stop_time = ? " " WHERE start_time = (SELECT max(start_time) from agent_log)", (get_aware_utc_now(),)) c.close() gevent.sleep(0.1) self._connection.commit() self._connection.close()
[docs] @RPC.export def watch_topic(self, group, topic, timeout): """RPC method Listen for a topic to be published within a given number of seconds or send alerts. If the given group is new creates and starts an instance of AlertGroup agent for the new group. The alert group agent, onstart, will start watching for the given topics :pararm group: Group that should watch the topic. :type group: str :param topic: Topic expected to be published. :type topic: str :param timeout: Seconds before an alert is sent. :type timeout: int """ if self.group_instances.get(group) is None: self.group_instances[group] = self.create_alert_group(group, {topic: timeout}) else: self.group_instances[group].watch_topic(topic, timeout) self.group_instances[group].restart_timer()
[docs] @RPC.export def watch_device(self, group, topic, timeout, points): """RPC method Watch a device's ALL topic and expect points. If the given group is new creates and starts an instance of group agent for the new group. The group onstart will start watching for the given device points :pararm group: Group that should watch the device. :type group: str :param topic: Topic expected to be published. :type topic: str :param timeout: Seconds before an alert is sent. :type timeout: int :param points: Points to expect in the publish message. :type points: [str] """ if self.group_instances.get(group) is None: self.group_instances[group] = self.create_alert_group( group, {topic: {"seconds": timeout, "points": points}}) else: self.group_instances[group].watch_device(topic, timeout, points) self.group_instances[group].restart_timer()
[docs] @RPC.export def ignore_topic(self, group, topic): """RPC method Remove a topic from agent's watch list. Alerts will no longer be sent if a topic stops being published. :param group: Group that should ignore the topic. :type group: str :param topic: Topic to remove from the watch list. :type topic: str """ group = self.group_instances[group] group.ignore_topic(topic)
[docs] @Core.schedule(periodic(1)) def decrement_ttl(self): """Periodic call Used to maintain the time since each topic's last publish. Sends an alert if any topics are missing. """ # Loop through each alert group for name in self.group_instances: topics_timedout = set() alert_topics = set() # Loop through topics in alert group for topic in self.group_instances[name].wait_time.keys(): # Send an alert if a topic hasn't been self.group_instances[name].topic_ttl[topic] -= 1 if self.group_instances[name].topic_ttl[topic] <= 0: alert_topics.add(topic) self.group_instances[name].topic_ttl[topic] = self.group_instances[name].wait_time[topic] if topic not in self.group_instances[name].unseen_topics: topics_timedout.add(topic) self.group_instances[name].unseen_topics.add(topic) # Send an alert if a point hasn't been seen try: points = self.group_instances[name].point_ttl[topic].keys() for p in points: self.group_instances[name].point_ttl[topic][p] -= 1 if self.group_instances[name].point_ttl[topic][p] <= 0: self.group_instances[name].point_ttl[topic][p] = self.group_instances[name].wait_time[topic] alert_topics.add((topic, p)) if (topic, p) not in self.group_instances[name].unseen_topics: topics_timedout.add((topic, p)) self.group_instances[name].unseen_topics.add((topic, p)) except KeyError: pass if alert_topics: try: self.group_instances[name].send_alert(list(alert_topics)) except ZMQError: self.group_instances[name].main_agent.reset_remote_agent() if topics_timedout: self.group_instances[name].log_timeout(list(topics_timedout))
[docs]class AlertGroup(): def __init__(self, group_name, config, connection, main_agent, publish_local=True, publish_remote=False): self.group_name = group_name self.connection = connection self.config = config self.main_agent = main_agent self.wait_time = {} self.topic_ttl = {} self.point_ttl = {} self.unseen_topics = set() self.last_seen = {} self.publish_local = publish_local self.publish_remote = publish_remote self.parse_config()
[docs] def parse_config(self): _log.info("Listening for alert group {}".format(self.group_name)) config = self.config for topic in config.keys(): # Optional config option with a list of points that # might not be published. if type(config[topic]) is dict: point_config = config[topic] self.watch_device(topic, point_config["seconds"], point_config["points"]) # Default config option else: timeout = config[topic] self.watch_topic(topic, timeout)
[docs] def watch_topic(self, topic, timeout): """Listen for a topic to be published within a given number of seconds or send alerts. :param topic: Topic expected to be published. :type topic: str :param timeout: Seconds before an alert is sent. :type timeout: int """ self.wait_time[topic] = timeout self.topic_ttl[topic] = timeout self.main_agent.vip.pubsub.subscribe(peer='pubsub', prefix=topic, callback=self.reset_time)
[docs] def watch_device(self, topic, timeout, points): """Watch a device's ALL topic and expect points. This method calls the watch topic method so both methods don't need to be called. :param topic: Topic expected to be published. :type topic: str :param timeout: Seconds before an alert is sent. :type timeout: int :param points: Points to expect in the publish message. :type points: [str] """ self.point_ttl[topic] = {} for p in points: self.point_ttl[topic][p] = timeout self.watch_topic(topic, timeout)
[docs] def ignore_topic(self, topic): """Remove a topic from the group watchlist :param topic: Topic to remove from the watch list. :type topic: str """ _log.info("Removing topic {} from watchlist".format(topic)) self.main_agent.vip.pubsub.unsubscribe(peer='pubsub', prefix=topic, callback=self.reset_time) points = self.point_ttl.pop(topic, None) self.topic_ttl.pop(topic, None) self.wait_time.pop(topic, None) self.unseen_topics.remove(topic) for p in points: self.unseen_topics.remove((topic, p))
[docs] def restart_timer(self): """ Reset timer for all topics in this alert group. Should be called when a new topic is added to a currently active alert group """ for t in self.topic_ttl: self.topic_ttl[t] = self.wait_time[t] for topic in self.point_ttl: for point in self.point_ttl[topic]: self.point_ttl[topic][point] = self.wait_time[topic]
[docs] def reset_time(self, peer, sender, bus, topic, headers, message): """Callback for topic subscriptions Resets the timeout for topics and devices when publishes are received. """ up_time = get_aware_utc_now() # TODO: What is the use case for this IF STMT # topic should always be there?? Ask Craig if topic not in self.wait_time: found = False # if topic isn't in wait time we need to figure out the # prefix topic so that we can determine the wait time for x in self.wait_time: # TODO: order the wait_time topics so furthest down the tree wins. if topic.startswith(x): topic = x found = True break if not found: _log.debug("No configured topic prefix for topic {}".format( topic)) return log_topics = set() # Reset the standard topic timeout self.topic_ttl[topic] = self.wait_time[topic] self.last_seen[topic] = get_aware_utc_now() if topic in self.unseen_topics: self.unseen_topics.remove(topic) # log time we saw topic only if we had earlier recorded a timeout log_topics.add(topic) # Reset timeouts on volatile points if topic in self.point_ttl: received_points = message[0].keys() expected_points = self.point_ttl[topic].keys() for point in expected_points: if point in received_points: self.point_ttl[topic][point] = self.wait_time[topic] self.last_seen[(topic, point)] = get_aware_utc_now() if (topic, point) in self.unseen_topics: self.unseen_topics.remove((topic, point)) log_topics.add((topic, point)) if log_topics: self.log_time_up(up_time, log_topics)
[docs] def log_timeout(self, log_topics): """ logs into database the last time a topic was seen before a time out or current time if topic was never seen from the time of alert agent start. :param log_topics: The list of configured topics for which message was received. Entries in this list can either be topic string or a tuple containing an all topic and a point name. :type log_topics: list """ values = [] for topic in log_topics: values.append((self.get_topic_name(topic), self.last_seen.get(topic))) c = self.connection.cursor() c.executemany( "INSERT INTO topic_log (topic, last_seen_before_timeout) " "VALUES (?, ?)", values) c.close() self.connection.commit()
[docs] def log_time_up(self, up_time, log_topics): """ Log into topic_log table when the alert agent found publishes to a topic after the last time it timed out. :param up_time: Time when message was published to the topic. Note that this need not be the same as the timestamp in message header which gets recorded in the historian. For example, when older device scrapes are replayed. :param log_topics: The list of configured topics for which message was received. Entries in this list can either be topic string or a tuple containing an all topic and a point name. :type up_time: datetime :type log_topics: list """ c = self.connection.cursor() for topic in log_topics: c.execute("UPDATE topic_log " "SET first_seen_after_timeout = ? " "WHERE rowid = " " (SELECT max(rowid) FROM topic_log " " WHERE topic = ? )", (up_time, self.get_topic_name(topic))) c.close() self.connection.commit()
[docs] @staticmethod def get_topic_name(parts): """ Return the input parameter if input parameter is a string. If input parameter is a tuple, expects an all topic as the first list element and point name as the second element of the tuple. strips "all" from the end of topic name and add the point name to it to get point topic string :param parts: topic name or (all topic, point name) :type parts: str or list :return: topic string :rtype: str """ if isinstance(parts, str): return parts elif parts[0].endswith("/all"): return parts[0][:-3] + parts[1] else: raise ValueError("Invalid topic and point name:{} Only all " "topics can use multiple points in an " "alert group. For topics not ending in " "/all use standard topic configuration format in " "alert agent configuration".format(parts))
[docs] def send_alert(self, unseen_topics): """Send an alert for the group, summarizing missing topics. :param unseen_topics: List of topics that were expected but not received :type unseen_topics: list """ alert_key = "AlertAgent Timeout for group {}".format(self.group_name) _log.debug(f"unseen_topics {unseen_topics}") _log.debug(f"sorted : {sorted(unseen_topics, key = lambda x: x[0] if isinstance(x, tuple) else x)}") context = "Topic(s) not published within time limit: {}".format( sorted(unseen_topics, key = lambda x: x[0] if isinstance(x, tuple) else x)) status = Status.build(STATUS_BAD, context=context) if self.publish_remote: try: remote_agent = self.main_agent.remote_agent if not remote_agent: raise RuntimeError("Remote agent unavailable") else: remote_agent.vip.health.send_alert(alert_key, status) except gevent.Timeout: self.main_agent.vip.health.send_alert(alert_key, status) else: if self.publish_local: self.main_agent.vip.health.send_alert(alert_key, status) else: self.main_agent.vip.health.send_alert(alert_key, status)
[docs]def main(): utils.vip_main(AlertAgent, identity=PLATFORM_TOPIC_WATCHER, version=__version__)
if __name__ == '__main__': try: main() except KeyboardInterrupt: pass