Source code for platform_driver.interfaces.fakedriver

# -*- coding: utf-8 -*- {{{
# ===----------------------------------------------------------------------===
#
#                 Component of Eclipse VOLTTRON
#
# ===----------------------------------------------------------------------===
#
# Copyright 2023 Battelle Memorial Institute
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# ===----------------------------------------------------------------------===
# }}}

import random
import datetime
import math
from math import pi

from platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
import logging

_log = logging.getLogger(__name__)
type_mapping = {
    "string": str,
    "int": int,
    "integer": int,
    "float": float,
    "bool": bool,
    "boolean": bool
}


[docs]class FakeRegister(BaseRegister): def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''): # register_type, read_only, pointName, units, description = ''): super(FakeRegister, self).__init__("byte", read_only, pointName, units, description='') self.reg_type = reg_type if default_value is None: self.value = self.reg_type(random.uniform(0, 100)) else: try: self.value = self.reg_type(default_value) except ValueError: self.value = self.reg_type()
[docs]class EKGregister(BaseRegister): def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''): super(EKGregister, self).__init__("byte", read_only, pointName, units, description='') self._value = 1 math_functions = ('acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2', 'atanh', 'sin', 'sinh', 'sqrt', 'tan', 'tanh') if default_value in math_functions: self.math_func = getattr(math, default_value) else: _log.error('Invalid default_value in EKGregister.') _log.warning('Defaulting to sin(x)') self.math_func = math.sin @property def value(self): now = datetime.datetime.now() seconds_in_radians = pi * float(now.second) / 30.0 yval = self.math_func(seconds_in_radians) return self._value * yval @value.setter def value(self, x): self._value = x
[docs]class Interface(BasicRevert, BaseInterface): def __init__(self, **kwargs): super(Interface, self).__init__(**kwargs)
[docs] def configure(self, config_dict, registry_config_str): self.parse_config(registry_config_str)
[docs] def get_point(self, point_name): register = self.get_register_by_name(point_name) return register.value
def _set_point(self, point_name, value): register = self.get_register_by_name(point_name) if register.read_only: raise RuntimeError("Trying to write to a point configured read only: " + point_name) register.value = register.reg_type(value) return register.value def _scrape_all(self): result = {} read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) for register in read_registers + write_registers: result[register.point_name] = register.value return result
[docs] def parse_config(self, configDict): if configDict is None: return for regDef in configDict: # Skip lines that have no address yet. if not regDef['Point Name']: continue read_only = regDef['Writable'].lower() != 'true' volttron_point_name = regDef['Volttron Point Name'] point_name = regDef['Point Name'] description = regDef.get('Notes', '') units = regDef['Units'] default_value = regDef.get("Starting Value", 'sin').strip() if not default_value: default_value = None type_name = regDef.get("Type", 'string') reg_type = type_mapping.get(type_name, str) register_type = FakeRegister if not point_name.startswith('EKG') else EKGregister register = register_type(read_only=read_only, pointName=volttron_point_name, reg_type=reg_type, default_value=default_value, description=description, units=units) if default_value is not None: self.set_default(volttron_point_name, register.value) self.insert_register(register)