[haizea-commit] r597 - in branches/TP2.0: src/haizea/core src/haizea/core/scheduler tests
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Fri Jul 3 06:19:46 CDT 2009
Author: borja
Date: 2009-07-03 06:19:38 -0500 (Fri, 03 Jul 2009)
New Revision: 597
Modified:
branches/TP2.0/src/haizea/core/leases.py
branches/TP2.0/src/haizea/core/scheduler/policy.py
branches/TP2.0/src/haizea/core/scheduler/slottable.py
branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
branches/TP2.0/tests/test_slottable.py
Log:
Best-effort leases working in TP2.0 (without preemption). Required some bug fixes in AvailabilityWindow.
Modified: branches/TP2.0/src/haizea/core/leases.py
===================================================================
--- branches/TP2.0/src/haizea/core/leases.py 2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/leases.py 2009-07-03 11:19:38 UTC (rev 597)
@@ -131,13 +131,18 @@
self.state.change_state(state)
def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "__________________________________________________")
self.logger.log(loglevel, "Lease ID : %i" % self.id)
+ self.logger.log(loglevel, "Type : %s" % Lease.type_str[self.get_type()])
self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
+ self.logger.log(loglevel, "Start : %s" % self.start)
self.logger.log(loglevel, "Duration : %s" % self.duration)
self.logger.log(loglevel, "State : %s" % Lease.state_str[self.get_state()])
self.logger.log(loglevel, "Resource req : %s" % self.requested_resources)
self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
self.logger.log(loglevel, "Mem image map : %s" % pretty_nodemap(self.memimagemap))
+ self.print_rrs(loglevel)
+ self.logger.log(loglevel, "--------------------------------------------------")
def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
if len(self.preparation_rrs) > 0:
Modified: branches/TP2.0/src/haizea/core/scheduler/policy.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/policy.py 2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/policy.py 2009-07-03 11:19:38 UTC (rev 597)
@@ -17,6 +17,7 @@
# -------------------------------------------------------------------------- #
from haizea.common.utils import abstract
+from haizea.core.leases import Lease
import operator
class Policy(object):
Modified: branches/TP2.0/src/haizea/core/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/slottable.py 2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/slottable.py 2009-07-03 11:19:38 UTC (rev 597)
@@ -259,15 +259,26 @@
res = [x.value for x in self.reservations_by_start[startpos:endpos]]
return res
- # on or after
def get_reservations_starting_after(self, start):
startitem = KeyValueWrapper(start, None)
- startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+ startpos = bisect.bisect_right(self.reservations_by_start, startitem)
res = [x.value for x in self.reservations_by_start[startpos:]]
return res
def get_reservations_ending_after(self, end):
startitem = KeyValueWrapper(end, None)
+ startpos = bisect.bisect_right(self.reservations_by_end, startitem)
+ res = [x.value for x in self.reservations_by_end[startpos:]]
+ return res
+
+ def get_reservations_starting_on_or_after(self, start):
+ startitem = KeyValueWrapper(start, None)
+ startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+ res = [x.value for x in self.reservations_by_start[startpos:]]
+ return res
+
+ def get_reservations_ending_on_or_after(self, end):
+ startitem = KeyValueWrapper(end, None)
startpos = bisect.bisect_left(self.reservations_by_end, startitem)
res = [x.value for x in self.reservations_by_end[startpos:]]
return res
@@ -290,6 +301,11 @@
bystart = set(self.get_reservations_starting_after(time))
byend = set(self.get_reservations_ending_after(time))
return list(bystart | byend)
+
+ def get_reservations_on_or_after(self, time):
+ bystart = set(self.get_reservations_starting_on_or_after(time))
+ byend = set(self.get_reservations_ending_on_or_after(time))
+ return list(bystart | byend)
def get_changepoints_after(self, after, until=None, nodes=None):
changepoints = set()
@@ -604,10 +620,10 @@
self.leases = set()
self.cp_list = [self.time] + self.slottable.get_changepoints_after(time, nodes=onlynodes)
-
+
# Create initial changepoint hash table
self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
-
+
for cp in self.changepoints.values():
for node_id, node in enumerate(self.slottable.nodes):
cp.add_node(node_id + 1, node.capacity)
@@ -621,7 +637,7 @@
pos += 1
lease = rr.lease
-
+
self.leases.add(lease)
if rr.start >= self.time:
Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-03 11:19:38 UTC (rev 597)
@@ -81,11 +81,11 @@
def schedule(self, lease, nexttime, earliest):
if lease.get_type() == Lease.BEST_EFFORT:
- return self.__schedule_asap(lease, nexttime, earliest)
+ return self.__schedule_asap(lease, nexttime, earliest, allow_reservation_in_future = True)
elif lease.get_type() == Lease.ADVANCE_RESERVATION:
return self.__schedule_exact(lease, nexttime, earliest)
elif lease.get_type() == Lease.IMMEDIATE:
- return self.__schedule_asap(lease, nexttime, earliest)
+ return self.__schedule_asap(lease, nexttime, earliest, allow_reservation_in_future = False)
def __schedule_exact(self, lease, nexttime, earliest):
start = lease.start.requested
@@ -117,19 +117,11 @@
return vmrr, preemptions
- def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = None):
+ def __schedule_asap(self, lease, nexttime, earliest, allow_reservation_in_future = None):
lease_id = lease.id
remaining_duration = lease.duration.get_remaining_duration()
- numnodes = lease.numnodes
- requested_resources = lease.requested_resources
- preemptible = lease.preemptible
mustresume = (lease.get_state() == Lease.STATE_SUSPENDED_QUEUED)
shutdown_time = self.__estimate_shutdown_time(lease)
- susptype = get_config().get("suspension")
- if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
- suspendable = False
- else:
- suspendable = True
if allow_reservation_in_future == None:
allow_reservation_in_future = self.can_reserve_besteffort_in_future()
@@ -197,6 +189,7 @@
+
#
# STEP 3: SLOT FITTING
#
@@ -209,85 +202,48 @@
duration += shutdown_time
+ reservation = False
+
# First, assuming we can't make reservations in the future
- start, end, canfit = self.__find_fit_at_points(
- changepoints,
- numnodes,
- requested_resources,
- duration,
- suspendable,
- min_duration)
+ start, end, mapping, preemptions = self.__find_fit_at_points(lease,
+ changepoints,
+ duration,
+ min_duration)
- if start == None:
- if not allow_reservation_in_future:
+ if start == None and not allow_reservation_in_future:
# We did not find a suitable starting time. This can happen
# if we're unable to make future reservations
raise NotSchedulableException, "Could not find enough resources for this request"
- else:
- mustsuspend = (end - start) < duration
- if mustsuspend and not suspendable:
- if not allow_reservation_in_future:
- raise NotSchedulableException, "Scheduling this lease would require preempting it, which is not allowed"
- else:
- start = None # No satisfactory start time
-
+
# If we haven't been able to fit the lease, check if we can
# reserve it in the future
if start == None and allow_reservation_in_future:
- start, end, canfit = self.__find_fit_at_points(
- futurecp,
- numnodes,
- requested_resources,
- duration,
- suspendable,
- min_duration
- )
+ start, end, mapping, preemptions = self.__find_fit_at_points(lease,
+ futurecp,
+ duration,
+ min_duration
+ )
+ # TODO: The following will also raise an exception if a lease
+ # makes a request that could *never* be satisfied with the
+ # current resources.
+ if start == None:
+ raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
-
- if start in [p[0] for p in futurecp]:
reservation = True
- else:
- reservation = False
-
#
- # STEP 4: FINAL SLOT FITTING
+ # STEP 4: CREATE RESERVATIONS
#
- # At this point, we know the lease fits, but we have to map it to
- # specific physical nodes.
-
- # Sort physical nodes
- physnodes = canfit.keys()
- if mustresume:
- # If we're resuming, we prefer resuming in the nodes we're already
- # deployed in, to minimize the number of transfers.
- vmrr = lease.get_last_vmrr()
- nodes = set(vmrr.nodes.values())
- availnodes = set(physnodes)
- deplnodes = availnodes.intersection(nodes)
- notdeplnodes = availnodes.difference(nodes)
- physnodes = list(deplnodes) + list(notdeplnodes)
- else:
- physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
- # Map to physical nodes
- mappings = {}
res = {}
- vmnode = 1
- while vmnode <= numnodes:
- for n in physnodes:
- if canfit[n]>0:
- canfit[n] -= 1
- mappings[vmnode] = n
- if res.has_key(n):
- res[n].incr(requested_resources)
- else:
- res[n] = ResourceTuple.copy(requested_resources)
- vmnode += 1
- break
+
+ for (vnode,pnode) in mapping.items():
+ vnode_res = lease.requested_resources[vnode]
+ if res.has_key(pnode):
+ res[pnode].incr(vnode_res)
+ else:
+ res[pnode] = ResourceTuple.copy(vnode_res)
-
- vmrr = VMResourceReservation(lease, start, end, mappings, res, reservation)
+ vmrr = VMResourceReservation(lease, start, end, mapping, res, reservation)
vmrr.state = ResourceReservation.STATE_SCHEDULED
if mustresume:
@@ -305,15 +261,14 @@
if reservation:
self.numbesteffortres += 1
-
susp_str = res_str = ""
if mustresume:
res_str = " (resuming)"
if mustsuspend:
susp_str = " (suspending)"
- self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+ self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
- return vmrr, []
+ return vmrr, preemptions
def estimate_migration_time(self, lease):
return self.__estimate_migration_time(lease)
@@ -492,45 +447,37 @@
return self.maxres > 0
- def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
- start = None
- end = None
- canfit = None
- availabilitywindow = self.slottable.availabilitywindow
-
-
- for p in changepoints:
- availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
- availabilitywindow.printContents()
+ def __find_fit_at_points(self, lease, changepoints, duration, min_duration):
+ found = False
+ for time, onlynodes in changepoints:
+ start = time
+ end = start + duration
+ self.logger.debug("Attempting to map from %s to %s" % (start, end))
+ mapping, actualend, preemptions = self.mapper.map(lease,
+ lease.requested_resources,
+ start,
+ end,
+ strictend = False,
+ onlynodes = onlynodes)
- if availabilitywindow.fitAtStart() >= numnodes:
- start=p[0]
- maxend = start + duration
- end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-
- self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-
- if end < maxend:
- self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-
- if not suspendable:
- pass
- # If we can't suspend, this fit is no good, and we have to keep looking
+ if mapping != None:
+ if actualend < end:
+ actualduration = actualend - start
+ if actualduration >= min_duration:
+ self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
+ found = True
+ break
else:
- # If we can suspend, we still have to check if the lease will
- # be able to run for the specified minimum duration
- if end-start > min_duration:
- break # We found a fit; stop looking
- else:
- self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
- # Set start back to None, to indicate that we haven't
- # found a satisfactory start time
- start = None
+ self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
else:
- # We've found a satisfactory starting time
- break
-
- return start, end, canfit
+ self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
+ found = True
+ break
+
+ if found:
+ return start, actualend, mapping, preemptions
+ else:
+ return None, None, None, None
def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
times = [] # (start, end, {pnode -> vnodes})
Modified: branches/TP2.0/tests/test_slottable.py
===================================================================
--- branches/TP2.0/tests/test_slottable.py 2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/tests/test_slottable.py 2009-07-03 11:19:38 UTC (rev 597)
@@ -526,3 +526,35 @@
assert(len(avail.avail_list)==1)
assert(avail.avail_list[0].available == EMPT_NODE)
assert(avail.avail_list[0].until == None)
+
+ self.slottable.awcache = None
+ aw = self.slottable.get_availability_window(T1415)
+ # 14:15
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 1,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 2,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 3,
+ leases = {}, next_cp = None)
+ avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 4,
+ leases = {}, next_cp = None)
+
+ avail = aw.get_availability_at_node(T1415, 1)
+ assert(len(avail.avail_list)==1)
+ assert(avail.avail_list[0].available == FULL_NODE)
+ assert(avail.avail_list[0].until == None)
+
+ avail = aw.get_availability_at_node(T1415, 2)
+ assert(len(avail.avail_list)==1)
+ assert(avail.avail_list[0].available == FULL_NODE)
+ assert(avail.avail_list[0].until == None)
+
+ avail = aw.get_availability_at_node(T1415, 3)
+ assert(len(avail.avail_list)==1)
+ assert(avail.avail_list[0].available == FULL_NODE)
+ assert(avail.avail_list[0].until == None)
+
+ avail = aw.get_availability_at_node(T1415, 4)
+ assert(len(avail.avail_list)==1)
+ assert(avail.avail_list[0].available == FULL_NODE)
+ assert(avail.avail_list[0].until == None)
\ No newline at end of file
More information about the Haizea-commit
mailing list