[haizea-commit] r583 - in branches/TP2.0: src/haizea/resourcemanager/scheduler tests
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Mon Jun 8 13:05:10 CDT 2009
Author: borja
Date: 2009-06-08 13:05:04 -0500 (Mon, 08 Jun 2009)
New Revision: 583
Added:
branches/TP2.0/tests/test_slottable.py
Modified:
branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
Log:
New slottable implementation and unit tests. TP2.0 branch is now broken, and will remain that way until all the changes to the scheduler are done.
Modified: branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py 2009-03-02 19:42:39 UTC (rev 582)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py 2009-06-08 18:05:04 UTC (rev 583)
@@ -21,83 +21,26 @@
from math import floor
import bisect
import logging
+from operator import attrgetter
-class Node(object):
- def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
- self.capacity = ResourceTuple.copy(capacity)
- if capacitywithpreemption == None:
- self.capacitywithpreemption = None
- else:
- self.capacitywithpreemption = ResourceTuple.copy(capacitywithpreemption)
- self.resourcepoolnode = resourcepoolnode
-
- @classmethod
- def from_resourcepool_node(cls, node):
- capacity = node.get_capacity()
- return cls(capacity, capacity, node)
+"""This module provides an in-memory slot table data structure.
-class NodeList(object):
- def __init__(self):
- self.nodelist = []
+A slot table is essentially just a collection of resource reservations.
+See the documentation for ResourceReservation, SlotTable, and AvailabilityWindow
+for additional implementation details.
- def add(self, node):
- self.nodelist.append(node)
-
- def __getitem__(self, n):
- return self.nodelist[n-1]
- def copy(self):
- nodelist = NodeList()
- for n in self.nodelist:
- nodelist.add(Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode))
- return nodelist
-
- def toDict(self):
- nodelist = self.copy()
- return dict([(i+1, v) for i, v in enumerate(nodelist)])
-
-class KeyValueWrapper(object):
- def __init__(self, key, value):
- self.key = key
- self.value = value
-
- def __cmp__(self, other):
- return cmp(self.key, other.key)
-class ResourceReservation(object):
-
- # Resource reservation states
- STATE_SCHEDULED = 0
- STATE_ACTIVE = 1
- STATE_DONE = 2
- state_str = {STATE_SCHEDULED : "Scheduled",
- STATE_ACTIVE : "Active",
- STATE_DONE : "Done"}
-
- def __init__(self, lease, start, end, res):
- self.lease = lease
- self.start = start
- self.end = end
- self.state = None
- self.resources_in_pnode = res
- self.logger = logging.getLogger("LEASES")
-
- def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Start : %s" % self.start)
- self.logger.log(loglevel, "End : %s" % self.end)
- self.logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state])
- self.logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
-
- def xmlrpc_marshall(self):
- # Convert to something we can send through XMLRPC
- rr = {}
- rr["start"] = xmlrpc_marshall_singlevalue(self.start)
- rr["end"] = xmlrpc_marshall_singlevalue(self.end)
- rr["state"] = self.state
- return rr
+"""
+
class ResourceTuple(object):
+ """A resource tuple
+
+ This class ...
+
+ """
def __init__(self, res):
self._res = res
@@ -159,243 +102,263 @@
r += "%s:%.2f " % (self.descriptions[i], x)
return r
+ def __eq__(self, res2):
+ return self._res == res2._res
+class ResourceReservation(object):
+ """A resource reservation
+
+ A resource reservation (or RR) is a data structure specifying that a certain
+ quantities of resources (represented as a ResourceTuple) are reserved across
+ several nodes (each node can have a different resource tuple; e.g., 1 CPU and
+ 512 MB of memory in node 1 and 2 CPUs and 1024 MB of memory in node 2). An RR
+ has a specific start and end time for all the nodes. Thus, if some nodes are
+ reserved for an interval of time, and other nodes are reserved for a different
+ interval (even if these reservations are for the same lease), two separate RRs
+ would have to be added to the slot table.
+
+ """
+
+ # Resource reservation states
+ STATE_SCHEDULED = 0
+ STATE_ACTIVE = 1
+ STATE_DONE = 2
+
+ # Mapping from state to a descriptive string
+ state_str = {STATE_SCHEDULED : "Scheduled",
+ STATE_ACTIVE : "Active",
+ STATE_DONE : "Done"}
+
+ def __init__(self, lease, start, end, res):
+ self.lease = lease
+ self.start = start
+ self.end = end
+ self.state = None
+ self.resources_in_pnode = res # pnode -> ResourceTuple
+ self.logger = logging.getLogger("LEASES")
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Start : %s" % self.start)
+ self.logger.log(loglevel, "End : %s" % self.end)
+ self.logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state])
+ self.logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
+
+ def xmlrpc_marshall(self):
+ # Convert to something we can send through XMLRPC
+ rr = {}
+ rr["start"] = xmlrpc_marshall_singlevalue(self.start)
+ rr["end"] = xmlrpc_marshall_singlevalue(self.end)
+ rr["state"] = self.state
+ return rr
+
+class Node(object):
+ def __init__(self, capacity, resourcepoolnode):
+ self.capacity = ResourceTuple.copy(capacity)
+ self.resourcepoolnode = resourcepoolnode
+
+ @classmethod
+ def from_resourcepool_node(cls, node):
+ capacity = node.get_capacity()
+ return cls(capacity, node)
+
+class NodeList(object):
+ def __init__(self):
+ self.nodelist = []
+
+ def add(self, node):
+ self.nodelist.append(node)
+
+ def __getitem__(self, n):
+ return self.nodelist[n-1]
+
+ def __iter__(self):
+ return iter(self.nodelist)
+
+ def copy(self):
+ nodelist = NodeList()
+ for n in self.nodelist:
+ nodelist.add(Node(n.capacity, n.resourcepoolnode))
+ return nodelist
+
+ def to_dict(self):
+ nodelist = self.copy()
+ return dict([(i+1, v) for i, v in enumerate(nodelist)])
+
+class KeyValueWrapper(object):
+ def __init__(self, key, value):
+ self.key = key
+ self.value = value
+
+ def __cmp__(self, other):
+ return cmp(self.key, other.key)
+
+
+
+
+
+
class SlotTable(object):
+ """Slot Table
+
+ The slot table is, by far, the most used data structure in Haizea, we need to
+ optimize access to these reservations. In particular, we will often need to quickly
+ access reservations starting or ending at a specific time (or in an interval of time).
+ The current slot table implementation stores the RRs in two ordered lists: one
+ by starting time and another by ending time. Access is done by binary search in O(log n)
+ time. Insertion and removal require O(n) time, since lists are implemented internally
+ as arrays in CPython. We could improve these times in the future by using a
+ tree structure (which Python doesn't have natively, so we'd have to include
+ our own tree implementation), although slot table accesses far outweight insertion
+ and removal operations. The slot table is implemented with classes SlotTable,
+ Node, NodeList, and KeyValueWrapper.
+
+ """
+
def __init__(self):
self.logger = logging.getLogger("SLOT")
self.nodes = NodeList()
self.reservations = []
- self.reservationsByStart = []
- self.reservationsByEnd = []
+ self.reservations_by_start = []
+ self.reservations_by_end = []
self.availabilitycache = {}
self.changepointcache = None
-
- self.availabilitywindow = AvailabilityWindow(self)
def add_node(self, resourcepoolnode):
self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
def is_empty(self):
- return (len(self.reservationsByStart) == 0)
+ return (len(self.reservations_by_start) == 0)
- def dirty(self):
- # You're a dirty, dirty slot table and you should be
- # ashamed of having outdated caches!
- self.availabilitycache = {}
- self.changepointcache = None
-
- def getAvailabilityCacheMiss(self, time):
- allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
- onlynodes = None
- nodes = {}
- reservations = self.getReservationsAt(time)
- # Find how much resources are available on each node
- canpreempt = True
- for r in reservations:
- for node in r.resources_in_pnode:
- if onlynodes == None or (onlynodes != None and node in onlynodes):
- if not nodes.has_key(node):
- n = self.nodes[node]
- if canpreempt:
- nodes[node] = Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode)
- else:
- nodes[node] = Node(n.capacity, None, n.resourcepoolnode)
- nodes[node].capacity.decr(r.resources_in_pnode[node])
- if canpreempt and not r.is_preemptible:
- nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
+ def is_full(self, time, restype = constants.RES_CPU):
+ nodes = self.get_availability(time)
+ avail = sum([node.capacity.get_by_type(restype) for node in nodes.values()])
+ return (avail == 0)
- # For the remaining nodes, use a reference to the original node, not a copy
- if onlynodes == None:
- missing = allnodes - set(nodes.keys())
- else:
- missing = onlynodes - set(nodes.keys())
-
- for node in missing:
- nodes[node] = self.nodes[node]
-
- self.availabilitycache[time] = nodes
-
- def getAvailability(self, time, resreq=None, onlynodes=None, canpreempt=False):
- if not self.availabilitycache.has_key(time):
- self.getAvailabilityCacheMiss(time)
- # Cache miss
-
- nodes = self.availabilitycache[time]
-
- if onlynodes != None:
- onlynodes = set(onlynodes)
- nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
-
- # Keep only those nodes with enough resources
- if resreq != None:
- newnodes = {}
- for n, node in nodes.items():
- if (not canpreempt and resreq.fits_in(node.capacity)) or (canpreempt and resreq.fits_in(node.capacitywithpreemption)):
- newnodes[n]=node
- else:
- pass
- nodes = newnodes
-
- return nodes
-
def get_total_capacity(self, restype = constants.RES_CPU):
return sum([n.capacity.get_by_type(restype) for n in self.nodes.nodelist])
- def getReservationsAt(self, time):
+ def get_reservations_at(self, time):
item = KeyValueWrapper(time, None)
- startpos = bisect.bisect_right(self.reservationsByStart, item)
- bystart = set([x.value for x in self.reservationsByStart[:startpos]])
- endpos = bisect.bisect_right(self.reservationsByEnd, item)
- byend = set([x.value for x in self.reservationsByEnd[endpos:]])
+ startpos = bisect.bisect_right(self.reservations_by_start, item)
+ bystart = set([x.value for x in self.reservations_by_start[:startpos]])
+ endpos = bisect.bisect_right(self.reservations_by_end, item)
+ byend = set([x.value for x in self.reservations_by_end[endpos:]])
res = bystart & byend
return list(res)
def get_reservations_starting_between(self, start, end):
startitem = KeyValueWrapper(start, None)
enditem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByStart, startitem)
- endpos = bisect.bisect_right(self.reservationsByStart, enditem)
- res = [x.value for x in self.reservationsByStart[startpos:endpos]]
+ startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+ endpos = bisect.bisect_right(self.reservations_by_start, enditem)
+ res = [x.value for x in self.reservations_by_start[startpos:endpos]]
return res
+ # on or after
def get_reservations_starting_after(self, start):
startitem = KeyValueWrapper(start, None)
- startpos = bisect.bisect_left(self.reservationsByStart, startitem)
- res = [x.value for x in self.reservationsByStart[startpos:]]
+ startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+ res = [x.value for x in self.reservations_by_start[startpos:]]
return res
def get_reservations_ending_after(self, end):
startitem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
- res = [x.value for x in self.reservationsByEnd[startpos:]]
+ startpos = bisect.bisect_left(self.reservations_by_end, startitem)
+ res = [x.value for x in self.reservations_by_end[startpos:]]
return res
def get_reservations_ending_between(self, start, end):
startitem = KeyValueWrapper(start, None)
enditem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
- endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
- res = [x.value for x in self.reservationsByEnd[startpos:endpos]]
+ startpos = bisect.bisect_left(self.reservations_by_end, startitem)
+ endpos = bisect.bisect_right(self.reservations_by_end, enditem)
+ res = [x.value for x in self.reservations_by_end[startpos:endpos]]
return res
def get_reservations_starting_at(self, time):
return self.get_reservations_starting_between(time, time)
def get_reservations_ending_at(self, time):
- return self.get_reservations_ending_between(time, time)
-
- # ONLY for simulation
- def getNextPrematureEnd(self, after):
- from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
- # Inefficient, but ok since this query seldom happens
- res = [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after]
- if len(res) > 0:
- prematureends = [r.prematureend for r in res]
- prematureends.sort()
- return prematureends[0]
- else:
- return None
-
- # ONLY for simulation
- def getPrematurelyEndingRes(self, t):
- from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
- return [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend == t]
+ return self.get_reservations_ending_between(time, time)
+ def get_reservations_after(self, time):
+ bystart = set(self.get_reservations_starting_after(time))
+ byend = set(self.get_reservations_ending_after(time))
+ return list(bystart | byend)
+
+ def get_changepoints_after(self, after, until=None, nodes=None):
+ changepoints = set()
+ res = self.get_reservations_after(after)
+ for rr in res:
+ if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
+ if rr.start > after:
+ changepoints.add(rr.start)
+ if rr.end > after:
+ changepoints.add(rr.end)
+ changepoints = list(changepoints)
+ if until != None:
+ changepoints = [c for c in changepoints if c < until]
+ changepoints.sort()
+ return changepoints
- def get_reservations_starting_or_ending_after(self, after):
- item = KeyValueWrapper(after, None)
- startpos = bisect.bisect_right(self.reservationsByStart, item)
- bystart = set([x.value for x in self.reservationsByStart[:startpos]])
- endpos = bisect.bisect_right(self.reservationsByEnd, item)
- byend = set([x.value for x in self.reservationsByEnd[endpos:]])
- res = bystart | byend
- return list(res)
-
- def addReservation(self, rr):
+ def add_reservation(self, rr):
startitem = KeyValueWrapper(rr.start, rr)
enditem = KeyValueWrapper(rr.end, rr)
- bisect.insort(self.reservationsByStart, startitem)
- bisect.insort(self.reservationsByEnd, enditem)
- self.dirty()
+ bisect.insort(self.reservations_by_start, startitem)
+ bisect.insort(self.reservations_by_end, enditem)
+ self.__dirty()
# If the slot table keys are not modified (start / end time)
# Just remove and reinsert.
- def updateReservation(self, rr):
+ def update_reservation(self, rr):
# TODO: Might be more efficient to resort lists
- self.removeReservation(rr)
- self.addReservation(rr)
- self.dirty()
+ self.remove_reservation(rr)
+ self.add_reservation(rr)
+ self.__dirty()
# If the slot table keys are modified (start and/or end time)
# provide the old keys (so we can remove it using
# the m) and updated reservation
def update_reservation_with_key_change(self, rr, old_start, old_end):
# TODO: Might be more efficient to resort lists
- self.removeReservation(rr, old_start, old_end)
- self.addReservation(rr)
- self.dirty()
-
-
- def getIndexOfReservation(self, rlist, rr, key):
- item = KeyValueWrapper(key, None)
- pos = bisect.bisect_left(rlist, item)
- found = False
- while not found:
- if rlist[pos].value == rr:
- found = True
- else:
- pos += 1
- return pos
-
- def removeReservation(self, rr, start=None, end=None):
+ self.remove_reservation(rr, old_start, old_end)
+ self.add_reservation(rr)
+ self.__dirty()
+
+ def remove_reservation(self, rr, start=None, end=None):
if start == None:
start = rr.start
if end == None:
- end = rr.start
- posstart = self.getIndexOfReservation(self.reservationsByStart, rr, start)
- posend = self.getIndexOfReservation(self.reservationsByEnd, rr, end)
- self.reservationsByStart.pop(posstart)
- self.reservationsByEnd.pop(posend)
- self.dirty()
+ end = rr.end
+ posstart = self.__get_reservation_index(self.reservations_by_start, rr, start)
+ posend = self.__get_reservation_index(self.reservations_by_end, rr, end)
+ self.reservations_by_start.pop(posstart)
+ self.reservations_by_end.pop(posend)
+ self.__dirty()
+
+ def get_availability(self, time, min_capacity=None, onlynodes=None):
+ if not self.availabilitycache.has_key(time):
+ self.__get_availability_cache_miss(time)
+ # Cache miss
+
+ nodes = self.availabilitycache[time]
-
- def findChangePointsAfter(self, after, until=None, nodes=None):
- changepoints = set()
- res = self.get_reservations_starting_or_ending_after(after)
- for rr in res:
- if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
- if rr.start > after:
- changepoints.add(rr.start)
- if rr.end > after:
- changepoints.add(rr.end)
- changepoints = list(changepoints)
- if until != None:
- changepoints = [c for c in changepoints if c < until]
- changepoints.sort()
- return changepoints
-
- def peekNextChangePoint(self, time):
- if self.changepointcache == None:
- # Cache is empty
- changepoints = self.findChangePointsAfter(time)
- changepoints.reverse()
- self.changepointcache = changepoints
- if len(self.changepointcache) == 0:
- return None
- else:
- return self.changepointcache[-1]
-
- def getNextChangePoint(self, time):
- p = self.peekNextChangePoint(time)
- if p != None:
- self.changepointcache.pop()
- return p
-
- def isFull(self, time):
- nodes = self.getAvailability(time)
- avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
- return (avail == 0)
-
+ if onlynodes != None:
+ onlynodes = set(onlynodes)
+ nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
+
+ # Keep only those nodes with enough resources
+ if min_capacity != None:
+ newnodes = {}
+ for n, node in nodes.items():
+ if min_capacity.fits_in(node.capacity):
+ newnodes[n]=node
+ else:
+ pass
+ nodes = newnodes
+
+ return nodes
+
def get_next_reservations_in_nodes(self, time, nodes, rr_type=None, immediately_next = False):
nodes = set(nodes)
rrs_in_nodes = []
@@ -429,107 +392,267 @@
rrs_in_nodes = list(rr_nodes_excl)
return rrs_in_nodes
-
-class AvailEntry(object):
- def __init__(self, time, avail, availpreempt, resreq):
- self.time = time
- self.avail = avail
- self.availpreempt = availpreempt
+
+ def get_next_changepoint(self, time):
+ item = KeyValueWrapper(time, None)
- if avail == None and availpreempt == None:
- self.canfit = 0
- self.canfitpreempt = 0
+ startpos = bisect.bisect_right(self.reservations_by_start, item)
+ if startpos == len(self.reservations_by_start):
+ time1 = None
else:
- self.canfit = resreq.get_num_fits_in(avail)
- if availpreempt == None:
- self.canfitpreempt = 0
+ time1 = self.reservations_by_start[startpos].value.start
+
+ endpos = bisect.bisect_right(self.reservations_by_end, item)
+ if endpos == len(self.reservations_by_end):
+ time2 = None
+ else:
+ time2 = self.reservations_by_end[endpos].value.end
+
+ if time1==None and time2==None:
+ return None
+ elif time1==None:
+ return time2
+ elif time2==None:
+ return time1
+ else:
+ return min(time1, time2)
+
+ # ONLY for simulation
+ def get_next_premature_end(self, after):
+ from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+ # Inefficient, but ok since this query seldom happens
+ res = [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after]
+ if len(res) > 0:
+ prematureends = [r.prematureend for r in res]
+ prematureends.sort()
+ return prematureends[0]
+ else:
+ return None
+
+ # ONLY for simulation
+ def get_prematurely_ending_res(self, t):
+ from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+ return [i.value for i in self.reservations_by_end if isinstance(i.value, VMResourceReservation) and i.value.prematureend == t]
+
+
+ def __get_reservation_index(self, rlist, rr, key):
+ item = KeyValueWrapper(key, None)
+ pos = bisect.bisect_left(rlist, item)
+ found = False
+ while not found:
+ if rlist[pos].value == rr:
+ found = True
else:
- self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
+ pos += 1
+ return pos
- def getCanfit(self, canpreempt):
- if canpreempt:
- return self.canfitpreempt
+
+ def __get_availability_cache_miss(self, time):
+ allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
+ onlynodes = None
+ nodes = {}
+ reservations = self.get_reservations_at(time)
+ # Find how much resources are available on each node
+ for r in reservations:
+ for node in r.resources_in_pnode:
+ if onlynodes == None or (onlynodes != None and node in onlynodes):
+ if not nodes.has_key(node):
+ n = self.nodes[node]
+ nodes[node] = Node(n.capacity, n.resourcepoolnode)
+ nodes[node].capacity.decr(r.resources_in_pnode[node])
+
+ # For the remaining nodes, use a reference to the original node, not a copy
+ if onlynodes == None:
+ missing = allnodes - set(nodes.keys())
else:
- return self.canfit
+ missing = onlynodes - set(nodes.keys())
+
+ for node in missing:
+ nodes[node] = self.nodes[node]
+
+ self.availabilitycache[time] = nodes
+ def __dirty(self):
+ # You're a dirty, dirty slot table and you should be
+ # ashamed of having outdated caches!
+ self.availabilitycache = {}
+
+
+
+
+
+class ChangepointAvail(object):
+ def __init__(self):
+ self.nodes = {}
+ self.leases = set()
+
+ def add_node(self, node, capacity):
+ self.nodes[node] = ChangepointNodeAvail(capacity)
+
+class ChangepointNodeAvail(object):
+ def __init__(self, available):
+ self.available = ResourceTuple.copy(available)
+ self.leases = set()
+ self.available_if_preempting = {}
+ self.next_cp = None
+ self.next_nodeavail = None
+
+ def decr(self, capacity):
+ self.available.decr(capacity)
+
+ def add_lease(self, lease_id, capacity):
+ if not lease_id in self.leases:
+ self.leases.add(lease_id)
+ self.available_if_preempting[lease_id] = ResourceTuple.create_empty()
+ self.available_if_preempting[lease_id].incr(capacity)
+
+ def get_avail_withpreemption(self, leases):
+ avail = ResourceTuple.copy(available)
+ for l in leases:
+ avail.decr(self.available_if_preempting[lease])
+
+class AvailEntry(object):
+ def __init__(self, available, until):
+ self.available = available
+ self.until = until
+
+
class AvailabilityWindow(object):
- def __init__(self, slottable):
+ """An availability window
+
+ A particularly important operation with the slot table is determining the
+ "availability window" of resources starting at a given time. In a nutshell,
+ an availability window provides a convenient abstraction over the slot table,
+ with methods to answer questions like "If I want to start a least at time T,
+ are there enough resources available to start the lease?" "Will those resources
+ be available until time T+t?" "If not, what's the longest period of time those
+ resources will be available?"
+
+ """
+ def __init__(self, slottable, time, onlynodes = None):
self.slottable = slottable
self.logger = logging.getLogger("SLOTTABLE.WIN")
- self.time = None
- self.resreq = None
- self.onlynodes = None
- self.avail = None
-
- # Create avail structure
- def initWindow(self, time, resreq, onlynodes = None, canpreempt=False):
self.time = time
- self.resreq = resreq
self.onlynodes = onlynodes
- self.avail = {}
+ self.leases = set()
+
+ self.cp_list = self.slottable.get_changepoints_after(time, nodes=onlynodes)
+
+ # Create initial changepoint hash table
+ self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
+
+ for cp in self.changepoints.values():
+ for node_id, node in enumerate(self.slottable.nodes):
+ cp.add_node(node_id + 1, node.capacity)
+
+ # Initial time
+ avail_at_start = self.slottable.get_availability(self.time, onlynodes=onlynodes)
+ self.cp_list = [self.time] + self.cp_list
+ self.changepoints[self.time] = ChangepointAvail()
+ for node_id, node in avail_at_start.items():
+ self.changepoints[self.time].add_node(node_id, node.capacity)
+
+ rrs = self.slottable.get_reservations_after(time)
+ rrs.sort(key=attrgetter("start"))
+ pos = 0
+ # Fill in rest of changepoint hash table
+ for rr in rrs:
+ while rr.start >= self.time and self.cp_list[pos] != rr.start:
+ pos += 1
+
+ lease_id = rr.lease.id
+
+ self.leases.add(lease_id)
+
+ if rr.start >= self.time:
+ start_cp = self.changepoints[rr.start]
+ start_cp.leases.add(lease_id)
+ for node in rr.resources_in_pnode:
+ start_cp.nodes[node].decr(rr.resources_in_pnode[node])
+ start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+ else:
+ start_cp = self.changepoints[self.time]
+ start_cp.leases.add(lease_id)
+ for node in rr.resources_in_pnode:
+ start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+
+ pos2 = pos + 1
- # Availability at initial time
- availatstart = self.slottable.getAvailability(self.time, self.resreq, self.onlynodes, canpreempt)
- for node in availatstart:
- capacity = availatstart[node].capacity
- if canpreempt:
- capacitywithpreemption = availatstart[node].capacitywithpreemption
- else:
- capacitywithpreemption = None
- self.avail[node] = [AvailEntry(self.time, capacity, capacitywithpreemption, self.resreq)]
+ while self.cp_list[pos2] < rr.end:
+ cp = self.changepoints[self.cp_list[pos2]]
+ cp.leases.add(lease_id)
+ for node in rr.resources_in_pnode:
+ cp.nodes[node].decr(rr.resources_in_pnode[node])
+ cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+
+ pos2 += 1
- # Determine the availability at the subsequent change points
- nodes = set(availatstart.keys())
- res = self.slottable.get_reservations_starting_after(self.time)
- changepoints = set()
- for rr in res:
- if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
- changepoints.add(rr.start)
- changepoints = list(changepoints)
- changepoints.sort()
- for p in changepoints:
- availatpoint = self.slottable.getAvailability(p, self.resreq, nodes, canpreempt)
- newnodes = set(availatpoint.keys())
+ prev_nodeavail = {}
+ for node_id, node in self.changepoints[self.time].nodes.items():
+ prev_nodeavail[node_id] = [node]
+
+ # Link node entries
+ for cp in self.cp_list[1:]:
+ for node_id, node in self.changepoints[cp].nodes.items():
+ prev_nodes = prev_nodeavail[node_id]
+ if prev_nodes[-1].available == node.available and prev_nodes[-1].leases == node.leases:
+ prev_nodes.append(node)
+ else:
+ for prev_node in prev_nodes:
+ prev_node.next_cp = cp
+ prev_node.next_nodeavail = node
+ prev_nodeavail[node_id] = [node]
+
+
+ def get_availability_at_node(self, time, node, preempted_leases = []):
+ avails = []
+
+ node = self.changepoints[time].nodes[node]
+ prev_avail = None
+ prev_node = None
+
+ while node != None:
+ available = ResourceTuple.copy(node.available)
+ for l in preempted_leases:
+ if node.available_if_preempting.has_key(l):
+ available.incr(node.available_if_preempting[l])
- # Add entries for nodes that have no resources available
- # (for, at least, one VM)
- fullnodes = nodes - newnodes
- for node in fullnodes:
- self.avail[node].append(AvailEntry(p, None, None, None))
- nodes.remove(node)
-
- # For the rest, only interested if the available resources
- # Decrease in the window
- for node in newnodes:
- capacity = availatpoint[node].capacity
- fits = self.resreq.get_num_fits_in(capacity)
- if canpreempt:
- capacitywithpreemption = availatpoint[node].capacitywithpreemption
- fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
- prevavail = self.avail[node][-1]
- if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
- self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
- elif canpreempt and (prevavail.getCanfit(canpreempt=False) > fits or prevavail.getCanfit(canpreempt=True) > fitswithpreemption):
- self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
+ if prev_avail != None and available.fits_in(prev_avail.available):
+ availentry = AvailEntry(available, None)
+ avails.append(availentry)
+ prev_avail.until = prev_node.next_cp
+ prev_avail = availentry
+ elif prev_avail == None:
+ availentry = AvailEntry(available, None)
+ avails.append(availentry)
+ prev_avail = availentry
+
+ prev_node = node
+ node = node.next_nodeavail
+
+ return avails
+
+ def get_nodes_at(self, time):
+ return self.changepoints[time].nodes.items()
- def fitAtStart(self, nodes = None, canpreempt = False):
+### Everything after this is from the old implementation
+
+ def fit_at_start(self, nodes = None):
if nodes != None:
avail = [v for (k, v) in self.avail.items() if k in nodes]
else:
avail = self.avail.values()
- if canpreempt:
- return sum([e[0].canfitpreempt for e in avail])
- else:
- return sum([e[0].canfit for e in avail])
+
+ return sum([e[0].canfit for e in avail])
# TODO: Also return the amount of resources that would have to be
# preempted in each physnode
- def findPhysNodesForVMs(self, numnodes, maxend, strictend=False, canpreempt=False):
+ def find_pnodes_for_vms(self, numnodes, maxend, strictend=False):
# Returns the physical nodes that can run all VMs, and the
# time at which the VMs must end
- canfit = dict([(n, v[0].getCanfit(canpreempt)) for (n, v) in self.avail.items()])
+ canfit = dict([(n, v[0].canfit) for (n, v) in self.avail.items()])
entries = []
for n in self.avail.keys():
entries += [(n, e) for e in self.avail[n][1:]]
@@ -547,7 +670,7 @@
# Can run to its maximum duration
break
else:
- diff = canfit[physnode] - entry.getCanfit(canpreempt)
+ diff = canfit[physnode] - entry.canfit
totalcanfit = sum([n for n in canfit.values()]) - diff
if totalcanfit < numnodes and not strictend:
# Not enough resources. Must end here
@@ -555,7 +678,7 @@
break
else:
# Update canfit
- canfit[physnode] = entry.getCanfit(canpreempt)
+ canfit[physnode] = entry.canfit
# Filter out nodes where we can't fit any vms
canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
@@ -563,42 +686,25 @@
return end, canfit
- def printContents(self, nodes = None, withpreemption = False):
+ def print_contents(self, nodes = None):
if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
if nodes == None:
physnodes = self.avail.keys()
else:
physnodes = [k for k in self.avail.keys() if k in nodes]
physnodes.sort()
- if withpreemption:
- p = "(with preemption)"
- else:
- p = "(without preemption)"
+
self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
for n in physnodes:
contents = "Node %i --- " % n
for x in self.avail[n]:
contents += "[ %s " % x.time
contents += "{ "
- if x.avail == None and x.availpreempt == None:
+ if x.avail == None :
contents += "END "
else:
- if withpreemption:
- res = x.availpreempt
- canfit = x.canfitpreempt
- else:
- res = x.avail
- canfit = x.canfit
+ res = x.avail
+ canfit = x.canfit
contents += "%s" % res
contents += "} (Fits: %i) ] " % canfit
self.logger.vdebug(contents)
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
Added: branches/TP2.0/tests/test_slottable.py
===================================================================
--- branches/TP2.0/tests/test_slottable.py (rev 0)
+++ branches/TP2.0/tests/test_slottable.py 2009-06-08 18:05:04 UTC (rev 583)
@@ -0,0 +1,603 @@
+from haizea.resourcemanager.leases import Lease
+from haizea.resourcemanager.scheduler.resourcepool import Node
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple, SlotTable, ResourceReservation, AvailabilityWindow
+from mx import DateTime
+import haizea.common.constants as constants
+
+T1255 = DateTime.DateTime(2006,11,25,12,55)
+T1300 = DateTime.DateTime(2006,11,25,13,00)
+T1305 = DateTime.DateTime(2006,11,25,13,05)
+T1315 = DateTime.DateTime(2006,11,25,13,15)
+T1325 = DateTime.DateTime(2006,11,25,13,25)
+T1330 = DateTime.DateTime(2006,11,25,13,30)
+T1335 = DateTime.DateTime(2006,11,25,13,35)
+T1345 = DateTime.DateTime(2006,11,25,13,45)
+T1350 = DateTime.DateTime(2006,11,25,13,50)
+T1355 = DateTime.DateTime(2006,11,25,13,55)
+T1400 = DateTime.DateTime(2006,11,25,14,00)
+T1415 = DateTime.DateTime(2006,11,25,14,15)
+T1420 = DateTime.DateTime(2006,11,25,14,20)
+
+resource_types = [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
+ (constants.RES_MEM, constants.RESTYPE_INT, "Mem")]
+ResourceTuple.set_resource_types(resource_types)
+
+FULL_NODE = ResourceTuple.create_empty()
+FULL_NODE.set_by_type(constants.RES_CPU, 1.0)
+FULL_NODE.set_by_type(constants.RES_MEM, 1024)
+
+HALF_NODE = ResourceTuple.create_empty()
+HALF_NODE.set_by_type(constants.RES_CPU, 0.5)
+HALF_NODE.set_by_type(constants.RES_MEM, 512)
+
+QRTR_NODE = ResourceTuple.create_empty()
+QRTR_NODE.set_by_type(constants.RES_CPU, 0.25)
+QRTR_NODE.set_by_type(constants.RES_MEM, 256)
+
+EMPT_NODE = ResourceTuple.create_empty()
+
+class TestSlotTable(object):
+ def __init__(self):
+ self.slottable = None
+
+ def test_slottable(self):
+ def assert_capacity(node, percent):
+ assert node.capacity.get_by_type(constants.RES_CPU) == percent * 1.0
+ assert node.capacity.get_by_type(constants.RES_MEM) == percent * 1024
+
+ def reservations_1_assertions():
+ assert not self.slottable.is_empty()
+ nodes = self.slottable.get_availability(T1300)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ nodes = self.slottable.get_availability(T1330)
+ assert_capacity(nodes[1], 1.0)
+ assert_capacity(nodes[2], 1.0)
+
+ def reservations_2_assertions():
+ nodes = self.slottable.get_availability(T1300)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1300)
+ assert len(rrs) == 1
+ assert rrs[0] == rr1
+
+ nodes = self.slottable.get_availability(T1330)
+ assert_capacity(nodes[1], 0.75)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1330)
+ assert len(rrs) == 1
+ assert rrs[0] == rr2
+
+ nodes = self.slottable.get_availability(T1400)
+ assert_capacity(nodes[1], 1.0)
+ assert_capacity(nodes[2], 1.0)
+ rrs = self.slottable.get_reservations_at(T1400)
+ assert len(rrs) == 0
+
+ def reservations_3_assertions():
+ nodes = self.slottable.get_availability(T1300)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1300)
+ assert len(rrs) == 1
+ assert rrs[0] == rr1
+
+ nodes = self.slottable.get_availability(T1315)
+ assert_capacity(nodes[1], 0.25)
+ assert_capacity(nodes[2], 0.25)
+ rrs = self.slottable.get_reservations_at(T1315)
+ assert len(rrs) == 2
+ assert rr1 in rrs and rr3 in rrs
+
+ nodes = self.slottable.get_availability(T1330)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.25)
+ rrs = self.slottable.get_reservations_at(T1330)
+ assert len(rrs) == 2
+ assert rr2 in rrs and rr3 in rrs
+
+ nodes = self.slottable.get_availability(T1345)
+ assert_capacity(nodes[1], 0.75)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1345)
+ assert len(rrs) == 1
+ assert rrs[0] == rr2
+
+ nodes = self.slottable.get_availability(T1400)
+ assert_capacity(nodes[1], 1.0)
+ assert_capacity(nodes[2], 1.0)
+ rrs = self.slottable.get_reservations_at(T1400)
+ assert len(rrs) == 0
+
+ def reservations_4_assertions():
+ nodes = self.slottable.get_availability(T1300)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1300)
+ assert len(rrs) == 1
+ assert rrs[0] == rr1
+
+ nodes = self.slottable.get_availability(T1315)
+ assert_capacity(nodes[1], 0.25)
+ assert_capacity(nodes[2], 0.25)
+ rrs = self.slottable.get_reservations_at(T1315)
+ assert len(rrs) == 2
+ assert rr1 in rrs and rr3 in rrs
+
+ nodes = self.slottable.get_availability(T1330)
+ assert_capacity(nodes[1], 0)
+ assert_capacity(nodes[2], 0)
+ rrs = self.slottable.get_reservations_at(T1330)
+ assert len(rrs) == 3
+ assert rr4 in rrs and rr2 in rrs and rr3 in rrs
+
+ nodes = self.slottable.get_availability(T1345)
+ assert_capacity(nodes[1], 0.25)
+ assert_capacity(nodes[2], 0.25)
+ rrs = self.slottable.get_reservations_at(T1345)
+ assert len(rrs) == 2
+ assert rr2 in rrs and rr4 in rrs
+
+ nodes = self.slottable.get_availability(T1400)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.75)
+ rrs = self.slottable.get_reservations_at(T1400)
+ assert len(rrs) == 1
+ assert rrs[0] == rr4
+
+ nodes = self.slottable.get_availability(T1415)
+ assert_capacity(nodes[1], 1.0)
+ assert_capacity(nodes[2], 1.0)
+ rrs = self.slottable.get_reservations_at(T1415)
+ assert len(rrs) == 0
+
+ def reservations_5_assertions():
+ nodes = self.slottable.get_availability(T1300)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1300)
+ assert len(rrs) == 1
+ assert rrs[0] == rr1
+
+ nodes = self.slottable.get_availability(T1315)
+ assert_capacity(nodes[1], 0.25)
+ assert_capacity(nodes[2], 0.25)
+ rrs = self.slottable.get_reservations_at(T1315)
+ assert len(rrs) == 2
+ assert set(rrs) == set([rr1,rr3])
+
+ nodes = self.slottable.get_availability(T1330)
+ assert_capacity(nodes[1], 0)
+ assert_capacity(nodes[2], 0)
+ rrs = self.slottable.get_reservations_at(T1330)
+ assert len(rrs) == 3
+ assert set(rrs) == set([rr2,rr3,rr4])
+
+ nodes = self.slottable.get_availability(T1345)
+ assert_capacity(nodes[1], 0.25)
+ assert_capacity(nodes[2], 0)
+ rrs = self.slottable.get_reservations_at(T1345)
+ assert len(rrs) == 3
+ assert set(rrs) == set([rr2,rr4,rr5])
+
+ nodes = self.slottable.get_availability(T1400)
+ assert_capacity(nodes[1], 0.5)
+ assert_capacity(nodes[2], 0.5)
+ rrs = self.slottable.get_reservations_at(T1400)
+ assert len(rrs) == 2
+ assert set(rrs) == set([rr4,rr5])
+
+ nodes = self.slottable.get_availability(T1415)
+ assert_capacity(nodes[1], 1.0)
+ assert_capacity(nodes[2], 1.0)
+ rrs = self.slottable.get_reservations_at(T1415)
+ assert len(rrs) == 0
+
+ rrs = self.slottable.get_reservations_starting_between(T1300, T1315)
+ assert set(rrs) == set([rr1,rr3])
+ rrs = self.slottable.get_reservations_starting_between(T1300, T1330)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4])
+ rrs = self.slottable.get_reservations_starting_between(T1300, T1345)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_between(T1315, T1330)
+ assert set(rrs) == set([rr2,rr3,rr4])
+ rrs = self.slottable.get_reservations_starting_between(T1315, T1345)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_between(T1330, T1345)
+ assert set(rrs) == set([rr2,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_between(T1400, T1415)
+ assert len(rrs) == 0
+ rrs = self.slottable.get_reservations_starting_between(T1305, T1335)
+ assert set(rrs) == set([rr3,rr2,rr4])
+
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1305)
+ assert len(rrs) == 0
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1315)
+ assert len(rrs) == 0
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1330)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1335)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1345)
+ assert set(rrs) == set([rr1,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1400)
+ assert set(rrs) == set([rr1,rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1300, T1415)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1315)
+ assert len(rrs) == 0
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1330)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1335)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1345)
+ assert set(rrs) == set([rr1,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1400)
+ assert set(rrs) == set([rr1,rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1305, T1415)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1315, T1330)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1315, T1335)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1315, T1345)
+ assert set(rrs) == set([rr1,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1315, T1400)
+ assert set(rrs) == set([rr1,rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1315, T1415)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1330, T1335)
+ assert set(rrs) == set([rr1])
+ rrs = self.slottable.get_reservations_ending_between(T1330, T1345)
+ assert set(rrs) == set([rr1,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1330, T1400)
+ assert set(rrs) == set([rr1,rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1330, T1415)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1335, T1345)
+ assert set(rrs) == set([rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1335, T1400)
+ assert set(rrs) == set([rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1335, T1415)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1345, T1400)
+ assert set(rrs) == set([rr2,rr3])
+ rrs = self.slottable.get_reservations_ending_between(T1345, T1415)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_between(T1400, T1415)
+ assert set(rrs) == set([rr2,rr4,rr5])
+
+ rrs = self.slottable.get_reservations_starting_after(T1300)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1305)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1315)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1330)
+ assert set(rrs) == set([rr2,rr4,rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1335)
+ assert set(rrs) == set([rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1345)
+ assert set(rrs) == set([rr5])
+ rrs = self.slottable.get_reservations_starting_after(T1400)
+ assert len(rrs) == 0
+ rrs = self.slottable.get_reservations_starting_after(T1415)
+ assert len(rrs) == 0
+
+ rrs = self.slottable.get_reservations_ending_after(T1300)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1305)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1315)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1330)
+ assert set(rrs) == set([rr1,rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1335)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1345)
+ assert set(rrs) == set([rr2,rr3,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1400)
+ assert set(rrs) == set([rr2,rr4,rr5])
+ rrs = self.slottable.get_reservations_ending_after(T1415)
+ assert set(rrs) == set([rr4,rr5])
+
+ assert self.slottable.get_next_changepoint(T1255) == T1300
+ assert self.slottable.get_next_changepoint(T1300) == T1315
+ assert self.slottable.get_next_changepoint(T1315) == T1330
+ assert self.slottable.get_next_changepoint(T1330) == T1345
+ assert self.slottable.get_next_changepoint(T1335) == T1345
+ assert self.slottable.get_next_changepoint(T1345) == T1400
+ assert self.slottable.get_next_changepoint(T1400) == T1415
+ assert self.slottable.get_next_changepoint(T1415) == None
+ assert self.slottable.get_next_changepoint(T1420) == None
+
+ self.slottable = SlotTable()
+
+ self.slottable.add_node(Node(1, "test-1", FULL_NODE))
+ self.slottable.add_node(Node(2, "test-2", FULL_NODE))
+
+ assert self.slottable.get_total_capacity(constants.RES_CPU) == 2.0
+ assert self.slottable.get_total_capacity(constants.RES_MEM) == 2048
+ assert self.slottable.is_empty()
+
+ res1 = {1: HALF_NODE, 2: HALF_NODE}
+ rr1 = ResourceReservation(None, T1300, T1330, res1)
+ self.slottable.add_reservation(rr1)
+ reservations_1_assertions()
+
+ res2 = {1: QRTR_NODE, 2: HALF_NODE}
+ rr2 = ResourceReservation(None, T1330, T1400, res2)
+ self.slottable.add_reservation(rr2)
+ reservations_2_assertions()
+
+ res3 = {1: QRTR_NODE, 2: QRTR_NODE}
+ rr3 = ResourceReservation(None, T1315, T1345, res3)
+ self.slottable.add_reservation(rr3)
+ reservations_3_assertions()
+
+ res4 = {1: HALF_NODE, 2: QRTR_NODE}
+ rr4 = ResourceReservation(None, T1330, T1415, res4)
+ self.slottable.add_reservation(rr4)
+ reservations_4_assertions()
+
+ res5 = {2: QRTR_NODE}
+ rr5 = ResourceReservation(None, T1345, T1415, res5)
+ self.slottable.add_reservation(rr5)
+ reservations_5_assertions()
+
+ self.slottable.remove_reservation(rr5)
+ reservations_4_assertions()
+ self.slottable.remove_reservation(rr4)
+ reservations_3_assertions()
+ self.slottable.remove_reservation(rr3)
+ reservations_2_assertions()
+ self.slottable.remove_reservation(rr2)
+ reservations_1_assertions()
+ self.slottable.remove_reservation(rr1)
+
+ assert self.slottable.is_empty()
+
+ def test_availabilitywindow(self):
+ def avail_node_assertions(time, avail, node_id, leases, next_cp):
+ node = aw.changepoints[time].nodes[node_id]
+ nleases = len(leases)
+ assert(node.available == avail)
+
+ assert(len(node.leases)==nleases)
+ for l in leases:
+ assert(l in node.leases)
+ assert(len(node.available_if_preempting) == nleases)
+ for l in leases:
+ assert(node.available_if_preempting[l] == leases[l])
+ assert(node.next_cp == next_cp)
+ if next_cp != None:
+ assert(node.next_nodeavail == aw.changepoints[next_cp].nodes[node_id])
+
+ self.slottable = SlotTable()
+
+ self.slottable.add_node(Node(1, "test-1", FULL_NODE))
+ self.slottable.add_node(Node(2, "test-2", FULL_NODE))
+ self.slottable.add_node(Node(3, "test-3", FULL_NODE))
+ self.slottable.add_node(Node(4, "test-4", FULL_NODE))
+
+ lease1 = Lease(None,None,None,None,None,1,None,None)
+ lease1.id = 1
+ res1 = {2: HALF_NODE}
+ rr1_1 = ResourceReservation(lease1, T1315, T1325, res1)
+ rr1_2 = ResourceReservation(lease1, T1325, T1330, res1)
+ self.slottable.add_reservation(rr1_1)
+ self.slottable.add_reservation(rr1_2)
+
+ lease2 = Lease(None,None,None,None,None,2,None,None)
+ lease2.id = 2
+ res2 = {2: FULL_NODE, 3: FULL_NODE}
+ rr2 = ResourceReservation(lease2, T1330, T1345, res2)
+ self.slottable.add_reservation(rr2)
+
+ lease3 = Lease(None,None,None,None,None,1,None,None)
+ lease3.id = 3
+ res3 = {4: FULL_NODE}
+ rr3_1 = ResourceReservation(lease3, T1330, T1355, res3)
+ rr3_2 = ResourceReservation(lease3, T1355, T1400, res3)
+ self.slottable.add_reservation(rr3_1)
+ self.slottable.add_reservation(rr3_2)
+
+ lease4 = Lease(None,None,None,None,None,1,None,None)
+ lease4.id = 4
+ res4 = {2: QRTR_NODE, 3: HALF_NODE}
+ rr4 = ResourceReservation(lease4, T1350, T1415, res4)
+ self.slottable.add_reservation(rr4)
+
+ lease5 = Lease(None,None,None,None,None,1,None,None)
+ lease5.id = 5
+ res5 = {2: QRTR_NODE}
+ rr5 = ResourceReservation(lease5, T1350, T1415, res5)
+ self.slottable.add_reservation(rr5)
+
+ lease6 = Lease(None,None,None,None,None,1,None,None)
+ lease6.id = 6
+ res6 = {1: FULL_NODE}
+ rr6 = ResourceReservation(lease6, T1255, T1305, res6)
+ self.slottable.add_reservation(rr6)
+
+ aw = AvailabilityWindow(self.slottable, T1300)
+
+
+ # TODO: Factor out data into a data structure so we can do more
+ # elaborate assertions
+
+ # 13:00
+ avail_node_assertions(time = T1300, avail = EMPT_NODE, node_id = 1,
+ leases = {6:FULL_NODE}, next_cp = T1305)
+ avail_node_assertions(time = T1300, avail = FULL_NODE, node_id = 2,
+ leases = {}, next_cp = T1315)
+ avail_node_assertions(time = T1300, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = T1330)
+ avail_node_assertions(time = T1300, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = T1330)
+
+
+ # 13:05
+ avail_node_assertions(time = T1305, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1305, avail = FULL_NODE, node_id = 2,
+ leases = {}, next_cp = T1315)
+ avail_node_assertions(time = T1305, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = T1330)
+ avail_node_assertions(time = T1305, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = T1330)
+
+ # 13:15
+ avail_node_assertions(time = T1315, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1315, avail = HALF_NODE, node_id = 2,
+ leases = {1:HALF_NODE}, next_cp = T1330)
+ avail_node_assertions(time = T1315, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = T1330)
+ avail_node_assertions(time = T1315, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = T1330)
+
+ # 13:25
+ avail_node_assertions(time = T1325, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1325, avail = HALF_NODE, node_id = 2,
+ leases = {1:HALF_NODE}, next_cp = T1330)
+ avail_node_assertions(time = T1325, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = T1330)
+ avail_node_assertions(time = T1325, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = T1330)
+
+ # 13:30
+ avail_node_assertions(time = T1330, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1330, avail = EMPT_NODE, node_id = 2,
+ leases = {2:FULL_NODE}, next_cp = T1345)
+ avail_node_assertions(time = T1330, avail = EMPT_NODE, node_id = 3,
+ leases = {2:FULL_NODE}, next_cp = T1345)
+ avail_node_assertions(time = T1330, avail = EMPT_NODE, node_id = 4,
+ leases = {3:FULL_NODE}, next_cp = T1400)
+
+ # 13:45
+ avail_node_assertions(time = T1345, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1345, avail = FULL_NODE, node_id = 2,
+ leases = {}, next_cp = T1350)
+ avail_node_assertions(time = T1345, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = T1350)
+ avail_node_assertions(time = T1345, avail = EMPT_NODE, node_id = 4,
+ leases = {3:FULL_NODE}, next_cp = T1400)
+
+ # 13:50
+ avail_node_assertions(time = T1350, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1350, avail = HALF_NODE, node_id = 2,
+ leases = {4:QRTR_NODE,5:QRTR_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1350, avail = HALF_NODE, node_id = 3,
+ leases = {4:HALF_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1350, avail = EMPT_NODE, node_id = 4,
+ leases = {3:FULL_NODE}, next_cp = T1400)
+
+ # 13:55
+ avail_node_assertions(time = T1355, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1355, avail = HALF_NODE, node_id = 2,
+ leases = {4:QRTR_NODE,5:QRTR_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1355, avail = HALF_NODE, node_id = 3,
+ leases = {4:HALF_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1355, avail = EMPT_NODE, node_id = 4,
+ leases = {3:FULL_NODE}, next_cp = T1400)
+
+ # 14:00
+ avail_node_assertions(time = T1400, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1400, avail = HALF_NODE, node_id = 2,
+ leases = {4:QRTR_NODE,5:QRTR_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1400, avail = HALF_NODE, node_id = 3,
+ leases = {4:HALF_NODE}, next_cp = T1415)
+ avail_node_assertions(time = T1400, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = None)
+
+ # 14:15
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 2,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = None)
+
+ avail = aw.get_availability_at_node(T1300, 1)
+ assert(len(avail)==1)
+ assert(avail[0].available == EMPT_NODE)
+ assert(avail[0].until == None)
+
+ avail = aw.get_availability_at_node(T1300, 2)
+ assert(len(avail)==3)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == T1315)
+ assert(avail[1].available == HALF_NODE)
+ assert(avail[1].until == T1330)
+ assert(avail[2].available == EMPT_NODE)
+ assert(avail[2].until == None)
+
+ avail = aw.get_availability_at_node(T1300, 3)
+ assert(len(avail)==2)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == T1330)
+ assert(avail[1].available == EMPT_NODE)
+ assert(avail[1].until == None)
+
+ avail = aw.get_availability_at_node(T1300, 4)
+ assert(len(avail)==2)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == T1330)
+ assert(avail[1].available == EMPT_NODE)
+ assert(avail[1].until == None)
+
+
+ avail = aw.get_availability_at_node(T1330, 1)
+ assert(len(avail)==1)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == None)
+
+ avail = aw.get_availability_at_node(T1330, 2)
+ assert(len(avail)==1)
+ assert(avail[0].available == EMPT_NODE)
+ assert(avail[0].until == None)
+
+ avail = aw.get_availability_at_node(T1330, 3)
+ assert(len(avail)==1)
+ assert(avail[0].available == EMPT_NODE)
+ assert(avail[0].until == None)
+
+ avail = aw.get_availability_at_node(T1330, 4)
+ assert(len(avail)==1)
+ assert(avail[0].available == EMPT_NODE)
+ assert(avail[0].until == None)
+
+
+ avail = aw.get_availability_at_node(T1345, 1)
+ assert(len(avail)==1)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == None)
+
+ avail = aw.get_availability_at_node(T1345, 2)
+ assert(len(avail)==2)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == T1350)
+ assert(avail[1].available == HALF_NODE)
+ assert(avail[1].until == None)
+
+ avail = aw.get_availability_at_node(T1345, 3)
+ assert(len(avail)==2)
+ assert(avail[0].available == FULL_NODE)
+ assert(avail[0].until == T1350)
+ assert(avail[1].available == HALF_NODE)
+ assert(avail[1].until == None)
+
+ avail = aw.get_availability_at_node(T1345, 4)
+ assert(len(avail)==1)
+ assert(avail[0].available == EMPT_NODE)
+ assert(avail[0].until == None)
More information about the Haizea-commit
mailing list