# -*- coding: utf-8 -*- {{{
# vim: set fenc=utf-8 ft=python sw=4 ts=4 sts=4 et:
#
# Copyright 2020, Battelle Memorial Institute.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This material was prepared as an account of work sponsored by an agency of
# the United States Government. Neither the United States Government nor the
# United States Department of Energy, nor Battelle, nor any of their
# employees, nor any jurisdiction or organization that has cooperated in the
# development of these materials, makes any warranty, express or
# implied, or assumes any legal liability or responsibility for the accuracy,
# completeness, or usefulness or any information, apparatus, product,
# software, or process disclosed, or represents that its use would not infringe
# privately owned rights. Reference herein to any specific commercial product,
# process, or service by trade name, trademark, manufacturer, or otherwise
# does not necessarily constitute or imply its endorsement, recommendation, or
# favoring by the United States Government or any agency thereof, or
# Battelle Memorial Institute. The views and opinions of authors expressed
# herein do not necessarily state or reflect those of the
# United States Government or any agency thereof.
#
# PACIFIC NORTHWEST NATIONAL LABORATORY operated by
# BATTELLE for the UNITED STATES DEPARTMENT OF ENERGY
# under Contract DE-AC05-76RL01830
# }}}
import bisect
import logging
from pickle import dumps, loads
from collections import defaultdict, namedtuple
from copy import deepcopy
from datetime import timedelta
from volttron.platform.agent import utils
PRIORITY_HIGH = 'HIGH'
PRIORITY_LOW = 'LOW'
PRIORITY_LOW_PREEMPT = 'LOW_PREEMPT'
ALL_PRIORITIES = {PRIORITY_HIGH, PRIORITY_LOW, PRIORITY_LOW_PREEMPT}
# RequestResult - Result of a schedule request returned from the schedule
# manager.
RequestResult = namedtuple('RequestResult', ['success', 'data', 'info_string'])
DeviceState = namedtuple('DeviceState',
['agent_id', 'task_id', 'time_remaining'])
_log = logging.getLogger(__name__)
[docs]class TimeSlice(object):
def __init__(self, start=None, end=None):
if end is None:
end = start
if start is not None:
if end < start:
raise ValueError('Invalid start and end values.')
self._start = start
self._end = end
def __repr__(self):
return 'TimeSlice({start!r},{end!r})'.format(start=self._start,
end=self._end)
def __str__(self):
return '({start} <-> {end})'.format(start=self._start, end=self._end)
@property
def end(self):
return self._end
@property
def start(self):
return self._start
def __cmp__(self, other):
if self._start >= other._end:
return 1
if self._end <= other._start:
return -1
return 0
# def __ne__(self, other):
# return self.__cmp__(other) != 0
#
# def __gt__(self, other):
# return self.__cmp__(other) > 0
def __lt__(self, other):
return self.__cmp__(other) < 0
# def __ge__(self, other):
# return self.__cmp__(other) >= 0
#
# def __le__(self, other):
# return self.__cmp__(other) <= 0
def __contains__(self, other):
return self._start < other < self._end
[docs] def stretch_to_include(self, time_slice):
if self._start is None or time_slice._start < self._start:
self._start = time_slice._start
if self._end is None or time_slice._end > self._end:
self._end = time_slice._end
[docs] def contains_include_start(self, other):
"""Similar to == or "in" but includes time == self.start"""
return other in self or other == self.start
[docs]class Task(object):
STATE_PRE_RUN = 'PRE_RUN'
STATE_RUNNING = 'RUNNING'
STATE_PREEMPTED = 'PREEMPTED'
STATE_FINISHED = 'FINISHED'
def __init__(self, agent_id, priority, requests):
self.agent_id = agent_id
self.priority = priority
self.time_slice = TimeSlice()
self.devices = defaultdict(Schedule)
self.state = Task.STATE_PRE_RUN
self.populate_schedule(requests)
[docs] def change_state(self, new_state):
if self.state == new_state:
return
# TODO: We can put code here for managing state changes.
self.state = new_state
[docs] def populate_schedule(self, requests):
for request in requests:
device, start, end = request
time_slice = TimeSlice(start, end)
if not isinstance(device, str):
raise ValueError('Device not string.')
self.devices[device].schedule_slot(time_slice)
self.time_slice.stretch_to_include(time_slice)
[docs] def make_current(self, now):
if self.state == Task.STATE_FINISHED:
self.devices.clear()
return
for device, schedule in list(self.devices.items()):
if schedule.finished(now):
del self.devices[device]
if self.time_slice.contains_include_start(now):
if self.state != Task.STATE_PREEMPTED:
self.change_state(Task.STATE_RUNNING)
elif self.time_slice > TimeSlice(now):
self.change_state(Task.STATE_PRE_RUN)
elif self.time_slice < TimeSlice(now):
self.change_state(Task.STATE_FINISHED)
[docs] def get_current_slots(self, now):
result = {}
for device, schedule in self.devices.items():
time_slot = schedule.get_current_slot(now)
if time_slot is not None:
result[device] = time_slot
return result
[docs] def get_conflicts(self, other):
results = []
for device, schedule in self.devices.items():
if device in other.devices:
conflicts = other.devices[device].get_conflicts(schedule)
results.extend(
[device, str(x.start), str(x.end)] for x in conflicts)
return results
[docs] def check_can_preempt_other(self, other):
if self.priority != PRIORITY_HIGH:
return False
if other.priority == PRIORITY_HIGH:
return False
if other.state == Task.STATE_RUNNING and other.priority != \
PRIORITY_LOW_PREEMPT:
return False
return True
[docs] def preempt(self, grace_time, now):
"""Return true if there are time slots that have a grace period left"""
self.make_current(now)
if self.state == Task.STATE_PREEMPTED:
return True
if self.state == Task.STATE_FINISHED:
return False
current_time_slots = []
for schedule in self.devices.values():
current_time_slots.extend(
schedule.prune_to_current(grace_time, now))
self.change_state(
Task.STATE_FINISHED if not current_time_slots else
Task.STATE_PREEMPTED)
if self.state == Task.STATE_PREEMPTED:
self.time_slice = TimeSlice(now, now + grace_time)
return True
return False
[docs] def get_next_event_time(self, now):
device_schedules = (x.get_next_event_time(now) for x in
self.devices.values())
events = [x for x in device_schedules if x is not None]
if events:
return min(events)
return None
[docs]class ScheduleError(Exception):
pass
[docs]class Schedule(object):
def __init__(self):
self.time_slots = []
[docs] def check_availability(self, time_slot):
start_slice = bisect.bisect_left(self.time_slots, time_slot)
end_slice = bisect.bisect_right(self.time_slots, time_slot)
return set(self.time_slots[start_slice:end_slice])
[docs] def make_current(self, now):
"""Should be called before working with a schedule.
Updates the state to the schedule to eliminate stuff in the past."""
now_slice = bisect.bisect_left(self.time_slots, TimeSlice(now))
_log.debug("now_slice in make_current {}".format(now_slice))
if now_slice > 0:
del self.time_slots[:now_slice]
[docs] def schedule_slot(self, time_slot):
if self.check_availability(time_slot):
raise ScheduleError('DERP! We messed up the scheduling!')
bisect.insort(self.time_slots, time_slot)
[docs] def get_next_event_time(self, now):
"""Run this to know when to the next state change is going to happen
with this schedule"""
self.make_current(now)
if not self.time_slots:
return None
_log.debug("in schedule get_next_event_time timeslots {} now {}"
.format(self.time_slots[0], now))
next_time = self.time_slots[0].end if self.time_slots[
0].contains_include_start(now) else self.time_slots[0].start
# Round to the next second to fix timer goofyness in agent timers.
if next_time.microsecond:
next_time = next_time.replace(microsecond=0) + timedelta(seconds=1)
return next_time
[docs] def get_current_slot(self, now):
self.make_current(now)
if not self.time_slots:
return None
if self.time_slots[0].contains_include_start(now):
return self.time_slots[0]
return None
[docs] def prune_to_current(self, grace_time, now):
"""Use this to prune a schedule due to preemption."""
current_slot = self.get_current_slot(now)
if current_slot is not None:
latest_end = now + grace_time
if current_slot.end > latest_end:
current_slot = TimeSlice(current_slot.start, latest_end)
self.time_slots = [current_slot]
else:
self.time_slots = []
return self.time_slots
[docs] def get_conflicts(self, other):
"""Returns a list of our time_slices that conflict with the other
schedule"""
return [x for x in self.time_slots if other.check_availability(x)]
[docs] def finished(self, now):
self.make_current(now)
return not bool(self.time_slots)
[docs] def get_schedule(self):
return deepcopy(self.time_slots)
def __len__(self):
return len(self.time_slots)
def __repr__(self):
pass
[docs]class ScheduleManager(object):
def __init__(self, grace_time, now=None, save_state_callback=None, initial_state_string=None):
self.tasks = {}
self.running_tasks = set()
self.preempted_tasks = set()
self.set_grace_period(grace_time)
self.save_state_callback = save_state_callback
if now is None:
now = utils.get_aware_utc_now()
self.load_state(now, initial_state_string)
[docs] def set_grace_period(self, seconds):
self.grace_time = timedelta(seconds=seconds)
[docs] def load_state(self, now, initial_state_string):
if initial_state_string is None:
return
try:
self.tasks = loads(initial_state_string)
self._cleanup(now)
except Exception:
self.tasks = {}
_log.error ('Scheduler state file corrupted!')
[docs] def save_state(self, now):
if self.save_state_callback is None:
return
try:
self._cleanup(now)
self.save_state_callback(dumps(self.tasks))
except Exception:
_log.error('Failed to save scheduler state!')
[docs] def request_slots(self, agent_id, id_, requests, priority, now=None):
if now is None:
now = utils.get_aware_utc_now()
self._cleanup(now)
if id_ in self.tasks:
return RequestResult(False, {}, 'TASK_ID_ALREADY_EXISTS')
if id_ is None:
return RequestResult(False, {}, 'MISSING_TASK_ID')
if priority is None:
return RequestResult(False, {}, 'MISSING_PRIORITY')
if priority not in ALL_PRIORITIES:
return RequestResult(False, {}, 'INVALID_PRIORITY')
if agent_id is None:
return RequestResult(False, {}, 'MISSING_AGENT_ID')
if requests is None or not requests:
return RequestResult(False, {}, 'MALFORMED_REQUEST_EMPTY')
if not isinstance(agent_id, str) or not agent_id:
return RequestResult(False, {},
'MALFORMED_REQUEST: TypeError: agentid must '
'be a nonempty string')
if not isinstance(id_, str) or not id_:
return RequestResult(False, {},
'MALFORMED_REQUEST: TypeError: taskid must '
'be a nonempty string')
try:
new_task = Task(agent_id, priority, requests)
except ScheduleError:
return RequestResult(False, {}, 'REQUEST_CONFLICTS_WITH_SELF')
except Exception as ex:
return RequestResult(False, {},
'MALFORMED_REQUEST: ' +
ex.__class__.__name__ + ': ' + str(
ex))
conflicts = defaultdict(dict)
preempted_tasks = set()
for task_id, task in self.tasks.items():
conflict_list = new_task.get_conflicts(task)
agent_id = task.agent_id
if conflict_list:
if not new_task.check_can_preempt_other(task):
conflicts[agent_id][task_id] = conflict_list
else:
preempted_tasks.add((agent_id, task_id))
if conflicts:
return RequestResult(False, conflicts,
'CONFLICTS_WITH_EXISTING_SCHEDULES')
# By this point we know that any remaining conflicts can be
# preempted
# and the request will succeed.
self.tasks[id_] = new_task
for _, task_id in preempted_tasks:
task = self.tasks[task_id]
task.preempt(self.grace_time, now)
self.save_state(now)
return RequestResult(True, preempted_tasks, '')
[docs] def cancel_task(self, agent_id, task_id, now):
if task_id not in self.tasks:
return RequestResult(False, {}, 'TASK_ID_DOES_NOT_EXIST')
task = self.tasks[task_id]
if task.agent_id != agent_id:
return RequestResult(False, {}, 'AGENT_ID_TASK_ID_MISMATCH')
del self.tasks[task_id]
self.save_state(now)
return RequestResult(True, {}, '')
[docs] def get_schedule_state(self, now):
self._cleanup(now)
running_results = {}
preempted_results = {}
for task_id in self.running_tasks:
task = self.tasks[task_id]
agent_id = task.agent_id
current_task_slots = task.get_current_slots(now)
_log.debug("current_task_slots {}".format(current_task_slots))
for device, time_slot in current_task_slots.items():
assert (device not in running_results)
running_results[device] = DeviceState(agent_id, task_id, (
time_slot.end - now).total_seconds())
for task_id in self.preempted_tasks:
task = self.tasks[task_id]
agent_id = task.agent_id
current_task_slots = task.get_current_slots(now)
for device, time_slot in current_task_slots.items():
assert (device not in preempted_results)
preempted_results[device] = DeviceState(agent_id, task_id, (
time_slot.end - now).total_seconds())
running_results.update(preempted_results)
return running_results
[docs] def get_next_event_time(self, now):
task_times = (x.get_next_event_time(now) for x in self.tasks.values())
events = [x for x in task_times if x is not None]
if events:
return min(events)
return None
def _cleanup(self, now):
"""Cleans up self and contained tasks to reflect the current time.
Should be called:
1. Before serializing to disk.
2. After reading from disk.
3. Before handling a schedule submission request.
4. After handling a schedule submission request.
5. Before handling a state request."""
# Reset the running tasks.
self.running_tasks = set()
self.preempted_tasks = set()
for task_id in list(self.tasks.keys()):
task = self.tasks[task_id]
task.make_current(now)
if task.state == Task.STATE_FINISHED:
del self.tasks[task_id]
elif task.state == Task.STATE_RUNNING:
self.running_tasks.add(task_id)
elif task.state == Task.STATE_PREEMPTED:
self.preempted_tasks.add(task_id)
def __repr__(self):
pass