[haizea-commit] r584 - branches/TP2.0/src/haizea/resourcemanager/scheduler
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Wed Jun 17 10:29:16 CDT 2009
Author: borja
Date: 2009-06-17 10:29:09 -0500 (Wed, 17 Jun 2009)
New Revision: 584
Added:
branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py
branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py
Modified:
branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
Log:
Added greedy mapper. Misc. changes to slot table.
Added: branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py (rev 0)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py 2009-06-17 15:29:09 UTC (rev 584)
@@ -0,0 +1,171 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import abstract
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple, AvailabilityWindow
+import haizea.common.constants as constants
+import operator
+
+class Mapper(object):
+ """Base class for mappers
+
+ """
+ def __init__(self, slottable, policy):
+ self.slottable = slottable
+ self.policy = policy
+
+ def map(self, requested_resources, start, end, strictend):
+ abstract()
+
+
+class GreedyMapper(Mapper):
+ def __init__(self, slottable, policy):
+ Mapper.__init__(self, slottable, policy)
+
+ def map(self, lease, requested_resources, start, end, strictend, onlynodes = None):
+ aw = self.slottable.get_availability_window(start, onlynodes)
+
+ pnodes = self.__sort_pnodes(lease, start, aw, onlynodes)
+ vnodes = self.__sort_vnodes(requested_resources)
+ vnodes.reverse()
+ leases = aw.get_leases_until(end)
+ mapping = {}
+ print "pnodes:", pnodes
+ print "vnodes:", vnodes
+ print "leases:", [l.id for l in leases]
+ leases = self.policy.sort_leases(lease, leases)
+ preemptable_leases = leases
+ preempting = []
+
+ done = False
+ while not done:
+ vnodes_pos = 0
+ vnode = vnodes[vnodes_pos]
+ capacity = requested_resources[vnode]
+ maxend = end
+ for pnode in pnodes:
+ need_to_map = ResourceTuple.create_empty()
+ need_to_map.incr(capacity)
+ avail=aw.get_availability_at_node(start, pnode, preempted_leases = preempting)
+ pnode_done = False
+ while not pnode_done:
+ if avail.fits(need_to_map, until = maxend):
+ mapping[vnode] = pnode
+ vnodes_pos += 1
+ if vnodes_pos >= len(vnodes):
+ done = True
+ break
+ else:
+ vnode = vnodes[vnodes_pos]
+ capacity = requested_resources[vnode]
+ need_to_map.incr(capacity)
+ else:
+ if strictend:
+ pnode_done = True
+ else:
+ latest = avail.latest_fit(need_to_map)
+ if latest == None:
+ pnode_done = True
+ else:
+ maxend = latest
+
+ if done:
+ break
+
+ if len(preemptable_leases) == 0:
+ done = True
+ else:
+ preempting.append(preemptable_leases.pop())
+
+ if len(mapping) != len(requested_resources):
+ return None
+ else:
+ return mapping, maxend, preempting
+
+
+
+ def __sort_pnodes(self, lease, time, aw,onlynodes):
+ if onlynodes == None:
+ nodes = aw.get_nodes_at(time)
+ else:
+ nodes = onlynodes
+
+ # Compares node x and node y.
+ # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+ def comparenodes(node_x, node_y):
+
+ leases_x = aw.get_leases_at(node_x, time)
+ leases_y = aw.get_leases_at(node_y, time)
+ # 1st: We prefer nodes with fewer leases to preempt
+ if len(leases_x) < len(leases_y):
+ return constants.BETTER
+ elif len(leases_x) > len(leases_y):
+ return constants.WORSE
+ else:
+ # 2nd: we prefer nodes with the highest capacity
+ avail_x = aw.get_availability_at(node_x, time)
+ avail_y = aw.get_availability_at(node_y, time)
+ if avail_x > avail_y:
+ return constants.BETTER
+ elif avail_x < avail_y:
+ return constants.WORSE
+ else:
+ # 3rd: we prefer nodes where the current capacity
+ # doesn't change for the longest time.
+ duration_x = aw.get_capacity_interval(node_x, time)
+ duration_y = aw.get_capacity_interval(node_y, time)
+ if (duration_x == None and duration_y != None) or duration_x > duration_y:
+ return constants.BETTER
+ elif (duration_x != None and duration_y == None) or duration_x < duration_y:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+ # TODO
+ #elif len(node_x.leases) == 0 and len(node_y.leases) == 0:
+
+ # preemptable_in_x = [l for l in node_x.leases if policy.can_preempt(lease, l)]
+ # preemptable_in_y = [l for l in node_x.leases if policy.can_preempt(lease, l)]
+
+ # avail_x = node_x.get_avail_withpreemption(preemptable_in_x)
+ # avail_y = node_x.get_avail_withpreemption(preemptable_in_y)
+
+ # Order nodes
+ nodes.sort(comparenodes)
+ return nodes
+
+ def __sort_vnodes(self, requested_resources):
+ max_res = ResourceTuple.create_empty()
+ for res in requested_resources.values():
+ for t in ResourceTuple.get_resource_types():
+ v = res.get_by_type(t)
+ if v > max_res.get_by_type(t):
+ max_res.set_by_type(t,v)
+
+ norm_res = {}
+ for k,v in requested_resources.items():
+ norm_capacity = 0
+ for t in ResourceTuple.get_resource_types():
+ if max_res.get_by_type(t) > 0:
+ norm_capacity += v.get_by_type(t) / float(max_res.get_by_type(t))
+ norm_res[k] = norm_capacity
+
+ vnodes = norm_res.items()
+ vnodes.sort(key=operator.itemgetter(1))
+ vnodes = [k for k,v in vnodes]
+ return vnodes
+
Added: branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py (rev 0)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py 2009-06-17 15:29:09 UTC (rev 584)
@@ -0,0 +1,42 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import abstract
+import operator
+
+class Policy(object):
+ """Base class for policy module
+
+ """
+ def __init__(self, slottable):
+ self.slottable = slottable
+
+ def sort_leases(self, preemptor, preemptees):
+ leases_score = [(preemptee, self.get_lease_preemptability_score(preemptor,preemptee)) for preemptee in preemptees]
+ leases_score = [(preemptee,score) for preemptee,score in leases_score if score != -1]
+ leases_score.sort(key=operator.itemgetter(1))
+ return [preempte for preempte,score in leases_score]
+
+ def get_lease_preemptability_score(self, preemptor, preemptee):
+ abstract()
+
+ def accept_lease(self, lease):
+ abstract()
+
+# def get_host_score(self, node):
+# abstract()
Modified: branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py 2009-06-08 18:05:04 UTC (rev 583)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py 2009-06-17 15:29:09 UTC (rev 584)
@@ -59,6 +59,10 @@
cls.tuplelength = len(resourcetypes)
@classmethod
+ def get_resource_types(cls):
+ return cls.type2pos.keys()
+
+ @classmethod
def create_empty(cls):
return cls([0 for x in range(cls.tuplelength)])
@@ -105,6 +109,9 @@
def __eq__(self, res2):
return self._res == res2._res
+ def __cmp__(self, res2):
+ return cmp(self._res, res2._res)
+
class ResourceReservation(object):
"""A resource reservation
@@ -220,8 +227,7 @@
self.reservations = []
self.reservations_by_start = []
self.reservations_by_end = []
- self.availabilitycache = {}
- self.changepointcache = None
+ self.__dirty()
def add_node(self, resourcepoolnode):
self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
@@ -416,6 +422,24 @@
return time1
else:
return min(time1, time2)
+
+ def get_availability_window(self, start, onlynodes = None):
+ cache_miss = False
+ if self.awcache == None:
+ cache_miss = True
+ else:
+ if self.awcache_onlynodes != None:
+ if onlynodes == None:
+ cache_miss = True
+ elif not set(onlynodes).issubset(set(self.awcache_onlynodes)):
+ cache_miss = True
+ if start < self.awcache_time:
+ cache_miss = True
+
+ if cache_miss:
+ self.__get_aw_cache_miss(start, onlynodes)
+
+ return self.awcache
# ONLY for simulation
def get_next_premature_end(self, after):
@@ -448,35 +472,36 @@
def __get_availability_cache_miss(self, time):
- allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
- onlynodes = None
+ allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
nodes = {}
reservations = self.get_reservations_at(time)
# Find how much resources are available on each node
for r in reservations:
for node in r.resources_in_pnode:
- if onlynodes == None or (onlynodes != None and node in onlynodes):
- if not nodes.has_key(node):
- n = self.nodes[node]
- nodes[node] = Node(n.capacity, n.resourcepoolnode)
- nodes[node].capacity.decr(r.resources_in_pnode[node])
+ if not nodes.has_key(node):
+ n = self.nodes[node]
+ nodes[node] = Node(n.capacity, n.resourcepoolnode)
+ nodes[node].capacity.decr(r.resources_in_pnode[node])
# For the remaining nodes, use a reference to the original node, not a copy
- if onlynodes == None:
- missing = allnodes - set(nodes.keys())
- else:
- missing = onlynodes - set(nodes.keys())
-
+ missing = allnodes - set(nodes.keys())
for node in missing:
nodes[node] = self.nodes[node]
self.availabilitycache[time] = nodes
-
+ def __get_aw_cache_miss(self, time, onlynodes):
+ self.awcache = AvailabilityWindow(self, time, onlynodes)
+ self.awcache_time = time
+ self.awcache_onlynodes = onlynodes
+
def __dirty(self):
# You're a dirty, dirty slot table and you should be
# ashamed of having outdated caches!
self.availabilitycache = {}
+ self.awcache_time = None
+ self.awcache_onlynodes = None
+ self.awcache = None
@@ -501,11 +526,11 @@
def decr(self, capacity):
self.available.decr(capacity)
- def add_lease(self, lease_id, capacity):
- if not lease_id in self.leases:
- self.leases.add(lease_id)
- self.available_if_preempting[lease_id] = ResourceTuple.create_empty()
- self.available_if_preempting[lease_id].incr(capacity)
+ def add_lease(self, lease, capacity):
+ if not lease in self.leases:
+ self.leases.add(lease)
+ self.available_if_preempting[lease] = ResourceTuple.create_empty()
+ self.available_if_preempting[lease].incr(capacity)
def get_avail_withpreemption(self, leases):
avail = ResourceTuple.copy(available)
@@ -517,7 +542,26 @@
self.available = available
self.until = until
+class AvailabilityInNode(object):
+ def __init__(self, avail_list):
+ self.avail_list = avail_list
+
+ def fits(self, capacity, until):
+ for avail in self.avail_list:
+ if avail.until == None or avail.until >= until:
+ return capacity.fits_in(avail.available)
+ def latest_fit(self, capacity):
+ prev = None
+ for avail in self.avail_list:
+ if not capacity.fits_in(avail.available):
+ return prev
+ else:
+ prev = avail.until
+
+ def get_avail_at_end(self):
+ return self.avail_list[-1]
+
class AvailabilityWindow(object):
"""An availability window
@@ -537,7 +581,7 @@
self.onlynodes = onlynodes
self.leases = set()
- self.cp_list = self.slottable.get_changepoints_after(time, nodes=onlynodes)
+ self.cp_list = [self.time] + self.slottable.get_changepoints_after(time, nodes=onlynodes)
# Create initial changepoint hash table
self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
@@ -546,13 +590,6 @@
for node_id, node in enumerate(self.slottable.nodes):
cp.add_node(node_id + 1, node.capacity)
- # Initial time
- avail_at_start = self.slottable.get_availability(self.time, onlynodes=onlynodes)
- self.cp_list = [self.time] + self.cp_list
- self.changepoints[self.time] = ChangepointAvail()
- for node_id, node in avail_at_start.items():
- self.changepoints[self.time].add_node(node_id, node.capacity)
-
rrs = self.slottable.get_reservations_after(time)
rrs.sort(key=attrgetter("start"))
pos = 0
@@ -561,30 +598,29 @@
while rr.start >= self.time and self.cp_list[pos] != rr.start:
pos += 1
- lease_id = rr.lease.id
+ lease = rr.lease
- self.leases.add(lease_id)
+ self.leases.add(lease)
if rr.start >= self.time:
start_cp = self.changepoints[rr.start]
- start_cp.leases.add(lease_id)
- for node in rr.resources_in_pnode:
- start_cp.nodes[node].decr(rr.resources_in_pnode[node])
- start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
else:
start_cp = self.changepoints[self.time]
- start_cp.leases.add(lease_id)
- for node in rr.resources_in_pnode:
- start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+
+ start_cp.leases.add(lease)
+ for node in rr.resources_in_pnode:
+ start_cp.nodes[node].decr(rr.resources_in_pnode[node])
+ start_cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
+
pos2 = pos + 1
while self.cp_list[pos2] < rr.end:
cp = self.changepoints[self.cp_list[pos2]]
- cp.leases.add(lease_id)
+ cp.leases.add(lease)
for node in rr.resources_in_pnode:
cp.nodes[node].decr(rr.resources_in_pnode[node])
- cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+ cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
pos2 += 1
@@ -617,7 +653,7 @@
for l in preempted_leases:
if node.available_if_preempting.has_key(l):
available.incr(node.available_if_preempting[l])
-
+
if prev_avail != None and available.fits_in(prev_avail.available):
availentry = AvailEntry(available, None)
avails.append(availentry)
@@ -631,80 +667,29 @@
prev_node = node
node = node.next_nodeavail
- return avails
+ return AvailabilityInNode(avails)
def get_nodes_at(self, time):
- return self.changepoints[time].nodes.items()
-
+ return self.changepoints[time].nodes.keys()
+
+ def get_leases_at(self, node, time):
+ return self.changepoints[time].nodes[node].leases
-### Everything after this is from the old implementation
+ def get_availability_at(self, node, time):
+ return self.changepoints[time].nodes[node].available
- def fit_at_start(self, nodes = None):
- if nodes != None:
- avail = [v for (k, v) in self.avail.items() if k in nodes]
+ def get_capacity_interval(self, node, time):
+ next_cp = self.changepoints[time].nodes[node].next_cp
+ if next_cp == None:
+ return None
else:
- avail = self.avail.values()
-
- return sum([e[0].canfit for e in avail])
+ return next_cp - time
- # TODO: Also return the amount of resources that would have to be
- # preempted in each physnode
- def find_pnodes_for_vms(self, numnodes, maxend, strictend=False):
- # Returns the physical nodes that can run all VMs, and the
- # time at which the VMs must end
- canfit = dict([(n, v[0].canfit) for (n, v) in self.avail.items()])
- entries = []
- for n in self.avail.keys():
- entries += [(n, e) for e in self.avail[n][1:]]
- getTime = lambda x: x[1].time
- entries.sort(key=getTime)
- if strictend:
- end = None
- else:
- end = maxend
- for e in entries:
- physnode = e[0]
- entry = e[1]
-
- if entry.time >= maxend:
- # Can run to its maximum duration
+ def get_leases_until(self, until):
+ leases = set()
+ for cp in self.cp_list:
+ if until <= cp:
break
- else:
- diff = canfit[physnode] - entry.canfit
- totalcanfit = sum([n for n in canfit.values()]) - diff
- if totalcanfit < numnodes and not strictend:
- # Not enough resources. Must end here
- end = entry.time
- break
- else:
- # Update canfit
- canfit[physnode] = entry.canfit
+ leases.update(self.changepoints[cp].leases)
+ return list(leases)
- # Filter out nodes where we can't fit any vms
- canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
-
- return end, canfit
-
-
- def print_contents(self, nodes = None):
- if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
- if nodes == None:
- physnodes = self.avail.keys()
- else:
- physnodes = [k for k in self.avail.keys() if k in nodes]
- physnodes.sort()
-
- self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
- for n in physnodes:
- contents = "Node %i --- " % n
- for x in self.avail[n]:
- contents += "[ %s " % x.time
- contents += "{ "
- if x.avail == None :
- contents += "END "
- else:
- res = x.avail
- canfit = x.canfit
- contents += "%s" % res
- contents += "} (Fits: %i) ] " % canfit
- self.logger.vdebug(contents)
More information about the Haizea-commit
mailing list