Source code for platform_driver.interfaces.fakedriver

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}


import random
import datetime
import math
from math import pi

from platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
from csv import DictReader
from io import StringIO
import logging

_log = logging.getLogger(__name__)
type_mapping = {"string": str,
                "int": int,
                "integer": int,
                "float": float,
                "bool": bool,
                "boolean": bool}


[docs]class FakeRegister(BaseRegister): def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''): # register_type, read_only, pointName, units, description = ''): super(FakeRegister, self).__init__("byte", read_only, pointName, units, description='') self.reg_type = reg_type if default_value is None: self.value = self.reg_type(random.uniform(0, 100)) else: try: self.value = self.reg_type(default_value) except ValueError: self.value = self.reg_type()
[docs]class EKGregister(BaseRegister): def __init__(self, read_only, pointName, units, reg_type, default_value=None, description=''): super(EKGregister, self).__init__("byte", read_only, pointName, units, description='') self._value = 1; math_functions = ('acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2', 'atanh', 'sin', 'sinh', 'sqrt', 'tan', 'tanh') if default_value in math_functions: self.math_func = getattr(math, default_value) else: _log.error('Invalid default_value in EKGregister.') _log.warning('Defaulting to sin(x)') self.math_func = math.sin @property def value(self): now = datetime.datetime.now() seconds_in_radians = pi * float(now.second) / 30.0 yval = self.math_func(seconds_in_radians) return self._value * yval @value.setter def value(self, x): self._value = x
[docs]class Interface(BasicRevert, BaseInterface): def __init__(self, **kwargs): super(Interface, self).__init__(**kwargs)
[docs] def configure(self, config_dict, registry_config_str): self.parse_config(registry_config_str)
[docs] def get_point(self, point_name): register = self.get_register_by_name(point_name) return register.value
def _set_point(self, point_name, value): register = self.get_register_by_name(point_name) if register.read_only: raise RuntimeError( "Trying to write to a point configured read only: " + point_name) register.value = register.reg_type(value) return register.value def _scrape_all(self): result = {} read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) for register in read_registers + write_registers: result[register.point_name] = register.value return result
[docs] def parse_config(self, configDict): if configDict is None: return for regDef in configDict: # Skip lines that have no address yet. if not regDef['Point Name']: continue read_only = regDef['Writable'].lower() != 'true' point_name = regDef['Volttron Point Name'] description = regDef.get('Notes', '') units = regDef['Units'] default_value = regDef.get("Starting Value", 'sin').strip() if not default_value: default_value = None type_name = regDef.get("Type", 'string') reg_type = type_mapping.get(type_name, str) register_type = FakeRegister if not point_name.startswith('EKG') else EKGregister register = register_type( read_only, point_name, units, reg_type, default_value=default_value, description=description) if default_value is not None: self.set_default(point_name, register.value) self.insert_register(register)