# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
'''
-------------------------------------------------------------------------------
History
03/30/16 - Initial.
08/15/16 - Remove whitespace in config file.
10/11/16 - Pass only device_id to VehicleDriver.
03/01/17 - Call agent.GetPoint in get_point.
04/17/17 - Updated for Volttron 4.0.
-------------------------------------------------------------------------------
'''
__author1__ = 'Carl Miller <carl.miller@pnnl.gov>'
__copyright__ = 'Copyright (c) 2019, Battelle Memorial Institute'
__license__ = 'Apache 2.0'
__version__ = '0.2.0'
from volttron.platform.agent import utils
try:
from ..interfaces import BaseRegister, BaseInterface, BasicRevert
except:
from services.core.PlatformDriverAgent.platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
from csv import DictReader
# from StringIO import StringIO - python 2
from io import StringIO # python 3
import gevent
import logging
import sys
# set DRIVER_PATH to path to your specific driver agent
DRIVER_PATH = "/home/volttron/GridAgents/VolttronAgents/Drivers"
sys.path.insert(0, DRIVER_PATH)
from heaters.agent import HeaterDriver
from meters.agent import MeterDriver
from hvac.agent import ThermostatDriver
from blinds.agent import BlindsDriver
from vehicles.agent import VehicleDriver
_log = logging.getLogger(__name__)
# UDI - Universal Driver Interface
[docs]class Interface(BasicRevert, BaseInterface):
def __init__(self, **kwargs):
super(Interface, self).__init__(**kwargs)
# the following are new in bacnet 4.0 driver, do we need to do too?
# self.register_count = 10000
# self.register_count_divisor = 1
self.agent = None
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='count', dest="verbosity", default=0)
args = parser.parse_args()
self._verboseness = args.verbosity
if (self._verboseness == 0):
verbiage = logging.ERROR
if (self._verboseness == 1):
verbiage = logging.WARNING # '-v'
elif (self._verboseness == 2):
verbiage = logging.INFO # '-vv'
elif (self._verboseness >= 3):
verbiage = logging.DEBUG # '-vvv'
_log.setLevel(verbiage)
'''
config_dict: 'filename'.config, specified in the 'platform-driver.agent' file.
registry_config_str: points csv file
def configure(self, config_dict, registry_config_str):
when 4.0 platform driver is started, class ConfigStore is instantiated:
volttron/platform/vip/agent/subsystems/configstore.py which exports initial_update()
which calls volttron/platform/store.py: def get_configs(self):
self.vip.rpc.call(identity, "config.initial_update" sets list of registry_configs
scripts/install_platform_driver_configs.py calls 'manage_store' rpc, which is in volttron/platform/store.py
which calls process_raw_config(), which stores it as a dict.
process_raw_config() is also called by process_store() in store.py
when the platform starts ( class ConfigStoreService):
processing_raw_config 'registry_configs/meter.csv' (config_type: csv)
process_store() is called by _setup using a 'PersistentDict', i.e.:
store_path '/home/carl/.volttron/configuration_store/platform.driver.store'
install_platform_driver_configs.py stores them as config_type="csv", it is useful for batch processing alot
of files at once, like when upgrading from 3.5 to 4.0
to add single config to store, activate and start platform then:
List current configs:
volttron-ctl config list platform.driver
config
devices/PNNL/LABHOME_B/METER1
registry_configs/meter.csv
Delete current configs:
volttron-ctl config delete platform.driver registry_configs/meter.csv # note lack of prefix './GridAgents/configs/'
volttron-ctl config delete platform.driver devices/PNNL/LABHOME_B/METER1
To store the driver configuration run the command:
delete any files from ../GridAgents/configs
volttron-ctl config store platform.driver devices/PNNL/LABHOME_B ../GridAgents/configs/devices/PNNL/LABHOME_B/METER1
To store the registry configuration run the command (note the **--raw option)
volttron-ctl config store platform.driver registry_configs/meter.csv ../GridAgents/configs/registry_configs/meter.csv --raw
***** NOTE: you MUST install the csv file in --raw mode for universal drivers. *****
'''
# get_point
[docs] def get_point(self, point_name):
register = self.get_register_by_name(point_name)
value = self.agent.GetPoint(register)
# if( self._verboseness == 2 ):
# _log.debug( "Universal get_point called for '{}', value: {}.".format(point_name, value))
return value
# _set_point
def _set_point(self, point_name, value):
register = self.get_register_by_name(point_name)
if register.read_only:
raise IOError("Trying to write to a point configured read only: " + point_name)
if (self.agent.SetPoint(register, value)):
register._value = register.reg_type(value)
self.point_map[point_name]._value = register._value
return register._value
# this gets called periodically via DriverAgent::periodic_read()
# ( on behalf of PlatformDriverAgent )
def _scrape_all(self):
result = {}
read_registers = self.get_registers_by_type("byte", True)
write_registers = self.get_registers_by_type("byte", False)
for register in read_registers + write_registers:
if (self._verboseness == 2):
_log.info("Universal Scraping Value for '{}': {}".format(register.point_name, register._value))
result[register.point_name] = register._value
return result
# this set each register to its default value (if it has one)
def _reset_all(self):
read_registers = self.get_registers_by_type("byte", True)
write_registers = self.get_registers_by_type("byte", False)
for register in read_registers + write_registers:
old_value = register._value
register._value = register._default_value
# _log.info( "point_map[register]._value = {}".format(self.point_map[register.point_name]._value))
if (self._verboseness == 2):
_log.info("Hardware not reachable, Resetting Value for '{}' from {} to {}".format(register.point_name,
old_value,
register._value))
'''
We maybe could have used revert_point( register.point_name ), but that is more for reverting the hardware to its default
value (calls set_point, which complains for read_only points), _reset_all is used to set the registry values to a default
when the hardware is not reachable....
if register in self.defaults:
self.point_map[register]._value = self.defaults[register]
if( self._verboseness == 2 ):
_log.info( "Universal Resetting Value for '{}' from {} to {}".format(register.point_name, old_value, register._value))
else:
if( self._verboseness == 2 ):
_log.info( "No Default Value Found while Resetting '{}'.".format(register.point_name))
'''
'''
parse_config
***** NOTE: you MUST install the csv file in --raw mode for universal drivers. *****
volttron-ctl config store platform.driver registry_configs/meter.csv
../GridAgents/configs/registry_configs/meter.csv --raw
'''
[docs] def parse_config(self, agent, device_type, config_dict, reg_config_str):
if reg_config_str is None:
return
config_str = (utils.strip_comments(reg_config_str).lstrip()).rstrip()
import re
# remove whitespace after delimiter, but not within delimited value:
config_str = re.sub(r',[\s]+', ',', config_str)
# remove trailing whitespace within delimited value:
config_str = re.sub(r'[\s]+,', ',', config_str)
# remove trailing whitespace at end of line:
# re.MULTILINE - When specified, '^' matches the beginning of the string andbeginning of each line (immediately following each newline)
# and '$' matches end of the string and end of each line (immediately preceding each newline).
config_str = re.sub(r'[\s]+$', '', config_str, flags=re.MULTILINE)
_log.debug('Configuring {} Driver with {} and config_str {}'.format(device_type, config_dict, config_str))
f = StringIO(config_str)
regDict = DictReader(f)
agent.ConfigureAgent(self, config_dict, regDict)