# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import os
import logging
import zmq
from zmq import Frame, NOBLOCK, ZMQError, EINVAL, EHOSTUNREACH
from volttron.utils.frame_serialization import serialize_frames
__all__ = ['BaseRouter', 'OUTGOING', 'INCOMING', 'UNROUTABLE', 'ERROR']
OUTGOING = 0
INCOMING = 1
UNROUTABLE = 2
ERROR = 3
_log = logging.getLogger(__name__)
# Optimizing by pre-creating frames
_ROUTE_ERRORS = {
errnum: (zmq.Frame(str(errnum).encode('ascii')),
zmq.Frame(os.strerror(errnum).encode('ascii')))
for errnum in [zmq.EHOSTUNREACH, zmq.EAGAIN]
}
_INVALID_SUBSYSTEM = (
zmq.Frame(str(zmq.EPROTONOSUPPORT).encode('ascii')),
zmq.Frame(os.strerror(zmq.EPROTONOSUPPORT).encode('ascii'))
)
[docs]class BaseRouter(object):
'''Abstract base class of VIP router implementation.
Router implementers should inherit this class and implement the
setup() method to bind to appropriate addresses, set identities,
setup authentication, etc, etc. The socket will be created by the
start() method, which will then call the setup() method. Once
started, the socket may be polled for incoming messages and those
messages are handled/routed by calling the route() method. During
routing, the issue() method, which may be implemented, will be
called to allow for debugging and logging. Custom subsystems may be
implemented in the handle_subsystem() method. The socket will be
closed when the stop() method is called.
'''
_context_class = zmq.Context
_socket_class = zmq.Socket
_poller_class = zmq.Poller
def __init__(self, context=None, default_user_id=None):
'''Initialize the object instance.
If context is None (the default), the zmq global context will be
used for socket creation.
'''
self.context = context or self._context_class.instance()
self.default_user_id = default_user_id
self.socket = None
self._peers = set()
self._poller = self._poller_class()
self._ext_sockets = []
self._socket_id_mapping = {}
[docs] def run(self):
'''Main router loop.'''
self.start()
try:
while True:
self.poll_sockets()
finally:
self.stop()
[docs] def start(self):
'''Create the socket and call setup().
The socket is save in the socket attribute. The setup() method
is called at the end of the method to perform additional setup.
'''
self.socket = sock = self._socket_class(self.context, zmq.ROUTER)
sock.router_mandatory = True
sock.sndtimeo = 0
sock.tcp_keepalive = True
sock.tcp_keepalive_idle = 180
sock.tcp_keepalive_intvl = 20
sock.tcp_keepalive_cnt = 6
self.context.set(zmq.MAX_SOCKETS, 30690)
sock.set_hwm(6000)
_log.debug("ROUTER SENDBUF: {0}, {1}".format(sock.getsockopt(zmq.SNDBUF), sock.getsockopt(zmq.RCVBUF)))
self.setup()
[docs] def stop(self, linger=1):
'''Close the socket.'''
self.socket.close(linger)
[docs] def setup(self):
'''Called from start() method to setup the socket.
Implement this method to bind the socket, set identities and
options, etc.
'''
raise NotImplementedError()
[docs] def poll_sockets(self):
'''Called inside run method
Implement this method to poll for sockets for incoming messages.
'''
raise NotImplementedError()
@property
def poll(self):
'''Returns the underlying socket's poll method.'''
return self.socket.poll
[docs] def handle_subsystem(self, frames, user_id):
'''Handle additional subsystems and provide a response.
This method does nothing by default and may be implemented by
subclasses to provide additional subsystems.
frames is a list of zmq.Frame objects with the following
elements:
[SENDER, RECIPIENT, PROTOCOL, USER_ID, MSG_ID, SUBSYSTEM, ...]
The return value should be None, if the subsystem is unknown, an
empty list or False (or other False value) if the message was
handled but does not require/generate a response, or a list of
containing the following elements:
[RECIPIENT, SENDER, PROTOCOL, USER_ID, MSG_ID, SUBSYSTEM, ...]
'''
pass
[docs] def issue(self, topic, frames, extra=None):
pass
if zmq.zmq_version_info() >= (4, 1, 0):
def lookup_user_id(self, sender, recipient, auth_token):
'''Find and return a user identifier.
Returns the UTF-8 encoded User-Id property from the sender
frame or None if the authenticator did not set the User-Id
metadata. May be extended to perform additional lookups.
'''
# pylint: disable=unused-argument
# A user id might/should be set by the ZAP authenticator
try:
# _log.debug(f"THE TYPE IS:::::::: {type(recipient)}")
# recipient.get('User-Id').encode('utf-8') returns sender !!!
return sender
except ZMQError as exc:
if exc.errno != EINVAL:
raise
return self.default_user_id
else:
[docs] def lookup_user_id(self, sender, recipient, auth_token):
'''Find and return a user identifier.
A no-op by default, this method must be overridden to map
the sender and auth_token to a user ID. The returned value
must be a string or None (if the token was not found).
'''
return self.default_user_id
def _distribute(self, *parts):
drop = set()
empty = ''
frames = [empty, empty, 'VIP1', empty, empty]
frames.extend(parts)
# _log.debug(f"_distribute {parts}")
for peer in self._peers:
frames[0] = peer
drop.update(self._send(frames))
for peer in drop:
self._drop_peer(peer)
def _drop_pubsub_peers(self, peer):
'''Drop peers for pubsub subsystem. To be handled by subclasses'''
pass
def _add_pubsub_peers(self, peer):
'''Add peers for pubsub subsystem. To be handled by subclasses'''
pass
def _add_peer(self, peer):
if peer in self._peers:
return
self._distribute('peerlist', 'add', peer)
self._peers.add(peer)
self._add_pubsub_peers(peer)
def _drop_peer(self, peer):
try:
self._peers.remove(peer)
except KeyError:
return
self._distribute(b'peerlist', b'drop', peer)
self._drop_pubsub_peers(peer)
[docs] def route(self, frames):
'''Route one message and return.
One message is read from the socket and processed. If the
recipient is the router (empty recipient), the standard hello
and ping subsystems are handled. Other subsystems are sent to
handle_subsystem() for processing. Messages destined for other
entities are routed appropriately.
'''
socket = self.socket
issue = self.issue
issue(INCOMING, frames)
# _log.debug(f"ROUTER Receiving frames: {frames}")
if len(frames) < 6:
# Cannot route if there are insufficient frames, such as
# might happen with a router probe.
if len(frames) == 2 and frames[0] and not frames[1]:
issue(UNROUTABLE, frames, 'router probe')
self._add_peer(frames[0])
else:
issue(UNROUTABLE, frames, 'too few frames')
return
sender, recipient, proto, auth_token, msg_id = frames[:5]
# _log.debug(f"routing {sender}, {recipient}, {proto}, {auth_token}, {msg_id}")
if proto != 'VIP1':
# Peer is not talking a protocol we understand
issue(UNROUTABLE, frames, 'bad VIP signature')
return
user_id = self.lookup_user_id(sender, recipient, auth_token)
if user_id is None:
user_id = ''
# _log.debug(f"user_id is {user_id}")
self._add_peer(sender)
subsystem = frames[5]
if not recipient:
# Handle requests directed at the router
name = subsystem
if name == 'hello':
frames = [sender, recipient, proto, user_id, msg_id,
'hello', 'welcome', '1.0', socket.identity, sender]
elif name == 'ping':
frames[:7] = [
sender, recipient, proto, user_id, msg_id, 'ping', 'pong']
elif name == 'peerlist':
try:
op = frames[6]
except IndexError:
op = None
frames = [sender, recipient, proto, '', msg_id, subsystem]
if op == 'list':
frames.append('listing')
frames.extend(self._peers)
else:
error = ('unknown' if op else 'missing') + ' operation'
frames.extend(['error', error])
elif name == 'error':
return
else:
response = self.handle_subsystem(frames, user_id)
if response is None:
# Handler does not know of the subsystem
errnum, errmsg = error = _INVALID_SUBSYSTEM
issue(ERROR, frames, error)
frames = [sender, recipient, proto, '', msg_id,
'error', errnum, errmsg, '', subsystem]
elif not response:
# Subsystem does not require a response
return
else:
frames = response
else:
# Route all other requests to the recipient
frames[:4] = [recipient, sender, proto, user_id]
for peer in self._send(frames):
self._drop_peer(peer)
def _send(self, frames):
issue = self.issue
socket = self.socket
drop = []
recipient, sender = frames[:2]
# Expecting outgoing frames:
# [RECIPIENT, SENDER, PROTO, USER_ID, MSG_ID, SUBSYS, ...]
try:
# Try sending the message to its recipient
# This is a zmq socket so we need to serialize it before sending
serialized_frames = serialize_frames(frames)
socket.send_multipart(serialized_frames, flags=NOBLOCK, copy=False)
issue(OUTGOING, serialized_frames)
except ZMQError as exc:
try:
errnum, errmsg = error = _ROUTE_ERRORS[exc.errno]
except KeyError:
error = None
if error is None:
raise
issue(ERROR, frames, error)
if exc.errno == EHOSTUNREACH:
drop.append(recipient)
if exc.errno != EHOSTUNREACH or sender is not frames[0]:
# Only send errors if the sender and recipient differ
proto, user_id, msg_id, subsystem = frames[2:6]
frames = [sender, '', proto, user_id, msg_id,
'error', errnum, errmsg, recipient, subsystem]
serialized_frames = serialize_frames(frames)
try:
socket.send_multipart(serialized_frames, flags=NOBLOCK, copy=False)
issue(OUTGOING, serialized_frames)
except ZMQError as exc:
try:
errnum, errmsg = error = _ROUTE_ERRORS[exc.errno]
except KeyError:
error = None
if error is None:
raise
issue(ERROR, serialized_frames, error)
if exc.errno == EHOSTUNREACH:
drop.append(sender)
return drop