# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
# Copyright 2019, Battelle Memorial Institute.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
# under Contract DE-AC05-76RL01830
# }}}

import logging
import weakref
import gevent
from collections import defaultdict
from bacpypes.basetypes import EngineeringUnits
from bacpypes.object import get_datatype
from bacpypes.primitivedata import Enumerated, Unsigned, Boolean, Integer, Real, Double

from volttron.platform.jsonrpc import RemoteError
from volttron.platform.messaging import topics

# Deals with the largest numbers that can be reported.
# see

_log = logging.getLogger(__name__)

[docs]class BACnetReader(object): """ The BACnetReader """ def __init__(self, vip, bacnet_proxy_identity, iam_response_fn=None, config_response_fn=None, batch_size=20): if batch_size < 1: raise ValueError("batch_size must be larger than 0") self._batch_size = batch_size self._pubsub = weakref.ref(vip.pubsub) self._rpc = weakref.ref(vip.rpc) self._proxy_identity = bacnet_proxy_identity self._response_function = iam_response_fn self._iam_response_fn = iam_response_fn self._config_response_fn = config_response_fn self._iam_callbacks = {} self._send_iam_responses = False self._caller_callback = {}
[docs] def start_whois(self, low_device_id=None, high_device_id=None, target_address=None):"Starting WHOIS") self._pubsub().subscribe(peer='pubsub', prefix=topics.BACNET_I_AM, callback=self._iam_handler).get(timeout=3) self._send_iam_responses = True self._rpc().call(self._proxy_identity, "who_is", low_device_id=low_device_id, high_device_id=high_device_id, target_address=target_address).get(timeout=5.0)
[docs] def stop_iam_responses(self):"Stopping WHOIS") self._pubsub().unsubscribe(peer='pubsub', prefix=topics.BACNET_I_AM, callback=self._iam_handler).get(timeout=3) self._send_iam_responses = False
[docs] def get_iam(self, device_id, callback, address=None, timeout=10): _log.debug("Getting iam callback") self._caller_callback[device_id] = callback self._pubsub().subscribe(peer='pubsub', prefix=topics.BACNET_I_AM, callback=self._iam_handler).get(timeout=3) self._rpc().call(self._proxy_identity, "who_is", low_device_id=device_id, high_device_id=device_id, target_address=address).get(timeout=5.0)
def _iam_handler(self, peer, sender, bus, topic, headers, message): """ Handle publishes from who_is. There are two different modes that this handler supports. The first is a sending of a single device_id which was started through a call to get_iam. This allows the caller to retrieve only the single device's information. This method is meant to be run from the scripts/bacnet/ script. The second method is when the user calls start_whois. This will keep open the whois publish for the specified number of seconds before unsubscribing. In this method we respond each time any information comes into the system. These two methods are mutually exclusive. """ device_id = message["device_id"] if device_id in self._caller_callback: callback = self._caller_callback.pop(device_id)"Received iam for device_id {}".format(device_id)) _log.debug("Callback received: {}".format(message)) callback(message) # Not 100% sure this won't have issues, but it didn't throw # any errors during scripts nor from the vc/vcp combinations. gevent.spawn_later(3, self._pubsub().unsubscribe, peer='pubsub', prefix=topics.BACNET_I_AM, callback=self._iam_handler) return if self._iam_response_fn is None: _log.error("No handler set for iam responses.") _log.error("IAM response was {}".format(message)) return if self._send_iam_responses:"Received iam for device_id {}".format(device_id)) _log.debug("iam message is: {}".format(message)) self._iam_response_fn(message) else: _log.debug("IAM response not processed {}".format(device_id))
[docs] def read_device_name(self, address, device_id): """ Reads the device name from the specified address and device_id :param address: Address of the bacnet device :param device_id: The device id of the bacnet device. :return: The device name or the string "MISSING DEVICE NAME" """ try: _log.debug("Reading device name.") device_name = self._read_prop(address, "device", device_id, "objectName") _log.debug('device_name = ' + str(device_name)) except TypeError: _log.debug("device missing objectName") device_name = "MISSING DEVICE NAME" except gevent.Timeout: device_name = "Device Timeout" except RemoteError as ex: device_name = repr(ex) except Exception as ex: device_name = repr(ex) return device_name
[docs] def read_device_description(self, address, device_id): """ Reads the device name from the specified address and device_id :param address: Address of the bacnet device :param device_id: The device id of the bacnet device. :return: The device desciption or an empty string """ try: _log.debug("Reading device description.") device_description = self._read_prop(address, "device", device_id, "description") _log.debug('description = ' + str(device_description)) except TypeError: _log.debug('device missing description') device_description = "" except RemoteError as e: _log.error("REMOTE ERROR") _log.error(e.args) device_description = "" except Exception as ex: device_description = repr(ex) return device_description
[docs] def read_device_properties(self, target_address, device_id, filter=None): """ Starts the processes of reading a device's meta data. The device will first be queried for all of it's objects. For each of the returned indexes only the properties that have a presentValue as a property will be used. Processing of the objects will continue in batches until all of the device points have been received. Data will ultimately be written through the `self._emit_reresponses` function. The `self._response_function` that was set in the constructor of the object will be used to return the data to the caller. :param target_address: The address of the bacnet device :param device_id: The device_id of the bacnet device :param filter: A list of two-tuples with (bacnet_type, [index]) where the bacnet_type is one of the bacnet_type strings and the [index] is an array of indexes to return. """'read_device_properties called target_address: {} device_id: {}'.format( target_address, device_id)) try: _log.debug("Reading objectList from device index 0") object_count = self._read_prop(target_address, "device", device_id, "objectList", index=0) list_property = "objectList" except TypeError: _log.debug("Type error so reading structuredObjectList of index 0") object_count = self._read_prop(target_address, "device", device_id, "structuredObjectList", index=0) list_property = "structuredObjectList" except RemoteError as e: _log.error("REMOTE ERROR read_device_properties") _log.error(e.args) except RuntimeError as ex: _log.error(repr(ex)) _log.debug('object_count = ' + str(object_count)) query_map = {} count = 0 # type_map contains a dictionary keyed off of the bacnet_type string # each value holds a list of the lines in the csv file that contains # that specific datatype. _log.debug("query_map: {}".format(query_map)) # Loop over each of the objects and interrogate the device for the # properties types and indexes. After this for loop type_map will # hold the readable properties from the bacnet device ordered by # the type i.e. type_map['analogInput'] = [300324,304050] for object_index in range(1, object_count + 1): count += 1 query_map[object_index] = [ "device", device_id, list_property, object_index ] if count >= self._batch_size: _log.debug("query_map: {}".format(query_map)) results = self._read_props(target_address, query_map) present_values = self._filter_present_value_from_results( results) self._process_input(target_address, device_id, present_values) query_map = {} count = 0 if count > 0: _log.debug("query_map: {}".format(query_map)) results = self._read_props(target_address, query_map) present_values = self._filter_present_value_from_results(results) self._process_input(target_address, device_id, present_values) self._response_function(dict(device_id=device_id, address=target_address, status="COMPLETE"), {})
def _build_query_map_for_type(self, object_type, index): """ Build a map that can be sent to the _read_props function. This function build keys based upon the object_type and index so that the return value from the eventual read_prosps call to the bacnet proxy can be read easily. This function was a based upon the process_object function. Instead of each level of the if then else structure making its own call to the query map, it makes a single call and retireves the properties. This makes it much more efficient in terms of the rpc call through the bacnet proxy. """ query_map = {} # Retrieve the class object for the object type. This allows # interrogation of the class to make decisions on what should be # loaded into the query_map. present_value_type = get_datatype(object_type, 'presentValue') # The object name translates into the point name of the object. key = '{}-{}'.format(index, "objectName") query_map[key] = [object_type, index, "objectName"] key = '{}-{}'.format(index, 'description') query_map[key] = [object_type, index, "description"] if issubclass(present_value_type, Enumerated): key = '{}-{}'.format(index, "relinquishDefault") query_map[key] = [object_type, index, "relinquishDefault"] elif issubclass(present_value_type, Boolean): pass elif get_datatype(object_type, 'units') is None: key = '{}-{}'.format(index, 'numberOfStates') query_map[key] = [object_type, index, "numberOfStates"] key = '{}-{}'.format(index, 'stateText') query_map[key] = [object_type, index, "stateText"] if object_type != 'multiSTateInput': key = '{}-{}'.format(index, "relinquishDefault") query_map[key] = [object_type, index, "relinquishDefault"] elif object_type == 'loop': pass else: pass else: key = '{}-{}'.format(index, 'units') query_map[key] = [object_type, index, "units"] key = '{}-{}'.format(index, 'resolution') query_map[key] = [object_type, index, "resolution"] if object_type not in ( 'largeAnalogValue', 'integerValue', 'positiveIntegerValue'): key = '{}-{}'.format(index, 'minPresValue') query_map[key] = [object_type, index, 'minPresValue'] key = '{}-{}'.format(index, 'maxPresValue') query_map[key] = [object_type, index, 'maxPresValue'] if object_type != 'analogInput': key = '{}-{}'.format(index, "relinquishDefault") query_map[key] = [object_type, index, "relinquishDefault"] return query_map def _build_results(self, query_map, result_map): """ Create dictionary objects. The `build_results` function creates a dictionary of dictionaries. The results will be keyed off of the index of the device property. Each key query_map will be split on a - producing (index, property) tuple. The property will then be added as a key to the dictionary corresponding to the dictionary. :param: object_type: string representation of the bacnet type :param: query_map: mapping of request values that were sent to the `_read_props` function. :param: result_map: mapping that was returned from the `_read_props` function. :returns: dict: dictionary of dictinaries based upon the index of device properties. """ from pprint import pprint objects = defaultdict(dict) for key in query_map: if key not in result_map: _log.debug("MISSING KEY {}".format(key)) continue index, property = key.split('-') obj = objects[index] obj['index'] = index if property == 'objectName': if not result_map[key]: obj['object_name'] = 'MISSING OBJECT NAME' else: obj['object_name'] = result_map[key] obj[property] = result_map[key] if 'object_type' not in obj: obj['object_type'] = query_map[key][0] _log.debug("Built objects") return objects def _process_enumerated(self, object_type, obj): units = 'Enum' notes = obj.get('description', '').strip() present_value_type = get_datatype(object_type, 'presentValue') values = list(present_value_type.enumerations.values()) min_value = min(values) max_value = max(values) vendor_range = '' if hasattr(present_value_type, 'vendor_range'): vendor_min, vendor_max = present_value_type.vendor_range vendor_range = ' (vendor {min}-{max})'.format(min=vendor_min, max=vendor_max) units_details = '{min}-{max}{vendor}'.format(min=min_value, max=max_value, vendor=vendor_range) if not object_type.endswith('Input'): if "relinquishDefault" in obj: default_value = obj['relinquishDefault'] _log.debug('DEFAULT VALUE IS: {}'.format(default_value)) _log.debug('ENUMERATION VALUES: {}'.format(present_value_type.enumerations)) for k, v in present_value_type.enumerations.items(): if v == default_value: units_details += ' (default {default})'.format( default=default_value) break if not notes: enum_strings = [] enum_items = sorted(present_value_type(0).enumerations.items()) for name, value in enum_items: enum_strings.append(str(value) + '=' + name) notes = present_value_type.__name__ + ': ' + ', '.join(enum_strings) return units, units_details, notes def _process_units(self, object_type, obj): units_details = '' notes = obj.get('description', '').strip() if object_type.startswith('multiState'): units = 'State' state_count = obj.get('numberOfStates') if state_count: units_details = 'State count: {}'.format(state_count) enum_strings = [] state_list = obj.get('stateText') if state_list: for name in state_list[1:]: enum_strings.append(name) notes = ', '.join('{}={}'.format(x, y) for x, y in enumerate(enum_strings, start=1)) if object_type != 'multiStateInput': default_value = obj.get('relinquishDefault') if default_value: units_details += ' (default {default})'.format( default=default_value) units_details = units_details.strip() elif object_type == 'loop': units = 'Loop' else: units = 'UNKNOWN UNITS' return units, units_details, notes def _process_unknown(self, object_type, obj): obj_units = "UNKNOWN UNIT ENUM VALUE" try: obj_units = EngineeringUnits(obj.get('units')).value if isinstance(obj_units, int): obj_units = 'UNKNOWN UNIT ENUM VALUE: ' + str(obj.get('units')) except ValueError: if obj.get('units'): obj_units += ": " + str(obj.get('units')) units_details = '' notes = obj.get('description', '').strip() if object_type.startswith('analog') or object_type in ( 'largeAnalogValue', 'integerValue', 'positiveIntegerValue'): if not object_type.endswith('Value'): res_value = obj.get('resolution') if res_value: notes = 'Resolution: {resolution:.6g}'.format( resolution=res_value) if object_type not in ( 'largeAnalogValue', 'integerValue', 'positiveIntegerValue'): min_value = obj.get('minPresValue', -MAX_RANGE_REPORT) max_value = obj.get('maxPresValue', MAX_RANGE_REPORT) has_min = min_value > -MAX_RANGE_REPORT has_max = max_value < MAX_RANGE_REPORT if has_min and has_max: units_details = '{min:.2f} to {max:.2f}'.format( min=min_value, max=max_value) elif has_min: units_details = 'Min: {min:.2f}'.format( min=min_value) elif has_max: units_details = 'Max: {max:.2f}'.format( max=max_value) else: units_details = 'No limits.' if object_type != 'analogInput': if 'relinquishDefault' in obj: units_details += ' (default {default})'.format( default=obj.get('relinquishDefault')) units_details = units_details.strip() return obj_units, units_details, notes def _emit_responses(self, device_id, target_address, objects): """ results = {} results['Reference Point Name'] = results[ 'Volttron Point Name'] = object_name results['Units'] = object_units results['Unit Details'] = object_units_details results['BACnet Object Type'] = obj_type results['Property'] = 'presentValue' results['Writable'] = writable results['Index'] = index results['Notes'] = object_notes :param objects: :return: """ _log.debug('emit_responses: objects: {}'.format(objects)) for index, obj in objects.items(): object_type = obj['object_type'] present_value_type = get_datatype(object_type, 'presentValue') object_units_details = '' object_notes = '' if issubclass(present_value_type, Boolean): object_units = 'Boolean' elif issubclass(present_value_type, Enumerated): object_units, object_units_details, object_notes = \ self._process_enumerated(object_type, obj) elif get_datatype(object_type, 'units') is None: object_units, object_units_details, object_notes = \ self._process_units(object_type, obj) else: object_units, object_units_details, object_notes = \ self._process_unknown(object_type, obj) results = {} results['Reference Point Name'] = results[ 'Volttron Point Name'] = obj['object_name'] results['Units'] = object_units results['Unit Details'] = object_units_details results['BACnet Object Type'] = object_type results['Property'] = 'presentValue' results['Writable'] = 'FALSE' results['Index'] = obj['index'] results['Notes'] = object_notes self._response_function(dict(device_id=device_id, address=target_address), results) def _process_input(self, target_address, device_id, input_items): _log.debug('process_input: items: {}'.format(input_items)) query_mapping = {} count = 0 processed = {} for item in input_items: index = item['index'] object_type = item['bacnet_type'] key = (target_address, device_id, object_type, index) if key in processed: _log.debug("Duplicate detected continuing") continue processed[key] = 1 new_map = self._build_query_map_for_type(object_type, index) query_mapping.update(new_map) if count >= self._batch_size: try: results = self._read_props(target_address, query_mapping) objects = self._build_results(query_mapping, results) _log.debug('Built bacnet Objects 1: {}'.format(objects)) self._emit_responses(device_id, target_address, objects) count = 0 except Exception as ex: _log.error(repr(ex)) except RemoteError as e: _log.error('REMOTE ERROR: {}'.format(e)) query_mapping = {} count += 1 if query_mapping: try: results = self._read_props(target_address, query_mapping) objects = self._build_results(query_mapping, results) except RemoteError as e: _log.error("REMOTE ERROR 2:") _log.error(e.args) except Exception as ex: _log.error(repr(ex)) else: _log.debug('Built bacnet Objects 2: {}'.format(objects)) self._emit_responses(device_id, target_address, objects) def _filter_present_value_from_results(self, results): """ Filter the results so that only presentValue datatypes are kept. The results of this function have an array of dictionaries. :param results: The results from a _read_props function. :return: An array of dictionaries. """ # Currently our driver only deals with objects that have a # presentValue datatype. This gives a list of dictionary's that # represent the individual lines in the config file (note not all of # the properties are currently present in the dictionary) presentValues = [dict(index=v[1], writable="FALSE", datatype=get_datatype(v[0], 'presentValue'), bacnet_type=v[0]) for k, v in results.items() if get_datatype(v[0], 'presentValue')] return presentValues def _read_props(self, address, parameters): _log.debug("_read_props for address: {} params: {}".format(address, parameters)) return self._rpc().call(self._proxy_identity, "read_properties", address, parameters).get(timeout=20) def _read_prop(self, address, obj_type, obj_inst, prop_id, index=None): point_map = {"result": [obj_type, obj_inst, prop_id, index]} result = self._read_props(address, point_map) try: return result["result"] except KeyError: pass # ========================================================================= # Untested and commented out calles to this code. I am leavingg it in # so that we don't have to rewrite it later ;) # ========================================================================= def _process_device_object_reference(self, address, obj_type, obj_inst, property_name, max_range_report, config_writer): object_count = self._read_prop(address, obj_type, obj_inst, property_name, index=0) for object_index in range(1, object_count + 1): _log.debug('property_name index = ' + repr(object_index)) object_reference = self._read_prop(address, obj_type, obj_inst, property_name, index=object_index) # Skip references to objects on other devices. if object_reference.deviceIdentifier is not None: continue sub_obj_type, sub_obj_index = object_reference.objectIdentifier self._process_object(address, sub_obj_type, sub_obj_index, max_range_report, config_writer) def _process_object(self, address, obj_type, index, max_range_report): _log.debug('obj_type = ' + repr(obj_type)) _log.debug('bacnet_index = ' + repr(index)) context = None if obj_type == "device": context = dict(address=address, device=index) writable = 'FALSE' subondinate_list_property = get_datatype(obj_type, 'subordinateList') if subondinate_list_property is not None: _log.debug('Processing StructuredViewObject') # self._process_device_object_reference(address, obj_type, index, # 'subordinateList', # max_range_report, # config_writer) return subondinate_list_property = get_datatype(obj_type, 'zoneMembers') if subondinate_list_property is not None: _log.debug('Processing LifeSafetyZoneObject') # self._process_device_object_reference(address, obj_type, index, # 'zoneMembers', # max_range_report, # config_writer) return present_value_type = get_datatype(obj_type, 'presentValue') if present_value_type is None: _log.debug('This object type has no presentValue. Skipping.') return if not issubclass(present_value_type, (Enumerated, Unsigned, Boolean, Integer, Real, Double)): _log.debug( 'presenValue is an unsupported type: ' + repr( present_value_type)) return try: object_name = self._read_prop(address, obj_type, index, "objectName") _log.debug('object name = ' + object_name) except TypeError: object_name = "NO NAME! PLEASE NAME THIS." try: object_notes = self._read_prop(address, obj_type, index, "description") except TypeError: object_notes = '' object_units_details = '' if issubclass(present_value_type, Enumerated): object_units = 'Enum' values = list(present_value_type.enumerations.values()) min_value = min(values) max_value = max(values) vendor_range = '' if hasattr(present_value_type, 'vendor_range'): vendor_min, vendor_max = present_value_type.vendor_range vendor_range = ' (vendor {min}-{max})'.format(min=vendor_min, max=vendor_max) object_units_details = '{min}-{max}{vendor}'.format(min=min_value, max=max_value, vendor=vendor_range) if not obj_type.endswith('Input'): try: default_value = self._read_prop(address, obj_type, index, "relinquishDefault") object_units_details += ' (default {default})'.format( default=present_value_type.enumerations[default_value]) # writable = 'TRUE' except KeyError: pass except TypeError: pass except ValueError: pass if not object_notes: enum_strings = [] enum_items = sorted(present_value_type(0).enumerations.items()) for name, value in enum_items: enum_strings.append(str(value) + '=' + name) object_notes = present_value_type.__name__ + ': ' + ', '.join(enum_strings) elif issubclass(present_value_type, Boolean): object_units = 'Boolean' elif get_datatype(obj_type, 'units') is None: if obj_type.startswith('multiState'): object_units = 'State' try: state_count = self._read_prop(address, obj_type, index, "numberOfStates") object_units_details = 'State count: {}'.format(state_count) except TypeError: pass try: enum_strings = [] state_list = self._read_prop(address, obj_type, index, "stateText") for name in state_list[1:]: enum_strings.append(name) object_notes = ', '.join('{}={}'.format(x, y) for x, y in enumerate(enum_strings, start=1)) except TypeError: pass if obj_type != 'multiStateInput': try: default_value = self._read_prop(address, obj_type, index, "relinquishDefault") object_units_details += ' (default {default})'.format(default=default_value) object_units_details = object_units_details.strip() # writable = 'TRUE' except TypeError: pass except ValueError: pass elif obj_type == 'loop': object_units = 'Loop' else: object_units = 'UNKNOWN UNITS' else: try: object_units = self._read_prop(address, obj_type, index, "units") except TypeError: object_units = 'UNKNOWN UNITS' if isinstance(object_units, int): object_units = 'UNKNOWN UNIT ENUM VALUE: ' + str(object_units) if obj_type.startswith('analog') or obj_type in ( 'largeAnalogValue', 'integerValue', 'positiveIntegerValue'): # Value objects never have a resolution property in practice. if not object_notes and not obj_type.endswith('Value'): try: res_value = self._read_prop(address, obj_type, index, "resolution") object_notes = 'Resolution: {resolution:.6g}'.format( resolution=res_value) except (TypeError, ValueError): pass if obj_type not in ( 'largeAnalogValue', 'integerValue', 'positiveIntegerValue'): try: min_value = self._read_prop(address, obj_type, index, "minPresValue") max_value = self._read_prop(address, obj_type, index, "maxPresValue") has_min = (min_value is not None) and (min_value > -max_range_report) has_max = (max_value is not None) and (max_value < max_range_report) if has_min and has_max: object_units_details = '{min:.2f} to {max:.2f}'.format(min=min_value, max=max_value) elif has_min: object_units_details = 'Min: {min:.2f}'.format(min=min_value) elif has_max: object_units_details = 'Max: {max:.2f}'.format(max=max_value) else: object_units_details = 'No limits.' except (TypeError, ValueError): pass if obj_type != 'analogInput': try: default_value = self._read_prop(address, obj_type, index, "relinquishDefault") object_units_details += ' (default {default})'.format(default=default_value) object_units_details = object_units_details.strip() except (TypeError, ValueError): pass _log.debug(' object units = ' + str(object_units)) _log.debug(' object units details = ' + str(object_units_details)) _log.debug(' object notes = ' + str(object_notes)) results = {} results['Reference Point Name'] = results[ 'Volttron Point Name'] = object_name results['Units'] = object_units results['Unit Details'] = object_units_details results['BACnet Object Type'] = obj_type results['Property'] = 'presentValue' results['Writable'] = writable results['Index'] = index results['Notes'] = object_notes self._response_function(context, results)