# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import random
import datetime
import math
from math import pi
from platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
from csv import DictReader
from io import StringIO
import logging
_log = logging.getLogger(__name__)
type_mapping = {"string": str,
"int": int,
"integer": int,
"float": float,
"bool": bool,
"boolean": bool}
[docs]class FakeRegister(BaseRegister):
def __init__(self, read_only, pointName, units, reg_type,
default_value=None, description=''):
# register_type, read_only, pointName, units, description = ''):
super(FakeRegister, self).__init__("byte", read_only, pointName, units,
description='')
self.reg_type = reg_type
if default_value is None:
self.value = self.reg_type(random.uniform(0, 100))
else:
try:
self.value = self.reg_type(default_value)
except ValueError:
self.value = self.reg_type()
[docs]class EKGregister(BaseRegister):
def __init__(self, read_only, pointName, units, reg_type,
default_value=None, description=''):
super(EKGregister, self).__init__("byte", read_only, pointName, units,
description='')
self._value = 1;
math_functions = ('acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2',
'atanh', 'sin', 'sinh', 'sqrt', 'tan', 'tanh')
if default_value in math_functions:
self.math_func = getattr(math, default_value)
else:
_log.error('Invalid default_value in EKGregister.')
_log.warning('Defaulting to sin(x)')
self.math_func = math.sin
@property
def value(self):
now = datetime.datetime.now()
seconds_in_radians = pi * float(now.second) / 30.0
yval = self.math_func(seconds_in_radians)
return self._value * yval
@value.setter
def value(self, x):
self._value = x
[docs]class Interface(BasicRevert, BaseInterface):
def __init__(self, **kwargs):
super(Interface, self).__init__(**kwargs)
[docs] def get_point(self, point_name):
register = self.get_register_by_name(point_name)
return register.value
def _set_point(self, point_name, value):
register = self.get_register_by_name(point_name)
if register.read_only:
raise RuntimeError(
"Trying to write to a point configured read only: " + point_name)
register.value = register.reg_type(value)
return register.value
def _scrape_all(self):
result = {}
read_registers = self.get_registers_by_type("byte", True)
write_registers = self.get_registers_by_type("byte", False)
for register in read_registers + write_registers:
result[register.point_name] = register.value
return result
[docs] def parse_config(self, configDict):
if configDict is None:
return
for regDef in configDict:
# Skip lines that have no address yet.
if not regDef['Point Name']:
continue
read_only = regDef['Writable'].lower() != 'true'
point_name = regDef['Volttron Point Name']
description = regDef.get('Notes', '')
units = regDef['Units']
default_value = regDef.get("Starting Value", 'sin').strip()
if not default_value:
default_value = None
type_name = regDef.get("Type", 'string')
reg_type = type_mapping.get(type_name, str)
register_type = FakeRegister if not point_name.startswith('EKG') else EKGregister
register = register_type(
read_only,
point_name,
units,
reg_type,
default_value=default_value,
description=description)
if default_value is not None:
self.set_default(point_name, register.value)
self.insert_register(register)