Source code for platform_driver.interfaces.universal

		03/30/16 - Initial.
		08/15/16 - Remove whitespace in config file.
		10/11/16 - Pass only device_id to VehicleDriver.
		03/01/17 - Call agent.GetPoint in get_point.
		04/17/17 - Updated for Volttron 4.0.
from volttron.platform.agent import utils

    from ..interfaces import BaseRegister, BaseInterface, BasicRevert
    from services.core.PlatformDriverAgent.platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert

from csv import DictReader
# from StringIO import StringIO - python 2
from io import StringIO  # python 3
import gevent
import logging
import sys

# set DRIVER_PATH to path to your specific driver agent
DRIVER_PATH = "/home/volttron/GridAgents/VolttronAgents/Drivers"
sys.path.insert(0, DRIVER_PATH)
from heaters.agent import HeaterDriver
from meters.agent import MeterDriver
from hvac.agent import ThermostatDriver
from blinds.agent import BlindsDriver
from vehicles.agent import VehicleDriver

_log = logging.getLogger(__name__)

# UDI - Universal Driver Interface
[docs]class Interface(BasicRevert, BaseInterface): def __init__(self, **kwargs): super(Interface, self).__init__(**kwargs) # the following are new in bacnet 4.0 driver, do we need to do too? # self.register_count = 10000 # self.register_count_divisor = 1 self.agent = None import argparse parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', action='count', dest="verbosity", default=0) args = parser.parse_args() self._verboseness = args.verbosity if (self._verboseness == 0): verbiage = logging.ERROR if (self._verboseness == 1): verbiage = logging.WARNING # '-v' elif (self._verboseness == 2): verbiage = logging.INFO # '-vv' elif (self._verboseness >= 3): verbiage = logging.DEBUG # '-vvv' _log.setLevel(verbiage) ''' config_dict: 'filename'.config, specified in the 'platform-driver.agent' file. registry_config_str: points csv file def configure(self, config_dict, registry_config_str): when 4.0 platform driver is started, class ConfigStore is instantiated: volttron/platform/vip/agent/subsystems/ which exports initial_update() which calls volttron/platform/ def get_configs(self):, "config.initial_update" sets list of registry_configs scripts/ calls 'manage_store' rpc, which is in volttron/platform/ which calls process_raw_config(), which stores it as a dict. process_raw_config() is also called by process_store() in when the platform starts ( class ConfigStoreService): processing_raw_config 'registry_configs/meter.csv' (config_type: csv) process_store() is called by _setup using a 'PersistentDict', i.e.: store_path '/home/carl/.volttron/configuration_store/' stores them as config_type="csv", it is useful for batch processing alot of files at once, like when upgrading from 3.5 to 4.0 to add single config to store, activate and start platform then: List current configs: volttron-ctl config list platform.driver config devices/PNNL/LABHOME_B/METER1 registry_configs/meter.csv Delete current configs: volttron-ctl config delete platform.driver registry_configs/meter.csv # note lack of prefix './GridAgents/configs/' volttron-ctl config delete platform.driver devices/PNNL/LABHOME_B/METER1 To store the driver configuration run the command: delete any files from ../GridAgents/configs volttron-ctl config store platform.driver devices/PNNL/LABHOME_B ../GridAgents/configs/devices/PNNL/LABHOME_B/METER1 To store the registry configuration run the command (note the **--raw option) volttron-ctl config store platform.driver registry_configs/meter.csv ../GridAgents/configs/registry_configs/meter.csv --raw ***** NOTE: you MUST install the csv file in --raw mode for universal drivers. ***** '''
[docs] def configure(self, config_dict, registry_config_dict): # 4.0 passes in a reg DICT not string now try: device_type = config_dict['device_type'] ''' see ./volttron/volttron/platform/vip/agent/ for Agent object definition every agent has a .core and .vip: vip.rpc vip.hello vip.pubsub vip.heartbeat vip.config ''' if (device_type == "heater"): self.agent = HeaterDriver(None, config_dict['device_id']) elif (device_type == "meter"): self.agent = MeterDriver(None, config_dict['device_id'], ) elif (device_type == "thermostat"): self.agent = ThermostatDriver(None, config_dict['device_id']) elif (device_type == "blinds"): self.agent = BlindsDriver(None, config_dict['device_id']) elif (device_type == "vehicle"): self.agent = VehicleDriver(None, config_dict['device_id']) else: raise RuntimeError("Unsupported Device Type: '{}'".format(device_type)) self.parse_config(self.agent, device_type, config_dict, registry_config_dict) event = gevent.event.Event() gevent.spawn(, event) event.wait(timeout=5) except KeyError as e: _log.fatal("configure Failed accessing Key({}) in configuration file: {}".format(e, config_dict)) raise SystemExit except RuntimeError as e: _log.fatal("configure Failed using configuration file: {}".format(config_dict)) raise SystemExit(e) except Exception as e: _log.fatal("configure Failed({}) using configuration file: {}".format(e, config_dict)) raise SystemExit
# get_point
[docs] def get_point(self, point_name): register = self.get_register_by_name(point_name) value = self.agent.GetPoint(register) # if( self._verboseness == 2 ): # _log.debug( "Universal get_point called for '{}', value: {}.".format(point_name, value)) return value
# _set_point def _set_point(self, point_name, value): register = self.get_register_by_name(point_name) if register.read_only: raise IOError("Trying to write to a point configured read only: " + point_name) if (self.agent.SetPoint(register, value)): register._value = register.reg_type(value) self.point_map[point_name]._value = register._value return register._value # this gets called periodically via DriverAgent::periodic_read() # ( on behalf of PlatformDriverAgent ) def _scrape_all(self): result = {} read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) for register in read_registers + write_registers: if (self._verboseness == 2):"Universal Scraping Value for '{}': {}".format(register.point_name, register._value)) result[register.point_name] = register._value return result # this set each register to its default value (if it has one) def _reset_all(self): read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) for register in read_registers + write_registers: old_value = register._value register._value = register._default_value # "point_map[register]._value = {}".format(self.point_map[register.point_name]._value)) if (self._verboseness == 2):"Hardware not reachable, Resetting Value for '{}' from {} to {}".format(register.point_name, old_value, register._value)) ''' We maybe could have used revert_point( register.point_name ), but that is more for reverting the hardware to its default value (calls set_point, which complains for read_only points), _reset_all is used to set the registry values to a default when the hardware is not reachable.... if register in self.defaults: self.point_map[register]._value = self.defaults[register] if( self._verboseness == 2 ): "Universal Resetting Value for '{}' from {} to {}".format(register.point_name, old_value, register._value)) else: if( self._verboseness == 2 ): "No Default Value Found while Resetting '{}'.".format(register.point_name)) ''' ''' parse_config ***** NOTE: you MUST install the csv file in --raw mode for universal drivers. ***** volttron-ctl config store platform.driver registry_configs/meter.csv ../GridAgents/configs/registry_configs/meter.csv --raw '''
[docs] def parse_config(self, agent, device_type, config_dict, reg_config_str): if reg_config_str is None: return config_str = (utils.strip_comments(reg_config_str).lstrip()).rstrip() import re # remove whitespace after delimiter, but not within delimited value: config_str = re.sub(r',[\s]+', ',', config_str) # remove trailing whitespace within delimited value: config_str = re.sub(r'[\s]+,', ',', config_str) # remove trailing whitespace at end of line: # re.MULTILINE - When specified, '^' matches the beginning of the string andbeginning of each line (immediately following each newline) # and '$' matches end of the string and end of each line (immediately preceding each newline). config_str = re.sub(r'[\s]+$', '', config_str, flags=re.MULTILINE) _log.debug('Configuring {} Driver with {} and config_str {}'.format(device_type, config_dict, config_str)) f = StringIO(config_str) regDict = DictReader(f) agent.ConfigureAgent(self, config_dict, regDict)