volttron.utils package

class volttron.utils.AbsolutePathFileReloader(filetowatch, callback)[source]

Bases: watchdog.events.PatternMatchingEventHandler

Extends PatternMatchingEvent handler to watch changes to a singlefile/file pattern within volttron home. filetowatch should be path relative to volttron home. For example filetowatch auth.json with watch file <volttron_home>/auth.json. filetowatch *.json will watch all json files in <volttron_home>

on_any_event(event)[source]

Catch-all event handler.

Parameters:event (FileSystemEvent) – The event object representing the file system event.
watchfile
class volttron.utils.VolttronHomeFileReloader(filetowatch, callback)[source]

Bases: watchdog.events.PatternMatchingEventHandler

Extends PatternMatchingEvent handler to watch changes to a singlefile/file pattern within volttron home. filetowatch should be path relative to volttron home. For example filetowatch auth.json with watch file <volttron_home>/auth.json. filetowatch *.json will watch all json files in <volttron_home>

on_any_event(event)[source]

Catch-all event handler.

Parameters:event (FileSystemEvent) – The event object representing the file system event.
volttron.utils.get_hostname()[source]
volttron.utils.get_random_key(length: int = 65) → str[source]

Returns a hex random key of specified length. The length must be > 0 in order for the key to be valid. Raises a ValueError if the length is invalid.

The default length is 65, which is 130 in length when hexlify is run.

Parameters:length
Returns:
volttron.utils.is_ip_private(vip_address)[source]

Determines if the passed vip_address is a private ip address or not.

Parameters:vip_address – A valid ip address.
Returns:True if an internal ip address.

Submodules

volttron.utils.docs module

doc_inherit decorator

Usage:

class Foo(object):
def foo(self):
“Frobber” pass
class Bar(Foo):

@doc_inherit def foo(self):

pass

Now, Bar.foo.__doc__ == Bar().foo.__doc__ == Foo.foo.__doc__ == “Frobber”

class volttron.utils.docs.DocInherit(mthd)[source]

Bases: object

Docstring inheriting method descriptor

The class itself is also used as a decorator

get_no_inst(cls)[source]
get_with_inst(obj, cls)[source]
use_parent_doc(func, source)[source]
volttron.utils.docs.doc_inherit

alias of volttron.utils.docs.DocInherit

volttron.utils.frame_serialization module

volttron.utils.frame_serialization.deserialize_frames(frames: List[zmq.sugar.frame.Frame]) → List[source]
volttron.utils.frame_serialization.serialize_frames(data: List[Any]) → List[zmq.sugar.frame.Frame][source]

volttron.utils.frozendict module

class volttron.utils.frozendict.FrozenDict(*args, **kwargs)[source]

Bases: dict

A wrapper around a dictionary that allows us to “freeze” a dictionary so that we can no longer set values on the object itself. This does that we can’t change the value instance object if its mutable.

freeze()[source]

volttron.utils.persistance module

class volttron.utils.persistance.PersistentDict(filename, flag='c', mode=None, format='pickle', *args, **kwds)[source]

Bases: dict

Persistent dictionary with an API compatible with shelve and anydbm.

The dict is kept in memory, so the dictionary operations run as fast as a regular dictionary.

Write to disk is delayed until close or sync (similar to gdbm’s fast mode).

Input file format is automatically discovered. Output file format is selectable between pickle, json, and csv. All three serialization formats are backed by fast C implementations.

async_sync()[source]

Write dict to disk via worker thread. Don’t mix with sync if it can be helped

close()[source]
sync()[source]

Write dict to disk

volttron.utils.persistance.load_create_store(filename)[source]

volttron.utils.prompt module

volttron.utils.prompt.prompt_response(prompt, valid_answers=None, default=None, echo=True, mandatory=False)[source]

volttron.utils.rmq_config_params module

class volttron.utils.rmq_config_params.RMQConfig[source]

Bases: object

Utility class to read/write RabbitMQ related configuration

admin_pwd
admin_user
amqp_port
amqp_port_ssl
certificate_data
hostname
is_ssl
load_rmq_config(volttron_home=None)[source]

Load RabbitMQ config from VOLTTRON_HOME :param volttron_home: VOLTTRON_HOME path :return:

local_password
local_user
mgmt_port
mgmt_port_ssl
node_name
reconnect_delay()[source]
rmq_home
use_existing_certs
virtual_host
write_rmq_config(volttron_home=None)[source]

Write new config options into $VOLTTRON_HOME/rabbitmq_config.yml :param volttron_home: VOLTTRON_HOME path :return:

volttron.utils.rmq_mgmt module

class volttron.utils.rmq_mgmt.RabbitMQMgmt[source]

Bases: object

build_agent_connection(identity, instance_name)[source]

Check if RabbitMQ user and certs exists for this agent, if not create a new one. Add access control/permissions if necessary. Return connection parameters. :param identity: Identity of agent :param instance_name: instance name of the platform :param is_ssl: Flag to indicate if SSL connection or not :return: Return connection parameters

build_connection_param(rmq_user, ssl_auth=None, retry_attempt=30, retry_delay=2)[source]

Build Pika Connection parameters :param rmq_user: RabbitMQ user :param ssl_auth: If SSL based connection or not :return:

build_remote_connection_param(rmq_user, rmq_address, ssl_auth=None, cert_dir=None, retry_attempt=30, retry_delay=2)[source]

Build Pika Connection parameters :param rmq_user: RabbitMQ user :param ssl_auth: If SSL based connection or not :return:

build_rmq_address(user=None, password=None, host=None, port=None, vhost=None, ssl_auth=None, ssl_params=None)[source]

Build RMQ address for federation or shovel connection :param ssl_auth: :param config: :return:

build_router_connection(identity, instance_name)[source]

Check if RabbitMQ user and certs exists for the router, if not create a new one. Add access control/permissions if necessary. Return connection parameters. :param identity: Identity of agent :param permissions: Configure+Read+Write permissions :param is_ssl: Flag to indicate if SSL connection or not :return:

build_shovel_connection(identity, instance_name, host, port, vhost, is_ssl)[source]

Check if RabbitMQ user and certs exists for this agent, if not create a new one. Add access control/permissions if necessary. Return connection parameters. :param identity: Identity of agent :param instance_name: instance name of the platform :param host: hostname :param port: amqp/amqps port :param vhost: virtual host :param is_ssl: Flag to indicate if SSL connection or not :return: Return connection uri

create_exchange(exchange, properties, vhost=None, ssl_auth=None)[source]

Create a new exchange :param exchange: exchange name :param properties: dict containing properties :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

create_queue(queue, properties, vhost=None, ssl_auth=None)[source]

Create a new queue :param queue: queue :param properties: dict containing properties :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

create_user(user, password=None, tags='administrator', ssl_auth=None)[source]

Create a new RabbitMQ user :param user: Username :param password: Password :param tags: “adminstrator/management” :param ssl_auth: Flag for SSL connection :return:

create_user_with_permissions(user, permissions, ssl_auth=None)[source]

Create RabbitMQ user. Set permissions for it. :param identity: Identity of agent :param permissions: Configure+Read+Write permissions :param is_ssl: Flag to indicate if SSL connection or not :return:

create_vhost(vhost='volttron', ssl_auth=None)[source]

Create a new virtual host :param vhost: virtual host :param ssl_auth :return:

delete_connection(name, ssl_auth=None)[source]

Delete open connection :param name: connection name :param ssl: Flag for SSL connection :return:

delete_exchange(exchange, vhost=None, ssl_auth=None)[source]

Delete a exchange :param exchange: exchange name :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

delete_multiplatform_parameter(component, parameter_name, vhost=None)[source]

Delete a component parameter :param component: component name :param parameter_name: parameter :param vhost: virtual host :return:

delete_parameter(component, parameter_name, vhost=None, ssl_auth=None)[source]

Delete a component parameter :param component: component name :param parameter_name: parameter :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

delete_policy(name, vhost=None, ssl_auth=None)[source]

Delete a policy :param name: policy name :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

delete_queue(queue, user=None, password=None, vhost=None, ssl_auth=None)[source]

Delete a queue :param queue: queue :param vhost: virtual host :return:

delete_user(user, ssl_auth=None)[source]

Delete specific user :param user: user :param ssl_auth: Flag for SSL connection :return:

delete_users_in_bulk(users, ssl_auth=None)[source]

Delete a list of users at once :param users: :param ssl_auth: :return:

delete_vhost(vhost, ssl_auth=None)[source]

Delete a virtual host :param vhost: virtual host :param user: username :param password: password :return:

get_bindings(exchange, ssl_auth=None)[source]

List all bindings in which a given exchange is the source :param exchange: source exchange :param ssl: Flag for SSL connection :return: list of bindings

get_connection(name, ssl_auth=None)[source]

Get status of a connection :param name: connection name :param ssl: Flag for SSL connection :return:

get_connections(vhost=None, ssl_auth=None)[source]

Get all connections :param user: username :param password: password :param vhost: virtual host :return:

get_default_permissions(fq_identity)[source]
get_exchanges(vhost=None, ssl_auth=None)[source]

List all exchanges :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

get_exchanges_with_props(vhost=None, ssl_auth=None)[source]

List all exchanges with properties :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

get_parameter(component, vhost=None, ssl_auth=None)[source]

Get component parameters, namely federation-upstream :param component: component name :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

get_policies(vhost=None, ssl_auth=None)[source]

Get all policies :param vhost: virtual host :param ssl_auth_auth: Flag for ssl_auth connection :return:

get_policy(name, vhost=None, ssl_auth=None)[source]

Get a specific policy :param name: policy name :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

get_queues(user=None, password=None, vhost=None, ssl_auth=None)[source]

Get list of queues :param user: username :param password: password :param vhost: virtual host :param ssl: Flag for SSL connection :return:

get_queues_with_props(vhost=None, ssl_auth=None)[source]

Get properties of all queues :param vhost: virtual host :param ssl: Flag for SSL connection :return:

get_ssl_url_params(user=None)[source]

Return SSL parameter string :return:

get_topic_permissions_for_user(user, vhost=None, ssl_auth=None)[source]

Get permissions for all topics :param user: user :param vhost: :param ssl_auth: Flag for SSL connection :return:

get_user_permissions(user, vhost=None, ssl_auth=None)[source]

Get permissions (configure, read, write) for the user :param user: user :param password: password :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

get_user_props(user, ssl_auth=None)[source]

Get properties of the user :param user: username :param ssl_auth: Flag for SSL connection :return:

get_users(ssl_auth=None)[source]

Get list of all users :param ssl_auth: Flag for SSL connection :return:

get_virtualhost(vhost, ssl_auth=None)[source]

Get properties for this virtual host :param vhost: :param ssl_auth: Flag indicating ssl based connection :return:

get_virtualhosts(ssl_auth=None)[source]
Parameters:ssl_auth
Returns:
init_rabbitmq_setup()[source]
Create a RabbitMQ resources for VOLTTRON instance.
  • Create a new virtual host: default is “volttron”
  • Create a new topic exchange: “volttron”
  • Create alternate exchange: “undeliverable” to capture unrouteable messages
Returns:
is_valid_amqp_port()[source]
is_valid_mgmt_port()[source]
list_channels_for_connection(connection, ssl_auth=None)[source]

List all open channels for a given channel :param connection: connnection name :param ssl: Flag for SSL connection :return:

list_channels_for_vhost(vhost=None, ssl_auth=None)[source]

List all open channels for a given vhost :param vhost: virtual host :param ssl: Flag for SSL connection :return:

set_parameter(component, parameter_name, parameter_properties, vhost=None, ssl_auth=None)[source]

Set parameter on a component :param component: component name (for example, federation-upstream) :param parameter_name: parameter name :param parameter_properties: parameter properties :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

set_policy(name, value, vhost=None, ssl_auth=None)[source]

Set a policy. For example a federation policy :param name: policy name :param value: policy value :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

set_topic_permissions_for_user(permissions, user, vhost=None, ssl_auth=None)[source]

Set read, write permissions for a topic and user :param permissions: dict containing exchange name and read/write permissions {“exchange”:”volttron”, read: “.*”, write: “^__pubsub__”} :param user: username :param ssl_auth: Flag for SSL connection :return:

set_user_permissions(permissions, user, vhost=None, ssl_auth=None)[source]

Set permissions for the user :param permissions: dict containing configure, read and write settings :param user: username :param password: password :param vhost: virtual host :param ssl_auth: Flag for SSL connection :return:

volttron.utils.rmq_setup module

RabbitMQ setup script to 1. setup single instance of RabbitMQ VOLTTRON 2. Federation 3. Shovel

exception volttron.utils.rmq_setup.RabbitMQSetupAlreadyError[source]

Bases: BaseException

exception volttron.utils.rmq_setup.RabbitMQStartError[source]

Bases: BaseException

volttron.utils.rmq_setup.check_rabbit_status(rmq_home=None, env=None)[source]
volttron.utils.rmq_setup.is_file_readable(file_path)[source]
volttron.utils.rmq_setup.prompt_port(default_port, prompt)[source]
volttron.utils.rmq_setup.prompt_shovels(vhome)[source]

Prompt for shovel configuration and save in rabbitmq_shovel_config.yml :return:

volttron.utils.rmq_setup.prompt_upstream_servers(vhome)[source]

Prompt for upstream server configurations and save in rabbitmq_federation_config.yml :return:

volttron.utils.rmq_setup.restart_ssl(rmq_home, env=None)[source]

Runs rabbitmqctl eval “ssl:stop(), ssl:start().” to make rmq reload ssl certificates. Client connection will get dropped and client should reconnect. :param rmq_home: :param env: Environment to run the RabbitMQ command. :return:

volttron.utils.rmq_setup.setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_name=None, rmq_conf_file=None, env=None)[source]

Setup VOLTTRON instance to run with RabbitMQ message bus. :param setup_type:

single - Setup to run as single instance federation - Setup to connect multiple VOLTTRON instances as

a federation

shovel - Setup shovels to forward local messages to remote instances

:param verbose :param prompt :raises RabbitMQSetupAlreadyError

volttron.utils.rmq_setup.start_rabbit(rmq_home, env=None)[source]

Start RabbitMQ server.

The function assumes that rabbitmq.conf in rmq_home/etc/rabbitmq is setup before this funciton is called.

If the function cannot detect that rabbit was started within roughly 60 seconds then class:RabbitMQStartError will be raised.

Parameters:
  • rmq_home – RabbitMQ installation path
  • env – Environment to start RabbitMQ with.
Raises:

RabbitMQStartError

volttron.utils.rmq_setup.stop_rabbit(rmq_home, env=None, quite=False)[source]

Stop RabbitMQ Server :param rmq_home: RabbitMQ installation path :param env: Environment to run the RabbitMQ command. :param quite: :return:

volttron.utils.rmq_setup.write_env_file(rmq_config, conf_file, env=None)[source]

Write rabbitmq-env.conf file :param conf_file: :param env: Environment to get the RABBITMQ_CONF_ENV_FILE out of. :param rmq_config: :return:

volttron.utils.valid_uuid module

volttron.utils.valid_uuid.validate_uuid4(uuid_string)[source]

Validate that a UUID string is in fact a valid uuid4. Happily, the uuid module does the actual checking for us. It is vital that the ‘version’ kwarg be passed to the UUID() call, otherwise any 32-character hex string is considered valid.