# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import os
import re
import zmq
import logging
from zmq import SNDMORE, EHOSTUNREACH, ZMQError, EAGAIN, NOBLOCK
from volttron.utils.frame_serialization import serialize_frames
from ..keystore import KeyStore
from zmq.utils import jsonapi
from ..vip.socket import Address
from zmq.utils.monitor import recv_monitor_message
import random
from zmq.green import ENOTSOCK
STATUS_CONNECTING = "CONNECTING"
STATUS_CONNECTED = "CONNECTED"
STATUS_CONNECTION_DELAY = "CONNECTION_DELAY"
STATUS_DISCONNECTED = "DISCONNECTED"
# Optimizing by pre-creating frames
_ROUTE_ERRORS = {
errnum: (zmq.Frame(str(errnum).encode('ascii')),
zmq.Frame(os.strerror(errnum).encode('ascii')))
for errnum in [zmq.EHOSTUNREACH, zmq.EAGAIN]
}
_log = logging.getLogger(__name__)
[docs]class RoutingService(object):
"""
This class maintains connection with external platforms.
"""
def __init__(self, socket, context, socket_class, poller, my_addr, instance_name, *args, **kwargs):
self._routing_table = dict()
self._poller = poller
self._instances = dict()
self._context = context
self._socket = socket
self._socket_class = socket_class
self._my_addr = my_addr
self._my_instance_name = instance_name
self._onconnect_pubsub_handler = None
self._ondisconnect_pubsub_handler = None
self._vip_sockets = set()
self._monitor_sockets = set()
self._socket_identities = dict()
self._web_addresses = []
[docs] def handle_subsystem(self, frames):
"""
Handler for incoming routing table frames. It calls appropriate action handler based on operation request.
:param frames list of frames
:type frames list
:returns: response frame to be sent back to the sender
:rtype: list
:Return Values:
response frame to be sent back to the sender
"""
response = []
result = False
try:
sender, recipient, proto, usr_id, msg_id, subsystem, op = frames[:7]
except (ValueError, TypeError): # TypeError will happen if frames is not subscriptable.
_log.error(f"Invalid number of frames handle_subsystem {frames}")
return False
if subsystem == 'routing_table':
# If Setup mode of operation, setup authorization
if op == 'setupmode_platform_connection':
instance_config = frames[7] # sonapi.loads(instance_config)
self._setup_authorization(instance_config)
# If Normal mode of operation, build authorized connection
elif op == 'normalmode_platform_connection':
instance_config = frames[7] # jsonapi.loads(instance_config)
self._build_connection(instance_config)
return False
# Respond to Hello/Welcome messages from other instances
elif op == 'hello':
handshake_request = frames[7]
try:
# Respond to 'hello' request with 'welcome'
if handshake_request == b'hello':
name = frames[8]
frames.pop(0)
_log.debug("HELLO Recieved hello, sending welcome to {}".format(name))
frames[6] = 'welcome'
frames[7] = self._my_instance_name
try:
_log.debug("Sending welcome message to sender {}".format(name))
self.send_external(name, frames)
except ZMQError as exc:
_log.error("ZMQ error: ")
# Respond to 'welcome' response by sending Pubsub subscription list
elif handshake_request == 'welcome':
name = frames[8]
_log.debug("HELLO Received welcome. Connection established with: {}".format(name))
try:
self._instances[name]['status'] = STATUS_CONNECTED
self._onconnect_pubsub_handler(name)
except KeyError as exc:
_log.error("Welcome message received from unknown platform: {}".format(name))
except IndexError as exc:
_log.error("Insufficient frames in hello message {}".format(exc))
elif op == "web-addresses":
self._web_addresses = frames[7]
self._web_addresses = self._web_addresses
# Update routing table entry
elif op == 'update':
result = self._update_entry(frames)
elif op == 'request_response':
pass
else:
_log.error("Unknown operation: {}".format(op))
if result:
#Form response frame
response = [sender, recipient, proto, usr_id, msg_id, subsystem]
response.append('request_response')
response.append(result)
else:
response = False
return response
def _setup_authorization(self, instance_info):
"""
Setup authorized connection with remote instance
:param instance_name: dicovery information(server key, name, vip-address) of remote instance
:return:
"""
try:
instance_name = instance_info['instance-name']
serverkey = instance_info['serverkey']
address = instance_info['vip-address']
web_address = instance_info['web-address']
except KeyError as exc:
_log.error("Missing parameter in instance info message {}".format(exc))
return
sock = zmq.Socket(zmq.Context(), zmq.DEALER)
num = random.random()
# socket identity and zap domain must be bytes
sock.identity = f"instance.{instance_name}.{num}".encode('utf-8')
sock.zap_domain = b'vip'
self._poller.register(sock, zmq.POLLIN)
keystore = KeyStore()
vip_address = f"{address}?serverkey={serverkey}&publickey={keystore.public}&secretkey={keystore.secret}"
ext_platform_address = Address(vip_address)
ext_platform_address.identity = sock.identity
try:
ext_platform_address.connect(sock)
except zmq.error.ZMQError as ex:
_log.error("ZMQ error on external connection {}".format(ex))
self._web_addresses.remove(web_address)
if not self._web_addresses:
_log.debug("MULTI_PLATFORM SETUP MODE COMPLETED")
def _build_connection(self, instance_info):
"""
Build connection with remote instance and send initial "hello" message.
:param instance_name: name of remote instance
:param serverkey: serverkey for establishing connection with remote instance
:return:
"""
_log.debug("instance_info {}".format(instance_info))
try:
instance_name = instance_info['instance-name']
serverkey = instance_info['serverkey']
address = instance_info['vip-address']
except KeyError as exc:
_log.error("Missing parameter in instance info message {}".format(exc))
return
# Return immediately if vip_address of external instance is same as self address
if address in self._my_addr:
_log.debug("Same instance: {}".format(address))
return
sock = zmq.Socket(zmq.Context(), zmq.DEALER)
sock.sndtimeo = 0
sock.tcp_keepalive = True
sock.tcp_keepalive_idle = 180
sock.tcp_keepalive_intvl = 20
sock.tcp_keepalive_cnt = 6
num = random.random()
sock.identity = f"instance.{instance_name}.{num}".encode('utf-8')
sock.zap_domain = b'vip'
mon_sock = sock.get_monitor_socket(
zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED | zmq.EVENT_CONNECT_DELAYED)
self._poller.register(mon_sock, zmq.POLLIN)
self._monitor_sockets.add(mon_sock)
self._instances[instance_name] = dict(platform_identity=sock.identity,
status=STATUS_CONNECTING,
socket=sock,
monitor_socket=mon_sock)
self._socket_identities[sock.identity] = instance_name
self._vip_sockets.add(sock)
self._poller.register(sock, zmq.POLLIN)
self._routing_table[instance_name] = [instance_name]
keystore = KeyStore()
sock = self._instances[instance_name]['socket']
vip_address = f"{address}?serverkey={serverkey}&publickey={keystore.public}&secretkey={keystore.secret}"
ext_platform_address = Address(vip_address)
ext_platform_address.identity = sock.identity
try:
ext_platform_address.connect(sock)
# Form VIP message to send to remote instance
frames = serialize_frames(['', 'VIP1', '', '', 'routing_table', 'hello', 'hello', self._my_instance_name])
_log.debug(f"HELLO Sending hello to: {instance_name}")
self.send_external(instance_name, frames)
except zmq.error.ZMQError as ex:
_log.error("ZMQ error on external connection {}".format(ex))
[docs] def handle_monitor_event(self, monitor_sock):
"""
Monitor external platform socket connections
:param monitor_sock: socket to monitor
:return:
"""
try:
message = recv_monitor_message(monitor_sock)
event = message['event']
instance_name = [name for name, instance_info in self._instances.items()
if instance_info['monitor_socket'] == monitor_sock]
if event & zmq.EVENT_CONNECTED:
_log.debug(
"CONNECTED to external platform: {}!! Sending MY subscriptions !!".format(instance_name[0]))
self._instances[instance_name[0]]['status'] = STATUS_CONNECTED
self._onconnect_pubsub_handler(instance_name[0])
elif event & zmq.EVENT_CONNECT_DELAYED:
# _log.debug("ROUTINGSERVICE socket DELAYED...Lets wait")
self._instances[instance_name[0]]['status'] = STATUS_CONNECTION_DELAY
elif event & zmq.EVENT_DISCONNECTED:
_log.debug("DISCONNECTED from external platform: {}. "
"Subscriptions will be resent on reconnect".format(instance_name[0]))
self._instances[instance_name[0]]['status'] = STATUS_DISCONNECTED
except ZMQError as exc:
if exc.errno == ENOTSOCK:
_log.error("Trying to use a non socket {}".format(exc))
except KeyError as exc:
_log.error("Unknown external instance: {}".format(instance_name))
[docs] def register(self, type, handler):
"""
Used by PubSubService to register for onconnect and ondisconnect handlers.
:param type: on_connect/on_disconnect
:param handler: handler function
:return:
"""
if type == 'on_connect':
self._onconnect_pubsub_handler = handler
else:
self._ondisconnect_pubsub_handler = handler
[docs] def my_instance_name(self):
"""
Name of my instance/platform.
:return:
"""
return self._my_instance_name
[docs] def disconnect_external_instances(self, instance_name):
"""
Close socket connections to remote platform
:param instance_name:
:return:
"""
try:
self._ondisconnect_pubsub_handler(instance_name)
instance_info = self._instances[instance_name]
sock = instance_info['socket']
mon_sock = instance_info['monitor_socket']
mon_sock.close()
except KeyError as exc:
_log.error("Unknown external instance name: {}".format(instance_name))
[docs] def get_name_for_identity(self, identity):
"""
Get instance name
:param identity: platform identity
:return:
"""
if self._socket_identities[identity]:
return self._socket_identities[identity]
else: return None
[docs] def send_external(self, instance_name, frames):
"""
Send frames to external instance
:param instance_name: name of remote instance
:param frames: frames to send
:return:
"""
success = False
instance_info = dict()
try:
instance_info = self._instances[instance_name]
_log.debug(f"Instance info is: {instance_info}")
try:
# Send using external socket
success = self._send_to_socket(instance_info['socket'], frames)
except ZMQError as exc:
_log.error("Could not send to {} using new socket".format(instance_name))
success = False
# if not success:
# #Try sending through router socket
# if bytes(frames[0]) == b'' and instance_info['status'] == STATUS_CONNECTING:
# frames[:0] = [self._my_instance_name]
#
# try:
# _log.debug("Trying to send with router socket")
# #success = self._send(self._socket, frames)
# except ZMQError as exc:
# _log.debug("Dropping or setting to disconnected {}".format(instance_name))
# # Let's just update status as 'DISCONNECTED' for now
# self._instances[instance_name]['status'] = STATUS_DISCONNECTED
# raise
except KeyError:
_log.debug(f"******************My instance name is: {self._my_instance_name}")
frames[:0] = [self._my_instance_name]
_log.debug("Key error for platform {0}".format(instance_name))
#success = self._send(self._socket, frames)
return success
def _send_to_socket(self, sock, frames):
"""
Send specified frames through the passed zmq.Socket. The frames do not
have to be true frames. This function will call `volttron.utils/.rame_serialization.serialize_frames``
on the list of frames before sending the data.
:param sock: zmq.Socket
:param frames:
A list of frames or data to be sent through a zmq socket.
:return:
bool - True if frames were successfully sent.
"""
success = True
try:
frames = serialize_frames(frames)
_log.debug(f"Frames sent to external {[x.bytes for x in frames]}")
# Try sending the message to its recipient
sock.send_multipart(frames, flags=NOBLOCK, copy=False)
except ZMQError as exc:
try:
errnum, errmsg = error = _ROUTE_ERRORS[exc.errno]
except KeyError:
success = False
error = None
if exc.errno == EHOSTUNREACH or exc.errno == EAGAIN:
success = False
raise
return success
def _update_entry(self, frames):
"""
NOT USED - FOR FUTURE
Update routing table entries.
:param frames:
:return:
"""
if len(frames) > 6:
sender = bytes(frames[0])
routing_table = bytes(frames[7])
routing_table = jsonapi.loads(routing_table)
_log.debug("ROUTING SERVICE Ext routing TABLE: {0}, MY {1} ".format(routing_table, self._routing_table))
for vip_id in routing_table:
if vip_id in self._routing_table:
if vip_id != self._my_vip_id:
my_route_list = self._routing_table[vip_id]
if len(routing_table[vip_id]) > 0 and len(routing_table[vip_id]) < len(my_route_list):
my_route_list = [sender]
self._routing_table[vip_id] = my_route_list.extend(routing_table[vip_id])
else:
route_list = [sender]
self._routing_table[vip_id] = route_list.extend(routing_table[vip_id])
_log.debug("ROUTING SERVICE my routing TABLE: {} ".format(self._routing_table))
return True
else:
return False
[docs] def close_external_connections(self):
"""
Close external platform socket connections
:return:
"""
for name in self._instances:
self.disconnect_external_instances(name)