Source code for platform_driver.interfaces.bacnet

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}


import logging
from datetime import datetime, timedelta

from platform_driver.driver_exceptions import DriverConfigError
from platform_driver.interfaces import BaseInterface, BaseRegister
from volttron.platform.vip.agent import errors
from volttron.platform.jsonrpc import RemoteError

# Logging is completely configured by now.
_log = logging.getLogger(__name__)

DEFAULT_COV_LIFETIME = 180
COV_UPDATE_BUFFER = 3
BACNET_TYPE_MAPPING = {"multiStateValue": int, "multiStateInput": int, "multiStateOutput": int,
                       "analogValue": float, "analogInput": float, "analogOutput": float,
                       "binaryValue": bool, "binaryInput": bool, "binaryOutput": bool
                      }


[docs]class Register(BaseRegister): def __init__(self, instance_number, object_type, property_name, read_only, point_name, units, description='', priority=None, list_index=None): super(Register, self).__init__("byte", read_only, point_name, units, description=description) self.instance_number = int(instance_number) self.object_type = object_type self.property = property_name self.priority = priority self.index = list_index self.python_type = BACNET_TYPE_MAPPING[object_type]
[docs]class Interface(BaseInterface): def __init__(self, **kwargs): super(Interface, self).__init__(**kwargs) self.register_count = 10000 self.register_count_divisor = 1 self.cov_points = []
[docs] def configure(self, config_dict, registry_config_str): self.min_priority = config_dict.get("min_priority", 8) self.parse_config(registry_config_str) self.target_address = config_dict.get("device_address") self.device_id = int(config_dict.get("device_id")) self.cov_lifetime = config_dict.get("cov_lifetime", DEFAULT_COV_LIFETIME) self.proxy_address = config_dict.get("proxy_address", "platform.bacnet_proxy") self.max_per_request = config_dict.get("max_per_request", 24) self.use_read_multiple = config_dict.get("use_read_multiple", True) self.timeout = float(config_dict.get("timeout", 30.0)) self.ping_retry_interval = timedelta(seconds=config_dict.get("ping_retry_interval", 5.0)) self.scheduled_ping = None self.ping_target() # list of points to establish change of value subscriptions with, generated from the registry config for point_name in self.cov_points: self.establish_cov_subscription(point_name, self.cov_lifetime, True)
[docs] def schedule_ping(self): if self.scheduled_ping is None: now = datetime.now() next_try = now + self.ping_retry_interval self.scheduled_ping = self.core.schedule(next_try, self.ping_target)
[docs] def ping_target(self): # Some devices (mostly RemoteStation addresses behind routers) will not be reachable without # first establishing the route to the device. Sending a directed WhoIsRequest is will # settle that for us when the response comes back. pinged = False try: self.vip.rpc.call(self.proxy_address, 'ping_device', self.target_address, self.device_id).get(timeout=self.timeout) pinged = True except errors.Unreachable: _log.warning("Unable to reach BACnet proxy.") except errors.VIPError: _log.warning("Error trying to ping device.") self.scheduled_ping = None # Schedule retry. if not pinged: self.schedule_ping()
[docs] def get_point(self, point_name, get_priority_array=False): register = self.get_register_by_name(point_name) property_name = "priorityArray" if get_priority_array else register.property register_index = None if get_priority_array else register.index result = self.vip.rpc.call(self.proxy_address, 'read_property', self.target_address, register.object_type, register.instance_number, property_name, register_index).get(timeout=self.timeout) return result
[docs] def set_point(self, point_name, value, priority=None): # TODO: support writing from an array. register = self.get_register_by_name(point_name) if register.read_only: raise IOError("Trying to write to a point configured read only: " + point_name) if priority is not None and priority < self.min_priority: raise IOError("Trying to write with a priority lower than the minimum of " + str(self.min_priority)) # We've already validated the register priority against the min priority. args = [self.target_address, value, register.object_type, register.instance_number, register.property, priority if priority is not None else register.priority, register.index] result = self.vip.rpc.call(self.proxy_address, 'write_property', *args).get(timeout=self.timeout) return result
[docs] def scrape_all(self): # TODO: support reading from an array. point_map = {} read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) for register in read_registers + write_registers: point_map[register.point_name] = [register.object_type, register.instance_number, register.property, register.index] while True: try: result = self.vip.rpc.call(self.proxy_address, 'read_properties', self.target_address, point_map, self.max_per_request, self.use_read_multiple).get(timeout=self.timeout) except RemoteError as e: if "segmentationNotSupported" in e.message: if self.max_per_request <= 1: _log.error("Receiving a segmentationNotSupported error with 'max_per_request' setting of 1.") raise self.register_count_divisor += 1 self.max_per_request = max(int(self.register_count/self.register_count_divisor), 1) _log.info("Device requires a lower max_per_request setting. Trying: "+str(self.max_per_request)) continue elif e.message.endswith("rejected the request: 9") and self.use_read_multiple: _log.info("Device rejected request with 'unrecognized-service' error, attempting to access with use_read_multiple false") self.use_read_multiple = False continue else: raise except errors.Unreachable: # If the Proxy is not running bail. _log.warning("Unable to reach BACnet proxy.") self.schedule_ping() raise else: break return result
[docs] def revert_all(self, priority=None): """ Revert entrire device to it's default state """ # TODO: Add multipoint write support write_registers = self.get_registers_by_type("byte", False) for register in write_registers: self.revert_point(register.point_name, priority=priority)
[docs] def revert_point(self, point_name, priority=None): """ Revert point to it's default state """ self.set_point(point_name, None, priority=priority)
[docs] def parse_config(self, configDict): if configDict is None: return self.register_count = len(configDict) for regDef in configDict: # Skip lines that have no address yet. if not regDef.get('Volttron Point Name'): continue io_type = regDef.get('BACnet Object Type') read_only = regDef.get('Writable').lower() != 'true' point_name = regDef.get('Volttron Point Name') # checks if the point is flagged for change of value is_cov = regDef.get("COV Flag", 'false').lower() == "true" index = int(regDef.get('Index')) list_index = regDef.get('Array Index', '') list_index = list_index.strip() if not list_index: list_index = None else: list_index = int(list_index) priority = regDef.get('Write Priority', '') priority = priority.strip() if not priority: priority = None else: priority = int(priority) if priority < self.min_priority: message = "{point} configured with a priority {priority} which is lower than than minimum {min}." raise DriverConfigError(message.format(point=point_name, priority=priority, min=self.min_priority)) description = regDef.get('Notes', '') units = regDef.get('Units') property_name = regDef.get('Property') register = Register(index, io_type, property_name, read_only, point_name, units, description=description, priority=priority, list_index=list_index) self.insert_register(register) if is_cov: self.cov_points.append(point_name)
[docs] def establish_cov_subscription(self, point_name, lifetime, renew=False): """ Asks the BACnet proxy to establish a COV subscription for the point via RPC. If lifetime is specified, the subscription will live for that period, else the subscription will last indefinitely. Default period of 3 minutes. If renew is True, the the core scheduler will call this method again near the expiration of the subscription. """ register = self.get_register_by_name(point_name) try: self.vip.rpc.call(self.proxy_address, 'create_cov_subscription', self.target_address, self.device_path, point_name, register.object_type, register.instance_number, lifetime=lifetime) except errors.Unreachable: _log.warning("Unable to establish a subscription via the bacnet proxy as it was unreachable.") # Schedule COV resubscribe if renew and (lifetime > COV_UPDATE_BUFFER): now = datetime.now() next_sub_update = now + timedelta(seconds=(lifetime - COV_UPDATE_BUFFER)) self.core.schedule(next_sub_update, self.establish_cov_subscription, point_name, lifetime, renew)