# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import heapq
import inspect
import logging
import os
import platform as python_platform
import signal
import threading
import time
import urllib.parse
import uuid
import warnings
import weakref
from contextlib import contextmanager
from errno import ENOENT
from urllib.parse import urlsplit, parse_qs, urlunsplit
import gevent.event
from gevent.queue import Queue
from zmq import green as zmq
from zmq.green import ZMQError, EAGAIN, ENOTSOCK
from zmq.utils.monitor import recv_monitor_message
from volttron.platform import get_address
from volttron.platform import is_rabbitmq_available
from volttron.platform.agent import utils
from volttron.platform.agent.utils import load_platform_config, get_platform_instance_name
from volttron.platform.keystore import KeyStore, KnownHostsStore
from volttron.utils.rmq_mgmt import RabbitMQMgmt
from .decorators import annotate, annotations, dualmethod
from .dispatch import Signal
from .errors import VIPError
from .. import router
from ..rmq_connection import RMQConnection
from ..socket import Message
from ..zmq_connection import ZMQConnection
from .... import platform
if is_rabbitmq_available():
import pika
__all__ = ['BasicCore', 'Core', 'RMQCore', 'ZMQCore', 'killing']
_log = logging.getLogger(__name__)
class Periodic(object): # pylint: disable=invalid-name
''' Decorator to set a method up as a periodic callback.
The decorated method will be called with the given arguments every
period seconds while the agent is executing its run loop.
'''
def __init__(self, period, args=None, kwargs=None, wait=0):
'''Store period (seconds) and arguments to call method with.'''
assert period > 0
self.period = period
self.args = args or ()
self.kwargs = kwargs or {}
self.timeout = wait
def __call__(self, method):
'''Attach this object instance to the given method.'''
annotate(method, list, 'core.periodics', self)
return method
def _loop(self, method):
# pylint: disable=missing-docstring
# Use monotonic clock provided on hu's loop instance.
now = gevent.get_hub().loop.now
period = self.period
deadline = now()
if self.timeout != 0:
timeout = self.timeout or period
deadline += timeout
gevent.sleep(timeout)
while True:
try:
method(*self.args, **self.kwargs)
except (Exception, gevent.Timeout):
_log.exception('unhandled exception in periodic callback')
deadline += period
timeout = deadline - now()
if timeout > 0:
gevent.sleep(timeout)
else:
# Prevent catching up.
deadline -= timeout
def get(self, method):
'''Return a Greenlet for the given method.'''
return gevent.Greenlet(self._loop, method)
class ScheduledEvent(object):
'''Class returned from Core.schedule.'''
def __init__(self, function, args=None, kwargs=None):
self.function = function
self.args = args or []
self.kwargs = kwargs or {}
self.canceled = False
self.finished = False
def cancel(self):
'''Mark the timer as canceled to avoid a callback.'''
self.canceled = True
def __call__(self):
if not self.canceled:
self.function(*self.args, **self.kwargs)
self.finished = True
def findsignal(obj, owner, name):
parts = name.split('.')
if len(parts) == 1:
signal = getattr(obj, name)
else:
signal = owner
for part in parts:
signal = getattr(signal, part)
assert isinstance(signal, Signal), 'bad signal name %r' % (name,)
return signal
[docs]class BasicCore(object):
delay_onstart_signal = False
delay_running_event_set = False
def __init__(self, owner):
self.greenlet = None
self.spawned_greenlets = weakref.WeakSet()
self._async = None
self._async_calls = []
self._stop_event = None
self._schedule_event = None
self._schedule = []
self.onsetup = Signal()
self.onstart = Signal()
self.onstop = Signal()
self.onfinish = Signal()
self.oninterrupt = None
self.tie_breaker = 0
# SIGINT does not work in Windows.
# If using the standalone agent on a windows machine,
# this section will be skipped
if python_platform.system() != 'Windows':
prev_int_signal = gevent.signal.getsignal(signal.SIGINT)
# To avoid a child agent handler overwriting the parent agent handler
if prev_int_signal in [None, signal.SIG_IGN, signal.SIG_DFL]:
self.oninterrupt = gevent.signal.signal(signal.SIGINT,
self._on_sigint_handler)
self._owner = owner
[docs] def setup(self):
# Split out setup from __init__ to give oportunity to add
# subsystems with signals
try:
owner = self._owner
except AttributeError:
return
del self._owner
periodics = []
def setup(member): # pylint: disable=redefined-outer-name
periodics.extend(
periodic.get(member) for periodic in annotations(
member, list, 'core.periodics'))
for deadline, args, kwargs in annotations(member, list, 'core.schedule'):
self.schedule(deadline, member, *args, **kwargs)
for name in annotations(member, set, 'core.signals'):
findsignal(self, owner, name).connect(member, owner)
inspect.getmembers(owner, setup)
def start_periodics(sender, **kwargs): # pylint: disable=unused-argument
for periodic in periodics:
sender.spawned_greenlets.add(periodic)
periodic.start()
del periodics[:]
self.onstart.connect(start_periodics)
[docs] def loop(self, running_event):
# pre-setup
yield
# pre-start
yield
# pre-stop
yield
# pre-finish
yield
[docs] def link_receiver(self, receiver, sender, **kwargs):
greenlet = gevent.spawn(receiver, sender, **kwargs)
self.spawned_greenlets.add(greenlet)
return greenlet
[docs] def run(self, running_event=None): # pylint: disable=method-hidden
'''Entry point for running agent.'''
self._schedule_event = gevent.event.Event()
self.setup()
self.greenlet = current = gevent.getcurrent()
def kill_leftover_greenlets():
for glt in self.spawned_greenlets:
glt.kill()
self.greenlet.link(lambda _: kill_leftover_greenlets())
def handle_async_():
'''Execute pending calls.'''
calls = self._async_calls
while calls:
func, args, kwargs = calls.pop()
greenlet = gevent.spawn(func, *args, **kwargs)
self.spawned_greenlets.add(greenlet)
def schedule_loop():
heap = self._schedule
event = self._schedule_event
cur = gevent.getcurrent()
now = time.time()
while True:
if heap:
deadline = heap[0][0]
timeout = min(5.0, max(0.0, deadline - now))
else:
timeout = None
if event.wait(timeout):
event.clear()
now = time.time()
while heap and now >= heap[0][0]:
_, _, callback = heapq.heappop(heap)
greenlet = gevent.spawn(callback)
cur.link(lambda glt: greenlet.kill())
self._stop_event = stop = gevent.event.Event()
self._async = gevent.get_hub().loop.async_()
self._async.start(handle_async_)
current.link(lambda glt: self._async.stop())
looper = self.loop(running_event)
next(looper)
self.onsetup.send(self)
loop = next(looper)
if loop:
self.spawned_greenlets.add(loop)
scheduler = gevent.Greenlet(schedule_loop)
if loop:
loop.link(lambda glt: scheduler.kill())
self.onstart.connect(lambda *_, **__: scheduler.start())
if not self.delay_onstart_signal:
self.onstart.sendby(self.link_receiver, self)
if not self.delay_running_event_set:
if running_event is not None:
running_event.set()
try:
if loop and loop in gevent.wait([loop, stop], count=1):
raise RuntimeError('VIP loop ended prematurely')
stop.wait()
except (gevent.GreenletExit, KeyboardInterrupt):
pass
scheduler.kill()
next(looper)
receivers = self.onstop.sendby(self.link_receiver, self)
gevent.wait(receivers)
next(looper)
self.onfinish.send(self)
[docs] def stop(self, timeout=None):
def halt():
self._stop_event.set()
self.greenlet.join(timeout)
return self.greenlet.ready()
if gevent.get_hub() is self._stop_event.hub:
return halt()
return self.send_async(halt).get()
def _on_sigint_handler(self, signo, *_):
'''
Event handler to set onstop event when the agent needs to stop
:param signo:
:param _:
:return:
'''
_log.debug("SIG interrupt received. Calling stop")
if signo == signal.SIGINT:
self._stop_event.set()
# self.stop()
[docs] def send(self, func, *args, **kwargs):
self._async_calls.append((func, args, kwargs))
self._async.send()
[docs] def send_async(self, func, *args, **kwargs):
result = gevent.event.AsyncResult()
async_ = gevent.hub.get_hub().loop.async_()
results = [None, None]
def receiver():
async_.stop()
exc, value = results
if exc is None:
result.set(value)
else:
result.set_exception(exc)
async_.start(receiver)
def worker():
try:
results[:] = [None, func(*args, **kwargs)]
except Exception as exc: # pylint: disable=broad-except
results[:] = [exc, None]
async_.send()
self.send(worker)
return result
[docs] def spawn(self, func, *args, **kwargs):
assert self.greenlet is not None
greenlet = gevent.spawn(func, *args, **kwargs)
self.spawned_greenlets.add(greenlet)
return greenlet
[docs] def spawn_later(self, seconds, func, *args, **kwargs):
assert self.greenlet is not None
greenlet = gevent.spawn_later(seconds, func, *args, **kwargs)
self.spawned_greenlets.add(greenlet)
return greenlet
[docs] def spawn_in_thread(self, func, *args, **kwargs):
result = gevent.event.AsyncResult()
def wrapper():
try:
self.send(result.set, func(*args, **kwargs))
except Exception as exc: # pylint: disable=broad-except
self.send(result.set_exception, exc)
result.thread = thread = threading.Thread(target=wrapper)
thread.daemon = True
thread.start()
return result
@dualmethod
def periodic(self, period, func, args=None, kwargs=None, wait=0):
warnings.warn(
'Use of the periodic() method is deprecated in favor of the '
'schedule() method with the periodic() generator. This '
'method will be removed in a future version.',
DeprecationWarning)
greenlet = Periodic(period, args, kwargs, wait).get(func)
self.spawned_greenlets.add(greenlet)
greenlet.start()
return greenlet
[docs] @periodic.classmethod
def periodic(cls, period, args=None, kwargs=None, wait=0): # pylint: disable=no-self-argument
warnings.warn(
'Use of the periodic() decorator is deprecated in favor of '
'the schedule() decorator with the periodic() generator. '
'This decorator will be removed in a future version.',
DeprecationWarning)
return Periodic(period, args, kwargs, wait)
[docs] @classmethod
def receiver(cls, signal):
def decorate(method):
annotate(method, set, 'core.signals', signal)
return method
return decorate
@dualmethod
def schedule(self, deadline, func, *args, **kwargs):
event = ScheduledEvent(func, args, kwargs)
try:
it = iter(deadline)
except TypeError:
self._schedule_callback(deadline, event)
else:
self._schedule_iter(it, event)
return event
[docs] def get_tie_breaker(self):
self.tie_breaker += 1
return self.tie_breaker
def _schedule_callback(self, deadline, callback):
deadline = utils.get_utc_seconds_from_epoch(deadline)
heapq.heappush(self._schedule, (deadline, self.get_tie_breaker(), callback))
if self._schedule_event:
self._schedule_event.set()
def _schedule_iter(self, it, event):
def wrapper():
if event.canceled:
event.finished = True
return
try:
deadline = next(it)
except StopIteration:
event.function(*event.args, **event.kwargs)
event.finished = True
else:
self._schedule_callback(deadline, wrapper)
event.function(*event.args, **event.kwargs)
try:
deadline = next(it)
except StopIteration:
event.finished = True
else:
self._schedule_callback(deadline, wrapper)
[docs] @schedule.classmethod
def schedule(cls, deadline, *args, **kwargs): # pylint: disable=no-self-argument
if hasattr(deadline, 'timetuple'):
# deadline = time.mktime(deadline.timetuple())
deadline = utils.get_utc_seconds_from_epoch(deadline)
def decorate(method):
annotate(method, list, 'core.schedule', (deadline, args, kwargs))
return method
return decorate
[docs]class Core(BasicCore):
# We want to delay the calling of "onstart" methods until we have
# confirmation from the server that we have a connection. We will fire
# the event when we hear the response to the hello message.
delay_onstart_signal = True
# Agents started before the router can set this variable
# to false to keep from blocking. AuthService does this.
delay_running_event_set = True
def __init__(self, owner, address=None, identity=None, context=None,
publickey=None, secretkey=None, serverkey=None,
volttron_home=os.path.abspath(platform.get_home()),
agent_uuid=None, reconnect_interval=None,
version='0.1', instance_name=None, messagebus=None):
self.volttron_home = volttron_home
# These signals need to exist before calling super().__init__()
self.onviperror = Signal()
self.onsockevent = Signal()
self.onconnected = Signal()
self.ondisconnected = Signal()
self.configuration = Signal()
super(Core, self).__init__(owner)
self.address = address if address is not None else get_address()
self.identity = str(identity) if identity is not None else str(uuid.uuid4())
self.agent_uuid = agent_uuid
self.publickey = publickey
self.secretkey = secretkey
self.serverkey = serverkey
self.reconnect_interval = reconnect_interval
self._reconnect_attempt = 0
self.instance_name = instance_name
self.messagebus = messagebus
self.subsystems = {'error': self.handle_error}
self.__connected = False
self._version = version
self.socket = None
self.connection = None
_log.debug('address: %s', address)
_log.debug('identity: %s', self.identity)
_log.debug('agent_uuid: %s', agent_uuid)
_log.debug('serverkey: %s', serverkey)
[docs] def version(self):
return self._version
[docs] def get_connected(self):
return self.__connected
[docs] def set_connected(self, value):
self.__connected = value
connected = property(fget=lambda self: self.get_connected(),
fset=lambda self, v: self.set_connected(v)
)
[docs] def stop(self, timeout=None, platform_shutdown=False):
# Send message to router that this agent is stopping
if self.__connected and not platform_shutdown:
frames = [self.identity]
self.connection.send_vip('', 'agentstop', args=frames, copy=False)
super(Core, self).stop(timeout=timeout)
# This function moved directly from the zmqcore agent. it is included here because
# when we are attempting to connect to a zmq bus from a rmq bus this will be used
# to create the public and secret key for that connection or use it if it was already
# created.
def _get_keys_from_keystore(self):
'''Returns agent's public and secret key from keystore'''
if self.agent_uuid:
# this is an installed agent, put keystore in its dist-info
current_directory = os.path.abspath(os.curdir)
keystore_dir = os.path.join(current_directory,
"{}.dist-info".format(os.path.basename(current_directory)))
elif self.identity is None:
raise ValueError("Agent's VIP identity is not set")
else:
if not self.volttron_home:
raise ValueError('VOLTTRON_HOME must be specified.')
keystore_dir = os.path.join(
self.volttron_home, 'keystores',
self.identity)
keystore_path = os.path.join(keystore_dir, 'keystore.json')
keystore = KeyStore(keystore_path)
return keystore.public, keystore.secret
[docs] def register(self, name, handler, error_handler=None):
self.subsystems[name] = handler
if error_handler:
name_bytes = name
def onerror(sender, error, **kwargs):
if error.subsystem == name_bytes:
error_handler(sender, error=error, **kwargs)
self.onviperror.connect(onerror)
[docs] def handle_error(self, message):
if len(message.args) < 4:
_log.debug('unhandled VIP error %s', message)
elif self.onviperror:
args = message.args
error = VIPError.from_errno(*args)
self.onviperror.send(self, error=error, message=message)
[docs] def create_event_handlers(self, state, hello_response_event, running_event):
def connection_failed_check():
# If we don't have a verified connection after 10.0 seconds
# shut down.
if hello_response_event.wait(10.0):
return
_log.error("No response to hello message after 10 seconds.")
_log.error("Type of message bus used {}".format(self.messagebus))
_log.error("A common reason for this is a conflicting VIP IDENTITY.")
_log.error("Another common reason is not having an auth entry on"
"the target instance.")
_log.error("Shutting down agent.")
_log.error("Possible conflicting identity is: {}".format(
self.identity
))
self.stop(timeout=10.0)
def hello():
# Send hello message to VIP router to confirm connection with
# platform
state.ident = ident = 'connect.hello.%d' % state.count
state.count += 1
self.spawn(connection_failed_check)
message = Message(peer='', subsystem='hello',
id=ident, args=['hello'])
self.connection.send_vip_object(message)
def hello_response(sender, version='',
router='', identity=''):
_log.info("Connected to platform: "
"router: {} version: {} identity: {}".format(
router, version, identity))
_log.debug("Running onstart methods.")
hello_response_event.set()
self.onstart.sendby(self.link_receiver, self)
self.configuration.sendby(self.link_receiver, self)
if running_event is not None:
running_event.set()
return connection_failed_check, hello, hello_response
[docs]class ZMQCore(Core):
"""
Concrete Core class for ZeroMQ message bus
"""
def __init__(self, owner, address=None, identity=None, context=None,
publickey=None, secretkey=None, serverkey=None,
volttron_home=os.path.abspath(platform.get_home()),
agent_uuid=None, reconnect_interval=None,
version='0.1', enable_fncs=False,
instance_name=None, messagebus='zmq'):
super(ZMQCore, self).__init__(owner, address=address, identity=identity,
context=context, publickey=publickey, secretkey=secretkey,
serverkey=serverkey, volttron_home=volttron_home,
agent_uuid=agent_uuid, reconnect_interval=reconnect_interval,
version=version,
instance_name=instance_name, messagebus=messagebus)
self.context = context or zmq.Context.instance()
self._fncs_enabled = enable_fncs
self.messagebus = messagebus
self._set_keys()
_log.debug("AGENT RUNNING on ZMQ Core {}".format(self.identity))
self.socket = None
[docs] def get_connected(self):
return super(ZMQCore, self).get_connected()
[docs] def set_connected(self, value):
super(ZMQCore, self).set_connected(value)
connected = property(get_connected, set_connected)
def _set_keys(self):
"""Implements logic for setting encryption keys and putting
those keys in the parameters of the VIP address
"""
self._set_server_key()
self._set_public_and_secret_keys()
if self.publickey and self.secretkey and self.serverkey:
self._add_keys_to_addr()
def _add_keys_to_addr(self):
'''Adds public, secret, and server keys to query in VIP address if
they are not already present'''
def add_param(query_str, key, value):
query_dict = parse_qs(query_str)
if not value or key in query_dict:
return ''
# urlparse automatically adds '?', but we need to add the '&'s
return '{}{}={}'.format('&' if query_str else '', key, value)
url = list(urlsplit(self.address))
if url[0] in ['tcp', 'ipc']:
url[3] += add_param(url[3], 'publickey', self.publickey)
url[3] += add_param(url[3], 'secretkey', self.secretkey)
url[3] += add_param(url[3], 'serverkey', self.serverkey)
self.address = str(urlunsplit(url))
def _set_public_and_secret_keys(self):
if self.publickey is None or self.secretkey is None:
self.publickey, self.secretkey, _ = self._get_keys_from_addr()
if self.publickey is None or self.secretkey is None:
self.publickey, self.secretkey = self._get_keys_from_keystore()
def _set_server_key(self):
if self.serverkey is None:
self.serverkey = self._get_keys_from_addr()[2]
known_serverkey = self._get_serverkey_from_known_hosts()
if (self.serverkey is not None and known_serverkey is not None
and self.serverkey != known_serverkey):
raise Exception("Provided server key ({}) for {} does "
"not match known serverkey ({}).".format(
self.serverkey, self.address, known_serverkey))
# Until we have containers for agents we should not require all
# platforms that connect to be in the known host file.
# See issue https://github.com/VOLTTRON/volttron/issues/1117
if known_serverkey is not None:
self.serverkey = known_serverkey
def _get_serverkey_from_known_hosts(self):
known_hosts_file = os.path.join(self.volttron_home, 'known_hosts')
known_hosts = KnownHostsStore(known_hosts_file)
return known_hosts.serverkey(self.address)
def _get_keys_from_addr(self):
url = list(urlsplit(self.address))
query = parse_qs(url[3])
publickey = query.get('publickey', [None])[0]
secretkey = query.get('secretkey', [None])[0]
serverkey = query.get('serverkey', [None])[0]
return publickey, secretkey, serverkey
[docs] def loop(self, running_event):
# pre-setup
# self.context.set(zmq.MAX_SOCKETS, 30690)
self.connection = ZMQConnection(self.address,
self.identity,
self.instance_name,
context=self.context)
self.connection.open_connection(zmq.DEALER)
flags = dict(hwm=6000, reconnect_interval=self.reconnect_interval)
self.connection.set_properties(flags)
self.socket = self.connection.socket
yield
# pre-start
state = type('HelloState', (), {'count': 0, 'ident': None})
hello_response_event = gevent.event.Event()
connection_failed_check, hello, hello_response = \
self.create_event_handlers(state, hello_response_event, running_event)
def close_socket(sender):
gevent.sleep(2)
try:
if self.socket is not None:
self.socket.monitor(None, 0)
self.socket.close(1)
finally:
self.socket = None
def monitor():
# Call socket.monitor() directly rather than use
# get_monitor_socket() so we can use green sockets with
# regular contexts (get_monitor_socket() uses
# self.context.socket()).
addr = 'inproc://monitor.v-%d' % (id(self.socket),)
sock = None
if self.socket is not None:
try:
self.socket.monitor(addr)
sock = zmq.Socket(self.context, zmq.PAIR)
sock.connect(addr)
while True:
try:
message = recv_monitor_message(sock)
self.onsockevent.send(self, **message)
event = message['event']
if event & zmq.EVENT_CONNECTED:
hello()
elif event & zmq.EVENT_DISCONNECTED:
self.connected = False
elif event & zmq.EVENT_CONNECT_RETRIED:
self._reconnect_attempt += 1
if self._reconnect_attempt == 50:
self.connected = False
sock.disable_monitor()
self.stop()
self.ondisconnected.send(self)
elif event & zmq.EVENT_MONITOR_STOPPED:
break
except ZMQError as exc:
if exc.errno == ENOTSOCK:
break
except ZMQError as exc:
raise
# if exc.errno == EADDRINUSE:
# pass
finally:
try:
url = list(urllib.parse.urlsplit(self.address))
if url[0] in ['tcp'] and sock is not None:
sock.close()
if self.socket is not None:
self.socket.monitor(None, 0)
except Exception as exc:
_log.debug("Error in closing the socket: {}".format(exc))
self.onconnected.connect(hello_response)
self.ondisconnected.connect(close_socket)
if self.address[:4] in ['tcp:', 'ipc:']:
self.spawn(monitor).join(0)
self.connection.connect()
if self.address.startswith('inproc:'):
hello()
def vip_loop():
sock = self.socket
while True:
try:
# Message at this point in time will be a
# volttron.platform.vip.socket.Message object that has attributes
# for all of the vip elements. Note these are no longer bytes.
# see https://github.com/volttron/volttron/issues/2123
message = sock.recv_vip_object(copy=False)
except ZMQError as exc:
if exc.errno == EAGAIN:
continue
elif exc.errno == ENOTSOCK:
self.socket = None
break
else:
raise
subsystem = message.subsystem
# _log.debug("Received new message {0}, {1}, {2}, {3}".format(
# subsystem, message.id, len(message.args), message.args[0]))
# Handle hellos sent by CONNECTED event
if (str(subsystem) == 'hello' and
message.id == state.ident and
len(message.args) > 3 and
message.args[0] == 'welcome'):
version, server, identity = message.args[1:4]
self.connected = True
self.onconnected.send(self, version=version,
router=server, identity=identity)
continue
try:
handle = self.subsystems[subsystem]
except KeyError:
_log.error('peer %r requested unknown subsystem %r',
message.peer, subsystem)
message.user = ''
message.args = list(router._INVALID_SUBSYSTEM)
message.args.append(message.subsystem)
message.subsystem = 'error'
sock.send_vip_object(message, copy=False)
else:
handle(message)
yield gevent.spawn(vip_loop)
# pre-stop
yield
# pre-finish
try:
self.connection.disconnect()
self.socket.monitor(None, 0)
self.connection.close_connection(1)
except AttributeError:
pass
except ZMQError as exc:
if exc.errno != ENOENT:
_log.exception('disconnect error')
finally:
self.socket = None
yield
[docs]@contextmanager
def killing(greenlet, *args, **kwargs):
'''Context manager to automatically kill spawned greenlets.
Allows one to kill greenlets that would continue after a timeout:
with killing(agent.vip.pubsub.subscribe(
'peer', 'topic', callback)) as subscribe:
subscribe.get(timeout=10)
'''
try:
yield greenlet
finally:
greenlet.kill(*args, **kwargs)
[docs]class RMQCore(Core):
"""
Concrete Core class for RabbitMQ message bus
"""
def __init__(self, owner, address=None, identity=None, context=None,
publickey=None, secretkey=None, serverkey=None,
volttron_home=os.path.abspath(platform.get_home()),
agent_uuid=None, reconnect_interval=None,
version='0.1', instance_name=None, messagebus='rmq',
volttron_central_address=None,
volttron_central_instance_name=None):
super(RMQCore, self).__init__(owner, address=address, identity=identity,
context=context, publickey=publickey, secretkey=secretkey,
serverkey=serverkey, volttron_home=volttron_home,
agent_uuid=agent_uuid, reconnect_interval=reconnect_interval,
version=version, instance_name=instance_name, messagebus=messagebus)
self.volttron_central_address = volttron_central_address
# TODO Look at this and see if we really need this here.
# if instance_name is specified as a parameter in this calls it will be because it is
# a remote connection. So we load it from the platform configuration file
if not instance_name:
config_opts = load_platform_config()
self.instance_name = config_opts.get('instance-name')
else:
self.instance_name = instance_name
assert self.instance_name, "Instance name must have been set in the platform config file."
assert not volttron_central_instance_name, "Please report this as volttron_central_instance_name shouldn't be passed."
# self._event_queue = gevent.queue.Queue
self._event_queue = Queue()
self.rmq_user = '.'.join([self.instance_name, self.identity])
_log.debug("AGENT RUNNING on RMQ Core {}".format(self.rmq_user))
self.messagebus = messagebus
self.rmq_mgmt = RabbitMQMgmt()
self.rmq_address = address
# added so that it is available to auth subsytem when connecting
# to remote instance
if self.publickey is None or self.secretkey is None:
self.publickey, self.secretkey = self._get_keys_from_keystore()
def _get_keys_from_addr(self):
return None, None, None
[docs] def get_connected(self):
return super(RMQCore, self).get_connected()
[docs] def set_connected(self, value):
super(RMQCore, self).set_connected(value)
connected = property(get_connected, set_connected)
def _build_connection_parameters(self):
param = None
if self.identity is None:
raise ValueError("Agent's VIP identity is not set")
else:
try:
if self.instance_name == get_platform_instance_name():
param = self.rmq_mgmt.build_agent_connection(self.identity,
self.instance_name)
else:
param = self.rmq_mgmt.build_remote_connection_param(self.rmq_user,
self.rmq_address,
True)
except AttributeError:
_log.error("RabbitMQ broker may not be running. Restart the broker first")
param = None
return param
[docs] def loop(self, running_event):
if not isinstance(self.rmq_address, pika.ConnectionParameters):
self.rmq_address = self._build_connection_parameters()
# pre-setup
self.connection = RMQConnection(self.rmq_address,
self.identity,
self.instance_name,
reconnect_delay=self.rmq_mgmt.rmq_config.reconnect_delay(),
vc_url=self.volttron_central_address)
yield
# pre-start
flags = dict(durable=False, exclusive=True, auto_delete=True)
if self.connection:
self.connection.set_properties(flags)
# Register callback handler for VIP messages
self.connection.register(self.vip_message_handler)
state = type('HelloState', (), {'count': 0, 'ident': None})
hello_response_event = gevent.event.Event()
connection_failed_check, hello, hello_response = \
self.create_event_handlers(state, hello_response_event, running_event)
def connection_error():
self.connected = False
self.stop()
self.ondisconnected.send(self)
def connect_callback():
router_connected = False
try:
bindings = self.rmq_mgmt.get_bindings('volttron')
except AttributeError:
bindings = None
router_user = router_key = "{inst}.{ident}".format(inst=self.instance_name,
ident='router')
if bindings:
for binding in bindings:
if binding['destination'] == router_user and \
binding['routing_key'] == router_key:
router_connected = True
break
# Connection retry attempt issue #1702.
# If the agent detects that RabbitMQ broker is reconnected before the router, wait
# for the router to connect before sending hello()
if router_connected:
hello()
else:
_log.debug("Router not bound to RabbitMQ yet, waiting for 2 seconds before sending hello {}".
format(self.identity))
self.spawn_later(2, hello)
# Connect to RMQ broker. Register a callback to get notified when
# connection is confirmed
if self.rmq_address:
self.connection.connect(connect_callback, connection_error)
self.onconnected.connect(hello_response)
self.ondisconnected.connect(self.connection.close_connection)
def vip_loop():
if self.rmq_address:
wait_period = 1 # 1 second
while True:
message = None
try:
message = self._event_queue.get(wait_period)
except gevent.Timeout:
pass
except Exception as exc:
_log.error(exc.args)
raise
if message:
subsystem = message.subsystem
if subsystem == 'hello':
if (subsystem == 'hello' and
message.id == state.ident and
len(message.args) > 3 and
message.args[0] == 'welcome'):
version, server, identity = message.args[1:4]
self.connected = True
self.onconnected.send(self, version=version,
router=server,
identity=identity)
continue
try:
handle = self.subsystems[subsystem]
except KeyError:
_log.error('peer %r requested unknown subsystem %r',
message.peer, subsystem)
message.user = ''
message.args = list(router._INVALID_SUBSYSTEM)
message.args.append(message.subsystem)
message.subsystem = 'error'
self.connection.send_vip_object(message)
else:
handle(message)
yield gevent.spawn(vip_loop)
# pre-stop
yield
# pre-finish
if self.rmq_address:
self.connection.close_connection()
yield
[docs] def vip_message_handler(self, message):
# _log.debug("RMQ VIP Core {}".format(message))
self._event_queue.put(message)