Source code for volttron.platform.vip.externalrpcservice

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}



import os
import zmq
import logging
from volttron.platform import jsonapi
from zmq import SNDMORE, EHOSTUNREACH, ZMQError, EAGAIN, NOBLOCK
from volttron.utils.frame_serialization import serialize_frames

_log = logging.getLogger(__name__)
# Optimizing by pre-creating frames
_ROUTE_ERRORS = {
    errnum: (zmq.Frame(str(errnum).encode('ascii')),
             zmq.Frame(os.strerror(errnum).encode('ascii')))
    for errnum in [zmq.EHOSTUNREACH, zmq.EAGAIN]
}


[docs]class ExternalRPCService(object): """ Class to manage routing of RPC calls between external platforms and internal agents(peers). """ def __init__(self, socket, routing_service, *args, **kwargs): self._ext_router = routing_service self._vip_sock = socket #_log.debug("ExternalRPCService")
[docs] def handle_subsystem(self, frames): """ EXT_RPC subsystem handler on the server end. :frames list of frames :type frames list """ response = [] result = None try: sender, recipient, proto, usr_id, msg_id, subsystem, op, msg = frames[:9] except IndexError: return False if subsystem == 'external_rpc': #If operation is to send to external platform if op == 'send_platform': result = self._send_to_platform(frames) #If operation is to send to internal peer, use the internal router socket to send the frames elif op == 'send_peer': result = self._send_to_peer(frames) if not result: response = result elif result is not None: # Form response frame response = [sender, recipient, proto, usr_id, msg_id, subsystem] response.append('request_response') response.append(result) return response
def _send_to_platform(self, frames): """ Send frames to external platform :param frames: frames following VIP format :return: """ try: #Extract the frames and reorganize to add external platform and peer information sender, recipient, proto, usr_id, msg_id, subsystem, op, msg = frames[:9] #msg_data = jsonapi.loads(msg) msg_data = msg to_platform = msg_data['to_platform'] msg_data['from_platform'] = self._ext_router.my_instance_name() msg_data['from_peer'] = sender msg = jsonapi.dumps(msg_data) op = 'send_peer' frames = ['', proto, usr_id, msg_id, subsystem, op, msg] #_log.debug("ROUTER: Sending EXT RernalPC message to: {}".format(to_platform)) #Use external socket to send the message self._ext_router.send_external(to_platform, frames) return False except KeyError as exc: _log.error("Missing instance name in external RPC message: {}".format(exc)) except IndexError: _log.error("Invalid EXT RPC message") def _send_to_peer(self, frames): """ Send the external RPC message to local peer :param frames: Frames following VIP format :return: frames list """ try: # Extract the frames and reorganize to send to local peer sender, recipient, proto, usr_id, msg_id, subsystem, op, msg = frames[:9] #msg_data = jsonapi.loads(msg) msg_data = msg peer = msg_data['to_peer'] frames[0] = peer drop = self._send_internal(frames) return False except KeyError as exc: _log.error("Missing agent name in external RPC message: {}".format(exc)) except IndexError: _log.error("Invalid EXT RPC message") def _send_internal(self, frames): """ Send message to internal/local peer :param frames: frames :return: peer to be dropped if not reachable """ drop = [] peer = frames[0] # Expecting outgoing frames: # [RECIPIENT, SENDER, PROTO, USER_ID, MSG_ID, SUBSYS, ...] frames = serialize_frames(frames) try: # Try sending the message to its recipient self._vip_sock.send_multipart(frames, flags=NOBLOCK, copy=False) except ZMQError as exc: try: errnum, errmsg = error = _ROUTE_ERRORS[exc.errno] except KeyError: error = None if exc.errno == EHOSTUNREACH: _log.debug("Host unreachable {}".format(peer)) drop.append(peer) elif exc.errno == EAGAIN: _log.debug("EAGAIN error {}".format(peer)) return drop