Source code for volttron.platform.control

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2017, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}

from __future__ import absolute_import, print_function

import argparse
import collections
import json
import logging
import logging.handlers
import os
import re
import shutil
import sys
import tempfile
import traceback
import uuid
import hashlib
import tarfile
import subprocess

import gevent
import gevent.event
from volttron.platform.vip.agent.subsystems.query import Query
from volttron.platform import get_home, get_address

from .agent import utils
from .agent.known_identities import CONTROL_CONNECTION, CONFIGURATION_STORE
from .vip.agent import Agent as BaseAgent, Core, RPC
from . import aip as aipmod
from . import config
from .jsonrpc import RemoteError
from .vip.agent.errors import VIPError
from .auth import AuthEntry, AuthFile, AuthException
from .keystore import KeyStore, KnownHostsStore

try:
    import volttron.restricted
except ImportError:
    HAVE_RESTRICTED = False
else:
    from volttron.restricted import cgroups

    HAVE_RESTRICTED = True

_stdout = sys.stdout
_stderr = sys.stderr

_log = logging.getLogger(os.path.basename(sys.argv[0])
                         if __name__ == '__main__' else __name__)

CHUNK_SIZE = 4096


[docs]class ControlService(BaseAgent): def __init__(self, aip, *args, **kwargs): tracker = kwargs.pop('tracker', None) kwargs["enable_store"] = False super(ControlService, self).__init__(*args, **kwargs) self._aip = aip self._tracker = tracker @Core.receiver('onsetup') def _setup(self, sender, **kwargs): if not self._tracker: return self.vip.rpc.export(lambda: self._tracker.enabled, 'stats.enabled') self.vip.rpc.export(self._tracker.enable, 'stats.enable') self.vip.rpc.export(self._tracker.disable, 'stats.disable') self.vip.rpc.export(lambda: self._tracker.stats, 'stats.get')
[docs] @RPC.export def serverkey(self): q = Query(self.core) pk = q.query('serverkey').get(timeout=1) del q return pk
[docs] @RPC.export def clear_status(self, clear_all=False): self._aip.clear_status(clear_all)
[docs] @RPC.export def agent_status(self, uuid): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) return self._aip.agent_status(uuid)
[docs] @RPC.export def agent_name(self, uuid): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) return self._aip.agent_name(uuid)
[docs] @RPC.export def agent_version(self, uuid): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) return self._aip.agent_version(uuid)
[docs] @RPC.export def agent_versions(self): return self._aip.agent_versions()
[docs] @RPC.export def status_agents(self): return self._aip.status_agents()
[docs] @RPC.export def start_agent(self, uuid): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) self._aip.start_agent(uuid)
[docs] @RPC.export def stop_agent(self, uuid): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) identity = self.agent_vip_identity(uuid) self._aip.stop_agent(uuid) #Send message to router that agent is shutting down frames = [bytes(identity)] self.core.socket.send_vip(b'', 'agentstop', frames, copy=False)
[docs] @RPC.export def restart_agent(self, uuid): self.stop_agent(uuid) self.start_agent(uuid)
[docs] @RPC.export def shutdown(self): self._aip.shutdown()
[docs] @RPC.export def stop_platform(self): # XXX: Restrict call as it kills the process self.core.socket.send_vip(b'', b'quit')
[docs] @RPC.export def list_agents(self): tag = self._aip.agent_tag priority = self._aip.agent_priority return [{'name': name, 'uuid': uuid, 'tag': tag(uuid), 'priority': priority(uuid), 'identity': self.agent_vip_identity(uuid)} for uuid, name in self._aip.list_agents().iteritems()]
[docs] @RPC.export def tag_agent(self, uuid, tag): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) if not isinstance(tag, (type(None), basestring)): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'tag';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) return self._aip.tag_agent(uuid, tag)
[docs] @RPC.export def remove_agent(self, uuid, remove_auth=True): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) self._aip.remove_agent(uuid, remove_auth=remove_auth)
[docs] @RPC.export def prioritize_agent(self, uuid, priority='50'): if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) if not isinstance(priority, (type(None), basestring)): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string or null for 'priority';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) self._aip.prioritize_agent(uuid, priority)
[docs] @RPC.export def agent_vip_identity(self, uuid): """ Lookup the agent's vip identity based upon it's uuid. @param uuid: @return: """ if not isinstance(uuid, basestring): identity = bytes(self.vip.rpc.context.vip_message.peer) raise TypeError("expected a string for 'uuid';" "got {!r} from identity: {}".format( type(uuid).__name__, identity)) return self._aip.agent_identity(uuid)
[docs] @RPC.export def get_all_agent_publickeys(self): """ RPC method to retrieve the public keys of all of the agents installed on the VOLTTRON instance. This method does not differentiate between running and not running agents. .. note:: This method will only retrieve a publickey for an installed agents. It is recommended that dynamic agents use the context of the containing agent's publickey for connections to external instances. :return: mapping of identity to agent publickey :rtype: dict """ id_map = self._aip.get_agent_identity_to_uuid_mapping() retmap = {} for id, uuid in id_map.items(): retmap[id] = self._aip.get_agent_keystore(uuid).public return retmap
[docs] @RPC.export def install_agent_local(self, filename, vip_identity=None, publickey=None, secretkey=None): return self._aip.install_agent(filename, vip_identity=vip_identity, publickey=publickey, secretkey=secretkey)
[docs] @RPC.export def install_agent(self, filename, channel_name, vip_identity=None, publickey=None, secretkey=None): """ Installs an agent on the instance instance. The installation of an agent through this method involves sending the binary data of the agent file through a channel. The following example is the protocol for sending the agent across the wire: Example Protocol: .. code-block:: python # client creates channel to this agent (control) channel = agent.vip.channel('control', 'channel_name') # Begin sending data sha512 = hashlib.sha512() while True: request, file_offset, chunk_size = channel.recv_multipart() # Control has all of the file. Send hash for for it to verify. if request == b'checksum': channel.send(hash) assert request == b'fetch' # send a chunk of the file file_offset = int(file_offset) chunk_size = int(chunk_size) file.seek(file_offset) data = file.read(chunk_size) sha512.update(data) channel.send(data) agent_uuid = agent_uuid.get(timeout=10) # close and delete the channel channel.close(linger=0) del channel :param:string:filename: The name of the agent packaged file that is being written. :param:string:channel_name: The name of the channel that the agent file will be sent on. :param:string:publickey: Encoded public key the installed agent will use :param:string:secretkey: Encoded secret key the installed agent will use """ peer = bytes(self.vip.rpc.context.vip_message.peer) channel = self.vip.channel(peer, channel_name) try: tmpdir = tempfile.mkdtemp() path = os.path.join(tmpdir, os.path.basename(filename)) store = open(path, 'wb') file_offset = 0 sha512 = hashlib.sha512() try: while True: # request a chunk of the file channel.send_multipart([ b'fetch', bytes(file_offset), bytes(CHUNK_SIZE) ]) # get the requested data with gevent.Timeout(30): data = channel.recv() sha512.update(data) store.write(data) size = len(data) file_offset += size # let volttron-ctl know that we have everything if size < CHUNK_SIZE: channel.send_multipart([b'checksum', b'', b'']) with gevent.Timeout(30): checksum = channel.recv() assert checksum == sha512.digest() break except AssertionError: _log.warning("Checksum mismatch on received file") raise except gevent.Timeout: _log.warning("Gevent timeout trying to receive data") raise finally: store.close() _log.debug('Closing channel on server') channel.close(linger=0) del channel agent_uuid = self._aip.install_agent(path, vip_identity=vip_identity, publickey=publickey, secretkey=secretkey) return agent_uuid finally: shutil.rmtree(tmpdir, ignore_errors=True)
[docs]def log_to_file(file, level=logging.WARNING, handler_class=logging.StreamHandler): '''Direct log output to a file (or something like one).''' handler = handler_class(file) handler.setLevel(level) handler.setFormatter(utils.AgentFormatter( '%(asctime)s %(composite_name)s %(levelname)s: %(message)s')) root = logging.getLogger() root.setLevel(level) root.addHandler(handler)
Agent = collections.namedtuple('Agent', 'name tag uuid vip_identity') def _list_agents(aip): return [Agent(name, aip.agent_tag(uuid), uuid, aip.agent_identity(uuid)) for uuid, name in aip.list_agents().iteritems()]
[docs]def escape(pattern): strings = re.split(r'([*?])', pattern) if len(strings) == 1: return re.escape(pattern), False return ''.join('.*' if s == '*' else '.' if s == '?' else s if s in [r'\?', r'\*'] else re.escape(s) for s in strings), True
[docs]def filter_agents(agents, patterns, opts): by_name, by_tag, by_uuid = opts.by_name, opts.by_tag, opts.by_uuid for pattern in patterns: regex, _ = escape(pattern) result = set() if not (by_uuid or by_name or by_tag): reobj = re.compile(regex) matches = [agent for agent in agents if reobj.match(agent.uuid)] if len(matches) == 1: result.update(matches) else: reobj = re.compile(regex + '$') if by_uuid: result.update( agent for agent in agents if reobj.match(agent.uuid)) if by_name: result.update( agent for agent in agents if reobj.match(agent.name)) if by_tag: result.update( agent for agent in agents if reobj.match(agent.tag or '')) yield pattern, result
[docs]def filter_agent(agents, pattern, opts): return next(filter_agents(agents, [pattern], opts))[1]
[docs]def backup_agent_data(output_filename, source_dir): with tarfile.open(output_filename, "w:gz") as tar: tar.add(source_dir, arcname=os.path.sep) #os.path.basename(source_dir))
[docs]def restore_agent_data(source_file, output_dir): # Open tarfile with tarfile.open(mode="r:gz", fileobj=file(source_file)) as tar: tar.extractall(output_dir)
[docs]def find_agent_data_dir(opts, agent_uuid): agent_data_dir = None for x in os.listdir(opts.aip.agent_dir(agent_uuid)): if x.endswith("agent-data"): agent_data_dir = os.path.join(opts.aip.agent_dir(agent_uuid), x) break return agent_data_dir
[docs]def upgrade_agent(opts): publickey = None secretkey = None identity = opts.vip_identity if not identity: raise ValueError("Missing required VIP IDENTITY option") identity_to_uuid = opts.aip.get_agent_identity_to_uuid_mapping() agent_uuid = identity_to_uuid.get(identity, None) backup_agent_file = "/tmp/{}.tar.gz".format(agent_uuid) if agent_uuid: agent_data_dir = find_agent_data_dir(opts, agent_uuid) if agent_data_dir: backup_agent_data(backup_agent_file, agent_data_dir) keystore = opts.aip.get_agent_keystore(agent_uuid) publickey = keystore.public secretkey = keystore.secret _stdout.write('Removing previous version of agent "{}"\n' .format(identity)) opts.connection.call('remove_agent', agent_uuid, remove_auth=False) else: _stdout.write(('Could not find agent with VIP IDENTITY "{}". ' 'Installing as new agent\n').format(identity)) if secretkey is None or publickey is None: publickey = None secretkey = None def restore_agent_data(agent_uuid): # if we are upgrading transfer the old data on. if os.path.exists(backup_agent_file): new_agent_data_dir = find_agent_data_dir(opts, new_agent_uuid) restore_agent_data(backup_agent_file, new_agent_data_dir) os.remove(backup_agent_file) install_agent(opts, publickey=publickey, secretkey=secretkey, callback=restore_agent_data)
[docs]def install_agent(opts, publickey=None, secretkey=None, callback=None): aip = opts.aip filename = opts.wheel tag = opts.tag vip_identity = opts.vip_identity if opts.vip_address.startswith('ipc://'): _log.info("Installing wheel locally without channel subsystem") filename = config.expandall(filename) agent_uuid = opts.connection.call('install_agent_local', filename, vip_identity=vip_identity, publickey=publickey, secretkey=secretkey) if tag: opts.connection.call('tag_agent', agent_uuid, tag) else: try: _log.debug('Creating channel for sending the agent.') channel_name = str(uuid.uuid4()) channel = opts.connection.server.vip.channel('control', channel_name) _log.debug('calling control install agent.') agent_uuid = opts.connection.call_no_get('install_agent', filename, channel_name, vip_identity=vip_identity, publickey=publickey, secretkey=secretkey) _log.debug('Sending wheel to control') sha512 = hashlib.sha512() with open(filename, 'rb') as wheel_file_data: while True: # get a request with gevent.Timeout(60): request, file_offset, chunk_size = channel.recv_multipart() if request == b'checksum': channel.send(sha512.digest()) break assert request == b'fetch' # send a chunk of the file file_offset = int(file_offset) chunk_size = int(chunk_size) wheel_file_data.seek(file_offset) data = wheel_file_data.read(chunk_size) sha512.update(data) channel.send(data) agent_uuid = agent_uuid.get(timeout=10) except Exception as exc: if opts.debug: traceback.print_exc() _stderr.write( '{}: error: {}: {}\n'.format(opts.command, exc, filename)) return 10 else: if tag: opts.connection.call('tag_agent', agent_uuid, tag) finally: _log.debug('closing channel') channel.close(linger=0) del channel name = opts.connection.call('agent_name', agent_uuid) _stdout.write('Installed {} as {} {}\n'.format(filename, agent_uuid, name)) # Need to use a callback here rather than a return value. I am not 100% # sure why this is the reason for allowing our tests to pass. if callback: callback(agent_uuid)
[docs]def tag_agent(opts): agents = filter_agent(_list_agents(opts.aip), opts.agent, opts) if len(agents) != 1: if agents: msg = 'multiple agents selected' else: msg = 'agent not found' _stderr.write( '{}: error: {}: {}\n'.format(opts.command, msg, opts.agent)) return 10 agent, = agents if opts.tag: _stdout.write('Tagging {} {}\n'.format(agent.uuid, agent.name)) opts.aip.tag_agent(agent.uuid, opts.tag) elif opts.remove: if agent.tag is not None: _stdout.write( 'Removing tag for {} {}\n'.format(agent.uuid, agent.name)) opts.aip.tag_agent(agent.uuid, None) else: if agent.tag is not None: _stdout.writelines([agent.tag, '\n'])
[docs]def remove_agent(opts, remove_auth=True): agents = _list_agents(opts.aip) for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) elif len(match) > 1 and not opts.force: _stderr.write( '{}: error: pattern returned multiple agents: {}\n'.format( opts.command, pattern)) _stderr.write( 'Use -f or --force to force removal of multiple agents.\n') return 10 for agent in match: _stdout.write('Removing {} {}\n'.format(agent.uuid, agent.name)) opts.connection.call('remove_agent', agent.uuid, remove_auth=remove_auth)
def _calc_min_uuid_length(agents): n = 0 for agent1 in agents: for agent2 in agents: if agent1 is agent2: continue common_len = len(os.path.commonprefix([agent1.uuid, agent2.uuid])) if common_len > n: n = common_len return n + 1
[docs]def list_agents(opts): def get_priority(agent): return opts.aip.agent_priority(agent.uuid) or '' _show_filtered_agents(opts, 'PRI', get_priority)
[docs]def status_agents(opts): agents = {agent.uuid: agent for agent in _list_agents(opts.aip)} status = {} for uuid, name, stat in opts.connection.call('status_agents'): try: agent = agents[uuid] except KeyError: agents[uuid] = agent = Agent(name, None, uuid) status[uuid] = stat agents = agents.values() def get_status(agent): try: pid, stat = status[agent.uuid] except KeyError: pid = stat = None if stat is not None: return str(stat) if pid: return 'running [{}]'.format(pid) return '' def get_health(agent): try: return opts.connection.server.vip.rpc.call(agent.vip_identity, 'health.get_status_json').get(timeout=4)[ 'status'] except VIPError: return '' _show_filtered_agents_status(opts, get_status, get_health, agents)
[docs]def agent_health(opts): agents = {agent.uuid: agent for agent in _list_agents(opts.aip)}.values() agents = get_filtered_agents(opts, agents) if not agents: _stderr.write('No installed Agents found\n') return agent = agents.pop() try: _stderr.write(json.dumps( opts.connection.server.vip.rpc.call(agent.vip_identity, 'health.get_status_json').get(timeout=4), indent=4) + '\n' ) except VIPError: print("Agent {} is not running on the Volttron platform.".format(agent.uuid))
[docs]def clear_status(opts): opts.connection.call('clear_status', opts.clear_all)
[docs]def enable_agent(opts): agents = _list_agents(opts.aip) for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) for agent in match: _stdout.write('Enabling {} {} with priority {}\n'.format( agent.uuid, agent.name, opts.priority)) opts.aip.prioritize_agent(agent.uuid, opts.priority)
[docs]def disable_agent(opts): agents = _list_agents(opts.aip) for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) for agent in match: priority = opts.aip.agent_priority(agent.uuid) if priority is not None: _stdout.write( 'Disabling {} {}\n'.format(agent.uuid, agent.name)) opts.aip.prioritize_agent(agent.uuid, None)
[docs]def start_agent(opts): call = opts.connection.call agents = _list_agents(opts.aip) for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) for agent in match: pid, status = call('agent_status', agent.uuid) if pid is None or status is not None: _stdout.write( 'Starting {} {}\n'.format(agent.uuid, agent.name)) call('start_agent', agent.uuid)
[docs]def stop_agent(opts): call = opts.connection.call agents = _list_agents(opts.aip) for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) for agent in match: pid, status = call('agent_status', agent.uuid) if pid and status is None: _stdout.write( 'Stopping {} {}\n'.format(agent.uuid, agent.name)) call('stop_agent', agent.uuid)
[docs]def restart_agent(opts): stop_agent(opts) start_agent(opts)
[docs]def run_agent(opts): call = opts.connection.call for directory in opts.directory: call('run_agent', directory)
[docs]def shutdown_agents(opts): opts.connection.call('shutdown') if opts.platform: opts.connection.notify('stop_platform')
[docs]def create_cgroups(opts): try: cgroups.setup(user=opts.user, group=opts.group) except ValueError as exc: _stderr.write('{}: error: {}\n'.format(opts.command, exc)) return os.EX_NOUSER
def _send_agent(connection, peer, path): wheel = open(path, 'rb') channel = connection.vip.channel(peer) def send(): try: # Wait for peer to open compliment channel channel.recv() while True: data = wheel.read(8192) channel.send(data) if not data: break # Wait for peer to signal all data received channel.recv() finally: wheel.close() channel.close(linger=0) result = connection.vip.rpc.call( peer, 'install_agent', os.path.basename(path), channel.name) task = gevent.spawn(send) result.rawlink(lambda glt: task.kill(block=False)) return result
[docs]def send_agent(opts): connection = opts.connection for wheel in opts.wheel: uuid = _send_agent(connection.server, connection.peer, wheel).get() connection.call('start_agent', uuid) _stdout.write('Agent {} started as {}\n'.format(wheel, uuid))
[docs]def gen_keypair(opts): keypair = KeyStore.generate_keypair_dict() _stdout.write('{}\n'.format(json.dumps(keypair, indent=2)))
[docs]def add_server_key(opts): store = KnownHostsStore() store.add(opts.host, opts.serverkey) _stdout.write('server key written to {}\n'.format(store.filename))
[docs]def list_known_hosts(opts): store = KnownHostsStore() entries = store.load() if entries: _print_two_columns(entries, 'HOST', 'CURVE KEY') else: _stdout.write('No entries in {}\n'.format(store.filename))
[docs]def remove_known_host(opts): store = KnownHostsStore() store.remove(opts.host) _stdout.write('host "{}" removed from {}\n'.format(opts.host, store.filename))
[docs]def do_stats(opts): call = opts.connection.call if opts.op == 'status': _stdout.write( '%sabled\n' % ('en' if call('stats.enabled') else 'dis')) elif opts.op in ['dump', 'pprint']: stats = call('stats.get') if opts.op == 'pprint': import pprint pprint.pprint(stats, _stdout) else: _stdout.writelines([str(stats), '\n']) else: call('stats.' + opts.op) _stdout.write( '%sabled\n' % ('en' if call('stats.enabled') else 'dis'))
[docs]def show_serverkey(opts): """ write serverkey to standard out. return 0 if success, 1 if false """ q = Query(opts.connection.server.core) pk = q.query('serverkey').get(timeout=2) del q if pk is not None: _stdout.write('%s\n' % pk) return 0 return 1
def _get_auth_file(volttron_home): path = os.path.join(volttron_home, 'auth.json') return AuthFile(path) def _print_two_columns(dict_, key_name, value_name): padding = 2 key_lengths = [len(key) for key in dict_] + [len(key_name)] max_key_len = max(key_lengths) + padding _stdout.write('{}{}{}\n'.format(key_name, ' ' * (max_key_len - len(key_name)), value_name)) _stdout.write('{}{}{}\n'.format('-' * len(key_name), ' ' * (max_key_len - len(key_name)), '-' * len(value_name))) for key in sorted(dict_): value = dict_[key] if isinstance(value, list): value = sorted(value) _stdout.write('{}{}{}\n'.format(key, ' ' * (max_key_len - len(key)), value))
[docs]def list_auth(opts, indices=None): auth_file = _get_auth_file(opts.volttron_home) entries = auth_file.read_allow_entries() print_out = [] if entries: for index, entry in enumerate(entries): if indices is None or index in indices: _stdout.write('\nINDEX: {}\n'.format(index)) _stdout.write( '{}\n'.format(json.dumps(vars(entry), indent=2))) else: _stdout.write('No entries in {}\n'.format(auth_file.auth_file))
def _ask_for_auth_fields(domain=None, address=None, user_id=None, capabilities=None, roles=None, groups=None, mechanism='CURVE', credentials=None, comments=None, enabled=True, **kwargs): class Asker(object): def __init__(self): self._fields = collections.OrderedDict() def add(self, name, default=None, note=None, callback=lambda x: x, validate=lambda x,y: (True, '')): self._fields[name] = {'note': note, 'default': default, 'callback': callback, 'validate': validate} def ask(self): for name in self._fields: note = self._fields[name]['note'] default = self._fields[name]['default'] callback = self._fields[name]['callback'] validate = self._fields[name]['validate'] if isinstance(default, list): default_str = '{}'.format(','.join(default)) elif default is None: default_str = '' else: default_str = default note = '({}) '.format(note) if note else '' question = '{} {}[{}]: '.format(name, note, default_str) valid = False while not valid: response = raw_input(question).strip() if response == '': response = default if response == 'clear': if _ask_yes_no('Do you want to clear this field?'): response = None valid, msg = validate(response, self._fields) if not valid: _stderr.write('{}\n'.format(msg)) self._fields[name]['response'] = callback(response) return {k: self._fields[k]['response'] for k in self._fields} def to_true_or_false(response): if isinstance(response, basestring): return {'true': True, 'false': False}[response.lower()] return response def is_true_or_false(x, fields): if x is not None: if isinstance(x, bool) or x.lower() in ['true', 'false']: return True, None return False, 'Please enter True or False' def valid_creds(creds, fields): try: mechanism = fields['mechanism']['response'] AuthEntry.valid_credentials(creds, mechanism=mechanism) except AuthException as e: return False, e.message return True, None def valid_mech(mech, fields): try: AuthEntry.valid_mechanism(mech) except AuthException as e: return False, e.message return True, None asker = Asker() asker.add('domain', domain) asker.add('address', address) asker.add('user_id', user_id) asker.add('capabilities', capabilities, 'delimit multiple entries with comma', _comma_split) asker.add('roles', roles, 'delimit multiple entries with comma', _comma_split) asker.add('groups', groups, 'delimit multiple entries with comma', _comma_split) asker.add('mechanism', mechanism, validate=valid_mech) asker.add('credentials', credentials, validate=valid_creds) asker.add('comments', comments) asker.add('enabled', enabled, callback=to_true_or_false, validate=is_true_or_false) return asker.ask() def _comma_split(line): if not isinstance(line, basestring): return line line = line.strip() if not line: return [] return [word.strip() for word in line.split(',')]
[docs]def add_auth(opts): """Add authorization entry. If all options are None, then use interactive 'wizard.' """ fields = { "domain": opts.domain, "address": opts.address, "mechanism": opts.mechanism, "credentials": opts.credentials, "user_id": opts.user_id, "groups": _comma_split(opts.groups), "roles": _comma_split(opts.roles), "capabilities": _comma_split(opts.capabilities), "comments": opts.comments, } if any(fields.values()): # Remove unspecified options so the default parameters are used fields = {k: v for k, v in fields.items() if v} fields['enabled'] = not opts.disabled entry = AuthEntry(**fields) else: # No options were specified, use interactive wizard responses = _ask_for_auth_fields() entry = AuthEntry(**responses) if opts.add_known_host: if entry.address is None: raise ValueError('host (--address) is required when ' '--add-known-host is specified') if entry.credentials is None: raise ValueError('serverkey (--credentials) is required when ' '--add-known-host is specified') opts.host = entry.address opts.serverkey = entry.credentials add_server_key(opts) auth_file = _get_auth_file(opts.volttron_home) try: auth_file.add(entry, overwrite=False) _stdout.write('added entry {}\n'.format(entry)) except AuthException as err: _stderr.write('ERROR: %s\n' % err.message)
def _ask_yes_no(question, default='yes'): yes = set(['yes', 'ye', 'y']) no = set(['no', 'n']) y = 'y' n = 'n' if default in yes: y = 'Y' elif default in no: n = 'N' else: raise ValueError("invalid default answer: '%s'" % default) while True: choice = raw_input('{} [{}/{}] '.format(question, y, n)).lower() if choice == '': choice = default if choice in yes: return True if choice in no: return False _stderr.write("Please respond with 'yes' or 'no'\n")
[docs]def remove_auth(opts): auth_file = _get_auth_file(opts.volttron_home) entry_count = len(auth_file.read_allow_entries()) for i in opts.indices: if i < 0 or i >= entry_count: _stderr.write('ERROR: invalid index {}\n'.format(i)) return _stdout.write('This action will delete the following:\n') list_auth(opts, opts.indices) if not _ask_yes_no('Do you wish to delete?'): return try: auth_file.remove_by_indices(opts.indices) if len(opts.indices) > 1: msg = 'removed entries at indices {}'.format(opts.indices) else: msg = msg = 'removed entry at index {}'.format(opts.indices) _stdout.write(msg + '\n') except AuthException as err: _stderr.write('ERROR: %s\n' % err.message)
[docs]def update_auth(opts): auth_file = _get_auth_file(opts.volttron_home) entries = auth_file.read_allow_entries() try: if opts.index < 0: raise IndexError entry = entries[opts.index] _stdout.write('(For any field type "clear" to clear the value.)\n') response = _ask_for_auth_fields(**entry.__dict__) updated_entry = AuthEntry(**response) auth_file.update_by_index(updated_entry, opts.index) _stdout.write('updated entry at index {}\n'.format(opts.index)) except IndexError: _stderr.write('ERROR: invalid index %s\n' % opts.index) except AuthException as err: _stderr.write('ERROR: %s\n' % err.message)
[docs]def add_role(opts): auth_file = _get_auth_file(opts.volttron_home) roles = auth_file.read()[2] if opts.role in roles: _stderr.write('role "{}" already exists\n'.format(opts.role)) return roles[opts.role] = list(set(opts.capabilities)) auth_file.set_roles(roles) _stdout.write('added role "{}"\n'.format(opts.role))
[docs]def list_roles(opts): auth_file = _get_auth_file(opts.volttron_home) roles = auth_file.read()[2] _print_two_columns(roles, 'ROLE', 'CAPABILITIES')
[docs]def update_role(opts): auth_file = _get_auth_file(opts.volttron_home) roles = auth_file.read()[2] if opts.role not in roles: _stderr.write('role "{}" does not exist\n'.format(opts.role)) return caps = roles[opts.role] if opts.remove: roles[opts.role] = list(set(caps) - set(opts.capabilities)) else: roles[opts.role] = list(set(caps) | set(opts.capabilities)) auth_file.set_roles(roles) _stdout.write('updated role "{}"\n'.format(opts.role))
[docs]def remove_role(opts): auth_file = _get_auth_file(opts.volttron_home) roles = auth_file.read()[2] if opts.role not in roles: _stderr.write('role "{}" does not exist\n'.format(opts.role)) return del roles[opts.role] auth_file.set_roles(roles) _stdout.write('removed role "{}"\n'.format(opts.role))
[docs]def add_group(opts): auth_file = _get_auth_file(opts.volttron_home) groups = auth_file.read()[1] if opts.group in groups: _stderr.write('group "{}" already exists\n'.format(opts.group)) return groups[opts.group] = list(set(opts.roles)) auth_file.set_groups(groups) _stdout.write('added group "{}"\n'.format(opts.group))
[docs]def list_groups(opts): auth_file = _get_auth_file(opts.volttron_home) groups = auth_file.read()[1] _print_two_columns(groups, 'GROUPS', 'ROLES')
[docs]def update_group(opts): auth_file = _get_auth_file(opts.volttron_home) groups = auth_file.read()[1] if opts.group not in groups: _stderr.write('group "{}" does not exist\n'.format(opts.group)) return roles = groups[opts.group] if opts.remove: groups[opts.group] = list(set(roles) - set(opts.roles)) else: groups[opts.group] = list(set(roles) | set(opts.roles)) auth_file.set_groups(groups) _stdout.write('updated group "{}"\n'.format(opts.group))
[docs]def remove_group(opts): auth_file = _get_auth_file(opts.volttron_home) groups = auth_file.read()[1] if opts.group not in groups: _stderr.write('group "{}" does not exist\n'.format(opts.group)) return del groups[opts.group] auth_file.set_groups(groups) _stdout.write('removed group "{}"\n'.format(opts.group))
[docs]def get_filtered_agents(opts, agents=None): if opts.pattern: filtered = set() for pattern, match in filter_agents(agents, opts.pattern, opts): if not match: _stderr.write( '{}: error: agent not found: {}\n'.format(opts.command, pattern)) filtered |= match agents = list(filtered) return agents
def _show_filtered_agents(opts, field_name, field_callback, agents=None): """Provides generic way to filter and display agent information. The agents will be filtered by the provided opts.pattern and the following fields will be displayed: * UUID (or part of the UUID) * agent name * VIP identiy * tag * field_name @param:Namespace:opts: Options from argparse @param:string:field_name: Name of field to display about agents @param:function:field_callback: Function that takes an Agent as an argument and returns data to display @param:list:agents: List of agents to filter and display """ if not agents: agents = _list_agents(opts.aip) agents = get_filtered_agents(opts, agents) if not agents: _stderr.write('No installed Agents found\n') return agents.sort() if not opts.min_uuid_len: n = 36 else: n = max(_calc_min_uuid_length(agents), opts.min_uuid_len) name_width = max(5, max(len(agent.name) for agent in agents)) tag_width = max(3, max(len(agent.tag or '') for agent in agents)) identity_width = max(3, max(len(agent.vip_identity or '') for agent in agents)) fmt = '{} {:{}} {:{}} {:{}} {:>6}\n' _stderr.write( fmt.format(' ' * n, 'AGENT', name_width, 'IDENTITY', identity_width, 'TAG', tag_width, field_name)) for agent in agents: _stdout.write(fmt.format(agent.uuid[:n], agent.name, name_width, agent.vip_identity, identity_width, agent.tag or '', tag_width, field_callback(agent))) def _show_filtered_agents_status(opts, status_callback, health_callback, agents=None): """Provides generic way to filter and display agent information. The agents will be filtered by the provided opts.pattern and the following fields will be displayed: * UUID (or part of the UUID) * agent name * VIP identiy * tag * field_name @param:Namespace:opts: Options from argparse @param:string:field_name: Name of field to display about agents @param:function:field_callback: Function that takes an Agent as an argument and returns data to display @param:list:agents: List of agents to filter and display """ if not agents: agents = _list_agents(opts.aip) agents = get_filtered_agents(opts, agents) if not agents: _stderr.write('No installed Agents found\n') return agents.sort() if not opts.min_uuid_len: n = 36 else: n = max(_calc_min_uuid_length(agents), opts.min_uuid_len) name_width = max(5, max(len(agent.name) for agent in agents)) tag_width = max(3, max(len(agent.tag or '') for agent in agents)) identity_width = max(3, max(len(agent.vip_identity or '') for agent in agents)) fmt = '{} {:{}} {:{}} {:{}} {:>6} {:>15}\n' _stderr.write( fmt.format(' ' * n, 'AGENT', name_width, 'IDENTITY', identity_width, 'TAG', tag_width, 'STATUS', 'HEALTH')) fmt = '{} {:{}} {:{}} {:{}} {:<15} {:<}\n' for agent in agents: _stdout.write(fmt.format(agent.uuid[:n], agent.name, name_width, agent.vip_identity, identity_width, agent.tag or '', tag_width, status_callback(agent), health_callback(agent)))
[docs]def get_agent_publickey(opts): def get_key(agent): return opts.aip.get_agent_keystore(agent.uuid).public _show_filtered_agents(opts, 'PUBLICKEY', get_key)
# XXX: reimplement over VIP # def send_agent(opts): # _log.debug("send_agent: "+ str(opts)) # ssh_dir = os.path.join(opts.volttron_home, 'ssh') # _log.debug('ssh_dir: ' + ssh_dir) # try: # host_key, client = comms.client(ssh_dir, opts.host, opts.port) # except (OSError, IOError, PasswordRequiredException, SSHException) as exc: # if opts.debug: # traceback.print_exc() # _stderr.write('{}: error: {}\n'.format(opts.command, exc)) # if isinstance(exc, OSError): # return os.EX_OSERR # if isinstance(exc, IOError): # return os.EX_IOERR # return os.EX_SOFTWARE # if host_key is None: # _stderr.write('warning: no public key found for remote host\n') # with client: # for wheel in opts.wheel: # with open(wheel) as file: # client.send_and_start_agent(file)
[docs]def add_config_to_store(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call file_contents = opts.infile.read() call("manage_store", opts.identity, opts.name, file_contents, config_type=opts.config_type)
[docs]def delete_config_from_store(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call if opts.delete_store: call("manage_delete_store", opts.identity) return if opts.name is None: _stderr.write('ERROR: must specify a configuration when not deleting entire store\n') return call("manage_delete_config", opts.identity, opts.name)
[docs]def list_store(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call results = [] if opts.identity is None: results = call("manage_list_stores") else: results = call("manage_list_configs", opts.identity) for item in results: _stdout.write(item+"\n")
[docs]def get_config(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call results = call("manage_get", opts.identity, opts.name, raw=opts.raw) if opts.raw: _stdout.write(results) else: if isinstance(results, str): _stdout.write(results) else: _stdout.write(json.dumps(results, indent=2)) _stdout.write("\n")
[docs]def edit_config(opts): opts.connection.peer = CONFIGURATION_STORE call = opts.connection.call if opts.new_config: config_type = opts.config_type raw_data = '' else: try: results = call("manage_get_metadata", opts.identity, opts.name) config_type = results["type"] raw_data = results["data"] except RemoteError as e: if "No configuration file" not in e.message: raise config_type = opts.config_type raw_data = '' #Write raw data to temp file #This will not work on Windows, FYI with tempfile.NamedTemporaryFile(suffix=".txt") as f: f.write(raw_data) f.flush() success = True try: subprocess.check_call([opts.editor, f.name]) except subprocess.CalledProcessError as e: _stderr.write("Editor returned with code {}. Changes not committed.\n".format(e.returncode)) success = False if not success: return f.seek(0) new_raw_data = f.read() if new_raw_data == raw_data: _stderr.write("No changes detected.\n") return call("manage_store", opts.identity, opts.name, new_raw_data, config_type=config_type)
[docs]class ControlConnection(object): def __init__(self, address, peer='control', publickey=None, secretkey=None, serverkey=None): self.address = address self.peer = peer self._server = BaseAgent(address=self.address, publickey=publickey, secretkey=secretkey, serverkey=serverkey, enable_store=False, identity=CONTROL_CONNECTION, enable_channel=True) self._greenlet = None @property def server(self): if self._greenlet is None: event = gevent.event.Event() self._greenlet = gevent.spawn(self._server.core.run, event) event.wait() return self._server
[docs] def call(self, method, *args, **kwargs): return self.server.vip.rpc.call( self.peer, method, *args, **kwargs).get()
[docs] def call_no_get(self, method, *args, **kwargs): return self.server.vip.rpc.call( self.peer, method, *args, **kwargs)
[docs] def notify(self, method, *args, **kwargs): return self.server.vip.rpc.notify( self.peer, method, *args, **kwargs)
[docs] def kill(self, *args, **kwargs): if self._greenlet is not None: self._greenlet.kill(*args, **kwargs)
[docs]def priority(value): n = int(value) if not 0 <= n < 100: raise ValueError('invalid priority (0 <= n < 100): {}'.format(n)) return '{:02}'.format(n)
[docs]def get_keys(opts): '''Gets keys from keystore and known-hosts store''' hosts = KnownHostsStore() serverkey = hosts.serverkey(opts.vip_address) key_store = KeyStore() publickey = key_store.public secretkey = key_store.secret return {'publickey': publickey, 'secretkey': secretkey, 'serverkey': serverkey}
[docs]def main(argv=sys.argv): # Refuse to run as root if not getattr(os, 'getuid', lambda: -1)(): sys.stderr.write('%s: error: refusing to run as root to prevent ' 'potential damage.\n' % os.path.basename(argv[0])) sys.exit(77) volttron_home = get_home() os.environ['VOLTTRON_HOME'] = volttron_home global_args = config.ArgumentParser(description='global options', add_help=False) global_args.add_argument('-c', '--config', metavar='FILE', action='parse_config', ignore_unknown=True, sections=[None, 'global', 'volttron-ctl'], help='read configuration from FILE') global_args.add_argument('--debug', action='store_true', help='show tracbacks for errors rather than a brief message') global_args.add_argument('-t', '--timeout', type=float, metavar='SECS', help='timeout in seconds for remote calls (default: %(default)g)') global_args.add_argument('--msgdebug', help='route all messages to an agent while debugging') global_args.add_argument( '--vip-address', metavar='ZMQADDR', help='ZeroMQ URL to bind for VIP connections') global_args.set_defaults( vip_address=get_address(), timeout=30, ) filterable = config.ArgumentParser(add_help=False) filterable.add_argument('--name', dest='by_name', action='store_true', help='filter/search by agent name') filterable.add_argument('--tag', dest='by_tag', action='store_true', help='filter/search by tag name') filterable.add_argument('--uuid', dest='by_uuid', action='store_true', help='filter/search by UUID (default)') filterable.set_defaults(by_name=False, by_tag=False, by_uuid=False) parser = config.ArgumentParser( prog=os.path.basename(argv[0]), add_help=False, description='Manage and control VOLTTRON agents.', usage='%(prog)s command [OPTIONS] ...', argument_default=argparse.SUPPRESS, parents=[global_args] ) parser.add_argument('-l', '--log', metavar='FILE', default=None, help='send log output to FILE instead of stderr') parser.add_argument('-L', '--log-config', metavar='FILE', help='read logging configuration from FILE') parser.add_argument('-q', '--quiet', action='add_const', const=10, dest='verboseness', help='decrease logger verboseness; may be used multiple times') parser.add_argument('-v', '--verbose', action='add_const', const=-10, dest='verboseness', help='increase logger verboseness; may be used multiple times') parser.add_argument('--verboseness', type=int, metavar='LEVEL', default=logging.WARNING, help='set logger verboseness') parser.add_argument( '--show-config', action='store_true', help=argparse.SUPPRESS) parser.add_help_argument() parser.set_defaults( log_config=None, volttron_home=volttron_home, ) top_level_subparsers = parser.add_subparsers(title='commands', metavar='', dest='command') def add_parser(*args, **kwargs): parents = kwargs.get('parents', []) parents.append(global_args) kwargs['parents'] = parents subparser = kwargs.pop("subparser", top_level_subparsers) return subparser.add_parser(*args, **kwargs) install = add_parser('install', help='install agent from wheel', epilog='Optionally you may specify the --tag argument to tag the ' 'agent during install without requiring a separate call to ' 'the tag command. ') install.add_argument('wheel', help='path to agent wheel') install.add_argument('--tag', help='tag for the installed agent') install.add_argument('--vip-identity', help='VIP IDENTITY for the installed agent. ' 'Overrides any previously configured VIP IDENTITY.') if HAVE_RESTRICTED: install.add_argument('--verify', action='store_true', dest='verify_agents', help='verify agent integrity during install') install.add_argument('--no-verify', action='store_false', dest='verify_agents', help=argparse.SUPPRESS) install.set_defaults(func=install_agent, verify_agents=True) tag = add_parser('tag', parents=[filterable], help='set, show, or remove agent tag') tag.add_argument('agent', help='UUID or name of agent') group = tag.add_mutually_exclusive_group() group.add_argument('tag', nargs='?', const=None, help='tag to give agent') group.add_argument('-r', '--remove', action='store_true', help='remove tag') tag.set_defaults(func=tag_agent, tag=None, remove=False) remove = add_parser('remove', parents=[filterable], help='remove agent') remove.add_argument('pattern', nargs='+', help='UUID or name of agent') remove.add_argument('-f', '--force', action='store_true', help='force removal of multiple agents') remove.set_defaults(func=remove_agent, force=False) list_ = add_parser('list', parents=[filterable], help='list installed agent') list_.add_argument('pattern', nargs='*', help='UUID or name of agent') list_.add_argument('-n', dest='min_uuid_len', type=int, metavar='N', help='show at least N characters of UUID (0 to show all)') list_.set_defaults(func=list_agents, min_uuid_len=1) status = add_parser('status', parents=[filterable], help='show status of agents') status.add_argument('pattern', nargs='*', help='UUID or name of agent') status.add_argument('-n', dest='min_uuid_len', type=int, metavar='N', help='show at least N characters of UUID (0 to show all)') status.set_defaults(func=status_agents, min_uuid_len=1) health = add_parser('health', parents=[filterable], help='show agent health as JSON') health.add_argument('pattern', nargs=1, help='UUID or name of agent') health.set_defaults(func=agent_health, min_uuid_len=1) clear = add_parser('clear', help='clear status of defunct agents') clear.add_argument('-a', '--all', dest='clear_all', action='store_true', help='clear the status of all agents') clear.set_defaults(func=clear_status, clear_all=False) enable = add_parser('enable', parents=[filterable], help='enable agent to start automatically') enable.add_argument('pattern', nargs='+', help='UUID or name of agent') enable.add_argument('-p', '--priority', type=priority, help='2-digit priority from 00 to 99') enable.set_defaults(func=enable_agent, priority='50') disable = add_parser('disable', parents=[filterable], help='prevent agent from start automatically') disable.add_argument('pattern', nargs='+', help='UUID or name of agent') disable.set_defaults(func=disable_agent) start = add_parser('start', parents=[filterable], help='start installed agent') start.add_argument('pattern', nargs='+', help='UUID or name of agent') if HAVE_RESTRICTED: start.add_argument('--verify', action='store_true', dest='verify_agents', help='verify agent integrity during start') start.add_argument('--no-verify', action='store_false', dest='verify_agents', help=argparse.SUPPRESS) start.set_defaults(func=start_agent) stop = add_parser('stop', parents=[filterable], help='stop agent') stop.add_argument('pattern', nargs='+', help='UUID or name of agent') stop.set_defaults(func=stop_agent) restart = add_parser('restart', parents=[filterable], help='restart agent') restart.add_argument('pattern', nargs='+', help='UUID or name of agent') restart.set_defaults(func=restart_agent) run = add_parser('run', help='start any agent by path') run.add_argument('directory', nargs='+', help='path to agent directory') if HAVE_RESTRICTED: run.add_argument('--verify', action='store_true', dest='verify_agents', help='verify agent integrity during run') run.add_argument('--no-verify', action='store_false', dest='verify_agents', help=argparse.SUPPRESS) run.set_defaults(func=run_agent) upgrade = add_parser('upgrade', help='upgrade agent from wheel', epilog='Optionally you may specify the --tag argument to tag the ' 'agent during upgrade without requiring a separate call to ' 'the tag command. ') upgrade.add_argument('vip_identity', metavar='vip-identity', help='VIP IDENTITY of agent to upgrade') upgrade.add_argument('wheel', help='path to new agent wheel') upgrade.add_argument('--tag', help='tag for the upgraded agent') if HAVE_RESTRICTED: upgrade.add_argument('--verify', action='store_true', dest='verify_agents', help='verify agent integrity during upgrade') upgrade.add_argument('--no-verify', action='store_false', dest='verify_agents', help=argparse.SUPPRESS) upgrade.set_defaults(func=upgrade_agent, verify_agents=True) auth_cmds = add_parser("auth", help="manage authorization entries and encryption keys") auth_subparsers = auth_cmds.add_subparsers(title='subcommands', metavar='', dest='store_commands') auth_add = add_parser('add', help='add new authentication record', subparser=auth_subparsers) auth_add.add_argument('--domain', default=None) auth_add.add_argument('--address', default=None) auth_add.add_argument('--mechanism', default=None) auth_add.add_argument('--credentials', default=None) auth_add.add_argument('--user_id', default=None) auth_add.add_argument('--groups', default=None, help='delimit multiple entries with comma') auth_add.add_argument('--roles', default=None, help='delimit multiple entries with comma') auth_add.add_argument('--capabilities', default=None, help='delimit multiple entries with comma') auth_add.add_argument('--comments', default=None) auth_add.add_argument('--disabled', action='store_true') auth_add.add_argument('--add-known-host', action='store_true', help='adds entry in known host') auth_add.set_defaults(func=add_auth) auth_add_group = add_parser('add-group', subparser=auth_subparsers, help='associate a group name with a set of roles') auth_add_group.add_argument('group', metavar='GROUP', help='name of group') auth_add_group.add_argument('roles', metavar='ROLE', nargs='*', help='roles to associate with the group') auth_add_group.set_defaults(func=add_group) auth_add_known_host = add_parser('add-known-host', subparser=auth_subparsers, help='add server public key to known-hosts file') auth_add_known_host.add_argument('--host', required=True, help='hostname or IP address with optional port') auth_add_known_host.add_argument('--serverkey', required=True) auth_add_known_host.set_defaults(func=add_server_key) auth_add_role = add_parser('add-role', subparser=auth_subparsers, help='associate a role name with a set of capabilities') auth_add_role.add_argument('role', metavar='ROLE', help='name of role') auth_add_role.add_argument('capabilities', metavar='CAPABILITY', nargs='*', help='capabilities to associate with the role') auth_add_role.set_defaults(func=add_role) auth_keypair = add_parser('keypair', subparser=auth_subparsers, help='generate CurveMQ keys for encrypting VIP connections') auth_keypair.set_defaults(func=gen_keypair) auth_list = add_parser('list', help='list authentication records', subparser=auth_subparsers) auth_list.set_defaults(func=list_auth) auth_list_groups = add_parser('list-groups', subparser=auth_subparsers, help='show list of group names and their sets of roles') auth_list_groups.set_defaults(func=list_groups) auth_list_known_host = add_parser('list-known-hosts', subparser=auth_subparsers, help='list entries from known-hosts file') auth_list_known_host.set_defaults(func=list_known_hosts) auth_list_roles = add_parser('list-roles', subparser=auth_subparsers, help='show list of role names and their sets of capabilities') auth_list_roles.set_defaults(func=list_roles) auth_publickey = add_parser('publickey', parents=[filterable], subparser=auth_subparsers, help='show public key for each agent') auth_publickey.add_argument('pattern', nargs='*', help='UUID or name of agent') auth_publickey.add_argument('-n', dest='min_uuid_len', type=int, metavar='N', help='show at least N characters of UUID (0 to show all)') auth_publickey.set_defaults(func=get_agent_publickey, min_uuid_len=1) auth_remove = add_parser('remove', subparser=auth_subparsers, help='removes one or more authentication records by indices') auth_remove.add_argument('indices', nargs='+', type=int, help='index or indices of record(s) to remove') auth_remove.set_defaults(func=remove_auth) auth_remove_group = add_parser('remove-group', subparser=auth_subparsers, help='disassociate a group name from a set of roles') auth_remove_group.add_argument('group', help='name of group') auth_remove_group.set_defaults(func=remove_group) auth_remove_known_host = add_parser('remove-known-host', subparser=auth_subparsers, help='remove entry from known-hosts file') auth_remove_known_host.add_argument('host', metavar='HOST', help='hostname or IP address with optional port') auth_remove_known_host.set_defaults(func=remove_known_host) auth_remove_role = add_parser('remove-role', subparser=auth_subparsers, help='disassociate a role name from a set of capabilities') auth_remove_role.add_argument('role', help='name of role') auth_remove_role.set_defaults(func=remove_role) auth_serverkey = add_parser('serverkey', subparser=auth_subparsers, help="show the serverkey for the instance") auth_serverkey.set_defaults(func=show_serverkey) auth_update = add_parser('update', subparser=auth_subparsers, help='updates one authentication record by index') auth_update.add_argument('index', type=int, help='index of record to update') auth_update.set_defaults(func=update_auth) auth_update_group = add_parser('update-group', subparser=auth_subparsers, help='update group to include (or remove) given roles') auth_update_group.add_argument('group', metavar='GROUP', help='name of group') auth_update_group.add_argument('roles', nargs='*', metavar='ROLE', help='roles to append to (or remove from) the group') auth_update_group.add_argument('--remove', action='store_true', help='remove (rather than append) given roles') auth_update_group.set_defaults(func=update_group) auth_update_role = add_parser('update-role', subparser=auth_subparsers, help='update role to include (or remove) given capabilities') auth_update_role.add_argument('role', metavar='ROLE', help='name of role') auth_update_role.add_argument('capabilities', nargs='*', metavar='CAPABILITY', help='capabilities to append to (or remove from) the role') auth_update_role.add_argument('--remove', action='store_true', help='remove (rather than append) given capabilities') auth_update_role.set_defaults(func=update_role) config_store = add_parser("config", help="manage the platform configuration store") config_store_subparsers = config_store.add_subparsers(title='subcommands', metavar='', dest='store_commands') config_store_store = add_parser("store", help="store a configuration", subparser=config_store_subparsers) config_store_store.add_argument('identity', help='VIP IDENTITY of the store') config_store_store.add_argument('name', help='name used to reference the configuration by in the store') config_store_store.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help='file containing the contents of the configuration') config_store_store.add_argument('--raw', const="raw", dest="config_type" , action="store_const", help='interpret the input file as raw data') config_store_store.add_argument('--json', const="json", dest="config_type", action="store_const", help='interpret the input file as json') config_store_store.add_argument('--csv', const="csv", dest="config_type", action="store_const", help='interpret the input file as csv') config_store_store.set_defaults(func=add_config_to_store, config_type="json") config_store_edit = add_parser("edit", help="edit a configuration. (nano by default, respects EDITOR env variable)", subparser=config_store_subparsers) config_store_edit.add_argument('identity', help='VIP IDENTITY of the store') config_store_edit.add_argument('name', help='name used to reference the configuration by in the store') config_store_edit.add_argument('--editor', dest="editor", help='Set the editor to use to change the file. Defaults to nano if EDITOR is not set', default=os.getenv("EDITOR", "nano")) config_store_edit.add_argument('--raw', const="raw", dest="config_type", action="store_const", help='Interpret the configuration as raw data. If the file already exists this is ignored.') config_store_edit.add_argument('--json', const="json", dest="config_type", action="store_const", help='Interpret the configuration as json. If the file already exists this is ignored.') config_store_edit.add_argument('--csv', const="csv", dest="config_type", action="store_const", help='Interpret the configuration as csv. If the file already exists this is ignored.') config_store_edit.add_argument('--new', dest="new_config", action="store_true", help='Ignore any existing configuration and creates new empty file.' ' Configuration is not written if left empty. Type defaults to JSON.') config_store_edit.set_defaults(func=edit_config, config_type="json") config_store_delete = add_parser("delete", help="delete a configuration", subparser=config_store_subparsers) config_store_delete.add_argument('identity', help='VIP IDENTITY of the store') config_store_delete.add_argument('name', nargs='?', help='name used to reference the configuration by in the store') config_store_delete.add_argument('--all', dest="delete_store", action="store_true", help='delete all configurations in the store') config_store_delete.set_defaults(func=delete_config_from_store) config_store_list = add_parser("list", help="list stores or configurations in a store", subparser=config_store_subparsers) config_store_list.add_argument('identity', nargs='?', help='VIP IDENTITY of the store to list') config_store_list.set_defaults(func=list_store) config_store_get = add_parser("get", help="get the contents of a configuration", subparser=config_store_subparsers) config_store_get.add_argument('identity', help='VIP IDENTITY of the store') config_store_get.add_argument('name', help='name used to reference the configuration by in the store') config_store_get.add_argument('--raw', action="store_true", help='get the configuration as raw data') config_store_get.set_defaults(func=get_config) shutdown = add_parser('shutdown', help='stop all agents') shutdown.add_argument('--platform', action='store_true', help='also stop the platform process') shutdown.set_defaults(func=shutdown_agents, platform=False) send = add_parser('send', help='send agent and start on a remote platform') send.add_argument('wheel', nargs='+', help='agent package to send') send.set_defaults(func=send_agent) stats = add_parser('stats', help='manage router message statistics tracking') op = stats.add_argument( 'op', choices=['status', 'enable', 'disable', 'dump', 'pprint'], nargs='?') stats.set_defaults(func=do_stats, op='status') if HAVE_RESTRICTED: cgroup = add_parser('create-cgroups', help='setup VOLTTRON control group for restricted execution') cgroup.add_argument('-u', '--user', metavar='USER', help='owning user name or ID') cgroup.add_argument('-g', '--group', metavar='GROUP', help='owning group name or ID') cgroup.set_defaults(func=create_cgroups, user=None, group=None) # Parse and expand options args = argv[1:] conf = os.path.join(volttron_home, 'config') if os.path.exists(conf) and 'SKIP_VOLTTRON_CONFIG' not in os.environ: args = ['--config', conf] + args opts = parser.parse_args(args) if opts.log: opts.log = config.expandall(opts.log) if opts.log_config: opts.log_config = config.expandall(opts.log_config) opts.vip_address = config.expandall(opts.vip_address) if getattr(opts, 'show_config', False): for name, value in sorted(vars(opts).iteritems()): print(name, repr(value)) return # Configure logging level = max(1, opts.verboseness) if opts.log is None: log_to_file(sys.stderr, level) elif opts.log == '-': log_to_file(sys.stdout, level) elif opts.log: log_to_file( opts.log, level, handler_class=logging.handlers.WatchedFileHandler) else: log_to_file(None, 100, handler_class=lambda x: logging.NullHandler()) if opts.log_config: logging.config.fileConfig(opts.log_config) opts.aip = aipmod.AIPplatform(opts) opts.aip.setup() opts.connection = ControlConnection(opts.vip_address, **get_keys(opts)) try: with gevent.Timeout(opts.timeout): return opts.func(opts) except gevent.Timeout: _stderr.write('{}: operation timed out\n'.format(opts.command)) return 75 except RemoteError as exc: print_tb = exc.print_tb error = exc.message except Exception as exc: print_tb = traceback.print_exc error = str(exc) else: return 0 if opts.debug: print_tb() _stderr.write('{}: error: {}\n'.format(opts.command, error)) return 20
def _main(): try: sys.exit(main()) except KeyboardInterrupt: sys.exit(1) if __name__ == '__main__': _main()