Source code for ambient.agent

# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright (c) 2017, Battelle Memorial Institute
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
#  FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official,
# policies either expressed or implied, of the FreeBSD Project.
#

# This material was prepared as an account of work sponsored by an
# agency of the United States Government.  Neither the United States
# Government nor the United States Department of Energy, nor Battelle,
# nor any of their employees, nor any jurisdiction or organization
# that has cooperated in the development of these materials, makes
# any warranty, express or implied, or assumes any legal liability
# or responsibility for the accuracy, completeness, or usefulness or
# any information, apparatus, product, software, or process disclosed,
# or represents that its use would not infringe privately owned rights.
#
# Reference herein to any specific commercial product, process, or
# service by trade name, trademark, manufacturer, or otherwise does
# not necessarily constitute or imply its endorsement, recommendation,
# r favoring by the United States Government or any agency thereof,
# or Battelle Memorial Institute. The views and opinions of authors
# expressed herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY
# operated by BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830

# }}}

__docformat__ = 'reStructuredText'

import logging
import datetime
import pytz
import sys

import grequests
# requests should be imported after grequests as requests imports ssl and grequests patches ssl
import requests

import pkg_resources
from volttron.platform.agent import utils
from volttron.platform.vip.agent import RPC
from volttron.platform.agent.utils import format_timestamp
from volttron.platform.agent.base_weather import BaseWeatherAgent
from volttron.platform import jsonapi

_log = logging.getLogger(__name__)
utils.setup_logging()
__version__ = "0.1"


[docs]def ambient(config_path, **kwargs): """ Parses the Agent configuration and returns an instance of the agent created using that configuration. :param config_path: Path to a configuration file. :type config_path: str :returns: Ambient :rtype: Ambient """ try: config = utils.load_config(config_path) except Exception: config = {} if not config: _log.error("Ambient agent configuration: ".format(config)) for key in ["api_key", "application_key"]: if not config.get(key) or not isinstance(config.get(key), str): raise RuntimeError("Ambient agent must be configured with '{}' key.".format(key)) _log.debug("config_dict before init: {}".format(config)) utils.update_kwargs_with_config(kwargs, config) return Ambient(**kwargs)
[docs]class Ambient(BaseWeatherAgent): """ The Ambient agent requires having an API key to interact with the remote API. The agent offers a performance_mode configuration option which allows users to limit the amount of data returned by the API. """ def __init__(self, application_key="", **kwargs): super(Ambient, self).__init__(**kwargs) _log.debug("vip_identity: " + self.core.identity) self.headers = {"Accept": "application/json", "Accept-Language": "en-US" } self.remove_service("get_hourly_historical") self.remove_service("get_hourly_forecast") self.app_key = application_key self.last_service_call_timestamp = None
[docs] @RPC.export def get_version(self): """ Provides the current version of the agent. :return: current version number in string format. """ return __version__
[docs] def validate_location(self, service_name, location): """ Indicates whether the location dictionary provided matches the format required by the remote weather API :param service_name: name of the remote API service :param location: location dictionary to provide in the remote API url :return: True if the location matches the required format else False """ return isinstance(location.get("location", None), str)
[docs] def get_update_interval(self, service_name): """ Indicates the interval between remote API updates :param service_name: requested service endpoint :return: datetime timedelta representing the time interval """ if service_name == "get_current_weather": return datetime.timedelta(minutes=5) else: return None
[docs] def get_api_description(self, service_name): """ Provides a human-readable description of the various endpoints provided by the agent :param service_name: requested service endpoint :return: Human-readable description string """ if service_name is "get_current_weather": "Provides current weather observations for locations by their corresponding Ambient weather station name " \ "via RPC (Requires {'location': <station location string>})" else: raise RuntimeError( "Service {} is not implemented by Ambient.".format(service_name))
[docs] def get_point_name_defs_file(self): """ Constructs the point name mapping dict from the mapping csv. :return: dictionary containing a mapping of service point names to standard point names with optional """ # returning resource file instead of stream, as csv.DictReader require file path or file like object opened in # text mode. return pkg_resources.resource_filename(__name__, "data/name_mapping.csv")
[docs] def query_current_weather(self, location): """ Retrieve data from the Ambient API, return formatted current data and store forecast data in cache :param location: location dictionary requested by the user :return: Timestamp and data for current data from the Ambient API """ ambient_response = self.make_request() location_response = None current_time = None for record in ambient_response: record_location = None record_info = record.pop("info") if record_info: record_location = record_info.get("location", "") if record_location: weather_data = record.get("lastData", {}) weather_data["macAddress"] = record.pop("macAddress", "") weather_data["name"] = record_info.get("name", "") # "date": "2019-04-25T17:09:00.000Z" weather_tz_string = weather_data.get('tz', None) if weather_tz_string: weather_tz = pytz.timezone(weather_tz_string) else: weather_tz = pytz.utc weather_date = datetime.datetime.strptime( weather_data.pop("date"), "%Y-%m-%dT%H:%M:%S.%fZ").astimezone(weather_tz) if location["location"] == record_location: current_time = format_timestamp(weather_date) location_response = weather_data else: weather_data = self.apply_mapping(weather_data) self.store_weather_records("get_current_weather", [jsonapi.dumps({"location": record_location}), weather_date, jsonapi.dumps(weather_data)]) else: raise RuntimeError("API record contained improper 'info' format") return current_time, location_response
[docs] def query_forecast_service(self, service, location, quantity, forecast_start): """ Unimplemented method stub :param service: forecast service type of weather data to return :param location: location dictionary requested during the RPC call :param quantity: number of records to return, used to generate Time Machine requests after the forecast request :param forecast_start: forecast results that are prior to this timestamp will be filtered by base weather agent :return: Timestamp and data returned by the Ambient weather API response """ raise NotImplementedError
[docs] def make_request(self): """ Request data from the Ambient Weather API An example of the return value is as follows [ { "macAddress": "18:93:D7:3B:89:0C", "lastData": { "dateutc": 1556212140000, "tempinf": 71.9, "humidityin": 31, "battout": "1", "temp1f": 68.7, "humidity1": 36, "batt1": "1", "date": "2019-04-25T17:09:00.000Z" }, "info": { "name": "Home B WS", "location": "Lab Home B" } }, { "macAddress": "50:F1:4A:F7:3C:C4", "lastData": { "dateutc": 1556211960000, "tempinf": 82.5, "humidityin": 27, "battout": "1", "temp1f": 68.5, "humidity1": 42, "batt1": "1", "date": "2019-04-25T17:06:00.000Z" }, "info": { "name": "Home A WS", "location": "Lab Home A" } } ] :return: """ # AuthenticationTwo API Keys are required for all REST API requests:applicationKey - identifies the # developer / application. To request an application key please email support@ambient.comapiKey - # grants access to past/present data for a given user's devices. A typical consumer-facing application will # initially ask the user to create an apiKey on their Ambient.net account page # (https://dashboard.ambientweather.net/account) and paste it into the app. Developers for personal or # in-house apps will also need to create an apiKey on their own account page. # Rate LimitingAPI requests are capped at 1 request/second for each user's apiKey and 3 requests/second # per applicationKey. When this limit is exceeded, the API will return a 429 response code. # Please be kind to our servers :) # If the previous call to the API was at least 3 seconds ago - this is a constraint set by Ambient if not self.last_service_call_timestamp or ( datetime.datetime.now() - self.last_service_call_timestamp).total_seconds() > 3: url = 'https://api.ambientweather.net/v1/devices?applicationKey=' + self.app_key + '&apiKey=' + \ self._api_key _log.info("requesting url: {}".format(url)) grequest = [grequests.get(url, verify=requests.certs.where(), headers=self.headers, timeout=30)] gresponse = grequests.map(grequest)[0] if gresponse is None: raise RuntimeError("get request did not return any response") try: response = jsonapi.loads(gresponse.content) self.last_service_call_timestamp = datetime.datetime.now() return response except ValueError: self.last_service_call_timestamp = datetime.datetime.now() self.generate_response_error(url, gresponse.status_code) else: raise RuntimeError("Previous API call to Ambient service is too recent, please wait at least 3 seconds " "between API calls.")
[docs] def query_hourly_forecast(self, location): """ Unimplemented method stub :param location: currently accepts lat/long location dictionary format only :return: time of forecast prediction as a timestamp string, and a list of """ raise NotImplementedError
[docs] def query_hourly_historical(self, location, start_date, end_date): """ Unimplemented method stub :param location: no format currently determined for history. :param start_date: Starting date for historical weather period. :param end_date: Ending date for historical weather period. :return: NotImplementedError """ raise NotImplementedError
[docs] def generate_response_error(self, url, response_code): """ Raises a descriptive runtime error based on the response code returned by a service. :param url: actual url used for requesting data from Ambient :param response_code: Http response code returned by a service following a request """ code_x100 = int(response_code / 100) if code_x100 == 2: raise RuntimeError("Remote API returned no data(code:{}, url:{})".format(response_code, url)) elif code_x100 == 3: raise RuntimeError( "Remote API redirected request, but redirect failed (code:{}, url:{})".format(response_code, url)) elif code_x100 == 4: raise RuntimeError( "Request ({}) rejected by remote API: Remote API returned Code {}".format(url, response_code)) elif code_x100 == 5: raise RuntimeError( "Remote API returned invalid response (code:{}, url:{})".format(response_code, url)) else: raise RuntimeError( "API request failed with unexpected response code (code:{}, url:{})".format(response_code, url))
[docs]def main(): """Main method called to start the agent.""" utils.vip_main(ambient, version=__version__)
if __name__ == '__main__': # Entry point for script try: sys.exit(main()) except KeyboardInterrupt: pass