# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2017, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import datetime
import gevent
import grequests
import logging
import requests
from requests.exceptions import HTTPError
from requests.packages.urllib3.connection import ConnectionError, NewConnectionError
from volttron.platform import jsonapi
from volttron.platform.agent import utils
from volttron.platform.agent.known_identities import CONFIGURATION_STORE, PLATFORM_DRIVER
from volttron.utils.persistance import PersistentDict
from platform_driver.interfaces import BaseInterface, BaseRegister, BasicRevert
AUTH_CONFIG_PATH = "drivers/auth/ecobee_{}"
THERMOSTAT_URL = 'https://api.ecobee.com/1/thermostat'
THERMOSTAT_HEADERS = {
'Content-Type': 'application/json;charset=UTF-8',
'Authorization': 'Bearer {}'
}
_log = logging.getLogger(__name__)
__version__ = "1.0"
[docs]class Interface(BasicRevert, BaseInterface):
"""
Interface implementation for wrapping around the Ecobee thermostat API
"""
def __init__(self, **kwargs):
super(Interface, self).__init__(**kwargs)
# Configuration value defaults
self.config_dict = {}
self.api_key = ""
self.ecobee_id = -1
# which agent is being used as the caching agent
self.cache = None
# Authorization tokens
self.refresh_token = None
self.access_token = None
self.authorization_code = None
self.authorization_stage = "UNAUTHORIZED"
# Config path for storing Ecobee auth information in config store, not user facing
self.auth_config_path = ""
# Un-initialized data response from Driver Cache agent
self.thermostat_data = None
# Un-initialized greenlet for querying cache agent
self.poll_greenlet_thermostats = None
[docs] def parse_config(self, config_dict):
"""
Parse driver registry configuration and create device registers
:param config_dict: Registry configuration in dictionary representation
"""
first_hold = True
_log.debug("Parsing Ecobee registry configuration.")
if not config_dict:
return
# Parse configuration file for registry parameters, then add new register to the interface
for index, regDef in enumerate(config_dict):
point_name = regDef.get("Point Name")
if not point_name:
_log.warning(f"Registry configuration contained entry without a point name: {regDef}")
continue
read_only = regDef.get('Writable', "").lower() != 'true'
readable = regDef.get('Readable', "").lower() == 'true'
volttron_point_name = regDef.get('Volttron Point Name')
if not volttron_point_name:
volttron_point_name = point_name
description = regDef.get('Notes', '')
units = regDef.get('Units', None)
default_value = regDef.get("Default Value", "").strip()
# Truncate empty string or 0 values to None
if not default_value:
default_value = None
type_name = regDef.get("Type", 'string')
# Create an instance of the register class based on the register type
if type_name.lower().startswith("setting"):
register = Setting(self.ecobee_id, read_only, readable, volttron_point_name, point_name, units,
description=description)
elif type_name.lower() == "hold":
if first_hold:
_log.warning("Hold registers' set_point requires dictionary value, for best practices, visit "
"https://www.ecobee.com/home/developer/api/documentation/v1/functions/SetHold.shtml")
first_hold = False
register = Hold(self.ecobee_id, read_only, readable, volttron_point_name, point_name, units,
description=description)
else:
_log.warning(f"Unsupported register type {type_name} in Ecobee registry configuration")
continue
if default_value is not None:
self.set_default(point_name, default_value)
# Add the register instance to our list of registers
self.insert_register(register)
# Each Ecobee thermostat has one Status reporting "register", one programs register and one vacation "register
# Status is a static point which reports a list of running HVAC systems reporting to the thermostat
status_register = Status(self.ecobee_id)
self.insert_register(status_register)
# Vacation can be used to manage all Vacation programs for the thermostat
vacation_register = Vacation(self.ecobee_id)
self.insert_register(vacation_register)
# Add a register for listing events and resuming programs
program_register = Program(self.ecobee_id)
self.insert_register(program_register)
[docs] def update_authorization(self):
if self.authorization_stage == "UNAUTHORIZED":
self.authorize_application()
if self.authorization_stage == "REQUEST_TOKENS":
self.request_tokens()
if self.authorization_stage == "REFRESH_TOKENS":
self.refresh_tokens()
self.update_auth_config()
[docs] def authorize_application(self):
auth_url = "https://api.ecobee.com/authorize"
params = {
"response_type": "ecobeePin",
"client_id": self.api_key,
"scope": "smartWrite"
}
try:
response = make_ecobee_request("GET", auth_url, params=params)
except (ConnectionError, NewConnectionError) as re:
_log.error(re)
_log.warning("Error connecting to Ecobee, Could not request pin.")
return
for auth_item in ['code', 'ecobeePin']:
if auth_item not in response:
raise RuntimeError(f"Ecobee authorization response was missing required item: {auth_item}, response "
"contained {response}")
self.authorization_code = response.get('code')
pin = response.get('ecobeePin')
_log.warning("***********************************************************")
_log.warning(
f'Please authorize your Ecobee developer app with PIN code {pin}.\nGo to '
'https://www.ecobee.com/consumerportal /index.html, click My Apps, Add application, Enter Pin and click '
'Authorize.')
_log.warning("***********************************************************")
self.authorization_stage = "REQUEST_TOKENS"
gevent.sleep(60)
[docs] def request_tokens(self):
"""
Request up to date Auth tokens from Ecobee using API key and authorization code
"""
# Generate auth request and extract returned value
_log.debug("Requesting new auth tokens from Ecobee.")
url = 'https://api.ecobee.com/token'
params = {
'grant_type': 'ecobeePin',
'code': self.authorization_code,
'client_id': self.api_key
}
response = make_ecobee_request("POST", url, data=params)
for token in ["access_token", "refresh_token"]:
if token not in response:
raise RuntimeError(f"Request tokens response did not contain {token}: {response}")
self.access_token = response.get('access_token')
self.refresh_token = response.get('refresh_token')
self.authorization_stage = "AUTHORIZED"
[docs] def refresh_tokens(self):
"""
Refresh Ecobee API authentication tokens via API endpoint - asks Ecobee to reset tokens then updates config with
new tokens from Ecobee
"""
_log.info('Refreshing Ecobee auth tokens.')
url = 'https://api.ecobee.com/token'
params = {
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
'client_id': self.api_key
}
# Generate auth request and extract returned value
response = make_ecobee_request("POST", url, data=params)
for token in 'access_token', 'refresh_token':
if token not in response:
raise RuntimeError(f"Ecobee response did not contain token {token}:, response was {response}")
self.access_token = response['access_token']
self.refresh_token = response['refresh_token']
self.authorization_stage = "AUTHORIZED"
[docs] def update_auth_config(self):
"""
Update the platform driver configuration for this device with new values from auth functions
"""
auth_config = {"AUTH_CODE": self.authorization_code,
"ACCESS_TOKEN": self.access_token,
"REFRESH_TOKEN": self.refresh_token}
_log.debug("Updating Ecobee auth configuration with new tokens.")
self.vip.rpc.call(CONFIGURATION_STORE, "set_config", self.auth_config_path, auth_config, trigger_callback=False,
send_update=False).get(timeout=3)
[docs] def get_auth_config_from_store(self):
"""
:return: Fetch currently stored auth configuration info from config store, returns empty dict if none is
present
"""
configs = self.vip.rpc.call(CONFIGURATION_STORE, "manage_list_configs", PLATFORM_DRIVER).get(timeout=3)
if self.auth_config_path in configs:
return jsonapi.loads(self.vip.rpc.call(
CONFIGURATION_STORE, "manage_get", PLATFORM_DRIVER, self.auth_config_path).get(timeout=3))
else:
_log.warning("No Ecobee auth file found in config store")
return {}
[docs] def get_thermostat_data(self, refresh=False):
"""
Collects most up to date thermostat object data for the configured Ecobee thermostat ID
:param refresh: whether or not to force obtaining new data from the remote Ecobee API
"""
params = {
"json": jsonapi.dumps({
"selection": {
"selectionType": "thermostats",
"selectionMatch": self.ecobee_id,
"includeSensors": True,
"includeRuntime": True,
"includeEvents": True,
"includeEquipmentStatus": True,
"includeSettings": True
}
})
}
headers = populate_thermostat_headers(self.access_token)
self.thermostat_data = self.get_ecobee_data("GET", THERMOSTAT_URL, 180, refresh=refresh, headers=headers,
params=params)
[docs] def get_ecobee_data(self, request_type, url, update_frequency, refresh=False, **kwargs):
"""
Checks cache for up to date Ecobee data. If none is available for the URL, makes a request to remote Ecobee API.
:param refresh: force Ecobee data to be obtained from the remote API rather than cache
:param request_type: HTTP request type for request sent to remote
:param url: URL of remote Ecobee API endpoint
:param update_frequency: period for which cached data is considered up to date
:param kwargs: HTTP request arguments
:return: Up to date Ecobee data for URL
"""
cache_data = self.get_data_cache(url, update_frequency)
if refresh or not (isinstance(cache_data, dict) and len(cache_data)):
try:
response = self.get_data_remote(request_type, url, **kwargs)
except HTTPError as he:
self.store_remote_data(url, None)
raise he
self.store_remote_data(url, response)
return response
else:
return cache_data
[docs] def get_data_remote(self, request_type, url, **kwargs):
"""
Make request to Ecobee remote API for "register" data, updating authorization tokens as necessary
:param request_type: HTTP request type for making request
:param url: URL corresponding to "register" data
:param kwargs: HTTP request arguments
:return: remote API response body
"""
try:
response = make_ecobee_request(request_type, url, **kwargs)
self.authorization_stage = "AUTHORIZED"
return response
except HTTPError:
_log.warning(f"HTTPError occurred while fetching data from Ecobee API url: {url}")
# The request to the remote failed, try refreshing the tokens and trying again using the refresh token
self.authorization_stage = "REFRESH_TOKENS"
try:
self.update_authorization()
except HTTPError:
_log.warning("HTTPError occurred while refreshing Ecobee API tokens")
# if tokens could not be refreshed, try obtaining new tokens using the existing authorization key
self.authorization_stage = "REQUEST_TOKENS"
# if we fail to request new tokens, the authorization key is no longer valid, the driver will need
# to be restarted
self.update_authorization()
response = make_ecobee_request(request_type, url, **kwargs)
self.authorization_stage = "AUTHORIZED"
return response
[docs] def get_data_cache(self, url, update_frequency):
"""
Fetches data from cache dict if it is up to date
:param url: URL to use to use as lookup value in cache dict
:param update_frequency: duration in seconds for which data in cache is considered up to date
:return: Data stored in cache if up to date, otherwise None
"""
url_data = self.cache.get(url)
if url_data:
timestamp = utils.parse_timestamp_string(url_data.get("request_timestamp"))
if (datetime.datetime.now() - timestamp).total_seconds() < update_frequency:
return url_data.get("request_response")
else:
_log.info("Cached Ecobee data out of date.")
return None
[docs] def store_remote_data(self, url, response):
"""
Store response body with a timestamp for a given URL
:param url: url to use to use as lookup value in cache dict
:param response: request response body to store in cache
"""
timestamp = utils.format_timestamp(datetime.datetime.now())
self.cache.update({
url: {
"request_timestamp": timestamp,
"request_response": response
}
})
_log.info(f"Last Ecobee update occurred at {timestamp}")
self.cache.sync()
[docs] def get_point(self, point_name, **kwargs):
"""
Return a point's most recent stored value from remote API
:param point_name: The name of the point corresponding to a register to get the state of
:return: register's most recent state from remote API response
"""
# Find the named register and get its current state from the periodic Ecobee API data
register = self.get_register_by_name(point_name)
try:
return register.get_state(self.thermostat_data)
except (ValueError, KeyError, TypeError):
self.get_thermostat_data(refresh=True)
return register.get_state(self.thermostat_data)
def _set_point(self, point_name, value, **kwargs):
"""
Send request to remote API to update a point based on provided parameters
:param point_name: Name of the point to update
:param value: Intended update value
:return: Updated state from remote API
"""
# Find the correct register by name, set its state, then fetch the new state based on the register's type
register = self.get_register_by_name(point_name)
if register.read_only:
raise IOError(f"Trying to write to a point configured read only: {point_name}")
try:
if isinstance(register, Setting) or isinstance(register, Hold):
register.set_state(value, self.access_token)
elif isinstance(register, Vacation) or isinstance(register, Program):
register.set_state(value, self.access_token, **kwargs)
except HTTPError:
self.refresh_tokens()
if isinstance(register, Setting) or isinstance(register, Hold):
register.set_state(value, self.access_token)
elif isinstance(register, Vacation) or isinstance(register, Program):
register.set_state(value, self.access_token, **kwargs)
self.get_thermostat_data(refresh=True)
if register.readable:
return register.get_state(self.thermostat_data)
def _scrape_all(self):
"""
Fetch point data for all configured points
:return: dictionary of most recent data for all points configured for the driver
"""
result = {}
byte_registers = self.get_registers_by_type("byte", True) + self.get_registers_by_type("byte", False)
registers = [register for register in byte_registers if register.readable]
refresh = True
# Add data for all holds and settings to our results
for register in registers:
try:
register_data = register.get_state(self.thermostat_data)
if isinstance(register_data, dict):
result.update(register_data)
else:
result[register.point_name] = register_data
except ValueError:
if refresh is True:
# refresh data, but don't create a non-deterministic loop of refreshes
self.get_thermostat_data(refresh=refresh)
refresh = False
register_data = register.get_state(self.thermostat_data)
if isinstance(register_data, dict):
result.update(register_data)
else:
result[register.point_name] = register_data
return result
[docs]class Setting(BaseRegister):
"""
Register to wrap around points contained in setting field of Ecobee API's thermostat data response
"""
def __init__(self, thermostat_identifier, read_only, readable, point_name, point_path, units,
description=''):
super(Setting, self).__init__("byte", read_only, point_name, units, description=description)
self.thermostat_id = thermostat_identifier
self.readable = readable
self.point_path = point_path
[docs] def set_state(self, value, access_token):
"""
Set Ecobee thermostat setting value by configured point name and provided value
:param value: Arbitrarily specified value to request as set point
:param access_token: Ecobee access token to provide as bearer auth in request
:return: request response values from settings request
"""
# Generate set state request content and send request
params = {"format": "json"}
thermostat_body = {
"thermostat": {
"settings": {
self.point_path: value
}
}
}
headers, body = populate_selection_objects(access_token, "thermostats", self.thermostat_id, thermostat_body)
make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, params=params, json=body)
[docs] def get_state(self, ecobee_data):
"""
:param ecobee_data: Ecobee data dictionary obtained from Driver HTTP Cache agent
:return: Most recently available data for this setting register
"""
if not self.readable:
raise RuntimeError("Requested read of write-only point {}".format(self.point_name))
if not ecobee_data:
raise ValueError("No Ecobee data from cache available during point scrape.")
# Parse the state out of the data dictionary
for thermostat in ecobee_data.get("thermostatList"):
if int(thermostat["identifier"]) == self.thermostat_id:
if self.point_path not in thermostat.get("settings") or \
thermostat["settings"].get(self.point_path) is None:
raise ValueError(f"Point name {self.point_name} could not be found in latest Ecobee data")
else:
return thermostat["settings"].get(self.point_path)
raise ValueError(
f"Point {self.point_path} not available in Ecobee data (Volttron Point Name {self.point_name}).")
[docs]class Hold(BaseRegister):
"""
Register to wrap around points contained in hold field of Ecobee API's thermostat data response
"""
def __init__(self, thermostat_identifier, read_only, readable, point_name, point_path, units, description=''):
super(Hold, self).__init__("byte", read_only, point_name, units, description=description)
self.thermostat_id = thermostat_identifier
self.readable = readable
self.python_type = int
self.point_path = point_path
[docs] def set_state(self, value, access_token):
"""
Set Ecobee thermostat hold by configured point name and provided value dictionary
:param value: Arbitrarily specified value dictionary. Ecobee API documentation provides best practice
information for each hold.
:param access_token: Ecobee access token to provide as bearer auth in request
:return: request response values from settings request
"""
if not isinstance(value, dict):
raise ValueError(f"Hold register set_state expects dict, received {type(value)}")
if "holdType" not in value:
raise ValueError('Hold register requires "holdType" in value dict')
if self.point_path not in value:
raise ValueError(f"Point name {self.point_name} not found in Hold set_state value dict")
# Generate set state request content and send request
params = {"format": "json"}
function_body = {
"functions": [
{
"type": "setHold",
"params": value
}
]
}
headers, body = populate_selection_objects(access_token, "thermostats", self.thermostat_id, function_body)
make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, params=params, json=body)
[docs] def get_state(self, ecobee_data):
"""
:param ecobee_data: Ecobee data dictionary obtained from Driver HTTP Cache agent
:return: Most recently available data for this setting register
"""
if not self.readable:
raise RuntimeError(f"Requested read of write-only point {self.point_name}")
if not ecobee_data:
raise ValueError("No Ecobee data from cache available during point scrape.")
# Parse the value from the data dictionary
for thermostat in ecobee_data.get("thermostatList"):
if int(thermostat.get("identifier")) == self.thermostat_id:
runtime_data = thermostat.get("runtime")
if not runtime_data or runtime_data.get(self.point_path) is None:
raise ValueError(f"Point name {self.point_name} could not be found in latest Ecobee data")
return runtime_data.get(self.point_path)
raise ValueError(
f"Point {self.point_path} not available in Ecobee data (Volttron Point Name {self.point_name}).")
# TODO deleting a vacation is currently broken
[docs]class Vacation(BaseRegister):
"""
Wrapper register for adding and deleting vacations, and getting vacation status
Note: Since vacations are transient, only 1 vacation register will be
created per driver. The driver can be used to add, delete, or get the status
of all vacations for the device
"""
def __init__(self, thermostat_identifier):
vacation_description = "Add, remove and fetch Vacations on this Ecobee device."
super(Vacation, self).__init__("byte", False, "Vacations", "", description=vacation_description)
self.thermostat_id = thermostat_identifier
self.readable = True
self.python_type = str
[docs] def set_state(self, vacation, access_token, delete=False):
"""
Send delete or create vacation request to Ecobee API for the configured thermostat
:param vacation: Vacation name for delete, or vacation object dictionary for create
:param access_token: Ecobee access token to provide as bearer auth in request
:param delete: Whether to delete the named vacation
"""
if delete:
if isinstance(vacation, dict):
vacation = vacation.get("name")
if not vacation:
raise ValueError('Deleting vacation on Ecobee thermostat requires either vacation name string or '
'dict with "name" string')
_log.debug("Creating Ecobee vacation deletion request")
# Generate and send delete vacation request to remote API
params = {"format": "json"}
function_body = {
"functions": [
{
"type": "deleteVacation",
"params": {
"name": vacation
}
}
]
}
headers, body = populate_selection_objects(access_token, "registered", self.thermostat_id, function_body)
make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, params=params, json=body)
else:
# Do some basic format validation for vacation dict, but user is ultimately responsible for formatting
# Ecobee API docs describe expected format, link provided below
valid_vacation = True
required_items = ["name", "coolHoldTemp", "heatHoldTemp", "startDate", "startTime", "endDate", "endTime"]
if not isinstance(vacation, dict):
valid_vacation = False
else:
for item in required_items:
if item not in vacation:
valid_vacation = False
break
if not valid_vacation:
raise ValueError('Creating vacation on Ecobee thermostat requires dict: {"name": <name string>, '
'"coolHoldTemp": <temp>, "heatHoldTemp": <temp>, "startDate": <date string>, '
'"startTime": <time string>, "endDate": <date string>, "endTime": <time string>}. '
'Date format required is "YYYY-mm-dd", time format is "HH:MM:SS". See '
'https://www.ecobee.com/home/developer/api/examples/ex9.shtml for more information')
# Generate create vacation request and send
params = {"format": "json"}
function_body = {
"functions": [
{
"type": "createVacation",
"params": vacation
}
]
}
headers, body = populate_selection_objects(access_token, "registered", self.thermostat_id, function_body)
make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, params=params, json=body)
[docs] def get_state(self, ecobee_data):
"""
:param ecobee_data: Ecobee data dictionary obtained from Driver HTTP Cache agent
:return: List of vacation dictionaries returned by Ecobee remote API
"""
if not ecobee_data:
raise ValueError("No Ecobee data from cache available during point scrape.")
# Parse out vacations from Ecobee API data dictionary
for thermostat in ecobee_data.get("thermostatList"):
if int(thermostat.get("identifier")) == self.thermostat_id:
events_data = thermostat.get("events")
if not isinstance(events_data, list):
raise ValueError(f"Point name {self.point_name} could not be found in latest Ecobee data")
return [event for event in events_data if event.get("type") == "vacation"]
raise ValueError(f"Point {self.point_name} not available in Ecobee data.")
# TODO deleting a program currently broken
[docs]class Program(BaseRegister):
"""
Wrapper register for managing Ecobee thermostat programs, and getting program status
"""
def __init__(self, thermostat_identifier):
program_description = "List or resume non-vacation programs stored on Ecobee thermostat"
super(Program, self).__init__("byte", False, "Programs", "", description=program_description)
self.thermostat_id = thermostat_identifier
self.readable = True
self.python_type = str
[docs] def set_state(self, program, access_token, resume_all=False):
"""
Set a new program, resume the next program on the programs stack, or "resume all"
:param program: Program dictionary as specified by Ecobee API docs if setting a new program, else None
:param access_token: Ecobee access token to provide as bearer auth in request
:param resume_all: Whether or not to "resume all" if using the resume program function
"""
params = {"format": "json"}
if not isinstance(program, dict) and not len(program):
if not resume_all:
_log.warning("No program specified, resuming next event on Ecobee event stack. To learn how to create "
"an Ecobee program, Visit "
"https://www.ecobee.com/home/developer/api/examples/ex11.shtml for more information")
else:
_log.info("No program specified and resume all is set to true, resuming all stored programs.")
_log.debug("Resuming scheduled Ecobee program(s)")
function_body = {
"functions": [
{
"type": "resumeProgram",
"params": {
"resumeAll": resume_all
}
}
]
}
headers, body = populate_selection_objects(access_token, "thermostats", self.thermostat_id, function_body)
else:
program_body = {
"thermostat": {
"program": program
}
}
headers, body = populate_selection_objects(access_token, "registered", self.thermostat_id, program_body)
make_ecobee_request("POST", THERMOSTAT_URL, headers=headers, params=params, json=body)
[docs] def get_state(self, ecobee_data):
"""
:param ecobee_data: Ecobee data dictionary obtained from Driver HTTP Cache agent
:return: List of Ecobee event objects minus vacation events
"""
if not ecobee_data:
raise ValueError("No Ecobee data from cache available during point scrape.")
# Parse out event objects from Ecobee API data
for thermostat in ecobee_data.get("thermostatList"):
if int(thermostat.get("identifier")) == self.thermostat_id:
events_data = thermostat.get("events")
if not isinstance(events_data, list):
raise ValueError(f"Point name {self.point_name} could not be found in latest Ecobee data")
return [event for event in events_data if event.get("type") != "vacation"]
raise ValueError(f"Point {self.point_name} not available in Ecobee data.")
[docs]class Status(BaseRegister):
"""
Status request wrapper register for Ecobee thermostats.
Note: There is a single status point for each thermostat, which is set by the device.
"""
def __init__(self, thermostat_identifier):
status_description = "Reports device status as a list of running HVAC devices interfacing with this thermostat."
super(Status, self).__init__("byte", True, "Status", "", description=status_description)
self.thermostat_id = thermostat_identifier
self.readable = True
self.python_type = int
[docs] def set_state(self, value, access_token):
"""
Set state is not supported for the static Status register.
"""
raise NotImplementedError("Setting thermostat status is not supported.")
[docs] def get_state(self, ecobee_data):
"""
:return: List of currently running equipment connected to Ecobee thermostat
"""
if not ecobee_data:
raise ValueError("No Ecobee data from cache available during point scrape.")
# Parse out event objects from Ecobee API data
for thermostat in ecobee_data.get("thermostatList"):
if int(thermostat.get("identifier")) == self.thermostat_id:
status_string = thermostat.get("equipmentStatus")
if not isinstance(status_string, str):
raise ValueError(f"Point name {self.point_name} could not be found in latest Ecobee data")
return [status for status in status_string.split(",") if len(status)]
raise ValueError(f"Point {self.point_name} not available in Ecobee data.")
[docs]def populate_selection_objects(access_token, selection_type, selection_match, specification):
"""
Utility method for generating set point request bodies for Ecobee remote api
:param access_token: Ecobee access token from auth steps/configuration (bearer in request header)
:param selection_type: Ecobee identity selection type
:param selection_match: Ecobee identity selection match id
:param specification: dictionary specifying the Ecobee object for updating the point on the remote API
:return: request body JSON as dictionary
"""
body = {
"selection": {
"selectionType": selection_type,
"selectionMatch": selection_match
},
}
body.update(specification)
return populate_thermostat_headers(access_token), body
[docs]def call_grequest(method_name, url, **kwargs):
"""
Make grequest calls to remote api
:param method_name: method type - put/get/delete
:param url: http URL suffix
:param kwargs: Additional arguments for http request
:return: grequest response
"""
try:
fn = getattr(grequests, method_name)
request = fn(url, **kwargs)
response = grequests.map([request])[0]
if response and isinstance(response, list):
response = response[0]
response.raise_for_status()
return response
except (ConnectionError, NewConnectionError) as e:
_log.error(f"Error connecting to {url} with args {kwargs}: {e}")
raise e
[docs]def make_ecobee_request(request_type, url, **kwargs):
"""
Wrapper around making arbitrary GET and POST requests to remote Ecobee API
:return: Ecobee API response using provided request content
"""
# Generate appropriate grequests object
if request_type.lower() in ["get", "post"]:
response = call_grequest(request_type.lower(), url, verify=requests.certs.where(), timeout=30, **kwargs)
else:
raise ValueError(f"Unsupported request type {request_type} for Ecobee driver.")
# Send request and extract data from response
headers = response.headers
if "json" in headers.get("Content-Type"):
return response.json()
else:
content = response.content
if isinstance(content, bytes):
content = jsonapi.loads(response.decode("UTF-8"))
return content