Source code for volttron.platform.aip

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2017, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}


"""Component for the instantiation and packaging of agents."""


import contextlib
import errno
import logging
import os
import shutil
import signal
import sys
import uuid

import gevent
import gevent.event
from gevent.fileobject import FileObject
from gevent import subprocess
from gevent.subprocess import PIPE
from wheel.tool import unpack
import zmq

# Can't use zmq.utils.jsonapi because it is missing the load() method.
try:
    import simplejson as jsonapi
except ImportError:
    import json as jsonapi

from . import messaging
from .agent.utils import is_valid_identity
from .messaging import topics
from .packages import UnpackedPackage
from .vip.agent import Agent
from .keystore import KeyStore
from .auth import AuthFile, AuthEntry, AuthFileEntryAlreadyExists

try:
    from volttron.restricted import auth
    from volttron.restricted import certs
    from volttron.restricted.resmon import ResourceError
except ImportError:
    auth = None


_log = logging.getLogger(__name__)


[docs]def process_wait(p): timeout = 0.01 while True: result = p.poll() if result is not None: return result gevent.sleep(timeout) if timeout < 0.5: timeout *= 2
# LOG_* constants from syslog module (not available on Windows) _level_map = {7: logging.DEBUG, # LOG_DEBUG 6: logging.INFO, # LOG_INFO 5: logging.INFO, # LOG_NOTICE 4: logging.WARNING, # LOG_WARNING 3: logging.ERROR, # LOG_ERR 2: logging.CRITICAL, # LOG_CRIT 1: logging.CRITICAL, # LOG_ALERT 0: logging.CRITICAL,} # LOG_EMERG
[docs]def log_entries(name, agent, pid, level, stream): log = logging.getLogger(name) extra = {'processName': agent, 'process': pid} for line in (l.rstrip('\r\n') for l in stream): if line[0:1] == '{' and line[-1:] == '}': try: obj = jsonapi.loads(line) try: obj['args'] = tuple(obj['args']) except (KeyError, TypeError, ValueError): pass record = logging.makeLogRecord(obj) except Exception: pass else: if record.name in log.manager.loggerDict: if not logging.getLogger( record.name).isEnabledFor(record.levelno): continue elif not log.isEnabledFor(record.levelno): continue record.remote_name, record.name = record.name, name record.__dict__.update(extra) log.handle(record) continue if line[0:1] == '<' and line[2:3] == '>' and line[1:2].isdigit(): yield _level_map.get(int(line[1]), level), line[3:] else: yield level, line
[docs]def log_stream(name, agent, pid, path, stream): log = logging.getLogger(name) extra = {'processName': agent, 'process': pid} unset = {'thread': None, 'threadName': None, 'module': None} for level, line in stream: if log.isEnabledFor(level): record = logging.LogRecord(name, level, path, 0, line, [], None) record.__dict__.update(extra) record.__dict__.update(unset) log.handle(record)
[docs]class IgnoreErrno(object): ignore = [] def __init__(self, errno, *more): self.ignore = [errno] self.ignore.extend(more) def __enter__(self): return def __exit__(self, exc_type, exc_value, traceback): try: return exc_value.errno in self.ignore except AttributeError: pass
ignore_enoent = IgnoreErrno(errno.ENOENT)
[docs]class ExecutionEnvironment(object): '''Environment reserved for agent execution. Deleting ExecutionEnvironment objects should cause the process to end and all resources to be returned to the system. ''' def __init__(self): self.process = None self.env = None
[docs] def execute(self, *args, **kwargs): try: self.env = kwargs.get('env', None) self.process = subprocess.Popen(*args, **kwargs) except OSError as e: if e.filename: raise raise OSError(*(e.args + (args[0],)))
def __call__(self, *args, **kwargs): self.execute(*args, **kwargs)
[docs]class AIPplatform(object): '''Manages the main workflow of receiving and sending agents.''' def __init__(self, env, **kwargs): self.env = env self.agents = {}
[docs] def setup(self): '''Creates paths for used directories for the instance.''' for path in [self.run_dir, self.config_dir, self.install_dir]: if not os.path.exists(path): os.makedirs(path, 0o755)
[docs] def finish(self): for exeenv in self.agents.itervalues(): if exeenv.process.poll() is None: exeenv.process.send_signal(signal.SIGINT) for exeenv in self.agents.itervalues(): if exeenv.process.poll() is None: exeenv.process.terminate() for exeenv in self.agents.itervalues(): if exeenv.process.poll() is None: exeenv.process.kill()
[docs] def shutdown(self): for agent_uuid in self.agents.iterkeys(): self.stop_agent(agent_uuid) event = gevent.event.Event() agent = Agent(identity='aip', address='inproc://vip') task = gevent.spawn(agent.core.run, event) try: event.wait() agent.vip.pubsub.publish( 'pubsub', topics.PLATFORM_SHUTDOWN, {'reason': 'Received shutdown command'}).get() finally: agent.core.stop() task.kill()
subscribe_address = property(lambda me: me.env.subscribe_address) publish_address = property(lambda me: me.env.publish_address) config_dir = property(lambda me: os.path.abspath(me.env.volttron_home)) install_dir = property(lambda me: os.path.join(me.config_dir, 'agents')) run_dir = property(lambda me: os.path.join(me.config_dir, 'run'))
[docs] def autostart(self): agents, errors = [], [] for agent_uuid, agent_name in self.list_agents().iteritems(): try: priority = self._agent_priority(agent_uuid) except EnvironmentError as exc: errors.append((agent_uuid, str(exc))) continue if priority is not None: agents.append((priority, agent_uuid)) agents.sort(reverse=True) for _, agent_uuid in agents: try: self.start_agent(agent_uuid) except Exception as exc: errors.append((agent_uuid, str(exc))) return errors
[docs] def land_agent(self, agent_wheel): if auth is None: raise NotImplementedError() agent_uuid = self.install_agent(agent_wheel) try: self.start_agent(agent_uuid) self.prioritize_agent(agent_uuid) except: self.remove_agent(agent_uuid) raise return agent_uuid
[docs] def install_agent(self, agent_wheel, vip_identity=None, publickey=None, secretkey=None): while True: agent_uuid = str(uuid.uuid4()) if agent_uuid in self.agents: continue agent_path = os.path.join(self.install_dir, agent_uuid) try: os.mkdir(agent_path) break except OSError as exc: if exc.errno != errno.EEXIST: raise try: if auth is not None and self.env.verify_agents: unpacker = auth.VolttronPackageWheelFile(agent_wheel, certsobj=certs.Certs()) unpacker.unpack(dest=agent_path) else: unpack(agent_wheel, dest=agent_path) final_identity = self._setup_agent_vip_id(agent_uuid, vip_identity=vip_identity) if publickey is not None and secretkey is not None: keystore = self.get_agent_keystore(agent_uuid) keystore.public = publickey keystore.secret = secretkey self._authorize_agent_keys(agent_uuid, final_identity) except Exception: shutil.rmtree(agent_path) raise return agent_uuid
def _setup_agent_vip_id(self, agent_uuid, vip_identity=None): agent_path = os.path.join(self.install_dir, agent_uuid) name = self.agent_name(agent_uuid) pkg = UnpackedPackage(os.path.join(agent_path, name)) identity_template_filename = os.path.join(pkg.distinfo, "IDENTITY_TEMPLATE") rm_id_template = False if not os.path.exists(identity_template_filename): agent_name = self.agent_name(agent_uuid) name_template = agent_name + "_{n}" else: with open(identity_template_filename, 'rb') as fp: name_template = fp.read(64) rm_id_template = True if vip_identity is not None: name_template = vip_identity _log.debug( 'Using name template "' + name_template + '" to generate VIP ID') final_identity = self._get_available_agent_identity(name_template) if final_identity is None: raise ValueError( "Agent with VIP ID {} already installed on platform.".format( name_template)) if not is_valid_identity(final_identity): raise ValueError( 'Invlaid identity detecated: {}'.format( ','.format(final_identity) )) identity_filename = os.path.join(agent_path, "IDENTITY") with open(identity_filename, 'wb') as fp: fp.write(final_identity) _log.info("Agent {uuid} setup to use VIP ID {vip_identity}". format( uuid=agent_uuid, vip_identity=final_identity)) # Cleanup IDENTITY_TEMPLATE file. if rm_id_template: os.remove(identity_template_filename) _log.debug('IDENTITY_TEMPLATE file removed.') return final_identity
[docs] def get_agent_keystore(self, agent_uuid): agent_path = os.path.join(self.install_dir, agent_uuid) name = self.agent_name(agent_uuid) agent_path_with_name = os.path.join(agent_path, name) data_dir = self._get_data_dir(agent_path_with_name) keystore_path = os.path.join(data_dir, 'keystore.json') return KeyStore(keystore_path)
def _authorize_agent_keys(self, agent_uuid, identity): publickey = self.get_agent_keystore(agent_uuid).public entry = AuthEntry(credentials=publickey, user_id=identity, comments='Automatically added on agent install') try: AuthFile().add(entry) except AuthFileEntryAlreadyExists: pass def _unauthorize_agent_keys(self, agent_uuid): publickey = self.get_agent_keystore(agent_uuid).public AuthFile().remove_by_credentials(publickey) def _get_data_dir(self, agent_path): pkg = UnpackedPackage(agent_path) data_dir = os.path.join(os.path.dirname(pkg.distinfo), '{}.agent-data'.format(pkg.package_name)) if not os.path.exists(data_dir): os.mkdir(data_dir) return data_dir
[docs] def get_agent_identity_to_uuid_mapping(self): results = {} for agent_uuid in self.list_agents(): try: agent_identity = self.agent_identity(agent_uuid) except ValueError: continue if agent_identity is not None: results[agent_identity] = agent_uuid return results
[docs] def get_all_agent_identities(self): return self.get_agent_identity_to_uuid_mapping().keys()
def _get_available_agent_identity(self, name_template): all_agent_identities = self.get_all_agent_identities() # Provided name template is static if name_template == name_template.format(n=0): return name_template if name_template not in all_agent_identities else None # Find a free ID count = 1 while True: test_name = name_template.format(n=count) if test_name not in all_agent_identities: return test_name count += 1
[docs] def remove_agent(self, agent_uuid, remove_auth=True): if agent_uuid not in os.listdir(self.install_dir): raise ValueError('invalid agent') self.stop_agent(agent_uuid) self.agents.pop(agent_uuid, None) if remove_auth: self._unauthorize_agent_keys(agent_uuid) shutil.rmtree(os.path.join(self.install_dir, agent_uuid))
[docs] def agent_name(self, agent_uuid): agent_path = os.path.join(self.install_dir, agent_uuid) for agent_name in os.listdir(agent_path): dist_info = os.path.join( agent_path, agent_name, agent_name + '.dist-info') if os.path.exists(dist_info): return agent_name raise KeyError(agent_uuid)
[docs] def list_agents(self): agents = {} for agent_uuid in os.listdir(self.install_dir): try: agents[agent_uuid] = self.agent_name(agent_uuid) except KeyError: pass return agents
[docs] def active_agents(self): return {agent_uuid: execenv.name for agent_uuid, execenv in self.agents.iteritems()}
[docs] def clear_status(self, clear_all=False): remove = [] for agent_uuid, execenv in self.agents.iteritems(): if execenv.process.poll() is not None: if clear_all: remove.append(agent_uuid) else: path = os.path.join(self.install_dir, agent_uuid) if not os.path.exists(path): remove.append(agent_uuid) for agent_uuid in remove: self.agents.pop(agent_uuid, None)
[docs] def status_agents(self): return [(agent_uuid, agent_name, self.agent_status(agent_uuid)) for agent_uuid, agent_name in self.active_agents().iteritems()]
[docs] def tag_agent(self, agent_uuid, tag): tag_file = os.path.join(self.install_dir, agent_uuid, 'TAG') if not tag: with ignore_enoent: os.unlink(tag_file) else: with open(tag_file, 'w') as file: file.write(tag[:64])
[docs] def agent_identity(self, agent_uuid): """ Return the identity of the agent that is installed. The IDENTITY file is written to the agent's install directory the the first time the agent is installed. This function reads that file and returns the read value. @param agent_uuid: @return: """ if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') identity_file = os.path.join(self.install_dir, agent_uuid, 'IDENTITY') with ignore_enoent, open(identity_file, 'r') as file: return file.readline(64)
[docs] def agent_tag(self, agent_uuid): if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') tag_file = os.path.join(self.install_dir, agent_uuid, 'TAG') with ignore_enoent, open(tag_file, 'r') as file: return file.readline(64)
[docs] def agent_version(self, agent_uuid): if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') agent_path = os.path.join(self.install_dir, agent_uuid) name = self.agent_name(agent_uuid) pkg = UnpackedPackage(os.path.join(agent_path, name)) return pkg.version
[docs] def agent_dir(self, agent_uuid): if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') return os.path.join(self.install_dir, agent_uuid, self.agent_name(agent_uuid))
[docs] def agent_versions(self): agents = {} for agent_uuid in os.listdir(self.install_dir): try: agents[agent_uuid] = (self.agent_name(agent_uuid), self.agent_version(agent_uuid)) except KeyError: pass return agents
def _agent_priority(self, agent_uuid): autostart = os.path.join(self.install_dir, agent_uuid, 'AUTOSTART') with ignore_enoent, open(autostart) as file: return file.readline(100).strip()
[docs] def agent_priority(self, agent_uuid): if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') return self._agent_priority(agent_uuid)
[docs] def prioritize_agent(self, agent_uuid, priority='50'): if '/' in agent_uuid or agent_uuid in ['.', '..']: raise ValueError('invalid agent') autostart = os.path.join(self.install_dir, agent_uuid, 'AUTOSTART') if priority is None: with ignore_enoent: os.unlink(autostart) else: with open(autostart, 'w') as file: file.write(priority.strip())
def _check_resources(self, resmon, execreqs, reserve=False): hard_reqs = execreqs.get('hard_requirements', {}) failed_terms = resmon.check_hard_resources(hard_reqs) if failed_terms: msg = '\n'.join(' {}: {} ({})'.format( term, hard_reqs[term], avail) for term, avail in failed_terms.iteritems()) _log.error('hard resource requirements not met:\n%s', msg) raise ValueError('hard resource requirements not met') requirements = execreqs.get('requirements', {}) try: if reserve: return resmon.reserve_soft_resources(requirements) else: failed_terms = resmon.check_soft_resources(requirements) if failed_terms: errmsg = 'soft resource requirements not met' else: return except ResourceError as exc: errmsg, failed_terms = exc.args msg = '\n'.join(' {}: {} ({})'.format( term, requirements.get(term, '<unset>'), avail) for term, avail in failed_terms.iteritems()) _log.error('%s:\n%s', errmsg, msg) raise ValueError(errmsg)
[docs] def check_resources(self, execreqs): resmon = getattr(self.env, 'resmon', None) if resmon: return self._check_resources(resmon, execreqs, reserve=False)
def _reserve_resources(self, resmon, execreqs): return self._check_resources(resmon, execreqs, reserve=True)
[docs] def get_execreqs(self, agent_uuid): name = self.agent_name(agent_uuid) pkg = UnpackedPackage(os.path.join(self.install_dir, agent_uuid, name)) return self._read_execreqs(pkg.distinfo)
def _read_execreqs(self, dist_info): execreqs_json = os.path.join(dist_info, 'execreqs.json') try: with ignore_enoent, open(execreqs_json) as file: return jsonapi.load(file) except Exception as exc: msg = 'error reading execution requirements: {}: {}'.format( execreqs_json, exc) _log.error(msg) raise ValueError(msg) _log.warning('missing execution requirements: %s', execreqs_json) return {}
[docs] def start_agent(self, agent_uuid): name = self.agent_name(agent_uuid) agent_path = os.path.join(self.install_dir, agent_uuid, name) execenv = self.agents.get(agent_uuid) if execenv and execenv.process.poll() is None: _log.warning('request to start already running agent %s', agent_path) raise ValueError('agent is already running') pkg = UnpackedPackage(agent_path) if auth is not None and self.env.verify_agents: auth.UnpackedPackageVerifier(pkg.distinfo).verify() metadata = pkg.metadata try: exports = metadata['extensions']['python.exports'] except KeyError: try: exports = metadata['exports'] except KeyError: raise ValueError('no entry points exported') try: module = exports['volttron.agent']['launch'] except KeyError: try: module = exports['setuptools.installation']['eggsecutable'] except KeyError: _log.error('no agent launch class specified in package %s', agent_path) raise ValueError('no agent launch class specified in package') config = os.path.join(pkg.distinfo, 'config') tag = self.agent_tag(agent_uuid) environ = os.environ.copy() environ['PYTHONPATH'] = ':'.join([agent_path] + sys.path) environ['PATH'] = (os.path.abspath(os.path.dirname(sys.executable)) + ':' + environ['PATH']) if os.path.exists(config): environ['AGENT_CONFIG'] = config else: environ.pop('AGENT_CONFIG', None) if tag: environ['AGENT_TAG'] = tag else: environ.pop('AGENT_TAG', None) environ['AGENT_SUB_ADDR'] = self.subscribe_address environ['AGENT_PUB_ADDR'] = self.publish_address environ['AGENT_UUID'] = agent_uuid environ['_LAUNCHED_BY_PLATFORM'] = '1' #For backwards compatibility create the identity file if it does not exist. identity_file = os.path.join(self.install_dir, agent_uuid, "IDENTITY") if not os.path.exists(identity_file): _log.debug('IDENTITY FILE MISSING: CREATING IDENTITY FILE WITH VALUE: {}'.format(agent_uuid)) with open(identity_file, 'wb') as fp: fp.write(agent_uuid) with open(identity_file, 'rb') as fp: agent_vip_identity = fp.read() environ['AGENT_VIP_IDENTITY'] = agent_vip_identity module, _, func = module.partition(':') if func: code = '__import__({0!r}, fromlist=[{1!r}]).{1}()'.format(module, func) argv = [sys.executable, '-c', code] else: argv = [sys.executable, '-m', module] resmon = getattr(self.env, 'resmon', None) if resmon is None: execenv = ExecutionEnvironment() else: execreqs = self._read_execreqs(pkg.distinfo) execenv = self._reserve_resources(resmon, execreqs) execenv.name = name or agent_path _log.info('starting agent %s', agent_path) data_dir = self._get_data_dir(agent_path) execenv.execute(argv, cwd=data_dir, env=environ, close_fds=True, stdin=open(os.devnull), stdout=PIPE, stderr=PIPE) self.agents[agent_uuid] = execenv proc = execenv.process _log.info('agent %s has PID %s', agent_path, proc.pid) gevent.spawn(log_stream, 'agents.stderr', name, proc.pid, argv[0], log_entries('agents.log', name, proc.pid, logging.ERROR, proc.stderr)) gevent.spawn(log_stream, 'agents.stdout', name, proc.pid, argv[0], ((logging.INFO, line.rstrip('\r\n')) for line in proc.stdout)) return self.agent_status(agent_uuid)
[docs] def agent_status(self, agent_uuid): execenv = self.agents.get(agent_uuid) if execenv is None: return (None, None) return (execenv.process.pid, execenv.process.poll())
[docs] def stop_agent(self, agent_uuid): try: execenv = self.agents[agent_uuid] except KeyError: return if execenv.process.poll() is None: # pylint: disable=catching-non-exception execenv.process.send_signal(signal.SIGINT) try: return gevent.with_timeout(3, process_wait, execenv.process) except gevent.Timeout: _log.warn("First timeout") execenv.process.terminate() try: return gevent.with_timeout(3, process_wait, execenv.process) except gevent.Timeout: _log.warn("2nd timeout") execenv.process.kill() try: return gevent.with_timeout(3, process_wait, execenv.process) except gevent.Timeout: _log.error("last timeout") raise ValueError('process is unresponsive') return execenv.process.poll()
[docs] def agent_uuid_from_pid(self, pid): for agent_uuid, execenv in self.agents.iteritems(): if execenv.process.pid == pid: return agent_uuid if execenv.process.poll() is None else None