from __future__ import print_function, absolute_import

import logging

from zmq import green as zmq
from import ZMQError, ENOTSOCK

from volttron.platform import jsonapi
from volttron.utils.frame_serialization import deserialize_frames, serialize_frames
from .agent import Agent, Core
from volttron.platform import is_rabbitmq_available

if is_rabbitmq_available():
    import pika

_log = logging.getLogger(__name__)

[docs]class ZMQProxyRouter(Agent): """ Proxy ZMQ based router agent is implemented for backward compatibility with ZeroMQ based message bus. In a single instance setup, either ZeroMQ or RabbitMQ based message bus will be running and all the agents will be using the same message bus. But in multi-platform setup, some instances maybe running with RabbitMQ message bus and others with ZeroMQ message bus. The Proxy router agent is implemented to manage the routing between local and external instances in such cases. Please note, if all instances in multi-platform setup are RabbitMQ based, then RabbitMQ federation/shovel need to be used. """ def __init__(self, address, identity, zmq_router, *args, **kwargs): super(ZMQProxyRouter, self).__init__(identity, address, **kwargs) self.zmq_router = zmq_router self._routing_key = self.core.instance_name + '.' + 'proxy' rmq_user = self.core.instance_name + '.' + identity self._outbound_response_queue = "{user}.zmq.outbound.response".format(user=rmq_user) self._outbound_request_queue = "{user}.zmq.outbound.request".format(user=rmq_user) self._rpc_handler_queue = "{user}.zmq.outbound.subsystem".format(user=rmq_user) self._vip_loop_running = False self._zmq_peers = set()
[docs] @Core.receiver('onstart') def startup(self, sender, **kwargs): """ On startup, it does the following: - Start ZMQ Router loop. - Establish RMQ queue bindings to handle routing of messages between internal and external agents. :param sender: :param kwargs: :return: """ if not self._vip_loop_running: self.core.spawn(self.vip_loop) connection = self.core.connection channel = # ---------------------------------------------------------------------------------- # Create a queue to receive messages from local platform # (for example, response for RPC request etc) result = channel.queue_declare(queue=self._rpc_handler_queue, durable=False, exclusive=True, auto_delete=True, callback=None) channel.queue_bind(, queue=self._rpc_handler_queue, routing_key=self.core.instance_name + '.proxy.router.zmq.outbound.subsystem', callback=None) channel.basic_consume(self.rpc_message_handler, queue=self._rpc_handler_queue, no_ack=True) # -------------------------------------------------------------------------------------- # Create a queue to receive messages from local platform # (for example, response for RPC request etc) result = channel.queue_declare(queue=self._outbound_response_queue, durable=False, exclusive=True, auto_delete=True, callback=None) channel.queue_bind(, queue=self._outbound_response_queue, routing_key=self.core.instance_name + '.proxy.router.subsystems', callback=None) channel.basic_consume(self.outbound_response_handler, queue=self._outbound_response_queue, no_ack=True) # Create a queue to receive messages from local platform. # For example, external platform pubsub/RPC subscribe/unsubscribe # requests from internal agents channel.queue_declare(queue=self._outbound_request_queue, durable=False, exclusive=True, auto_delete=True, callback=None) # Binding for external platform pubsub message requests channel.queue_bind(, queue=self._outbound_request_queue, routing_key=self.core.instance_name + '.proxy.router.pubsub', callback=None) # Binding for external platform RPC message requests channel.queue_bind(, queue=self._outbound_request_queue, routing_key=self.core.instance_name + '.proxy.router.external_rpc', callback=None) channel.basic_consume(self.outbound_request_handler, queue=self._outbound_request_queue, no_ack=True)
[docs] @Core.receiver('onstop') def on_stop(self, sender, **kwargs): """ Stop the ZMQ router :param sender: :param kwargs: :return: """ _log.debug("********************************************************************") _log.debug("Stopping ZMQ Router") _log.debug("********************************************************************") self.zmq_router.stop()
[docs] def outbound_response_handler(self, ch, method, props, body): """ Message received from internal agent to send to remote agent in ZMQ VIP message format. :param ch: channel :param method: contains routing key :param props: message properties like VIP header information :param body: message :return: """ # Strip sender's identity from binding key routing_key = str(method.routing_key) platform, to_identity = routing_key.split(".", 1) platform, from_identity = props.app_id.split(".", 1) userid = props.headers.get('user', '') # Reformat message into ZMQ VIP format frames = [to_identity, from_identity, 'VIP1', userid, props.message_id, props.type] try: args = jsonapi.loads(body) try: # This is necessary because jsonrpc request/response is inside a list which the # ZMQ agent subsystem does not like args = jsonapi.loads(args[0]) frames.append(jsonapi.dumps(args)) except ValueError as e: if isinstance(args, list): for m in args: frames.append(m) else: frames.append(jsonapi.dumps(args)) except TypeError as e: _log.error("Invalid json format {}".format(e)) return _log.debug("Proxy ZMQ Router Outbound handler {0}, {1}".format(to_identity, args)) try: self.zmq_router.socket.send_multipart(frames, copy=True) except ZMQError as ex: _log.error("ZMQ Error {}".format(ex))
[docs] def rpc_message_handler(self, ch, method, props, body): """ :param ch: :param method: :param props: :param body: :return: """ zmq_frames = [] frames = serialize_frames(jsonapi.loads(body)) try: self.zmq_router.socket.send_multipart(frames, copy=False) except ZMQError as ex: _log.error("ZMQ Error {}".format(ex))
[docs] def outbound_request_handler(self, ch, method, props, body): """ Handler for receiving external platform PubSub/RPC requests from internal agents. It then calls external PubSub/RPC router handler to forward the request to external platform. :param ch: channel :param method: contains the routing key :param props: message properties :param body: message body :return: """ _log.debug("Proxy ZMQ Router {}".format(body)) frames = jsonapi.loads(body.decode('utf-8')) if len(frames) > 6: if frames[5] == 'pubsub': # Forward the message to pubsub component of the router to take action self.zmq_router.pubsub.handle_subsystem(frames) else: # Forward the message to external RPC handler component of the router to take action self.zmq_router.ext_rpc.handle_subsystem(frames)
[docs] def publish_callback(self, peer, sender, bus, topic, headers, message): """ Callback method registered with local message bus to receive PubSub messages subscribed by external platform agents. PubSub component of router will route the message to appropriate external platform subscribers. :return: """ json_msg = jsonapi.dumps(dict(bus=bus, headers=headers, message=message)) # Reformat the message into ZMQ VIP message frames frames = [sender, '', 'VIP', '', '', 'pubsub', zmq.Frame('publish'), zmq.Frame(str(topic)), zmq.Frame(str(json_msg))] self.zmq_router.pubsub.handle_subsystem(frames, '')
[docs] def vip_loop(self): """ Infinite VIP loop to receive and send messages over ZMQ message bus. :return: """ connection = self.core.connection # Set up ZMQ router socket connection self.zmq_router.start() # Register proxy agent handle self.zmq_router.pubsub.add_rabbitmq_agent(self) self._vip_loop_running = True while True: try: frames = self.zmq_router.socket.recv_multipart(copy=False) frames = deserialize_frames(frames) sender, recipient, proto, auth_token, msg_id, subsystem = frames[:6] sender = sender recipient = recipient subsystem = subsystem if subsystem == 'hello':, 'zmq') self._zmq_peers.add(sender) elif subsystem == 'agentstop':, 'zmq') self._zmq_peers.remove(sender) if not recipient or recipient in self._zmq_peers: # Handle router specific messages or route to ZMQ peer self.zmq_router.route(frames) else: # Route to RabbitMQ agent self._route_to_agent(frames) except ZMQError as exc: # _log.error("Error while receiving message: {}".format(exc)) if exc.errno == ENOTSOCK: break self._vip_loop_running = False
def _route_to_agent(self, frames): """ Send the message to local agent using internal RabbitMQ message bus :param frames: ZMQ message frames :return: """ sender, recipient, proto, auth_token, msg_id, subsystem = frames[:6] args = [arg for arg in frames[6:]] # for f in frames: # _log.debug("Frames:; {}".format(f)) connection = self.core.connection app_id = "{instance}.{identity}".format(instance=self.core.instance_name, identity=sender) # Change queue binding for the Response message # After sending the message (request) on behalf of ZMQ client, the response has to # routed back to the caller. Queue binding is modified for that purpose. # outbound_response_handler() gets called (based on the binding) to reformat response # message and send over zmq bus, queue=self._outbound_response_queue, routing_key=app_id, callback=None) # Set the destination routing key to destination agent destination_routing_key = "{0}.{1}".format(self.core.instance_name, recipient) # Fit VIP frames into the PIKA properties dictionary # VIP format - [SENDER, RECIPIENT, PROTO, USER_ID, MSG_ID, SUBSYS, ARGS...] dct = { 'user_id': self.core.instance_name + '.' + self.core.identity, 'app_id': app_id, # Routing key of SOURCE AGENT 'headers': dict(sender=sender, # SENDER recipient=destination_routing_key, # RECEIVER proto='VIP', # PROTO user=auth_token, # USER_ID ), 'message_id': msg_id, # MSG_ID 'type': subsystem, # SUBSYS 'content_type': 'application/json' } properties = pika.BasicProperties(**dct) # _log.debug("PROXY PUBLISHING TO CHANNEL {0}, {1}, {2}".format(destination_routing_key, app_id, properties)), destination_routing_key, jsonapi.dumps(args, ensure_ascii=False), properties)