# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import logging
from volttron.platform.agent.known_identities import (
VOLTTRON_CENTRAL, CONTROL)
from volttron.platform.vip.agent import (Agent, RPC)
_log = logging.getLogger(__name__)
[docs]class VCConnection(Agent):
"""
This agent will connect to an instance with volttron.central agent connected
to it. The volttron.central agent will use this agent to communicate with
the platform.agent(vcp) running on the current instance of the platform.
"""
def __init__(self, **kwargs):
self._log = logging.getLogger(self.__class__.__name__)
super(VCConnection, self).__init__(**kwargs)
self._main_agent = None
self.server = None
self.peer = CONTROL
[docs] def set_main_agent(self, main_agent):
"""
The main agent is the VCP that is using this agent to connect to the
remote volttron instance.
:param main_agent: the agent that instantiated this one.
:type VolttronCentralPlatform:
"""
self._main_agent = main_agent
self.server = self._main_agent
[docs] def publish_to_vc(self, topic, message=None, headers={}):
"""
This method allows the main_agent to publish a message up to the
volttron.central instance.
:param topic:
:param message:
:param headers:
"""
self.vip.pubsub.publish('pubsub', topic, headers, message).get(timeout=5)
[docs] @RPC.export
def start_bacnet_scan(self, iam_topic, proxy_identity, low_device_id=None,
high_device_id=None, target_address=None,
scan_length=5):
"""
Starts a bacnet scan using the the named proxy_identity as the callee.
:param iam_topic:
:param proxy_identity:
:param low_device_id:
:param high_device_id:
:param target_address:
:param scan_length:
:return:
"""
self._main_agent.start_bacnet_scan(iam_vc_response_topic=iam_topic,
proxy_identity=proxy_identity,
low_device_id=low_device_id,
high_device_id=high_device_id,
target_address=target_address,
scan_length=scan_length)
[docs] @RPC.export
def get_instance_uuid(self):
"""
Retrieve the instance uuid for the vcp agent's instance.
:return:
"""
return self._main_agent.get_instance_uuid()
[docs] @RPC.export
def get_health(self):
"""
Retrieve the health of the vcp agent.
:return:
"""
return self._main_agent.vip.health.get_status()
[docs] @RPC.export
def start_agent(self, agent_uuid):
"""
Start an agent that is already present on the vcp instance.
:param agent_uuid:
:return:
"""
return self._main_agent.start_agent(agent_uuid)
[docs] @RPC.export
def stop_agent(self, agent_uuid):
"""
Stop an agent already running on the vcp instance.
:param agent_uuid:
:return:
"""
return self._main_agent.start_agent(agent_uuid)
[docs] @RPC.export
def restart(self, agent_uuid):
"""
Performs the stop and start operations on the vcp instance for an agent.
:param agent_uuid:
:return:
"""
stop_result = self.stop_agent(agent_uuid)
start_result = self.start_agent(agent_uuid)
return stop_result, start_result
[docs] @RPC.export
def agent_status(self, agent_uuid):
"""
Retrieves the status of a particular agent executing on the vcp
instance. The agent does not have to be executing in order to receive
it's status.
:param agent_uuid:
:return:
"""
return self._main_agent.agent_status(agent_uuid)
[docs] @RPC.export
def status_agents(self):
"""
Return all of the installed agents' statuses for the vcp instance.
:return:
"""
return self._main_agent.status_agents()
[docs] @RPC.export
def get_devices(self):
"""
Retrieves configuration entries from the config store that begin with
'devices'.
:return: dictionary of devices.
"""
self._log.debug("Getting devices in vcconnection.py")
return self._main_agent.get_devices()
[docs] @RPC.export
def publish_bacnet_props(self, proxy_identity, publish_topic, address,
device_id, filter=[]):
self._log.debug('Publishing bacnet props to topic: {}'.format(
publish_topic))
self._main_agent.publish_bacnet_props(
proxy_identity,
publish_topic,
address,
device_id,
filter=[])
[docs] @RPC.export
def store_agent_config(self, agent_identity, config_name, raw_contents,
config_type='raw'):
"""
Store an agent configuration on the volttron instance associated with
this agent.
:param agent_identity:
:param config_name:
:param raw_contents:
:param config_type:
:return: None
"""
return self._main_agent.store_agent_config(agent_identity, config_name,
raw_contents, config_type)
[docs] @RPC.export
def list_agent_configs(self, agent_identity):
"""
List the agent configuration files stored on the volttron instance
associated with this agent.
:param agent_identity: Agent identity to retrieve configuration from.
:return: A list of the configuration names.
"""
return self._main_agent.list_agent_configs(agent_identity)
[docs] @RPC.export
def get_agent_config(self, agent_identity, config_name, raw=True):
"""
Retrieve the configuration from the config store of the passed agent
identity.
:param agent_identity:
:param config_name:
:param raw:
:return: The stored configuration.
"""
return self._main_agent.get_agent_config(agent_identity, config_name,
raw)
[docs] @RPC.export
def delete_agent_config(self, agent_identity, config_name):
"""
Deletes the configuration from the config store of the passed agent
identity.
:param agent_identity:
:param config_name:
:return: The stored configuration.
"""
return self._main_agent.delete_agent_config(agent_identity, config_name)
[docs] @RPC.export
def subscribe_to_vcp(self, prefix, prefix_on_vc):
"""
Allows volttron.central to listen to the message bus on vcp instance.
:param prefix: The prefix to listen for.
:param prefix_on_vc:
The prefix to publish to on volttron central instance.
"""
self._log.info("VC subscribing to prefix: {}".format(prefix))
self._log.info("VCP will publish to {} on VC".format(prefix_on_vc))
def subscription_wrapper(peer, sender, bus, topic, headers,
message):
# We only publish up to vc for things that aren't forwarded.
if 'X-Forwarded' in headers:
return
self._log.debug("publishing to VC topic: {}".format(
prefix_on_vc + topic
))
# Prepend the specified prefix to the topic that was passed
# to the method
self.publish_to_vc(prefix_on_vc+topic, message, headers)
# Use the main agent to do the subscription on.
self._main_agent.vip.pubsub.subscribe('pubsub',
prefix,
subscription_wrapper)
[docs] @RPC.export
def call(self, platform_method, *args, **kwargs):
return self._main_agent.call(platform_method, *args, **kwargs)
[docs] def kill(self):
"""Dummy method to use install_agent_vctl"""
pass
[docs] def is_connected(self):
connected = self.vip.hello().get(timeout=5) is not None
self._log.debug("is_connected returning {}".format(connected))
return connected
[docs] def is_peer_connected(self, peer=VOLTTRON_CENTRAL):
connected = peer in self.vip.peerlist().get(timeout=5)
self._log.debug("is_connected returning {}".format(connected))
return connected
[docs] @RPC.export
def route_to_agent_method(self, id, agent_method, params):
"""
Calls a method on an installed agent running on the platform.
.. note::
This method only valid for installed agents not dynamic agents.
:param id:
:param agent_method:
:param params:
:return:
"""
self._log.debug("Routing method: {}".format(agent_method))
return self._main_agent.route_request(id, agent_method, params)
[docs] @RPC.export
def get_vip_addresses(self):
"""
Retrieves the vip addresses that were specified in the configuration
file or via command line.
:return:
"""
return self._main_agent.get_external_vip_addresses()
[docs] @RPC.export
def get_instance_name(self):
return self._main_agent.get_instance_name()
[docs] @RPC.export
def restart_agent(self, agent_uuid):
"""
Calls restart method on the vcp main agent instance.
.. note::
This method only valid for installed agents not dynamic agents.
:param agent_uuid:
:return:
"""
return self._main_agent.restart(agent_uuid)
[docs] @RPC.export
def list_agents(self):
"""
Calls list_agents method on the vcp main agent instance.
.. note::
This method only valid for installed agents not dynamic agents.
:return:
"""
return self._main_agent.list_agents()
[docs] @RPC.export
def install_agent(self, local_wheel_file):
"""
Installs
:param local_wheel_file:
:return:
"""
return self._main_agent.install_agent