Source code for volttron.utils.rmq_setup

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2019, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}

"""
RabbitMQ setup script to
1. setup single instance of RabbitMQ VOLTTRON
2. Federation
3. Shovel
"""

import argparse
import logging
import os
from socket import getfqdn
from shutil import copy
import gevent
import yaml

from . rmq_mgmt import RabbitMQMgmt
from . rmq_config_params import RMQConfig

from volttron.platform import certs
from volttron.platform import get_home
from volttron.platform.agent.utils import (store_message_bus_config,
                                           execute_command)
from volttron.utils.prompt import prompt_response, y, y_or_n
from volttron.platform.agent.utils import get_platform_instance_name

_log = logging.getLogger(os.path.basename(__file__))


[docs]class RabbitMQStartError(BaseException): pass
[docs]class RabbitMQSetupAlreadyError(BaseException): pass
def _start_rabbitmq_without_ssl(rmq_config, conf_file, env=None): """ Check if basic RabbitMQ configuration is available. Start RabbitMQ in non ssl mode so that we can login as guest to create volttron users, exchanges, and vhost. :return: """ if not rmq_config.volttron_home: rmq_config.volttron_home = get_home() rmq_home = rmq_config.rmq_home if not rmq_home: rmq_home = os.path.join(os.path.expanduser("~"), "rabbitmq_server/rabbitmq_server-3.7.7") if os.path.exists(rmq_home): os.environ['RABBITMQ_HOME'] = rmq_home else: print("\nERROR:\n" "Missing key 'rmq_home' in RabbitMQ config and RabbitMQ is " "not installed in default path: \n" "~/rabbitmq_server/rabbitmq_server-3.7.7 \n" "Please set the correct RabbitMQ installation path in " "rabbitmq_config.yml") exit(1) else: if not os.path.exists(rmq_home) or not os.path.exists(os.path.join( rmq_home, 'sbin/rabbitmq-server')): print("\nERROR:\n" "Invalid rmq-home value ({}). Please fix rmq-home " "in {} and rerun this script".format( rmq_home, rmq_config.volttron_rmq_config)) exit(1) else: os.environ['RABBITMQ_HOME'] = rmq_home # attempt to stop stop_rabbit(rmq_home, env, quite=True) if rmq_config.amqp_port != 5672 and rmq_config.mgmt_port != 15672: # If ports if non ssl ports are not default write a rabbitmq.conf before # restarting new_conf = """listeners.tcp.default = {} management.listener.port = {}""".format(rmq_config.amqp_port, rmq_config.mgmt_port) with open(conf_file, 'w+') as r_conf: r_conf.write(new_conf) # Need to write env file even when starting without ssl mode since env file will provide the right node name, # tcp port and conf file to use. This is essential for tests as we don't use default port, paths or node name. # TODO - we should probably not use default node name even for non test use case to avoid node name class when # you have more than one instance of RMQ on the same machine write_env_file(rmq_config, conf_file, env) # Start RabbitMQ server _log.info("Starting RabbitMQ server") start_rabbit(rmq_config.rmq_home, env=env)
[docs]def write_env_file(rmq_config, conf_file, env=None): """ Write rabbitmq-env.conf file :param conf_file: :param env: Environment to get the RABBITMQ_CONF_ENV_FILE out of. :param rmq_config: :return: """ if not env: env = os.environ # If there is a custom node name then we need to write a env file, set amqp port in this env file, and # point to conf file path if rmq_config.node_name != 'rabbit': nodebase = os.path.dirname(conf_file) # Creating a custom node name with custome port. Create a env file and add entry to point to conf file in # the env file env_entries = """NODENAME={} NODE_PORT={} MNESIA_DIR={} CONFIG_FILE={} LOG_BASE={} PLUGINS_EXPAND_DIR={} PID_FILE={} RABBITMQ_GENERATED_CONFIG_DIR={}""".format(rmq_config.node_name, rmq_config.amqp_port, os.path.join(nodebase, 'mnesia'), conf_file, os.path.join(nodebase, 'logs'), os.path.join(nodebase, 'plugins-expand'), os.path.join(nodebase, 'rabbitmq.pid'), os.path.join(nodebase, 'generated_config')) with open(env.get('RABBITMQ_CONF_ENV_FILE'), 'w+') as env_conf: env_conf.write(env_entries)
def _create_federation_setup(admin_user, admin_password, is_ssl, vhost, vhome): """ Creates a RabbitMQ federation of multiple VOLTTRON instances based on rabbitmq config. - Builds AMQP/S address for each upstream server - Creates upstream servers - Adds policy to make "volttron" exchange "federated". :return: """ rmq_mgmt = RabbitMQMgmt() federation_config_file = os.path.join(vhome, 'rabbitmq_federation_config.yml') federation_config = _read_config_file(federation_config_file) federation = federation_config.get('federation-upstream') if federation: ssl_params = None if is_ssl: ssl_params = rmq_mgmt.get_ssl_url_params() for host, upstream in federation.items(): try: name = "upstream-{vhost}-{host}".format(vhost=upstream['virtual-host'], host=host) _log.debug("Upstream Server: {name} ".format(name=name)) address = rmq_mgmt.build_rmq_address(admin_user, admin_password, host, upstream['port'], upstream['virtual-host'], is_ssl, ssl_params) prop = dict(vhost=vhost, component="federation-upstream", name=name, value={"uri": address}) rmq_mgmt.set_parameter('federation-upstream', name, prop, vhost) policy_name = 'volttron-federation' policy_value = {"pattern": "^volttron", "definition": {"federation-upstream-set": "all"}, "priority": 0, "apply-to": "exchanges"} rmq_mgmt.set_policy(policy_name, policy_value, vhost) except KeyError as ex: _log.error("Federation setup did not complete. " "Missing Key {key} in upstream config " "{upstream}".format(key=ex, upstream=upstream)) def _create_shovel_setup(instance_name, local_host, port, vhost, vhome, is_ssl): """ Create RabbitMQ shovel based on the RabbitMQ config :return: """ shovel_config_file = os.path.join(vhome, 'rabbitmq_shovel_config.yml') shovel_config = _read_config_file(shovel_config_file) shovels = shovel_config.get('shovel', {}) rmq_mgmt = RabbitMQMgmt() ssl_params = None _log.debug("shovel config: {}".format(shovel_config)) try: for remote_host, shovel in shovels.items(): pubsub_config = shovel.get("pubsub", {}) _log.debug("shovel parameters: {}".format(shovel)) for identity, topics in pubsub_config.items(): # Build source address src_uri = rmq_mgmt.build_shovel_connection(identity, instance_name, local_host, port, vhost, is_ssl) # Build destination address dest_uri = rmq_mgmt.build_shovel_connection(identity, instance_name, remote_host, shovel['port'], shovel['virtual-host'], is_ssl) if not isinstance(topics, list): topics = [topics] for topic in topics: _log.debug("Creating shovel to forward PUBSUB topic {}".format( topic)) name = "shovel-{host}-{topic}".format(host=remote_host, topic=topic) routing_key = "__pubsub__.{instance}.{topic}.#".format( instance=instance_name, topic=topic) prop = dict(vhost=vhost, component="shovel", name=name, value={"src-uri": src_uri, "src-exchange": "volttron", "src-exchange-key": routing_key, "dest-uri": dest_uri, "dest-exchange": "volttron"} ) _log.debug("shovel property: {}".format(prop)) rmq_mgmt.set_parameter("shovel", name, prop) rpc_config = shovel.get("rpc", {}) _log.debug("RPC config: {}".format(rpc_config)) for remote_instance, agent_ids in rpc_config.items(): for ids in agent_ids: local_identity = ids[0] remote_identity = ids[1] src_uri = rmq_mgmt.build_shovel_connection(local_identity, instance_name, local_host, port, vhost, is_ssl) dest_uri = rmq_mgmt.build_shovel_connection(local_identity, instance_name, remote_host, shovel['port'], shovel['virtual-host'], is_ssl) _log.info("Creating shovel to make RPC call to remote Agent" ": {}".format(remote_identity)) name = "shovel-{host}-{identity}".format(host=remote_host, identity=local_identity) routing_key = "{instance}.{identity}.#".format( instance=remote_instance, identity=remote_identity) prop = dict(vhost=vhost, component="shovel", name=name, value={"src-uri": src_uri, "src-exchange": "volttron", "src-exchange-key": routing_key, "dest-uri": dest_uri, "dest-exchange": "volttron"} ) rmq_mgmt.set_parameter("shovel", name, prop) except KeyError as exc: _log.error("Shovel setup did not complete. Missing Key: {}".format( exc)) def _setup_for_ssl_auth(rmq_config, rmq_conf_file, env=None): """ Utility method to create 1. Root CA 2. RabbitMQ server certificates (public and private) 3. RabbitMQ config with SSL setting 4. Admin user to connect to RabbitMQ management Web interface :param instance_name: Instance name :return: """ _log.info('\nChecking for CA certificate\n') root_ca_name, server_name, admin_client_name = \ certs.Certs.get_admin_cert_names(rmq_config.instance_name) vhome = get_home() white_list_dir = os.path.join(vhome, "certificates", "whitelist") if not os.path.exists(white_list_dir): os.mkdir(white_list_dir) _create_certs(rmq_config, admin_client_name, server_name) # if all was well, create the rabbitmq.conf file for user to copy # /etc/rabbitmq and update VOLTTRON_HOME/rabbitmq_config.json new_conf = """listeners.tcp.default = {tcp_port} management.listener.port = {mgmt_port} listeners.ssl.default = {amqp_port_ssl} ssl_options.cacertfile = {ca} ssl_options.certfile = {server_cert} ssl_options.keyfile = {server_key} ssl_options.verify = verify_peer ssl_options.fail_if_no_peer_cert = true auth_mechanisms.1 = EXTERNAL ssl_cert_login_from = common_name ssl_options.versions.1 = tlsv1.2 ssl_options.versions.2 = tlsv1.1 ssl_options.versions.3 = tlsv1 management.listener.port = {mgmt_port_ssl} management.listener.ssl = true management.listener.ssl_opts.cacertfile = {ca} management.listener.ssl_opts.certfile = {server_cert} management.listener.ssl_opts.keyfile = {server_key} trust_store.directory={ca_dir} trust_store.refresh_interval=0""".format( tcp_port=rmq_config.amqp_port, mgmt_port=rmq_config.mgmt_port, mgmt_port_ssl=rmq_config.mgmt_port_ssl, amqp_port_ssl=rmq_config.amqp_port_ssl, ca=rmq_config.crts.cert_file(rmq_config.crts.trusted_ca_name), server_cert=rmq_config.crts.cert_file(server_name), server_key=rmq_config.crts.private_key_file(server_name), ca_dir=white_list_dir ) with open(rmq_conf_file, 'w') as rconf: rconf.write(new_conf) write_env_file(rmq_config, rmq_conf_file, env) # Stop server, move new config file with ssl params, start server stop_rabbit(rmq_config.rmq_home, env=env) start_rabbit(rmq_config.rmq_home, env=env) default_vhome = os.path.abspath( os.path.normpath( os.path.expanduser( os.path.expandvars('~/.volttron')))) additional_to_do = "" if vhome != default_vhome: additional_to_do = "\n - Please set environment variable " \ "VOLTTRON_HOME " \ "to {vhome} before starting volttron" msg = "\n\n#######################\n\nSetup complete for volttron home " \ "{vhome} " \ "with instance name={}\nNotes:" + additional_to_do + \ "\n - On production environments, restrict write access to {" \ "root_ca} to only admin user. For example: " \ "sudo chown root {root_ca} and {trusted_ca}" \ "\n - A new admin user was created with user name: {} and " \ "password={}.\n You could change this user's password by logging " \ "into https://{}:{}/ Please update {} if you change password" \ "\n\n#######################" _log.info(msg.format(rmq_config.instance_name, rmq_config.admin_user, rmq_config.admin_pwd, rmq_config.hostname, rmq_config.mgmt_port_ssl, rmq_config.volttron_rmq_config, root_ca=rmq_config.crts.cert_file( rmq_config.crts.root_ca_name), trusted_ca=rmq_config.crts.cert_file( rmq_config.crts.trusted_ca_name), vhome=vhome)) def _create_certs(rmq_config, admin_client_name, server_cert_name): """ Utility method to create certificates :param client_cert_name: client (agent) cert name :param instance_ca_name: VOLTTRON instance name :param server_cert_name: RabbitMQ sever name :return: """ crts = rmq_config.crts if rmq_config.crts.ca_exists(): if rmq_config.use_existing_certs is None: attributes = crts.get_cert_subject(crts.root_ca_name) prompt_str = "Found {} with the following attributes. \n {} \n Do " \ "you want to use this certificate: ".format( crts.cert_file(crts.root_ca_name), attributes) prompt = prompt_response(prompt_str, valid_answers=y_or_n, default='Y') if prompt in y: return elif rmq_config.use_existing_certs: return if rmq_config.use_existing_certs is None: prompt_str = "\n**IMPORTANT:**\nCreating a new Root CA will " \ "invalidate " \ "any existing agent certificate and hence any existing " \ "certificates will be deleted. If you have federation " \ "or shovel setup, you will have to share the new " \ "certificate with the other volttron instance(s) for " \ "the shovel/federation connections to work. " \ "Do you want to create a new Root CA." prompt = prompt_response(prompt_str, valid_answers=y_or_n, default='N') if prompt not in y: return # We are creating new CA cert so delete any existing certs. The user has # already been warned for d in [crts.cert_dir, crts.private_dir, crts.ca_db_dir]: for x in os.listdir(d): os.remove(os.path.join(d, x)) _log.info('\n Creating root ca for volttron instance: {}'.format( crts.cert_file(crts.root_ca_name))) cert_data = rmq_config.certificate_data if not cert_data or \ not (all(k in cert_data for k in ['country', 'state', 'location', 'organization', 'organization-unit', 'common-name']) or all( k in cert_data for k in ['ca-public-key', 'ca-private-key'])): print( "\nERROR:\n" "No certificate data found in {} or certificate data is " "incomplete. certificate-data should either contain all " "the details necessary to create a self signed CA or " "point to the file path of an existing CA's public and " "private key. Please refer to example " "config at examples/configurations/rabbitmq/rabbitmq_config.yml" " to see list of ssl certificate data to be configured".format( rmq_config.volttron_rmq_config)) exit(1) if cert_data.get('ca-public-key'): # using existing CA copy(cert_data['ca-public-key'], rmq_config.crts.cert_file(crts.root_ca_name)) copy(cert_data['ca-private-key'], rmq_config.crts.private_key_file(crts.root_ca_name)) else: data = {'C': cert_data.get('country'), 'ST': cert_data.get('state'), 'L': cert_data.get('location'), 'O': cert_data.get('organization'), 'OU': cert_data.get('organization-unit'), 'CN': cert_data.get('common-name')} _log.info("Creating root ca with the following info: {}".format(data)) crts.create_root_ca(overwrite=False, **data) # create a copy of the root ca as instance_name-trusted-cas.crt. copy(rmq_config.crts.cert_file(crts.root_ca_name), rmq_config.crts.cert_file(crts.trusted_ca_name)) crts.create_signed_cert_files(server_cert_name, cert_type='server', fqdn=rmq_config.hostname) crts.create_signed_cert_files(admin_client_name, cert_type='client') def _verify_and_save_instance_ca(rmq_config, instance_ca_path, instance_ca_key): """ Save instance CA in VOLTTRON HOME :param instance_ca_path: :param instance_ca_key: :return: """ found = False if instance_ca_path and os.path.exists(instance_ca_path) and \ instance_ca_key and os.path.exists(instance_ca_key): found = True # TODO: check content of file # openssl crl2pkcs7 -nocrl -certfile volttron2-ca.crt | openssl pkcs7 -print_certs -noout # this should list subject, issuer of both root CA and # intermediate CA rmq_config.crts.save_cert(instance_ca_path) rmq_config.crts.save_key(instance_ca_key) return found
[docs]def setup_rabbitmq_volttron(setup_type, verbose=False, prompt=False, instance_name=None, rmq_conf_file=None, env=None): """ Setup VOLTTRON instance to run with RabbitMQ message bus. :param setup_type: single - Setup to run as single instance federation - Setup to connect multiple VOLTTRON instances as a federation shovel - Setup shovels to forward local messages to remote instances :param verbose :param prompt :raises RabbitMQSetupAlreadyError """ if not instance_name: instance_name = get_platform_instance_name(prompt=True) # Store config this is checked at startup store_message_bus_config(message_bus='rmq', instance_name=instance_name) rmq_config = RMQConfig() if verbose: _log.setLevel(logging.DEBUG) _log.debug("verbose set to True") _log.debug(get_home()) logging.getLogger("requests.packages.urllib3.connectionpool" "").setLevel(logging.DEBUG) else: _log.setLevel(logging.INFO) logging.getLogger("requests.packages.urllib3.connectionpool" "").setLevel(logging.WARN) if prompt: # ignore any existing rabbitmq_config.yml in vhome. Prompt user and # generate a new rabbitmq_config.yml _create_rabbitmq_config(rmq_config, setup_type) # Load either the newly created config or config passed try: rmq_config.load_rmq_config() except (yaml.parser.ParserError, yaml.scanner.ScannerError, yaml.YAMLError) as exc: _log.error("Error: YAML file cannot parsed properly. Check the contents of the file") return exc except IOError as exc: _log.error("Error opening {}. Please create a rabbitmq_config.yml " "file in your volttron home. If you want to point to a " "volttron home other than {} please set it as the " "environment variable VOLTTRON_HOME".format( rmq_config.volttron_rmq_config, rmq_config.volttron_home)) _log.error("\nFor single setup, configuration file must at least " "contain host and ssl certificate details. For federation " "and shovel setup, config should contain details about the " "volttron instance with which communication needs " "to be established. Please refer to example config file " "at examples/configurations/rabbitmq/rabbitmq_config.yml") raise if not rmq_conf_file: rmq_conf_file = os.path.join(rmq_config.rmq_home, "etc/rabbitmq/rabbitmq.conf") invalid = True if setup_type in ["all", "single"]: invalid = False # Verify that the rmq_conf_file if exists is removed before continuing. message = f"A rabbitmq conf file {rmq_conf_file} already exists.\n" \ "In order for setup to proceed it must be removed.\n" if os.path.exists(rmq_conf_file): print(message) while os.path.exists(rmq_conf_file): value = prompt_response(f"Remove {rmq_conf_file}? ", y_or_n) if value in y: os.remove(rmq_conf_file) _start_rabbitmq_without_ssl(rmq_config, rmq_conf_file, env=env) _log.debug("Creating rabbitmq virtual hosts and required users for " "volttron") # Create local RabbitMQ setup - vhost, exchange etc. # should be called after _start_rabbitmq_without_ssl rmq_mgmt = RabbitMQMgmt() success = rmq_mgmt.init_rabbitmq_setup() if success and rmq_config.is_ssl: _setup_for_ssl_auth(rmq_config, rmq_conf_file, env=env) # Create utility scripts script_path = os.path.dirname(os.path.realpath(__file__)) src_home = os.path.dirname(os.path.dirname(script_path)) start_script = os.path.join(src_home, 'start-rabbitmq') with open(start_script, 'w+') as f: f.write(os.path.join(rmq_config.rmq_home, 'sbin', 'rabbitmq-server') + ' -detached') f.write(os.linesep) f.write("sleep 5") # give a few seconds for all plugins to be ready os.chmod(start_script, 0o755) stop_script = os.path.join(src_home, 'stop-rabbitmq') with open(stop_script, 'w+') as f: f.write(os.path.join(rmq_config.rmq_home, 'sbin', 'rabbitmqctl') + ' stop') os.chmod(stop_script, 0o755) # symlink to rmq log log_name = os.path.join(src_home, 'rabbitmq.log') if os.path.lexists(log_name): os.unlink(log_name) os.symlink(os.path.join(rmq_config.rmq_home, 'var/log/rabbitmq', rmq_config.node_name + "@" + rmq_config.hostname.split('.')[0] + ".log"), log_name) if setup_type in ["all", "federation"]: # Create a multi-platform federation setup invalid = False _create_federation_setup(rmq_config.admin_user, rmq_config.admin_pwd, rmq_config.is_ssl, rmq_config.virtual_host, rmq_config.volttron_home) if setup_type in ["all", "shovel"]: # Create shovel setup invalid = False if rmq_config.is_ssl: port = rmq_config.amqp_port_ssl else: port = rmq_config.amqp_port _create_shovel_setup(rmq_config.instance_name, rmq_config.hostname, port, rmq_config.virtual_host, rmq_config.volttron_home, rmq_config.is_ssl) if invalid: _log.error("Unknown option. Exiting....")
def _create_rabbitmq_config(rmq_config, setup_type): """ Prompt user for required details and create a rabbitmq_config.yml file in volttron home :param setup_type: type of rmq setup - single, federation, shovel or all """ if setup_type == 'single' or setup_type == 'all': if os.path.exists(rmq_config.volttron_rmq_config): prompt = "rabbitmq_config.yml exists in {} Do you wish to " \ "use this file to configure the instance".format( get_home()) prompt = prompt_response(prompt, valid_answers=y_or_n, default='Y') if prompt in y: return else: _log.info("New input data will be used to overwrite existing " "{}".format(rmq_config.volttron_rmq_config)) # TODO: ideally we can load existing file and set values in it # default and the compare what changed. If rmq-home changed # and existing config those should get cleared. If cert details # get changed - overwrite ca, server, admin cert and delete all # other certs. rmq_config.rmq_home = _prompt_rmq_home(rmq_config.rabbitmq_server) prompt = 'Fully qualified domain name of the system:' new_host = prompt_response(prompt, default=getfqdn()) rmq_config.hostname = new_host rmq_config.is_ssl = True if rmq_config.is_ssl: prompt = "Would you like to create a new self signed root CA" \ "certificate for this instance:" prompt = prompt_response(prompt, valid_answers=y_or_n, default='Y') if prompt in y: cert_data = {} print( "\nPlease enter the following details for root CA certificate") prompt = '\tCountry:' cert_data['country'] = prompt_response(prompt, default='US') prompt = '\tState:' cert_data['state'] = prompt_response(prompt, mandatory=True) prompt = '\tLocation:' cert_data['location'] = prompt_response(prompt, mandatory=True) prompt = '\tOrganization:' cert_data['organization'] = prompt_response(prompt, mandatory=True) prompt = '\tOrganization Unit:' cert_data['organization-unit'] = prompt_response(prompt, mandatory=True) cert_data['common-name'] = rmq_config.instance_name + '-root-ca' rmq_config.certificate_data = cert_data else: error = True while error: while True: prompt = 'Enter the root CA certificate public key file:' root_public = prompt_response(prompt, mandatory=True) if is_file_readable(root_public): break while True: prompt =\ 'Enter the root CA certificate private key file:' root_key = prompt_response(prompt, mandatory=True) if is_file_readable(root_key): break if certs.Certs.validate_key_pair(root_public, root_key): error = False cert_data = { 'ca-public-key': root_public, 'ca-private-key': root_key } rmq_config.certificate_data = cert_data else: print("Error: Given public key and private key do not " "match or is invalid. public and private key " "files should be PEM encoded and private key " "should use RSA encryption") prompt = "Do you want to use default values for RabbitMQ home, " \ "ports, and virtual host:" prompt = prompt_response(prompt, valid_answers=y_or_n, default='Y') if prompt in y: rmq_config.amqp_port = '5672' rmq_config.mgmt_port = '15672' rmq_config.amqp_port_ssl = '5671' rmq_config.mgmt_port_ssl = '15671' rmq_config.virtual_host = 'volttron' else: rmq_config.virtual_host = _prompt_vhost(rmq_config.config_opts) prompt = 'AMQP port for RabbitMQ:' rmq_config.amqp_port = prompt_port(5672, prompt) prompt = 'http port for the RabbitMQ management plugin:' rmq_config.mgmt_port = prompt_port(15672, prompt) if rmq_config.is_ssl: prompt = 'AMQPS (SSL) port RabbitMQ address:' rmq_config.amqp_port_ssl = prompt_port(5671, prompt) prompt = 'https port for the RabbitMQ management plugin:' rmq_config.mgmt_port_ssl = prompt_port(15671, prompt) # Write the new config options back to config file rmq_config.write_rmq_config() if setup_type in ['federation', 'all']: # if option was all then config_opts would be not null # if this was called with just setup_type = federation, load existing # config so that we don't overwrite existing federation configs prompt_upstream_servers(rmq_config.volttron_home) if setup_type in ['shovel', 'all']: # if option was all then config_opts would be not null # if this was called with just setup_type = shovel, load existing # config so that we don't overwrite existing list prompt_shovels(rmq_config.volttron_home)
[docs]def is_file_readable(file_path): file_path = os.path.expanduser(os.path.expandvars(file_path)) if os.path.exists(file_path) and os.access(file_path, os.R_OK): return True else: print("\nInvalid file path. Path does not exists or is not readable") return False
[docs]def prompt_port(default_port, prompt): valid_port = False while not valid_port: port = prompt_response(prompt, default=default_port) try: port = int(port) return port except ValueError: _log.error("Invalid port. Port should be an integer")
def _prompt_rmq_home(rabbitmq_server): default_dir = os.path.join(os.path.expanduser("~"), "rabbitmq_server", rabbitmq_server) valid_dir = False while not valid_dir: prompt = 'RabbitMQ server home:' rmq_home = prompt_response(prompt, default=default_dir) if os.path.exists(rmq_home) and \ os.path.exists(os.path.join(rmq_home, 'sbin/rabbitmq-server')): return rmq_home else: _log.error("Invalid install directory. Unable to find {} ".format( os.path.join(rmq_home, 'sbin/rabbitmq-server'))) return None def _prompt_vhost(config_opts): vhost = config_opts.get('virtual-host', 'volttron') prompt = 'Name of the virtual host under which RabbitMQ ' \ 'VOLTTRON will be running:' new_vhost = prompt_response(prompt, default=vhost) return new_vhost def _prompt_ssl(): prompt = prompt_response('\nEnable SSL Authentication:', valid_answers=y_or_n, default='Y') if prompt in y: return True else: return False
[docs]def prompt_upstream_servers(vhome): """ Prompt for upstream server configurations and save in rabbitmq_federation_config.yml :return: """ federation_config_file = os.path.join(vhome, 'rabbitmq_federation_config.yml') if os.path.exists(federation_config_file): federation_config = _read_config_file(federation_config_file) else: federation_config = {} upstream_servers = federation_config.get('federation-upstream', {}) prompt = 'Number of upstream servers to configure:' count = prompt_response(prompt, default=1) count = int(count) i = 0 for i in range(0, count): prompt = 'Hostname of the upstream server: ' host = prompt_response(prompt, mandatory=True) prompt = 'Port of the upstream server: ' port = prompt_response(prompt, default=5671) prompt = 'Virtual host of the upstream server: ' vhost = prompt_response(prompt, default='volttron') upstream_servers[host] = {'port': port, 'virtual-host': vhost} federation_config['federation-upstream'] = upstream_servers _write_to_config_file(federation_config_file, federation_config)
[docs]def prompt_shovels(vhome): """ Prompt for shovel configuration and save in rabbitmq_shovel_config.yml :return: """ shovel_config_file = os.path.join(vhome, 'rabbitmq_shovel_config.yml') if os.path.exists(shovel_config_file): shovel_config = _read_config_file(shovel_config_file) else: shovel_config = {} shovels = shovel_config.get('shovels', {}) prompt = 'Number of destination hosts to configure:' count = prompt_response(prompt, default=1) count = int(count) i = 0 try: for i in range(0, count): prompt = 'Hostname of the destination server: ' host = prompt_response(prompt, mandatory=True) prompt = 'Port of the destination server: ' port = prompt_response(prompt, default=5671) prompt = 'Virtual host of the destination server: ' vhost = prompt_response(prompt, default='volttron') shovels[host] = {'port': port, 'virtual-host': vhost} prompt = prompt_response('\nDo you want shovels for ' 'PUBSUB communication? ', valid_answers=y_or_n, default='N') if prompt in y: prompt = 'Name of the agent publishing the topic:' agent_id = prompt_response(prompt, mandatory=True) prompt = 'List of PUBSUB topics to publish to ' \ 'this remote instance (comma seperated)' topics = prompt_response(prompt, mandatory=True) topics = topics.split(",") shovels[host]['pubsub'] = {agent_id : topics} prompt = prompt_response( '\nDo you want shovels for RPC communication? ', valid_answers=y_or_n, default='N') if prompt in y: prompt = 'Name of the remote instance: ' remote_instance = prompt_response(prompt, mandatory=True) prompt = 'Number of Local to Remote pairs:' agent_count = prompt_response(prompt, default=1) agent_count = int(agent_count) agent_ids = [] for r in range(0, agent_count): prompt = 'Local agent that wants to make RPC' local_agent_id = prompt_response(prompt, mandatory=True) prompt = 'Remote agent on which to make the RPC' remote_agent_id = prompt_response(prompt, mandatory=True) agent_ids.append([local_agent_id, remote_agent_id]) shovels[host]['rpc'] = {remote_instance: agent_ids} except ValueError as e: _log.error("Invalid choice in the configuration: {}".format(e)) else: shovel_config['shovel'] = shovels _write_to_config_file(shovel_config_file, shovel_config)
def _read_config_file(filename): data = {} try: with open(filename, 'r') as yaml_file: data = yaml.safe_load(yaml_file) except IOError as exc: _log.error("Error reading from file: {}".format(filename)) except yaml.YAMLError as exc: _log.error("Yaml Error: {}".format(filename)) return data def _write_to_config_file(filename, data): try: with open(filename, 'w') as yaml_file: yaml.dump(data, yaml_file, default_flow_style=False) except IOError as exc: _log.error("Error writing to file: {}".format(filename)) except yaml.YAMLError as exc: _log.error("Yaml Error: {}".format(filename))
[docs]def stop_rabbit(rmq_home, env=None, quite=False): """ Stop RabbitMQ Server :param rmq_home: RabbitMQ installation path :param env: Environment to run the RabbitMQ command. :param quite: :return: """ try: if env: _log.debug("Stop RMQ: {}".format(env.get('VOLTTRON_HOME'))) cmd = [os.path.join(rmq_home, "sbin/rabbitmqctl"), "stop"] execute_command(cmd, env=env) gevent.sleep(2) if not quite: _log.info("**Stopped rmq server") except RuntimeError as e: if not quite: raise e
[docs]def restart_ssl(rmq_home, env=None): """ Runs rabbitmqctl eval "ssl:stop(), ssl:start()." to make rmq reload ssl certificates. Client connection will get dropped and client should reconnect. :param rmq_home: :param env: Environment to run the RabbitMQ command. :return: """ cmd = [os.path.join(rmq_home, "sbin/rabbitmqctl"), "eval", "ssl:stop(), ssl:start()."] execute_command(cmd, err_prefix="Error reloading ssl certificates", env=env, logger=_log)
[docs]def check_rabbit_status(rmq_home=None, env=None): status = True if not rmq_home: rmq_cfg = RMQConfig() rmq_home = rmq_cfg.rmq_home status_cmd = [os.path.join(rmq_home, "sbin/rabbitmqctl"), "shovel_status"] try: execute_command(status_cmd, env=env) except RuntimeError: status = False return status
[docs]def start_rabbit(rmq_home, env=None): """ Start RabbitMQ server. The function assumes that rabbitmq.conf in rmq_home/etc/rabbitmq is setup before this funciton is called. If the function cannot detect that rabbit was started within roughly 60 seconds then `class:RabbitMQStartError` will be raised. :param rmq_home: RabbitMQ installation path :param env: Environment to start RabbitMQ with. :raises RabbitMQStartError: """ # rabbitmqctl status returns true as soon as the erlang vm and does not wait # for all the plugins and database to be initialized and rmq is ready to # accept incoming connection. # Nor does rabbitmqctl wait, rabbitmqctl await_online_nodes work for this # purpose. shovel_status comes close... status_cmd = [os.path.join(rmq_home, "sbin/rabbitmqctl"), "shovel_status"] start_cmd = [os.path.join(rmq_home, "sbin/rabbitmq-server"), "-detached"] i = 0 started = False start = True while not started: try: # we expect this call to raise a RuntimeError until the rabbitmq server # is up and running. execute_command(status_cmd, env=env) if not start: # if we have attempted started already gevent.sleep(1) # give a second just to be sure started = True _log.info("Rmq server at {} is running at ".format(rmq_home)) except RuntimeError as e: # First time this exception block we are going to attempt to start # the rabbitmq server. if start: _log.debug("Rabbitmq is not running. Attempting to start") msg = "Error starting rabbitmq at {}".format(rmq_home) # attempt to start once execute_command(start_cmd, env=env, err_prefix=msg, logger=_log) start = False else: if i > 60: # if more than 60 tries we assume something failed raise RabbitMQStartError("Unable to verify rabbitmq server has started in a resonable time.") else: # sleep for another 2 seconds and check status again gevent.sleep(2) i = i + 2
if __name__ == "__main__": parser = argparse.ArgumentParser( formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('setup_type', help='Instance type: all, single, federation or shovel') parser.add_argument('prompt', default=False, help='Instance type: all, single, federation or shovel') args = parser.parse_args() try: setup_rabbitmq_volttron(args.setup_type, args.prompt) except KeyboardInterrupt: _log.info("Exiting setup process")