volttron.platform.vip package

VIP - VOLTTRON™ Interconnect Protocol implementation

See https://volttron.readthedocs.io/en/develop/core_services/messagebus/VIP/VIP-Overview.html for protocol specification.

This module is useful for using VIP outside of gevent. Please understand that ZeroMQ sockets are not thread-safe and care must be used when using across threads (or avoided all together). There is no locking around the state as there is with the gevent version in the green sub-module.

class volttron.platform.vip.BaseConnection(url, identity, instance_name)[source]

Bases: object

Base connection class for message bus connection.

class volttron.platform.vip.Socket(context=None, socket_type=5, shadow=None)[source]

Bases: volttron.platform.vip.socket._Socket, zmq.sugar.socket.Socket

Submodules

volttron.platform.vip.externalrpcservice module

class volttron.platform.vip.externalrpcservice.ExternalRPCService(socket, routing_service, *args, **kwargs)[source]

Bases: object

Class to manage routing of RPC calls between external platforms and internal agents(peers).

handle_subsystem(frames)[source]
EXT_RPC subsystem handler on the server end.

:frames list of frames :type frames list

volttron.platform.vip.green module

VIP - VOLTTRON™ Interconnect Protocol implementation

See https://volttron.readthedocs.io/en/develop/core_services/messagebus/VIP/VIP-Overview.html for protocol specification.

This module is for use within gevent. It provides some locking around send operations to protect the VIP state. It should be safe to use a single socket in multiple greenlets without any kind of locking.

class volttron.platform.vip.green.BaseRouter(context=None, default_user_id=None)[source]

Bases: volttron.platform.vip.router.BaseRouter

class volttron.platform.vip.green.Socket(*args, **kwargs)[source]

Bases: volttron.platform.vip.socket._Socket, zmq.green.core._Socket

volttron.platform.vip.keydiscovery module

exception volttron.platform.vip.keydiscovery.DiscoveryError[source]

Bases: Exception

Raised when a different volttron central tries to register.

class volttron.platform.vip.keydiscovery.KeyDiscoveryAgent(address, serverkey, identity, external_address_config, setup_mode, bind_web_address, *args, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

Class to get server key, instance name and vip address of external/remote platforms

startup(sender, **kwargs)[source]

Try to get platform discovery info of all the remote platforms. If unsuccessful, setup events to try again later :param sender: caller :param kwargs: optional arguments :return:

volttron.platform.vip.proxy_zmq_router module

class volttron.platform.vip.proxy_zmq_router.ZMQProxyRouter(address, identity, zmq_router, *args, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

Proxy ZMQ based router agent is implemented for backward compatibility with ZeroMQ based message bus. In a single instance setup, either ZeroMQ or RabbitMQ based message bus will be running and all the agents will be using the same message bus. But in multi-platform setup, some instances maybe running with RabbitMQ message bus and others with ZeroMQ message bus. The Proxy router agent is implemented to manage the routing between local and external instances in such cases.

Please note, if all instances in multi-platform setup are RabbitMQ based, then RabbitMQ federation/shovel need to be used.

on_stop(sender, **kwargs)[source]

Stop the ZMQ router :param sender: :param kwargs: :return:

outbound_request_handler(ch, method, props, body)[source]

Handler for receiving external platform PubSub/RPC requests from internal agents. It then calls external PubSub/RPC router handler to forward the request to external platform. :param ch: channel :param method: contains the routing key :param props: message properties :param body: message body :return:

outbound_response_handler(ch, method, props, body)[source]

Message received from internal agent to send to remote agent in ZMQ VIP message format. :param ch: channel :param method: contains routing key :param props: message properties like VIP header information :param body: message :return:

publish_callback(peer, sender, bus, topic, headers, message)[source]

Callback method registered with local message bus to receive PubSub messages subscribed by external platform agents. PubSub component of router will route the message to appropriate external platform subscribers. :return:

rpc_message_handler(ch, method, props, body)[source]
Parameters:
  • ch
  • method
  • props
  • body
Returns:

startup(sender, **kwargs)[source]
On startup, it does the following:
  • Start ZMQ Router loop.
  • Establish RMQ queue bindings to handle routing of messages

between internal and external agents.

Parameters:
  • sender
  • kwargs
Returns:

vip_loop()[source]

Infinite VIP loop to receive and send messages over ZMQ message bus. :return:

volttron.platform.vip.pubsubservice module

class volttron.platform.vip.pubsubservice.ProtectedPubSubTopics[source]

Bases: object

Simple class to contain protected pubsub topics

add(topic, capabilities)[source]
get(topic)[source]
get_topic_caps()[source]
class volttron.platform.vip.pubsubservice.PubSubService(socket, protected_topics, routing_service, *args, **kwargs)[source]

Bases: object

add_rabbitmq_agent(agent)[source]
external_platform_add(instance_name)[source]
external_platform_drop(instance_name)[source]
handle_subsystem(frames, user_id='')[source]
Handler for incoming pubsub frames. It checks operation frame and directs it for appropriate action handler.

:param frames list of frames :type frames list :param user_id user id of the publishing agent. This is required for protected topics check. :type user_id UTF-8 encoded User-Id property :returns: response frame to be sent back to the sender :rtype: list

Return Values:

response frame to be sent back to the sender

peer_add(peer)[source]
peer_drop(peer, **kwargs)[source]

Drop/Remove subscriptions related to the peer as it is no longer reachable/available. :param peer agent to be dropped :type peer str :param **kwargs optional arguments :type pointer to arguments

publish_callback(peer, sender, bus, topic, headers, message)[source]

Callback method to receive PubSub messages from internal RabbitMQ message bus and send it to external platform subscribers over ZMQ message bus. :param peer: pubsub :param sender: publisher :param bus: bus :param topic: publisher topic :param headers: message header :param message: message body :return:

volttron.platform.vip.pubsubwrapper module

class volttron.platform.vip.pubsubwrapper.PubSubWrapper(identity, **kwargs)[source]

Bases: volttron.platform.vip.agent.Agent

PubSubWrapper Agent acts as a wrapper agent for PubSub subsystem when connected to remote platform that which is using old pubsub (RPC based implementation). When it receives PubSub requests from remote platform, - calls the appropriate method of new platform. - returns the result back

add_bus(name)[source]
onsetup(sender, **kwargs)[source]
volttron.platform.vip.pubsubwrapper.decode_peer(peer)[source]
volttron.platform.vip.pubsubwrapper.encode_peer(peer)[source]

volttron.platform.vip.rmq_connection module

class volttron.platform.vip.rmq_connection.RMQConnection(url, identity, instance_name, reconnect_delay=30, vc_url=None)[source]

Bases: volttron.platform.vip.BaseConnection

Connection class for RabbitMQ message bus. 1. It maintains connection with RabbitMQ broker using Pika library APIs 2. Translates from VIP message format to RabbitMQ message format and visa-versa 3. Sends and receives messages using Pika library APIs

add_on_channel_close_callback()[source]

This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.

close_connection(linger=None)[source]

This method closes the connection to RabbitMQ. :return:

connect(connection_callback=None, connection_error_callback=None)[source]

Connect to RabbitMQ broker. Save the callback method to be invoked after connection steps are completed. :param connection_callback: :param connection_error_callback: :return:

disconnect()[source]

Disconnect from channel i.e, stop consuming from the channel :return:

on_bind_ok(unused_frame)[source]

Callback method invoked by Pika when VIP queue bind has completed. At this point we will start consuming messages by calling start_consuming. :param unused_frame: The Queue.BindOk response frame :return:

on_cancel_ok()[source]

Callback method invoked by Pika when RabbitMQ acknowledges the cancellation of a consumer. Next step is to close the channel. :return:

on_channel_closed(channel, reply_code, reply_text)[source]

Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, we’ll close the connection to shutdown the object.

Parameters:
  • pika.channel.Channel – The closed channel
  • reply_code (int) – The numeric reason the channel was closed
  • reply_text (str) – The text reason the channel was closed
on_channel_open(channel)[source]

This method is invoked by pika when channel has been opened. Declare VIP queue to handle messages :param new_channel: new channel object :return:

on_connection_closed(connection, reply_code, reply_text)[source]

Try to reconnect to the broker after few seconds :param connection: connection object :param reply_code: Connection Code :param reply_text: Connection reply message :return:

on_connection_open(unused_connection)[source]

This method is invoked by pika when connection has been opened. :param unused_connection: new connection object :return:

on_open_error(_connection_unused, error_message=None)[source]

Call the registered error handler :param _connection_unused: :param error_message: connection error message :return:

on_queue_declare_ok(method_frame)[source]

Callback method invoked after VIP queue has been declared. Next, we bind the queue to the exchange with VIP routing key. :param method_frame: The Queue.DeclareOk frame :return:

open_connection()[source]

Open a gevent adapter connection. :return:

register(vip_handler, error_handler=None)[source]

Register VIP handler to be invoked to handle incoming messages :param handler: VIP handler callback method :return:

rmq_message_handler(channel, method, props, body)[source]

Message handler for incoming messages. Reformats the incoming messages to VIP message object and hands it over to VIP message handler. :param channel: channel object :param method: method frame - contains routing key :param props: message properties containing VIP details such as

[SENDER, RECIPIENT, PROTO, USER_ID, MSG_ID, SUBSYS,]
Parameters:body – message body
Returns:
send_via_proxy(peer, subsystem, args=None, msg_id='', user='', via=None, flags=0, copy=False, track=False)[source]
send_vip(peer, subsystem, args=None, msg_id='', user='', via=None, flags=0, copy=True, track=False, platform=None)[source]

Send VIP message over RabbitMQ message bus. :param peer: peer :param subsystem: subsytem type :param args: actual message :param msg_id: message id :param user: user :param via: :param flags: unused :param copy: unused :param track: unused :param platform: instance name :return:

send_vip_object(message, flags=0, copy=True, track=False)[source]

Send the VIP message over RabbitMQ message bus.

Parameters:message – VIP message object
Returns:
send_vip_object_via_proxy(vip_object)[source]

Send the VIP object to proxy router agent :param vip_object: VIP message :return:

set_properties(flags)[source]

Set queue properties :param flags: :return:

class volttron.platform.vip.rmq_connection.RMQRouterConnection(url, identity, instance_name, reconnect_delay=30, vc_url=None)[source]

Bases: volttron.platform.vip.rmq_connection.RMQConnection

RabbitMQ message bus connection class for Router module

loop()[source]

Connect to RabbiMQ broker and run infinite loop to listen to incoming messages :return:

on_alternate_queue_bind_ok(unused_frame)[source]

Callback method invoked by Pika when alternate queue bind has completed. At this point we will start consuming messages by calling start_consuming. :param unused_frame: The Queue.BindOk response frame :return:

on_alternate_queue_declare_ok(method_frame)[source]

Callback method invoked after alternate queue has been declared. Next, we bind the queue to the alternate exchange to receive unroutable messages. :param method_frame: The Queue.DeclareOk frame :return:

on_channel_open(channel)[source]

This method is invoked by pika when channel has been opened. Declare VIP queue to handle messages :param new_channel: new channel object :return:

on_open_error(_connection_unused, error_message=None)[source]

Stop the infinite loop and call the registered error handler :param _connection_unused: :param error_message: connection error message :return:

open_connection()[source]

Open asynchronous connection for router/platform :return:

volttron.platform.vip.rmq_router module

volttron.platform.vip.router module

class volttron.platform.vip.router.BaseRouter(context=None, default_user_id=None)[source]

Bases: object

Abstract base class of VIP router implementation.

Router implementers should inherit this class and implement the setup() method to bind to appropriate addresses, set identities, setup authentication, etc, etc. The socket will be created by the start() method, which will then call the setup() method. Once started, the socket may be polled for incoming messages and those messages are handled/routed by calling the route() method. During routing, the issue() method, which may be implemented, will be called to allow for debugging and logging. Custom subsystems may be implemented in the handle_subsystem() method. The socket will be closed when the stop() method is called.

handle_subsystem(frames, user_id)[source]

Handle additional subsystems and provide a response.

This method does nothing by default and may be implemented by subclasses to provide additional subsystems.

frames is a list of zmq.Frame objects with the following elements:

[SENDER, RECIPIENT, PROTOCOL, USER_ID, MSG_ID, SUBSYSTEM, …]

The return value should be None, if the subsystem is unknown, an empty list or False (or other False value) if the message was handled but does not require/generate a response, or a list of containing the following elements:

[RECIPIENT, SENDER, PROTOCOL, USER_ID, MSG_ID, SUBSYSTEM, …]
issue(topic, frames, extra=None)[source]
lookup_user_id(sender, recipient, auth_token)[source]

Find and return a user identifier.

Returns the UTF-8 encoded User-Id property from the sender frame or None if the authenticator did not set the User-Id metadata. May be extended to perform additional lookups.

poll

Returns the underlying socket’s poll method.

poll_sockets()[source]

Called inside run method

Implement this method to poll for sockets for incoming messages.

route(frames)[source]

Route one message and return.

One message is read from the socket and processed. If the recipient is the router (empty recipient), the standard hello and ping subsystems are handled. Other subsystems are sent to handle_subsystem() for processing. Messages destined for other entities are routed appropriately.

run()[source]

Main router loop.

setup()[source]

Called from start() method to setup the socket.

Implement this method to bind the socket, set identities and options, etc.

start()[source]

Create the socket and call setup().

The socket is save in the socket attribute. The setup() method is called at the end of the method to perform additional setup.

stop(linger=1)[source]

Close the socket.

volttron.platform.vip.routingservice module

class volttron.platform.vip.routingservice.RoutingService(socket, context, socket_class, poller, my_addr, instance_name, *args, **kwargs)[source]

Bases: object

This class maintains connection with external platforms.

close_external_connections()[source]

Close external platform socket connections :return:

disconnect_external_instances(instance_name)[source]

Close socket connections to remote platform :param instance_name: :return:

get_connected_platforms()[source]

Get list of connected instances :return:

get_name_for_identity(identity)[source]

Get instance name :param identity: platform identity :return:

handle_monitor_event(monitor_sock)[source]

Monitor external platform socket connections :param monitor_sock: socket to monitor :return:

handle_subsystem(frames)[source]
Handler for incoming routing table frames. It calls appropriate action handler based on operation request.

:param frames list of frames :type frames list :returns: response frame to be sent back to the sender :rtype: list

Return Values:

response frame to be sent back to the sender

my_instance_name()[source]

Name of my instance/platform. :return:

register(type, handler)[source]

Used by PubSubService to register for onconnect and ondisconnect handlers. :param type: on_connect/on_disconnect :param handler: handler function :return:

send_external(instance_name, frames)[source]

Send frames to external instance :param instance_name: name of remote instance :param frames: frames to send :return:

volttron.platform.vip.socket module

VIP - VOLTTRON™ Interconnect Protocol implementation

See https://volttron.readthedocs.io/en/develop/core_services/messagebus/VIP/VIP-Overview.html for protocol specification.

This file contains an abstract _Socket class which should be extended to provide missing features for different threading models. The standard Socket class is defined in __init__.py. A gevent-friendly version is defined in green.py.

class volttron.platform.vip.socket.Address(address, **defaults)[source]

Bases: object

Parse and hold a URL-style address.

The URL given by address may contain optional query string parameters and a URL fragment which, if given, will be interpreted as the socket identity for the given address.

Valid parameters:
server: Server authentication method; must be one of NULL,
PLAIN, or CURVE.

domain: ZAP domain for server authentication. serverkey: Encoded CURVE server public key. secretkey: Encoded CURVE secret key. publickey: Encoded CURVE public key. ipv6: Boolean value indicating use of IPv6. username: Username to use with PLAIN authentication. password: Password to use with PLAIN authentication.

bind(sock, bind_fn=None)[source]

Extended zmq.Socket.bind() to include options in the address.

connect(sock, connect_fn=None)[source]

Extended zmq.Socket.connect() to include options in the address.

qs
reset(sock)[source]
exception volttron.platform.vip.socket.ProtocolError[source]

Bases: Exception

Error raised for invalid use of Socket object.

class volttron.platform.vip.socket.Message(**kwargs)[source]

Bases: object

Message object returned form Socket.recv_vip_object().

volttron.platform.vip.socket.nonblocking(sock)[source]

volttron.platform.vip.tracking module

Utilities for tracking VIP message statistics at the router.

class volttron.platform.vip.tracking.Tracker[source]

Bases: object

Object for sharing data between the router and control objects.

disable()[source]

Disable tracking.

enable()[source]

Enable tracking.

hit(topic, frames, extra)[source]

Increment counters for given topic and frames.

reset()[source]

Reset all counters to default values and set start time.

volttron.platform.vip.zmq_connection module

class volttron.platform.vip.zmq_connection.ZMQConnection(url, identity, instance_name, context)[source]

Bases: volttron.platform.vip.BaseConnection

Maintains ZMQ socket connection

bind()[source]
close_connection(linger=5)[source]

This method closes ZeroMQ socket

connect(callback=None)[source]
disconnect()[source]
open_connection(type)[source]
recv_vip_object(flags=0, copy=True, track=False)[source]
register(handler)[source]
send_vip(peer, subsystem, args=None, msg_id: bytes = b'', user=b'', via=None, flags=0, copy=True, track=False)[source]
send_vip_object(message, flags=0, copy=True, track=False)[source]
set_properties(flags)[source]