# -*- coding: utf-8 -*- {{{
# ===----------------------------------------------------------------------===
#
# Component of Eclipse VOLTTRON
#
# ===----------------------------------------------------------------------===
#
# Copyright 2023 Battelle Memorial Institute
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# ===----------------------------------------------------------------------===
# }}}
import random
import datetime
import math
from math import pi
from platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
import logging
_log = logging.getLogger(__name__)
type_mapping = {
"string": str,
"int": int,
"integer": int,
"float": float,
"bool": bool,
"boolean": bool
}
[docs]class FakeRegister(BaseRegister):
def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''):
# register_type, read_only, pointName, units, description = ''):
super(FakeRegister, self).__init__("byte", read_only, pointName, units, description='')
self.reg_type = reg_type
if default_value is None:
self.value = self.reg_type(random.uniform(0, 100))
else:
try:
self.value = self.reg_type(default_value)
except ValueError:
self.value = self.reg_type()
[docs]class EKGregister(BaseRegister):
def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''):
super(EKGregister, self).__init__("byte", read_only, pointName, units, description='')
self._value = 1
math_functions = ('acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2', 'atanh', 'sin',
'sinh', 'sqrt', 'tan', 'tanh')
if default_value in math_functions:
self.math_func = getattr(math, default_value)
else:
_log.error('Invalid default_value in EKGregister.')
_log.warning('Defaulting to sin(x)')
self.math_func = math.sin
@property
def value(self):
now = datetime.datetime.now()
seconds_in_radians = pi * float(now.second) / 30.0
yval = self.math_func(seconds_in_radians)
return self._value * yval
@value.setter
def value(self, x):
self._value = x
[docs]class Interface(BasicRevert, BaseInterface):
def __init__(self, **kwargs):
super(Interface, self).__init__(**kwargs)
[docs] def get_point(self, point_name):
register = self.get_register_by_name(point_name)
return register.value
def _set_point(self, point_name, value):
register = self.get_register_by_name(point_name)
if register.read_only:
raise RuntimeError("Trying to write to a point configured read only: " + point_name)
register.value = register.reg_type(value)
return register.value
def _scrape_all(self):
result = {}
read_registers = self.get_registers_by_type("byte", True)
write_registers = self.get_registers_by_type("byte", False)
for register in read_registers + write_registers:
result[register.point_name] = register.value
return result
[docs] def parse_config(self, configDict):
if configDict is None:
return
for regDef in configDict:
# Skip lines that have no address yet.
if not regDef['Point Name']:
continue
read_only = regDef['Writable'].lower() != 'true'
volttron_point_name = regDef['Volttron Point Name']
point_name = regDef['Point Name']
description = regDef.get('Notes', '')
units = regDef['Units']
default_value = regDef.get("Starting Value", 'sin').strip()
if not default_value:
default_value = None
type_name = regDef.get("Type", 'string')
reg_type = type_mapping.get(type_name, str)
register_type = FakeRegister if not point_name.startswith('EKG') else EKGregister
register = register_type(read_only=read_only,
pointName=volttron_point_name,
reg_type=reg_type,
default_value=default_value,
description=description,
units=units)
if default_value is not None:
self.set_default(volttron_point_name, register.value)
self.insert_register(register)