# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import datetime
import logging
import os
import sys
import statistics
from volttron.platform.vip.agent import Agent, RPC, Core
from volttron.platform.agent import utils
from volttron.platform.agent.utils import get_aware_utc_now
utils.setup_logging()
_log = logging.getLogger(__name__)
__version__ = '1.0'
[docs]def log_statistics(config_path, **kwargs):
"""
Load the LogStatisticsAgent agent configuration and returns and instance
of the agent created using that configuration.
:param config_path: Path to a configuration file.
:type config_path: str
:returns: LogStatisticsAgent agent instance
:rtype: LogStatisticsAgent agent
"""
config = utils.load_config(config_path)
return LogStatisticsAgent(config, **kwargs)
[docs]class LogStatisticsAgent(Agent):
"""
LogStatisticsAgent reads volttron.log file size every hour, compute the size delta from previous hour and publish
the difference with timestamp. It also publishes standard deviation every 24 hours.
:param config: Configuration dict
:type config: dict
Example configuration:
.. code-block:: python
{
"file_path" : "/home/volttron/volttron.log",
"analysis_interval_sec" : 60,
"publish_topic" : "platform/log_statistics",
"historian_topic" : "analysis/log_statistics"
}
"""
def __init__(self, config, **kwargs):
super(LogStatisticsAgent, self).__init__(**kwargs)
self.analysis_interval_sec = config["analysis_interval_sec"]
self.file_path = config["file_path"]
self.publish_topic = config["publish_topic"]
self.historian_topic = config["historian_topic"]
self.size_delta_list = []
self.file_start_size = None
self.prev_file_size = None
self._scheduled_event = None
[docs] @Core.receiver('onstart')
def starting(self, sender, **kwargs):
_log.info("Starting " + self.__class__.__name__ + " agent")
self.publish_analysis()
[docs] def publish_analysis(self):
"""
Publishes file's size increment in previous time interval (60 minutes) with timestamp.
Also publishes standard deviation of file's hourly size differences every 24 hour.
"""
if self._scheduled_event is not None:
self._scheduled_event.cancel()
if self.prev_file_size is None:
self.prev_file_size = self.get_file_size()
_log.debug("init_file_size = {}".format(self.prev_file_size))
else:
# read file size
curr_file_size = self.get_file_size()
# calculate size delta
size_delta = curr_file_size - self.prev_file_size
self.prev_file_size = curr_file_size
self.size_delta_list.append(size_delta)
headers = {'Date': datetime.datetime.utcnow().isoformat() + 'Z'}
publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'log_size_delta': size_delta}
historian_message = [{"log_size_delta ": size_delta},
{"log_size_delta ": {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}}]
if len(self.size_delta_list) == 24:
standard_deviation = statistics.stdev(self.size_delta_list)
publish_message['log_std_dev'] = standard_deviation
historian_message[0]['log_std_dev'] = standard_deviation
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}
_log.debug('publishing message {} with header {} on historian topic {}'
.format(historian_message, headers, self.historian_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.historian_topic, headers=headers,
message=historian_message)
self.size_delta_list = []
_log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message)
_log.debug('Scheduling next periodic call')
now = get_aware_utc_now()
next_update_time = now + datetime.timedelta(seconds=self.analysis_interval_sec)
self._scheduled_event = self.core.schedule(next_update_time, self.publish_analysis)
[docs] def get_file_size(self):
try:
return os.path.getsize(self.file_path)
except OSError as e:
_log.error(e)
[docs]def main(argv=sys.argv):
"""
Main method called by the platform.
"""
utils.vip_main(log_statistics, identity='platform.logstatisticsagent')
if __name__ == '__main__':
# Entry point for script
try:
sys.exit(main())
except KeyboardInterrupt:
pass