[haizea-commit] r489 - trunk/src/haizea/resourcemanager
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Fri Sep 5 17:38:40 CDT 2008
Author: borja
Date: 2008-09-05 17:38:40 -0500 (Fri, 05 Sep 2008)
New Revision: 489
Modified:
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Log:
Factored scheduling code out of SlotTable, and placed it in Scheduler
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-09-04 17:29:03 UTC (rev 488)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-09-05 22:38:40 UTC (rev 489)
@@ -38,6 +38,7 @@
from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation
from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from operator import attrgetter, itemgetter
import logging
@@ -289,10 +290,10 @@
start = lease_req.start.requested
end = lease_req.start.requested + lease_req.duration.requested
try:
- (nodeassignment, res, preemptions) = self.slottable.fitExact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+ (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
if len(preemptions) > 0:
- leases = self.slottable.findLeasesToPreempt(preemptions, start, end)
+ leases = self.__find_preemptable_leases(preemptions, start, end)
self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
for lease in leases:
self.preempt(lease, time=start)
@@ -363,7 +364,7 @@
try:
mustresume = (req.state == constants.LEASE_STATE_SUSPENDED)
canreserve = self.canReserveBestEffort()
- (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
+ (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
# Schedule deployment
if req.state != constants.LEASE_STATE_SUSPENDED:
@@ -425,7 +426,7 @@
# Determine earliest start time in each node
earliest = self.deployment.find_earliest_starting_times(req, nexttime)
try:
- (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
+ (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
# Schedule deployment
self.deployment.schedule(req, vmrr, nexttime)
@@ -436,6 +437,551 @@
except SlotFittingException, msg:
raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+ def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
+ lease_id = leasereq.id
+ start = leasereq.start.requested
+ end = leasereq.start.requested + leasereq.duration.requested
+ diskImageID = leasereq.diskimage_id
+ numnodes = leasereq.numnodes
+ resreq = leasereq.requested_resources
+
+ availabilitywindow = self.slottable.availabilitywindow
+
+ availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
+ availabilitywindow.printContents(withpreemption = False)
+ availabilitywindow.printContents(withpreemption = True)
+
+ mustpreempt = False
+ unfeasiblewithoutpreemption = False
+
+ fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
+ if fitatstart < numnodes:
+ if not canpreempt:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ unfeasiblewithoutpreemption = True
+ feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
+ fitatend = sum([n for n in canfitnopreempt.values()])
+ if fitatend < numnodes:
+ if not canpreempt:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ unfeasiblewithoutpreemption = True
+
+ canfitpreempt = None
+ if canpreempt:
+ fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
+ if fitatstart < numnodes:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
+ fitatend = sum([n for n in canfitpreempt.values()])
+ if fitatend < numnodes:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ if unfeasiblewithoutpreemption:
+ mustpreempt = True
+ else:
+ mustpreempt = False
+
+ # At this point we know if the lease is feasible, and if
+ # will require preemption.
+ if not mustpreempt:
+ self.logger.debug("The VM reservations for this lease are feasible without preemption.")
+ else:
+ self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
+
+ # merge canfitnopreempt and canfitpreempt
+ canfit = {}
+ for node in canfitnopreempt:
+ vnodes = canfitnopreempt[node]
+ canfit[node] = [vnodes, vnodes]
+ for node in canfitpreempt:
+ vnodes = canfitpreempt[node]
+ if canfit.has_key(node):
+ canfit[node][1] = vnodes
+ else:
+ canfit[node] = [0, vnodes]
+
+ orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
+
+ self.logger.debug("Node ordering: %s" % orderednodes)
+
+ # vnode -> pnode
+ nodeassignment = {}
+
+ # pnode -> resourcetuple
+ res = {}
+
+ # physnode -> how many vnodes
+ preemptions = {}
+
+ vnode = 1
+ if avoidpreempt:
+ # First pass, without preemption
+ for physnode in orderednodes:
+ canfitinnode = canfit[physnode][0]
+ for i in range(1, canfitinnode+1):
+ nodeassignment[vnode] = physnode
+ if res.has_key(physnode):
+ res[physnode].incr(resreq)
+ else:
+ res[physnode] = ds.ResourceTuple.copy(resreq)
+ canfit[physnode][0] -= 1
+ canfit[physnode][1] -= 1
+ vnode += 1
+ if vnode > numnodes:
+ break
+ if vnode > numnodes:
+ break
+
+ # Second pass, with preemption
+ if mustpreempt or not avoidpreempt:
+ for physnode in orderednodes:
+ canfitinnode = canfit[physnode][1]
+ for i in range(1, canfitinnode+1):
+ nodeassignment[vnode] = physnode
+ if res.has_key(physnode):
+ res[physnode].incr(resreq)
+ else:
+ res[physnode] = ds.ResourceTuple.copy(resreq)
+ canfit[physnode][1] -= 1
+ vnode += 1
+ # Check if this will actually result in a preemption
+ if canfit[physnode][0] == 0:
+ if preemptions.has_key(physnode):
+ preemptions[physnode].incr(resreq)
+ else:
+ preemptions[physnode] = ds.ResourceTuple.copy(resreq)
+ else:
+ canfit[physnode][0] -= 1
+ if vnode > numnodes:
+ break
+ if vnode > numnodes:
+ break
+
+ if vnode <= numnodes:
+ raise SchedException, "Availability window indicated that request but feasible, but could not fit it"
+
+ return nodeassignment, res, preemptions
+
+
+ def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
+ def comparepreemptability(rrX, rrY):
+ if rrX.lease.submit_time > rrY.lease.submit_time:
+ return constants.BETTER
+ elif rrX.lease.submit_time < rrY.lease.submit_time:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+
+ def preemptedEnough(amountToPreempt):
+ for node in amountToPreempt:
+ if not amountToPreempt[node].is_zero_or_less():
+ return False
+ return True
+
+ # Get allocations at the specified time
+ atstart = set()
+ atmiddle = set()
+ nodes = set(mustpreempt.keys())
+
+ reservationsAtStart = self.slottable.getReservationsAt(startTime)
+ reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtMiddle = self.slottable.getReservationsStartingBetween(startTime, endTime)
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtStart.sort(comparepreemptability)
+ reservationsAtMiddle.sort(comparepreemptability)
+
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+
+ # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+ for r in reservationsAtStart:
+ # The following will really only come into play when we have
+ # multiple VMs per node
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ # Don't need to preempt if we've already preempted all
+ # the needed resources in node n
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atstart.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+ if len(reservationsAtMiddle)>0:
+ changepoints = set()
+ for r in reservationsAtMiddle:
+ changepoints.add(r.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+
+ for cp in changepoints:
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+ reservations = [r for r in reservationsAtMiddle
+ if r.start <= cp and cp < r.end]
+ for r in reservations:
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atmiddle.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+ self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+
+ leases = [r.lease for r in atstart|atmiddle]
+
+ return leases
+
+
+ def __fit_besteffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
+ lease_id = lease.id
+ remdur = lease.duration.get_remaining_duration()
+ numnodes = lease.numnodes
+ resreq = lease.requested_resources
+ preemptible = lease.preemptible
+ suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
+ migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
+
+ #
+ # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
+ #
+
+ curnodes=None
+ # If we can't migrate, we have to stay in the
+ # nodes where the lease is currently deployed
+ if mustresume and not canmigrate:
+ vmrr, susprr = lease.get_last_vmrr()
+ curnodes = set(vmrr.nodes.values())
+ suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
+
+ if mustresume and canmigrate:
+ # If we have to resume this lease, make sure that
+ # we have enough time to transfer the images.
+ migratetime = lease.estimate_migration_time(migration_bandwidth)
+ earliesttransfer = self.rm.clock.get_time() + migratetime
+
+ for n in earliest:
+ earliest[n][0] = max(earliest[n][0], earliesttransfer)
+ suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
+
+ if mustresume:
+ resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
+ # Must allocate time for resumption too
+ remdur += resumetime
+ else:
+ suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
+
+
+ #
+ # STEP 2: FIND THE CHANGEPOINTS
+ #
+
+ # Find the changepoints, and the nodes we can use at each changepoint
+ # Nodes may not be available at a changepoint because images
+ # cannot be transferred at that time.
+ if not mustresume:
+ cps = [(node, e[0]) for node, e in earliest.items()]
+ cps.sort(key=itemgetter(1))
+ curcp = None
+ changepoints = []
+ nodes = []
+ for node, time in cps:
+ nodes.append(node)
+ if time != curcp:
+ changepoints.append([time, nodes[:]])
+ curcp = time
+ else:
+ changepoints[-1][1] = nodes[:]
+ else:
+ changepoints = list(set([x[0] for x in earliest.values()]))
+ changepoints.sort()
+ changepoints = [(x, curnodes) for x in changepoints]
+
+ # If we can make reservations for best-effort leases,
+ # we also consider future changepoints
+ # (otherwise, we only allow the VMs to start "now", accounting
+ # for the fact that vm images will have to be deployed)
+ if canreserve:
+ futurecp = self.slottable.findChangePointsAfter(changepoints[-1][0])
+ futurecp = [(p,None) for p in futurecp]
+ else:
+ futurecp = []
+
+
+
+ #
+ # STEP 3: SLOT FITTING
+ #
+
+ # First, assuming we can't make reservations in the future
+ start, end, canfit, mustsuspend = self.__find_fit_at_points(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
+
+ if not canreserve:
+ if start == None:
+ # We did not find a suitable starting time. This can happen
+ # if we're unable to make future reservations
+ raise SlotFittingException, "Could not find enough resources for this request"
+ elif mustsuspend and not suspendable:
+ raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
+
+ if start != None and mustsuspend and not suspendable:
+ start = None # No satisfactory start time
+
+ # If we haven't been able to fit the lease, check if we can
+ # reserve it in the future
+ if start == None and canreserve:
+ start, end, canfit, mustsuspend = self.__find_fit_at_points(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
+
+ if mustsuspend and not suspendable:
+ raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
+
+ if start in [p[0] for p in futurecp]:
+ reservation = True
+ else:
+ reservation = False
+
+
+ #
+ # STEP 4: FINAL SLOT FITTING
+ #
+ # At this point, we know the lease fits, but we have to map it to
+ # specific physical nodes.
+
+ # Sort physical nodes
+ physnodes = canfit.keys()
+ if mustresume:
+ # If we're resuming, we prefer resuming in the nodes we're already
+ # deployed in, to minimize the number of transfers.
+ vmrr, susprr = lease.get_last_vmrr()
+ nodes = set(vmrr.nodes.values())
+ availnodes = set(physnodes)
+ deplnodes = availnodes.intersection(nodes)
+ notdeplnodes = availnodes.difference(nodes)
+ physnodes = list(deplnodes) + list(notdeplnodes)
+ else:
+ physnodes.sort() # Arbitrary, prioritize nodes, as in exact
+
+ # Adjust times in case the lease has to be suspended/resumed
+ if mustsuspend:
+ suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
+ end -= suspendtime
+
+ if mustresume:
+ start += resumetime
+
+ # Map to physical nodes
+ mappings = {}
+ res = {}
+ vmnode = 1
+ while vmnode <= numnodes:
+ for n in physnodes:
+ if canfit[n]>0:
+ canfit[n] -= 1
+ mappings[vmnode] = n
+ if res.has_key(n):
+ res[n].incr(resreq)
+ else:
+ res[n] = ds.ResourceTuple.copy(resreq)
+ vmnode += 1
+ break
+
+
+
+ #
+ # STEP 5: CREATE RESOURCE RESERVATIONS
+ #
+
+ if mustresume:
+ resmres = {}
+ for n in mappings.values():
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
+ resmres[n] = r
+ resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
+ resmrr.state = constants.RES_STATE_SCHEDULED
+ else:
+ resmrr = None
+ if mustsuspend:
+ suspres = {}
+ for n in mappings.values():
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
+ suspres[n] = r
+ susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
+ susprr.state = constants.RES_STATE_SCHEDULED
+ oncomplete = constants.ONCOMPLETE_SUSPEND
+ else:
+ susprr = None
+ oncomplete = constants.ONCOMPLETE_ENDLEASE
+
+ vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
+ vmrr.state = constants.RES_STATE_SCHEDULED
+
+ susp_str = res_str = ""
+ if mustresume:
+ res_str = " (resuming)"
+ if mustsuspend:
+ susp_str = " (suspending)"
+ self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+
+ return resmrr, vmrr, susprr, reservation
+
+ def __find_fit_at_points(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
+ start = None
+ end = None
+ canfit = None
+ mustsuspend = None
+ availabilitywindow = self.slottable.availabilitywindow
+
+
+ for p in changepoints:
+ availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
+ availabilitywindow.printContents()
+
+ if availabilitywindow.fitAtStart() >= numnodes:
+ start=p[0]
+ maxend = start + remdur
+ end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
+
+ self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
+
+ if end < maxend:
+ mustsuspend=True
+ self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
+
+ if suspendable:
+ # It the lease is suspendable...
+ if suspendthreshold != None:
+ if end-start > suspendthreshold:
+ break
+ else:
+ self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
+ start = None
+ else:
+ pass
+ else:
+ # Keep looking
+ pass
+ else:
+ mustsuspend=False
+ # We've found a satisfactory starting time
+ break
+
+ return start, end, canfit, mustsuspend
+
+ def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
+ # TODO2: Choose appropriate prioritizing function based on a
+ # config file, instead of hardcoding it)
+ #
+ # TODO3: Basing decisions only on CPU allocations. This is ok for now,
+ # since the memory allocation is proportional to the CPU allocation.
+ # Later on we need to come up with some sort of weighed average.
+
+ nodes = canfit.keys()
+
+ # TODO: The deployment module should just provide a list of nodes
+ # it prefers
+ nodeswithimg=[]
+ #self.lease_deployment_type = self.rm.config.get("lease-preparation")
+ #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+ # reusealg = self.rm.config.get("diskimage-reuse")
+ # if reusealg==constants.REUSE_IMAGECACHES:
+ # nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
+
+ # Compares node x and node y.
+ # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+ def comparenodes(x, y):
+ hasimgX = x in nodeswithimg
+ hasimgY = y in nodeswithimg
+
+ # First comparison: A node with no preemptible VMs is preferible
+ # to one with preemptible VMs (i.e. we want to avoid preempting)
+ canfitnopreemptionX = canfit[x][0]
+ canfitpreemptionX = canfit[x][1]
+ hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
+
+ canfitnopreemptionY = canfit[y][0]
+ canfitpreemptionY = canfit[y][1]
+ hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
+
+ # TODO: Factor out common code
+ if avoidpreempt:
+ if hasPreemptibleX and not hasPreemptibleY:
+ return constants.WORSE
+ elif not hasPreemptibleX and hasPreemptibleY:
+ return constants.BETTER
+ elif not hasPreemptibleX and not hasPreemptibleY:
+ if hasimgX and not hasimgY:
+ return constants.BETTER
+ elif not hasimgX and hasimgY:
+ return constants.WORSE
+ else:
+ if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
+ elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
+ else: return constants.EQUAL
+ elif hasPreemptibleX and hasPreemptibleY:
+ # If both have (some) preemptible resources, we prefer those
+ # that involve the less preemptions
+ preemptX = canfitpreemptionX - canfitnopreemptionX
+ preemptY = canfitpreemptionY - canfitnopreemptionY
+ if preemptX < preemptY:
+ return constants.BETTER
+ elif preemptX > preemptY:
+ return constants.WORSE
+ else:
+ if hasimgX and not hasimgY: return constants.BETTER
+ elif not hasimgX and hasimgY: return constants.WORSE
+ else: return constants.EQUAL
+ elif not avoidpreempt:
+ # First criteria: Can we reuse image?
+ if hasimgX and not hasimgY:
+ return constants.BETTER
+ elif not hasimgX and hasimgY:
+ return constants.WORSE
+ else:
+ # Now we just want to avoid preemption
+ if hasPreemptibleX and not hasPreemptibleY:
+ return constants.WORSE
+ elif not hasPreemptibleX and hasPreemptibleY:
+ return constants.BETTER
+ elif hasPreemptibleX and hasPreemptibleY:
+ # If both have (some) preemptible resources, we prefer those
+ # that involve the less preemptions
+ preemptX = canfitpreemptionX - canfitnopreemptionX
+ preemptY = canfitpreemptionY - canfitnopreemptionY
+ if preemptX < preemptY:
+ return constants.BETTER
+ elif preemptX > preemptY:
+ return constants.WORSE
+ else:
+ if hasimgX and not hasimgY: return constants.BETTER
+ elif not hasimgX and hasimgY: return constants.WORSE
+ else: return constants.EQUAL
+ else:
+ return constants.EQUAL
+
+ # Order nodes
+ nodes.sort(comparenodes)
+ return nodes
+
def preempt(self, req, time):
self.logger.info("Preempting lease #%i..." % (req.id))
self.logger.vdebug("Lease before preemption:")
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-09-04 17:29:03 UTC (rev 488)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-09-05 22:38:40 UTC (rev 489)
@@ -268,452 +268,7 @@
self.changepointcache.pop()
return p
- def fitExact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
- lease_id = leasereq.id
- start = leasereq.start.requested
- end = leasereq.start.requested + leasereq.duration.requested
- diskImageID = leasereq.diskimage_id
- numnodes = leasereq.numnodes
- resreq = leasereq.requested_resources
- self.availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
- self.availabilitywindow.printContents(withpreemption = False)
- self.availabilitywindow.printContents(withpreemption = True)
-
- mustpreempt = False
- unfeasiblewithoutpreemption = False
-
- fitatstart = self.availabilitywindow.fitAtStart(canpreempt = False)
- if fitatstart < numnodes:
- if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- unfeasiblewithoutpreemption = True
- feasibleend, canfitnopreempt = self.availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
- fitatend = sum([n for n in canfitnopreempt.values()])
- if fitatend < numnodes:
- if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- unfeasiblewithoutpreemption = True
-
- canfitpreempt = None
- if canpreempt:
- fitatstart = self.availabilitywindow.fitAtStart(canpreempt = True)
- if fitatstart < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
- feasibleendpreempt, canfitpreempt = self.availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
- fitatend = sum([n for n in canfitpreempt.values()])
- if fitatend < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- if unfeasiblewithoutpreemption:
- mustpreempt = True
- else:
- mustpreempt = False
-
- # At this point we know if the lease is feasible, and if
- # will require preemption.
- if not mustpreempt:
- self.logger.debug("The VM reservations for this lease are feasible without preemption.")
- else:
- self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
-
- # merge canfitnopreempt and canfitpreempt
- canfit = {}
- for node in canfitnopreempt:
- vnodes = canfitnopreempt[node]
- canfit[node] = [vnodes, vnodes]
- for node in canfitpreempt:
- vnodes = canfitpreempt[node]
- if canfit.has_key(node):
- canfit[node][1] = vnodes
- else:
- canfit[node] = [0, vnodes]
-
- orderednodes = self.prioritizenodes(canfit, diskImageID, start, canpreempt, avoidpreempt)
-
- self.logger.debug("Node ordering: %s" % orderednodes)
-
- # vnode -> pnode
- nodeassignment = {}
-
- # pnode -> resourcetuple
- res = {}
-
- # physnode -> how many vnodes
- preemptions = {}
-
- vnode = 1
- if avoidpreempt:
- # First pass, without preemption
- for physnode in orderednodes:
- canfitinnode = canfit[physnode][0]
- for i in range(1, canfitinnode+1):
- nodeassignment[vnode] = physnode
- if res.has_key(physnode):
- res[physnode].incr(resreq)
- else:
- res[physnode] = ds.ResourceTuple.copy(resreq)
- canfit[physnode][0] -= 1
- canfit[physnode][1] -= 1
- vnode += 1
- if vnode > numnodes:
- break
- if vnode > numnodes:
- break
-
- # Second pass, with preemption
- if mustpreempt or not avoidpreempt:
- for physnode in orderednodes:
- canfitinnode = canfit[physnode][1]
- for i in range(1, canfitinnode+1):
- nodeassignment[vnode] = physnode
- if res.has_key(physnode):
- res[physnode].incr(resreq)
- else:
- res[physnode] = ds.ResourceTuple.copy(resreq)
- canfit[physnode][1] -= 1
- vnode += 1
- # Check if this will actually result in a preemption
- if canfit[physnode][0] == 0:
- if preemptions.has_key(physnode):
- preemptions[physnode].incr(resreq)
- else:
- preemptions[physnode] = ds.ResourceTuple.copy(resreq)
- else:
- canfit[physnode][0] -= 1
- if vnode > numnodes:
- break
- if vnode > numnodes:
- break
-
- if vnode <= numnodes:
- raise CriticalSlotFittingException, "Availability window indicated that request but feasible, but could not fit it"
-
- return nodeassignment, res, preemptions
-
-
- def findLeasesToPreempt(self, mustpreempt, startTime, endTime):
- def comparepreemptability(rrX, rrY):
- if rrX.lease.submit_time > rrY.lease.submit_time:
- return constants.BETTER
- elif rrX.lease.submit_time < rrY.lease.submit_time:
- return constants.WORSE
- else:
- return constants.EQUAL
-
- def preemptedEnough(amountToPreempt):
- for node in amountToPreempt:
- if not amountToPreempt[node].is_zero_or_less():
- return False
- return True
-
- # Get allocations at the specified time
- atstart = set()
- atmiddle = set()
- nodes = set(mustpreempt.keys())
-
- reservationsAtStart = self.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtMiddle = self.getReservationsStartingBetween(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtStart.sort(comparepreemptability)
- reservationsAtMiddle.sort(comparepreemptability)
-
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-
- # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
- for r in reservationsAtStart:
- # The following will really only come into play when we have
- # multiple VMs per node
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- # Don't need to preempt if we've already preempted all
- # the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atstart.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
- if len(reservationsAtMiddle)>0:
- changepoints = set()
- for r in reservationsAtMiddle:
- changepoints.add(r.start)
- changepoints = list(changepoints)
- changepoints.sort()
-
- for cp in changepoints:
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
- reservations = [r for r in reservationsAtMiddle
- if r.start <= cp and cp < r.end]
- for r in reservations:
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atmiddle.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
- self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-
- leases = [r.lease for r in atstart|atmiddle]
-
- return leases
-
-
- def fitBestEffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
- lease_id = lease.id
- remdur = lease.duration.get_remaining_duration()
- numnodes = lease.numnodes
- resreq = lease.requested_resources
- preemptible = lease.preemptible
- suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
- migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
-
- #
- # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
- #
-
- curnodes=None
- # If we can't migrate, we have to stay in the
- # nodes where the lease is currently deployed
- if mustresume and not canmigrate:
- vmrr, susprr = lease.get_last_vmrr()
- curnodes = set(vmrr.nodes.values())
- suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
-
- if mustresume and canmigrate:
- # If we have to resume this lease, make sure that
- # we have enough time to transfer the images.
- migratetime = lease.estimate_migration_time(migration_bandwidth)
- earliesttransfer = self.rm.clock.get_time() + migratetime
-
- for n in earliest:
- earliest[n][0] = max(earliest[n][0], earliesttransfer)
- suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
-
- if mustresume:
- resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
- # Must allocate time for resumption too
- remdur += resumetime
- else:
- suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
-
-
- #
- # STEP 2: FIND THE CHANGEPOINTS
- #
-
- # Find the changepoints, and the nodes we can use at each changepoint
- # Nodes may not be available at a changepoint because images
- # cannot be transferred at that time.
- if not mustresume:
- cps = [(node, e[0]) for node, e in earliest.items()]
- cps.sort(key=itemgetter(1))
- curcp = None
- changepoints = []
- nodes = []
- for node, time in cps:
- nodes.append(node)
- if time != curcp:
- changepoints.append([time, nodes[:]])
- curcp = time
- else:
- changepoints[-1][1] = nodes[:]
- else:
- changepoints = list(set([x[0] for x in earliest.values()]))
- changepoints.sort()
- changepoints = [(x, curnodes) for x in changepoints]
-
- # If we can make reservations for best-effort leases,
- # we also consider future changepoints
- # (otherwise, we only allow the VMs to start "now", accounting
- # for the fact that vm images will have to be deployed)
- if canreserve:
- futurecp = self.findChangePointsAfter(changepoints[-1][0])
- futurecp = [(p,None) for p in futurecp]
- else:
- futurecp = []
-
-
-
- #
- # STEP 3: SLOT FITTING
- #
-
- # First, assuming we can't make reservations in the future
- start, end, canfit, mustsuspend = self.fitBestEffortInChangepoints(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
- if not canreserve:
- if start == None:
- # We did not find a suitable starting time. This can happen
- # if we're unable to make future reservations
- raise SlotFittingException, "Could not find enough resources for this request"
- elif mustsuspend and not suspendable:
- raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
- if start != None and mustsuspend and not suspendable:
- start = None # No satisfactory start time
-
- # If we haven't been able to fit the lease, check if we can
- # reserve it in the future
- if start == None and canreserve:
- start, end, canfit, mustsuspend = self.fitBestEffortInChangepoints(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
- if mustsuspend and not suspendable:
- raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
- if start in [p[0] for p in futurecp]:
- reservation = True
- else:
- reservation = False
-
-
- #
- # STEP 4: FINAL SLOT FITTING
- #
- # At this point, we know the lease fits, but we have to map it to
- # specific physical nodes.
-
- # Sort physical nodes
- physnodes = canfit.keys()
- if mustresume:
- # If we're resuming, we prefer resuming in the nodes we're already
- # deployed in, to minimize the number of transfers.
- vmrr, susprr = lease.get_last_vmrr()
- nodes = set(vmrr.nodes.values())
- availnodes = set(physnodes)
- deplnodes = availnodes.intersection(nodes)
- notdeplnodes = availnodes.difference(nodes)
- physnodes = list(deplnodes) + list(notdeplnodes)
- else:
- physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
- # Adjust times in case the lease has to be suspended/resumed
- if mustsuspend:
- suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
- end -= suspendtime
-
- if mustresume:
- start += resumetime
-
- # Map to physical nodes
- mappings = {}
- res = {}
- vmnode = 1
- while vmnode <= numnodes:
- for n in physnodes:
- if canfit[n]>0:
- canfit[n] -= 1
- mappings[vmnode] = n
- if res.has_key(n):
- res[n].incr(resreq)
- else:
- res[n] = ds.ResourceTuple.copy(resreq)
- vmnode += 1
- break
-
-
-
- #
- # STEP 5: CREATE RESOURCE RESERVATIONS
- #
-
- if mustresume:
- resmres = {}
- for n in mappings.values():
- r = ds.ResourceTuple.create_empty()
- r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
- r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
- resmres[n] = r
- resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
- resmrr.state = constants.RES_STATE_SCHEDULED
- else:
- resmrr = None
- if mustsuspend:
- suspres = {}
- for n in mappings.values():
- r = ds.ResourceTuple.create_empty()
- r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
- r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
- suspres[n] = r
- susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
- susprr.state = constants.RES_STATE_SCHEDULED
- oncomplete = constants.ONCOMPLETE_SUSPEND
- else:
- susprr = None
- oncomplete = constants.ONCOMPLETE_ENDLEASE
-
- vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
- vmrr.state = constants.RES_STATE_SCHEDULED
-
- susp_str = res_str = ""
- if mustresume:
- res_str = " (resuming)"
- if mustsuspend:
- susp_str = " (suspending)"
- self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
-
- return resmrr, vmrr, susprr, reservation
-
- def fitBestEffortInChangepoints(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
- start = None
- end = None
- canfit = None
- mustsuspend = None
-
- for p in changepoints:
- self.availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
- self.availabilitywindow.printContents()
-
- if self.availabilitywindow.fitAtStart() >= numnodes:
- start=p[0]
- maxend = start + remdur
- end, canfit = self.availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-
- self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-
- if end < maxend:
- mustsuspend=True
- self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-
- if suspendable:
- # It the lease is suspendable...
- if suspendthreshold != None:
- if end-start > suspendthreshold:
- break
- else:
- self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
- start = None
- else:
- pass
- else:
- # Keep looking
- pass
- else:
- mustsuspend=False
- # We've found a satisfactory starting time
- break
-
- return start, end, canfit, mustsuspend
-
def suspend(self, lease, time):
suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
@@ -798,101 +353,8 @@
lease.print_contents()
- def prioritizenodes(self, canfit, diskImageID, start, canpreempt, avoidpreempt):
- # TODO2: Choose appropriate prioritizing function based on a
- # config file, instead of hardcoding it)
- #
- # TODO3: Basing decisions only on CPU allocations. This is ok for now,
- # since the memory allocation is proportional to the CPU allocation.
- # Later on we need to come up with some sort of weighed average.
-
- nodes = canfit.keys()
-
- # TODO: The deployment module should just provide a list of nodes
- # it prefers
- nodeswithimg=[]
- #self.lease_deployment_type = self.rm.config.get("lease-preparation")
- #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- # reusealg = self.rm.config.get("diskimage-reuse")
- # if reusealg==constants.REUSE_IMAGECACHES:
- # nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
- # Compares node x and node y.
- # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
- def comparenodes(x, y):
- hasimgX = x in nodeswithimg
- hasimgY = y in nodeswithimg
-
- # First comparison: A node with no preemptible VMs is preferible
- # to one with preemptible VMs (i.e. we want to avoid preempting)
- canfitnopreemptionX = canfit[x][0]
- canfitpreemptionX = canfit[x][1]
- hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
-
- canfitnopreemptionY = canfit[y][0]
- canfitpreemptionY = canfit[y][1]
- hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
-
- # TODO: Factor out common code
- if avoidpreempt:
- if hasPreemptibleX and not hasPreemptibleY:
- return constants.WORSE
- elif not hasPreemptibleX and hasPreemptibleY:
- return constants.BETTER
- elif not hasPreemptibleX and not hasPreemptibleY:
- if hasimgX and not hasimgY:
- return constants.BETTER
- elif not hasimgX and hasimgY:
- return constants.WORSE
- else:
- if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
- elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
- else: return constants.EQUAL
- elif hasPreemptibleX and hasPreemptibleY:
- # If both have (some) preemptible resources, we prefer those
- # that involve the less preemptions
- preemptX = canfitpreemptionX - canfitnopreemptionX
- preemptY = canfitpreemptionY - canfitnopreemptionY
- if preemptX < preemptY:
- return constants.BETTER
- elif preemptX > preemptY:
- return constants.WORSE
- else:
- if hasimgX and not hasimgY: return constants.BETTER
- elif not hasimgX and hasimgY: return constants.WORSE
- else: return constants.EQUAL
- elif not avoidpreempt:
- # First criteria: Can we reuse image?
- if hasimgX and not hasimgY:
- return constants.BETTER
- elif not hasimgX and hasimgY:
- return constants.WORSE
- else:
- # Now we just want to avoid preemption
- if hasPreemptibleX and not hasPreemptibleY:
- return constants.WORSE
- elif not hasPreemptibleX and hasPreemptibleY:
- return constants.BETTER
- elif hasPreemptibleX and hasPreemptibleY:
- # If both have (some) preemptible resources, we prefer those
- # that involve the less preemptions
- preemptX = canfitpreemptionX - canfitnopreemptionX
- preemptY = canfitpreemptionY - canfitnopreemptionY
- if preemptX < preemptY:
- return constants.BETTER
- elif preemptX > preemptY:
- return constants.WORSE
- else:
- if hasimgX and not hasimgY: return constants.BETTER
- elif not hasimgX and hasimgY: return constants.WORSE
- else: return constants.EQUAL
- else:
- return constants.EQUAL
- # Order nodes
- nodes.sort(comparenodes)
- return nodes
-
def isFull(self, time):
nodes = self.getAvailability(time)
avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
More information about the Haizea-commit
mailing list