Source code for volttron.platform.vip.agent.subsystems.volttronfncs

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}




from datetime import datetime
import logging
import re
import weakref
import warnings

from .base import SubsystemBase
import gevent

__all__ = ['FNCS']


_log = logging.getLogger(__name__)


try:
    from fncs import fncs
    HAS_FNCS = True
except ImportError:
    HAS_FNCS = False
except OSError:
    HAS_FNCS = False


# noinspection PyMethodMayBeStatic
[docs]class FNCS(SubsystemBase): """ The fncs subsystem allows an integration point between VOLTTRON and FNCS. """ def __init__(self, owner, core, pubsub): self.core = weakref.ref(core) self.pubsub = weakref.ref(pubsub) self._federate_name = self.core().identity self._broker = "tcp://localhost:5570" self._time_delta = "1s" self._poll_timeout = 60 self._registered_fncs_topics = {} self._registered_fncs_topic_callbacks = {} self._current_step = 0 self._current_simulation_time = None self._simulation_start_time = None self._simulation_delta = None self._simulation_length = None self._simulation_started = False self._simulation_complete = False self._work_callback = None self._current_values = {} self._stop_agent_when_sim_complete = False
[docs] def initialize(self, sim_start_time, sim_length, topic_mapping, work_callback, federate_name=None, broker_location="tcp://localhost:5570", time_delta="1s", stop_agent_when_sim_complete=False): """ Configure the agent to act as a federated connection to FNCS sim_start_time - Wall clock time for the simulation start time (This is not used at present time other than to be available) sim_length - Time for the simulation to run. Should be formatted as <number><unit> i.e. 60s. topic_mapping - Maps fncs topics onto volttron topics. federate_name - MUST be unique to the broker. If None, then will be the identity of the current agent process. broker - tcp location of the fncs broker (defaults to tcp://localhost:5570) time_delta - Minimum timestep supported for the federate. stop_agent_when_sim_complete - Should we stop the agent when the simulation is completed. :param sim_start_time: :param sim_length: :param topic_mapping: :param work_callback: :param federate_name: :param broker_location: :param time_delta: :param poll_timeout: :return: """ self.__raise_if_not_installed() if fncs.is_initialized(): raise RuntimeError("Invalid state, fncs has alreayd been initialized") if not topic_mapping: raise ValueError("Must supply a topic mapping with topics to map onto.") if not sim_start_time: raise ValueError("sim_start_time must be specified.") if not sim_length: raise ValueError("sim_length must be specified.") if not time_delta: raise ValueError("time_delta must be specified.") if not federate_name: raise ValueError("federate_name must be specified.") if not broker_location: raise ValueError("broker_location must be specified.") if not work_callback: raise ValueError("work_callback must be specified.") if not isinstance(sim_start_time, datetime): raise ValueError("sim_start_time must be a datetime object.") self._broker = broker_location self._time_delta = time_delta self._current_simulation_time = self._simulation_start_time = sim_start_time self._simulation_delta = self.parse_time(time_delta) self._simulation_length = self.parse_time(sim_length) if federate_name: self._federate_name = federate_name self._work_callback = work_callback for k, v in topic_mapping.items(): if not v.get('fncs_topic'): raise ValueError("Invalid fncs_topic specified in key {}.".format(k)) entry = dict(fncs_topic=v.get('fncs_topic')) if 'volttron_topic' in v: entry['volttron_topic'] = v['volttron_topic'] self._registered_fncs_topics[k] = entry self.__register_federate() self._simulation_started = False self._simulation_complete = False self._stop_agent_when_sim_complete = stop_agent_when_sim_complete
def __register_federate(self): self.__raise_if_not_installed() cfg = """name = {0[name]} time_delta = {0[time_delta]} broker = {0[broker]} """.format(dict(name=self._federate_name, broker=self._broker, time_delta=self._time_delta)) if self._registered_fncs_topics: cfg += "values" for k, v in self._registered_fncs_topics.items(): cfg += "\n\t{}\n\t\ttopic = {}\n".format(k, v['fncs_topic']) if v.get("default"): cfg += "\t\tdefault = {}\n".format(v.get('default')) if v.get("data_type"): cfg += "\t\ttype = {}\n".format(v.get('data_type')) if v.get("list"): cfg += "\t\tlist = true\n" _log.debug(cfg) cfg = cfg.replace("\t", " ") fncs.initialize(cfg) _log.debug("After initialized!") if not fncs.is_initialized(): raise RuntimeError("Intialization error for fncs.")
[docs] def start_simulation(self): """ Begin the main fncs loop :return: """ self.__raise_if_not_installed() if not fncs.is_initialized(): raise ValueError("intialized must be called before starting simulation") gevent.spawn(self._fncs_loop) # Allow the spawned greenlet to run. gevent.sleep(0.1)
@property def current_simulation_step(self): """ returns the current fncs timestep. :return: """ self.__raise_if_not_installed() return self._current_step
[docs] def next_timestep(self): """ Advances the fncs timestep to the next time delta. :return: """ self.__raise_if_not_installed() granted_time = fncs.time_request(self._current_step + self._simulation_delta) self._raise_if_error("fncs.time_request") self._current_step = granted_time _log.debug("Granted time is: {}".format(granted_time))
[docs] def getvalues(self): return self._current_values
[docs] def parse_time(self, time_string): """ Parses a <number><unit> i.e. 60s to a fncs timestep number. :param time_string: :return: """ parssed_time = re.findall(r'(\d+)(\s?)(\D+)', time_string) if len(parssed_time) > 0: inTime = int(parssed_time[0][0]) inUnit = parssed_time[0][2] if 's' in inUnit[0] or 'S' in inUnit[0]: timeMultiplier = 1 elif 'm' in inUnit[0] or 'M' in inUnit[0]: timeMultiplier = 60 elif 'h' in inUnit[0] or 'H' in inUnit[0]: timeMultiplier = 3600 elif 'd' in inUnit[0] or 'D' in inUnit[0]: timeMultiplier = 86400 else: warnings.warn("Unknown time unit supplied. Defaulting to seconds.") timeMultiplier = 1 else: raise RuntimeError( "Unable to parse run time argument. Please provide run time in the following format: #s, #m, #h, #d, or #y.") return inTime * timeMultiplier
[docs] def publish(self, topic, message): """ publish a topic to the fncs bus. The publish will only be sent if there is a federate subscribed to the topic that is being published. :param topic: :param message: :return: """ self.__raise_if_not_installed() _log.debug("Publishing to: {}".format(topic)) fncs.publish(topic, str(message)) self._raise_if_error("publishing topic: {} message: {}".format(topic, message))
[docs] def publish_anon(self, topic, message): """ publish an anonymous topic to the fncs bus. :param topic: :param message: :return: """ self.__raise_if_not_installed() fncs.publish_anon(topic, message) self._raise_if_error("publishing anon topic: {} message: {}".format(topic, message))
[docs] def reset(self): self.__raise_if_not_installed() fncs.die()
def _fncs_loop(self): _log.info("Starting fncs loop") self._simulation_started = True while self._current_step < self._simulation_length: # Block until the work is done here. subKeys = fncs.get_events() self._raise_if_error("After get_events") self._current_values.clear() for x in subKeys: fncs_topic = self._registered_fncs_topics[x].get('fncs_topic') self._current_values[fncs_topic] = fncs.get_value(x) if not fncs.is_initialized(): fncs.die() raise RuntimeError("FNCS unexpected error after get_values") self._work_callback() subKeys = fncs.get_events() if len(subKeys) > 0: for x in subKeys: subkeyvalue = fncs.get_value(x) volttron_topic = self._registered_fncs_topics[x].get('volttron_topic') if volttron_topic: self.pubsub().publish('pubsub', topic=volttron_topic, message=subkeyvalue) # This allows other event loops to run gevent.sleep(0.000000001) self._simulation_complete = True fncs.finalize() if self._stop_agent_when_sim_complete: self.core().stop() def _raise_if_error(self, location): if not fncs.is_initialized(): fncs.die() raise RuntimeError("FNCS unexpected error: {}".format(location)) @property def fncs_installed(self): """ Allows caller to determine if the fncs module is available. """ return HAS_FNCS def __raise_if_not_installed(self): if not self.fncs_installed: raise RuntimeError("Missing fncs python library.") @property def fncs_version(self): self.__raise_if_not_installed() return fncs.get_version() @property def simulation_running(self): return self._simulation_started and not self._simulation_complete @property def simulation_started(self): return self._simulation_started @property def simulation_complete(self): return self._simulation_complete @property def current_values(self): return self._current_values.copy()