[haizea-commit] r489 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Fri Sep 5 17:38:40 CDT 2008


Author: borja
Date: 2008-09-05 17:38:40 -0500 (Fri, 05 Sep 2008)
New Revision: 489

Modified:
   trunk/src/haizea/resourcemanager/scheduler.py
   trunk/src/haizea/resourcemanager/slottable.py
Log:
Factored scheduling code out of SlotTable, and placed it in Scheduler

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-09-04 17:29:03 UTC (rev 488)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-09-05 22:38:40 UTC (rev 489)
@@ -38,6 +38,7 @@
 from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
 from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation 
 from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from operator import attrgetter, itemgetter
 
 import logging
 
@@ -289,10 +290,10 @@
         start = lease_req.start.requested
         end = lease_req.start.requested + lease_req.duration.requested
         try:
-            (nodeassignment, res, preemptions) = self.slottable.fitExact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+            (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
             
             if len(preemptions) > 0:
-                leases = self.slottable.findLeasesToPreempt(preemptions, start, end)
+                leases = self.__find_preemptable_leases(preemptions, start, end)
                 self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
                 for lease in leases:
                     self.preempt(lease, time=start)
@@ -363,7 +364,7 @@
         try:
             mustresume = (req.state == constants.LEASE_STATE_SUSPENDED)
             canreserve = self.canReserveBestEffort()
-            (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
+            (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
             
             # Schedule deployment
             if req.state != constants.LEASE_STATE_SUSPENDED:
@@ -425,7 +426,7 @@
         # Determine earliest start time in each node
         earliest = self.deployment.find_earliest_starting_times(req, nexttime)
         try:
-            (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
+            (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
             # Schedule deployment
             self.deployment.schedule(req, vmrr, nexttime)
                         
@@ -436,6 +437,551 @@
         except SlotFittingException, msg:
             raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
         
+    def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
+        lease_id = leasereq.id
+        start = leasereq.start.requested
+        end = leasereq.start.requested + leasereq.duration.requested
+        diskImageID = leasereq.diskimage_id
+        numnodes = leasereq.numnodes
+        resreq = leasereq.requested_resources
+
+        availabilitywindow = self.slottable.availabilitywindow
+
+        availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
+        availabilitywindow.printContents(withpreemption = False)
+        availabilitywindow.printContents(withpreemption = True)
+
+        mustpreempt = False
+        unfeasiblewithoutpreemption = False
+        
+        fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
+        if fitatstart < numnodes:
+            if not canpreempt:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                unfeasiblewithoutpreemption = True
+        feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
+        fitatend = sum([n for n in canfitnopreempt.values()])
+        if fitatend < numnodes:
+            if not canpreempt:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                unfeasiblewithoutpreemption = True
+
+        canfitpreempt = None
+        if canpreempt:
+            fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
+            if fitatstart < numnodes:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
+            fitatend = sum([n for n in canfitpreempt.values()])
+            if fitatend < numnodes:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                if unfeasiblewithoutpreemption:
+                    mustpreempt = True
+                else:
+                    mustpreempt = False
+
+        # At this point we know if the lease is feasible, and if
+        # will require preemption.
+        if not mustpreempt:
+           self.logger.debug("The VM reservations for this lease are feasible without preemption.")
+        else:
+           self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
+
+        # merge canfitnopreempt and canfitpreempt
+        canfit = {}
+        for node in canfitnopreempt:
+            vnodes = canfitnopreempt[node]
+            canfit[node] = [vnodes, vnodes]
+        for node in canfitpreempt:
+            vnodes = canfitpreempt[node]
+            if canfit.has_key(node):
+                canfit[node][1] = vnodes
+            else:
+                canfit[node] = [0, vnodes]
+
+        orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
+            
+        self.logger.debug("Node ordering: %s" % orderednodes)
+        
+        # vnode -> pnode
+        nodeassignment = {}
+        
+        # pnode -> resourcetuple
+        res = {}
+        
+        # physnode -> how many vnodes
+        preemptions = {}
+        
+        vnode = 1
+        if avoidpreempt:
+            # First pass, without preemption
+            for physnode in orderednodes:
+                canfitinnode = canfit[physnode][0]
+                for i in range(1, canfitinnode+1):
+                    nodeassignment[vnode] = physnode
+                    if res.has_key(physnode):
+                        res[physnode].incr(resreq)
+                    else:
+                        res[physnode] = ds.ResourceTuple.copy(resreq)
+                    canfit[physnode][0] -= 1
+                    canfit[physnode][1] -= 1
+                    vnode += 1
+                    if vnode > numnodes:
+                        break
+                if vnode > numnodes:
+                    break
+            
+        # Second pass, with preemption
+        if mustpreempt or not avoidpreempt:
+            for physnode in orderednodes:
+                canfitinnode = canfit[physnode][1]
+                for i in range(1, canfitinnode+1):
+                    nodeassignment[vnode] = physnode
+                    if res.has_key(physnode):
+                        res[physnode].incr(resreq)
+                    else:
+                        res[physnode] = ds.ResourceTuple.copy(resreq)
+                    canfit[physnode][1] -= 1
+                    vnode += 1
+                    # Check if this will actually result in a preemption
+                    if canfit[physnode][0] == 0:
+                        if preemptions.has_key(physnode):
+                            preemptions[physnode].incr(resreq)
+                        else:
+                            preemptions[physnode] = ds.ResourceTuple.copy(resreq)
+                    else:
+                        canfit[physnode][0] -= 1
+                    if vnode > numnodes:
+                        break
+                if vnode > numnodes:
+                    break
+
+        if vnode <= numnodes:
+            raise SchedException, "Availability window indicated that request but feasible, but could not fit it"
+
+        return nodeassignment, res, preemptions
+
+
+    def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
+        def comparepreemptability(rrX, rrY):
+            if rrX.lease.submit_time > rrY.lease.submit_time:
+                return constants.BETTER
+            elif rrX.lease.submit_time < rrY.lease.submit_time:
+                return constants.WORSE
+            else:
+                return constants.EQUAL        
+            
+        def preemptedEnough(amountToPreempt):
+            for node in amountToPreempt:
+                if not amountToPreempt[node].is_zero_or_less():
+                    return False
+            return True
+        
+        # Get allocations at the specified time
+        atstart = set()
+        atmiddle = set()
+        nodes = set(mustpreempt.keys())
+        
+        reservationsAtStart = self.slottable.getReservationsAt(startTime)
+        reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtMiddle = self.slottable.getReservationsStartingBetween(startTime, endTime)
+        reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtStart.sort(comparepreemptability)
+        reservationsAtMiddle.sort(comparepreemptability)
+        
+        amountToPreempt = {}
+        for n in mustpreempt:
+            amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+
+        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+        for r in reservationsAtStart:
+            # The following will really only come into play when we have
+            # multiple VMs per node
+            mustpreemptres = False
+            for n in r.resources_in_pnode.keys():
+                # Don't need to preempt if we've already preempted all
+                # the needed resources in node n
+                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                    amountToPreempt[n].decr(r.resources_in_pnode[n])
+                    mustpreemptres = True
+            if mustpreemptres:
+                atstart.add(r)
+            if preemptedEnough(amountToPreempt):
+                break
+        
+        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+        if len(reservationsAtMiddle)>0:
+            changepoints = set()
+            for r in reservationsAtMiddle:
+                changepoints.add(r.start)
+            changepoints = list(changepoints)
+            changepoints.sort()        
+            
+            for cp in changepoints:
+                amountToPreempt = {}
+                for n in mustpreempt:
+                    amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+                reservations = [r for r in reservationsAtMiddle 
+                                if r.start <= cp and cp < r.end]
+                for r in reservations:
+                    mustpreemptres = False
+                    for n in r.resources_in_pnode.keys():
+                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                            amountToPreempt[n].decr(r.resources_in_pnode[n])
+                            mustpreemptres = True
+                    if mustpreemptres:
+                        atmiddle.add(r)
+                    if preemptedEnough(amountToPreempt):
+                        break
+            
+        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+        
+        leases = [r.lease for r in atstart|atmiddle]
+        
+        return leases
+
+
+    def __fit_besteffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
+        lease_id = lease.id
+        remdur = lease.duration.get_remaining_duration()
+        numnodes = lease.numnodes
+        resreq = lease.requested_resources
+        preemptible = lease.preemptible
+        suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
+        migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
+
+        #
+        # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
+        #
+        
+        curnodes=None
+        # If we can't migrate, we have to stay in the
+        # nodes where the lease is currently deployed
+        if mustresume and not canmigrate:
+            vmrr, susprr = lease.get_last_vmrr()
+            curnodes = set(vmrr.nodes.values())
+            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
+        
+        if mustresume and canmigrate:
+            # If we have to resume this lease, make sure that
+            # we have enough time to transfer the images.
+            migratetime = lease.estimate_migration_time(migration_bandwidth)
+            earliesttransfer = self.rm.clock.get_time() + migratetime
+
+            for n in earliest:
+                earliest[n][0] = max(earliest[n][0], earliesttransfer)
+            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
+                    
+        if mustresume:
+            resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
+            # Must allocate time for resumption too
+            remdur += resumetime
+        else:
+            suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
+
+
+        #
+        # STEP 2: FIND THE CHANGEPOINTS
+        #
+
+        # Find the changepoints, and the nodes we can use at each changepoint
+        # Nodes may not be available at a changepoint because images
+        # cannot be transferred at that time.
+        if not mustresume:
+            cps = [(node, e[0]) for node, e in earliest.items()]
+            cps.sort(key=itemgetter(1))
+            curcp = None
+            changepoints = []
+            nodes = []
+            for node, time in cps:
+                nodes.append(node)
+                if time != curcp:
+                    changepoints.append([time, nodes[:]])
+                    curcp = time
+                else:
+                    changepoints[-1][1] = nodes[:]
+        else:
+            changepoints = list(set([x[0] for x in earliest.values()]))
+            changepoints.sort()
+            changepoints = [(x, curnodes) for x in changepoints]
+
+        # If we can make reservations for best-effort leases,
+        # we also consider future changepoints
+        # (otherwise, we only allow the VMs to start "now", accounting
+        #  for the fact that vm images will have to be deployed)
+        if canreserve:
+            futurecp = self.slottable.findChangePointsAfter(changepoints[-1][0])
+            futurecp = [(p,None) for p in futurecp]
+        else:
+            futurecp = []
+
+
+
+        #
+        # STEP 3: SLOT FITTING
+        #
+
+        # First, assuming we can't make reservations in the future
+        start, end, canfit, mustsuspend = self.__find_fit_at_points(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
+
+        if not canreserve:
+            if start == None:
+                # We did not find a suitable starting time. This can happen
+                # if we're unable to make future reservations
+                raise SlotFittingException, "Could not find enough resources for this request"
+            elif mustsuspend and not suspendable:
+                raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
+
+        if start != None and mustsuspend and not suspendable:
+            start = None # No satisfactory start time
+            
+        # If we haven't been able to fit the lease, check if we can
+        # reserve it in the future
+        if start == None and canreserve:
+            start, end, canfit, mustsuspend = self.__find_fit_at_points(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
+
+        if mustsuspend and not suspendable:
+            raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
+
+        if start in [p[0] for p in futurecp]:
+            reservation = True
+        else:
+            reservation = False
+
+
+        #
+        # STEP 4: FINAL SLOT FITTING
+        #
+        # At this point, we know the lease fits, but we have to map it to
+        # specific physical nodes.
+        
+        # Sort physical nodes
+        physnodes = canfit.keys()
+        if mustresume:
+            # If we're resuming, we prefer resuming in the nodes we're already
+            # deployed in, to minimize the number of transfers.
+            vmrr, susprr = lease.get_last_vmrr()
+            nodes = set(vmrr.nodes.values())
+            availnodes = set(physnodes)
+            deplnodes = availnodes.intersection(nodes)
+            notdeplnodes = availnodes.difference(nodes)
+            physnodes = list(deplnodes) + list(notdeplnodes)
+        else:
+            physnodes.sort() # Arbitrary, prioritize nodes, as in exact
+
+        # Adjust times in case the lease has to be suspended/resumed
+        if mustsuspend:
+            suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
+            end -= suspendtime
+                
+        if mustresume:
+            start += resumetime
+        
+        # Map to physical nodes
+        mappings = {}
+        res = {}
+        vmnode = 1
+        while vmnode <= numnodes:
+            for n in physnodes:
+                if canfit[n]>0:
+                    canfit[n] -= 1
+                    mappings[vmnode] = n
+                    if res.has_key(n):
+                        res[n].incr(resreq)
+                    else:
+                        res[n] = ds.ResourceTuple.copy(resreq)
+                    vmnode += 1
+                    break
+
+
+
+        #
+        # STEP 5: CREATE RESOURCE RESERVATIONS
+        #
+        
+        if mustresume:
+            resmres = {}
+            for n in mappings.values():
+                r = ds.ResourceTuple.create_empty()
+                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
+                resmres[n] = r
+            resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
+            resmrr.state = constants.RES_STATE_SCHEDULED
+        else:
+            resmrr = None
+        if mustsuspend:
+            suspres = {}
+            for n in mappings.values():
+                r = ds.ResourceTuple.create_empty()
+                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
+                suspres[n] = r
+            susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
+            susprr.state = constants.RES_STATE_SCHEDULED
+            oncomplete = constants.ONCOMPLETE_SUSPEND
+        else:
+            susprr = None
+            oncomplete = constants.ONCOMPLETE_ENDLEASE
+
+        vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
+        vmrr.state = constants.RES_STATE_SCHEDULED
+        
+        susp_str = res_str = ""
+        if mustresume:
+            res_str = " (resuming)"
+        if mustsuspend:
+            susp_str = " (suspending)"
+        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+
+        return resmrr, vmrr, susprr, reservation
+
+    def __find_fit_at_points(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
+        start = None
+        end = None
+        canfit = None
+        mustsuspend = None
+        availabilitywindow = self.slottable.availabilitywindow
+
+
+        for p in changepoints:
+            availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
+            availabilitywindow.printContents()
+            
+            if availabilitywindow.fitAtStart() >= numnodes:
+                start=p[0]
+                maxend = start + remdur
+                end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
+        
+                self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
+                
+                if end < maxend:
+                    mustsuspend=True
+                    self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
+                    
+                    if suspendable:
+                        # It the lease is suspendable...
+                        if suspendthreshold != None:
+                            if end-start > suspendthreshold:
+                                break
+                            else:
+                                self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
+                                start = None
+                        else:
+                            pass
+                    else:
+                        # Keep looking
+                        pass
+                else:
+                    mustsuspend=False
+                    # We've found a satisfactory starting time
+                    break        
+                
+        return start, end, canfit, mustsuspend
+
+    def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
+        # TODO2: Choose appropriate prioritizing function based on a
+        # config file, instead of hardcoding it)
+        #
+        # TODO3: Basing decisions only on CPU allocations. This is ok for now,
+        # since the memory allocation is proportional to the CPU allocation.
+        # Later on we need to come up with some sort of weighed average.
+        
+        nodes = canfit.keys()
+        
+        # TODO: The deployment module should just provide a list of nodes
+        # it prefers
+        nodeswithimg=[]
+        #self.lease_deployment_type = self.rm.config.get("lease-preparation")
+        #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+        #    reusealg = self.rm.config.get("diskimage-reuse")
+        #    if reusealg==constants.REUSE_IMAGECACHES:
+        #        nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
+
+        # Compares node x and node y. 
+        # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+        def comparenodes(x, y):
+            hasimgX = x in nodeswithimg
+            hasimgY = y in nodeswithimg
+
+            # First comparison: A node with no preemptible VMs is preferible
+            # to one with preemptible VMs (i.e. we want to avoid preempting)
+            canfitnopreemptionX = canfit[x][0]
+            canfitpreemptionX = canfit[x][1]
+            hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
+            
+            canfitnopreemptionY = canfit[y][0]
+            canfitpreemptionY = canfit[y][1]
+            hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
+
+            # TODO: Factor out common code
+            if avoidpreempt:
+                if hasPreemptibleX and not hasPreemptibleY:
+                    return constants.WORSE
+                elif not hasPreemptibleX and hasPreemptibleY:
+                    return constants.BETTER
+                elif not hasPreemptibleX and not hasPreemptibleY:
+                    if hasimgX and not hasimgY: 
+                        return constants.BETTER
+                    elif not hasimgX and hasimgY: 
+                        return constants.WORSE
+                    else:
+                        if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
+                        elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
+                        else: return constants.EQUAL
+                elif hasPreemptibleX and hasPreemptibleY:
+                    # If both have (some) preemptible resources, we prefer those
+                    # that involve the less preemptions
+                    preemptX = canfitpreemptionX - canfitnopreemptionX
+                    preemptY = canfitpreemptionY - canfitnopreemptionY
+                    if preemptX < preemptY:
+                        return constants.BETTER
+                    elif preemptX > preemptY:
+                        return constants.WORSE
+                    else:
+                        if hasimgX and not hasimgY: return constants.BETTER
+                        elif not hasimgX and hasimgY: return constants.WORSE
+                        else: return constants.EQUAL
+            elif not avoidpreempt:
+                # First criteria: Can we reuse image?
+                if hasimgX and not hasimgY: 
+                    return constants.BETTER
+                elif not hasimgX and hasimgY: 
+                    return constants.WORSE
+                else:
+                    # Now we just want to avoid preemption
+                    if hasPreemptibleX and not hasPreemptibleY:
+                        return constants.WORSE
+                    elif not hasPreemptibleX and hasPreemptibleY:
+                        return constants.BETTER
+                    elif hasPreemptibleX and hasPreemptibleY:
+                        # If both have (some) preemptible resources, we prefer those
+                        # that involve the less preemptions
+                        preemptX = canfitpreemptionX - canfitnopreemptionX
+                        preemptY = canfitpreemptionY - canfitnopreemptionY
+                        if preemptX < preemptY:
+                            return constants.BETTER
+                        elif preemptX > preemptY:
+                            return constants.WORSE
+                        else:
+                            if hasimgX and not hasimgY: return constants.BETTER
+                            elif not hasimgX and hasimgY: return constants.WORSE
+                            else: return constants.EQUAL
+                    else:
+                        return constants.EQUAL
+        
+        # Order nodes
+        nodes.sort(comparenodes)
+        return nodes        
+        
     def preempt(self, req, time):
         self.logger.info("Preempting lease #%i..." % (req.id))
         self.logger.vdebug("Lease before preemption:")

Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py	2008-09-04 17:29:03 UTC (rev 488)
+++ trunk/src/haizea/resourcemanager/slottable.py	2008-09-05 22:38:40 UTC (rev 489)
@@ -268,452 +268,7 @@
             self.changepointcache.pop()
         return p
     
-    def fitExact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
-        lease_id = leasereq.id
-        start = leasereq.start.requested
-        end = leasereq.start.requested + leasereq.duration.requested
-        diskImageID = leasereq.diskimage_id
-        numnodes = leasereq.numnodes
-        resreq = leasereq.requested_resources
 
-        self.availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
-        self.availabilitywindow.printContents(withpreemption = False)
-        self.availabilitywindow.printContents(withpreemption = True)
-
-        mustpreempt = False
-        unfeasiblewithoutpreemption = False
-        
-        fitatstart = self.availabilitywindow.fitAtStart(canpreempt = False)
-        if fitatstart < numnodes:
-            if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                unfeasiblewithoutpreemption = True
-        feasibleend, canfitnopreempt = self.availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
-        fitatend = sum([n for n in canfitnopreempt.values()])
-        if fitatend < numnodes:
-            if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                unfeasiblewithoutpreemption = True
-
-        canfitpreempt = None
-        if canpreempt:
-            fitatstart = self.availabilitywindow.fitAtStart(canpreempt = True)
-            if fitatstart < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            feasibleendpreempt, canfitpreempt = self.availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
-            fitatend = sum([n for n in canfitpreempt.values()])
-            if fitatend < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                if unfeasiblewithoutpreemption:
-                    mustpreempt = True
-                else:
-                    mustpreempt = False
-
-        # At this point we know if the lease is feasible, and if
-        # will require preemption.
-        if not mustpreempt:
-           self.logger.debug("The VM reservations for this lease are feasible without preemption.")
-        else:
-           self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
-
-        # merge canfitnopreempt and canfitpreempt
-        canfit = {}
-        for node in canfitnopreempt:
-            vnodes = canfitnopreempt[node]
-            canfit[node] = [vnodes, vnodes]
-        for node in canfitpreempt:
-            vnodes = canfitpreempt[node]
-            if canfit.has_key(node):
-                canfit[node][1] = vnodes
-            else:
-                canfit[node] = [0, vnodes]
-
-        orderednodes = self.prioritizenodes(canfit, diskImageID, start, canpreempt, avoidpreempt)
-            
-        self.logger.debug("Node ordering: %s" % orderednodes)
-        
-        # vnode -> pnode
-        nodeassignment = {}
-        
-        # pnode -> resourcetuple
-        res = {}
-        
-        # physnode -> how many vnodes
-        preemptions = {}
-        
-        vnode = 1
-        if avoidpreempt:
-            # First pass, without preemption
-            for physnode in orderednodes:
-                canfitinnode = canfit[physnode][0]
-                for i in range(1, canfitinnode+1):
-                    nodeassignment[vnode] = physnode
-                    if res.has_key(physnode):
-                        res[physnode].incr(resreq)
-                    else:
-                        res[physnode] = ds.ResourceTuple.copy(resreq)
-                    canfit[physnode][0] -= 1
-                    canfit[physnode][1] -= 1
-                    vnode += 1
-                    if vnode > numnodes:
-                        break
-                if vnode > numnodes:
-                    break
-            
-        # Second pass, with preemption
-        if mustpreempt or not avoidpreempt:
-            for physnode in orderednodes:
-                canfitinnode = canfit[physnode][1]
-                for i in range(1, canfitinnode+1):
-                    nodeassignment[vnode] = physnode
-                    if res.has_key(physnode):
-                        res[physnode].incr(resreq)
-                    else:
-                        res[physnode] = ds.ResourceTuple.copy(resreq)
-                    canfit[physnode][1] -= 1
-                    vnode += 1
-                    # Check if this will actually result in a preemption
-                    if canfit[physnode][0] == 0:
-                        if preemptions.has_key(physnode):
-                            preemptions[physnode].incr(resreq)
-                        else:
-                            preemptions[physnode] = ds.ResourceTuple.copy(resreq)
-                    else:
-                        canfit[physnode][0] -= 1
-                    if vnode > numnodes:
-                        break
-                if vnode > numnodes:
-                    break
-
-        if vnode <= numnodes:
-            raise CriticalSlotFittingException, "Availability window indicated that request but feasible, but could not fit it"
-
-        return nodeassignment, res, preemptions
-
-
-    def findLeasesToPreempt(self, mustpreempt, startTime, endTime):
-        def comparepreemptability(rrX, rrY):
-            if rrX.lease.submit_time > rrY.lease.submit_time:
-                return constants.BETTER
-            elif rrX.lease.submit_time < rrY.lease.submit_time:
-                return constants.WORSE
-            else:
-                return constants.EQUAL        
-            
-        def preemptedEnough(amountToPreempt):
-            for node in amountToPreempt:
-                if not amountToPreempt[node].is_zero_or_less():
-                    return False
-            return True
-        
-        # Get allocations at the specified time
-        atstart = set()
-        atmiddle = set()
-        nodes = set(mustpreempt.keys())
-        
-        reservationsAtStart = self.getReservationsAt(startTime)
-        reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtMiddle = self.getReservationsStartingBetween(startTime, endTime)
-        reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtStart.sort(comparepreemptability)
-        reservationsAtMiddle.sort(comparepreemptability)
-        
-        amountToPreempt = {}
-        for n in mustpreempt:
-            amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-
-        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
-        for r in reservationsAtStart:
-            # The following will really only come into play when we have
-            # multiple VMs per node
-            mustpreemptres = False
-            for n in r.resources_in_pnode.keys():
-                # Don't need to preempt if we've already preempted all
-                # the needed resources in node n
-                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                    amountToPreempt[n].decr(r.resources_in_pnode[n])
-                    mustpreemptres = True
-            if mustpreemptres:
-                atstart.add(r)
-            if preemptedEnough(amountToPreempt):
-                break
-        
-        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
-        if len(reservationsAtMiddle)>0:
-            changepoints = set()
-            for r in reservationsAtMiddle:
-                changepoints.add(r.start)
-            changepoints = list(changepoints)
-            changepoints.sort()        
-            
-            for cp in changepoints:
-                amountToPreempt = {}
-                for n in mustpreempt:
-                    amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-                reservations = [r for r in reservationsAtMiddle 
-                                if r.start <= cp and cp < r.end]
-                for r in reservations:
-                    mustpreemptres = False
-                    for n in r.resources_in_pnode.keys():
-                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                            amountToPreempt[n].decr(r.resources_in_pnode[n])
-                            mustpreemptres = True
-                    if mustpreemptres:
-                        atmiddle.add(r)
-                    if preemptedEnough(amountToPreempt):
-                        break
-            
-        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
-        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-        
-        leases = [r.lease for r in atstart|atmiddle]
-        
-        return leases
-
-
-    def fitBestEffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
-        lease_id = lease.id
-        remdur = lease.duration.get_remaining_duration()
-        numnodes = lease.numnodes
-        resreq = lease.requested_resources
-        preemptible = lease.preemptible
-        suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
-        migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
-
-        #
-        # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
-        #
-        
-        curnodes=None
-        # If we can't migrate, we have to stay in the
-        # nodes where the lease is currently deployed
-        if mustresume and not canmigrate:
-            vmrr, susprr = lease.get_last_vmrr()
-            curnodes = set(vmrr.nodes.values())
-            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
-        
-        if mustresume and canmigrate:
-            # If we have to resume this lease, make sure that
-            # we have enough time to transfer the images.
-            migratetime = lease.estimate_migration_time(migration_bandwidth)
-            earliesttransfer = self.rm.clock.get_time() + migratetime
-
-            for n in earliest:
-                earliest[n][0] = max(earliest[n][0], earliesttransfer)
-            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
-                    
-        if mustresume:
-            resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
-            # Must allocate time for resumption too
-            remdur += resumetime
-        else:
-            suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
-
-
-        #
-        # STEP 2: FIND THE CHANGEPOINTS
-        #
-
-        # Find the changepoints, and the nodes we can use at each changepoint
-        # Nodes may not be available at a changepoint because images
-        # cannot be transferred at that time.
-        if not mustresume:
-            cps = [(node, e[0]) for node, e in earliest.items()]
-            cps.sort(key=itemgetter(1))
-            curcp = None
-            changepoints = []
-            nodes = []
-            for node, time in cps:
-                nodes.append(node)
-                if time != curcp:
-                    changepoints.append([time, nodes[:]])
-                    curcp = time
-                else:
-                    changepoints[-1][1] = nodes[:]
-        else:
-            changepoints = list(set([x[0] for x in earliest.values()]))
-            changepoints.sort()
-            changepoints = [(x, curnodes) for x in changepoints]
-
-        # If we can make reservations for best-effort leases,
-        # we also consider future changepoints
-        # (otherwise, we only allow the VMs to start "now", accounting
-        #  for the fact that vm images will have to be deployed)
-        if canreserve:
-            futurecp = self.findChangePointsAfter(changepoints[-1][0])
-            futurecp = [(p,None) for p in futurecp]
-        else:
-            futurecp = []
-
-
-
-        #
-        # STEP 3: SLOT FITTING
-        #
-
-        # First, assuming we can't make reservations in the future
-        start, end, canfit, mustsuspend = self.fitBestEffortInChangepoints(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
-        if not canreserve:
-            if start == None:
-                # We did not find a suitable starting time. This can happen
-                # if we're unable to make future reservations
-                raise SlotFittingException, "Could not find enough resources for this request"
-            elif mustsuspend and not suspendable:
-                raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
-        if start != None and mustsuspend and not suspendable:
-            start = None # No satisfactory start time
-            
-        # If we haven't been able to fit the lease, check if we can
-        # reserve it in the future
-        if start == None and canreserve:
-            start, end, canfit, mustsuspend = self.fitBestEffortInChangepoints(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
-        if mustsuspend and not suspendable:
-            raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
-        if start in [p[0] for p in futurecp]:
-            reservation = True
-        else:
-            reservation = False
-
-
-        #
-        # STEP 4: FINAL SLOT FITTING
-        #
-        # At this point, we know the lease fits, but we have to map it to
-        # specific physical nodes.
-        
-        # Sort physical nodes
-        physnodes = canfit.keys()
-        if mustresume:
-            # If we're resuming, we prefer resuming in the nodes we're already
-            # deployed in, to minimize the number of transfers.
-            vmrr, susprr = lease.get_last_vmrr()
-            nodes = set(vmrr.nodes.values())
-            availnodes = set(physnodes)
-            deplnodes = availnodes.intersection(nodes)
-            notdeplnodes = availnodes.difference(nodes)
-            physnodes = list(deplnodes) + list(notdeplnodes)
-        else:
-            physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
-        # Adjust times in case the lease has to be suspended/resumed
-        if mustsuspend:
-            suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
-            end -= suspendtime
-                
-        if mustresume:
-            start += resumetime
-        
-        # Map to physical nodes
-        mappings = {}
-        res = {}
-        vmnode = 1
-        while vmnode <= numnodes:
-            for n in physnodes:
-                if canfit[n]>0:
-                    canfit[n] -= 1
-                    mappings[vmnode] = n
-                    if res.has_key(n):
-                        res[n].incr(resreq)
-                    else:
-                        res[n] = ds.ResourceTuple.copy(resreq)
-                    vmnode += 1
-                    break
-
-
-
-        #
-        # STEP 5: CREATE RESOURCE RESERVATIONS
-        #
-        
-        if mustresume:
-            resmres = {}
-            for n in mappings.values():
-                r = ds.ResourceTuple.create_empty()
-                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
-                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
-                resmres[n] = r
-            resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
-            resmrr.state = constants.RES_STATE_SCHEDULED
-        else:
-            resmrr = None
-        if mustsuspend:
-            suspres = {}
-            for n in mappings.values():
-                r = ds.ResourceTuple.create_empty()
-                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
-                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
-                suspres[n] = r
-            susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
-            susprr.state = constants.RES_STATE_SCHEDULED
-            oncomplete = constants.ONCOMPLETE_SUSPEND
-        else:
-            susprr = None
-            oncomplete = constants.ONCOMPLETE_ENDLEASE
-
-        vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
-        vmrr.state = constants.RES_STATE_SCHEDULED
-        
-        susp_str = res_str = ""
-        if mustresume:
-            res_str = " (resuming)"
-        if mustsuspend:
-            susp_str = " (suspending)"
-        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
-
-        return resmrr, vmrr, susprr, reservation
-
-    def fitBestEffortInChangepoints(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
-        start = None
-        end = None
-        canfit = None
-        mustsuspend = None
-
-        for p in changepoints:
-            self.availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
-            self.availabilitywindow.printContents()
-            
-            if self.availabilitywindow.fitAtStart() >= numnodes:
-                start=p[0]
-                maxend = start + remdur
-                end, canfit = self.availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-        
-                self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-                
-                if end < maxend:
-                    mustsuspend=True
-                    self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-                    
-                    if suspendable:
-                        # It the lease is suspendable...
-                        if suspendthreshold != None:
-                            if end-start > suspendthreshold:
-                                break
-                            else:
-                                self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
-                                start = None
-                        else:
-                            pass
-                    else:
-                        # Keep looking
-                        pass
-                else:
-                    mustsuspend=False
-                    # We've found a satisfactory starting time
-                    break        
-                
-        return start, end, canfit, mustsuspend
-
     def suspend(self, lease, time):
         suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
         
@@ -798,101 +353,8 @@
             lease.print_contents()
 
 
-    def prioritizenodes(self, canfit, diskImageID, start, canpreempt, avoidpreempt):
-        # TODO2: Choose appropriate prioritizing function based on a
-        # config file, instead of hardcoding it)
-        #
-        # TODO3: Basing decisions only on CPU allocations. This is ok for now,
-        # since the memory allocation is proportional to the CPU allocation.
-        # Later on we need to come up with some sort of weighed average.
-        
-        nodes = canfit.keys()
-        
-        # TODO: The deployment module should just provide a list of nodes
-        # it prefers
-        nodeswithimg=[]
-        #self.lease_deployment_type = self.rm.config.get("lease-preparation")
-        #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
-        #    reusealg = self.rm.config.get("diskimage-reuse")
-        #    if reusealg==constants.REUSE_IMAGECACHES:
-        #        nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
 
-        # Compares node x and node y. 
-        # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
-        def comparenodes(x, y):
-            hasimgX = x in nodeswithimg
-            hasimgY = y in nodeswithimg
-
-            # First comparison: A node with no preemptible VMs is preferible
-            # to one with preemptible VMs (i.e. we want to avoid preempting)
-            canfitnopreemptionX = canfit[x][0]
-            canfitpreemptionX = canfit[x][1]
-            hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
-            
-            canfitnopreemptionY = canfit[y][0]
-            canfitpreemptionY = canfit[y][1]
-            hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
-
-            # TODO: Factor out common code
-            if avoidpreempt:
-                if hasPreemptibleX and not hasPreemptibleY:
-                    return constants.WORSE
-                elif not hasPreemptibleX and hasPreemptibleY:
-                    return constants.BETTER
-                elif not hasPreemptibleX and not hasPreemptibleY:
-                    if hasimgX and not hasimgY: 
-                        return constants.BETTER
-                    elif not hasimgX and hasimgY: 
-                        return constants.WORSE
-                    else:
-                        if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
-                        elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
-                        else: return constants.EQUAL
-                elif hasPreemptibleX and hasPreemptibleY:
-                    # If both have (some) preemptible resources, we prefer those
-                    # that involve the less preemptions
-                    preemptX = canfitpreemptionX - canfitnopreemptionX
-                    preemptY = canfitpreemptionY - canfitnopreemptionY
-                    if preemptX < preemptY:
-                        return constants.BETTER
-                    elif preemptX > preemptY:
-                        return constants.WORSE
-                    else:
-                        if hasimgX and not hasimgY: return constants.BETTER
-                        elif not hasimgX and hasimgY: return constants.WORSE
-                        else: return constants.EQUAL
-            elif not avoidpreempt:
-                # First criteria: Can we reuse image?
-                if hasimgX and not hasimgY: 
-                    return constants.BETTER
-                elif not hasimgX and hasimgY: 
-                    return constants.WORSE
-                else:
-                    # Now we just want to avoid preemption
-                    if hasPreemptibleX and not hasPreemptibleY:
-                        return constants.WORSE
-                    elif not hasPreemptibleX and hasPreemptibleY:
-                        return constants.BETTER
-                    elif hasPreemptibleX and hasPreemptibleY:
-                        # If both have (some) preemptible resources, we prefer those
-                        # that involve the less preemptions
-                        preemptX = canfitpreemptionX - canfitnopreemptionX
-                        preemptY = canfitpreemptionY - canfitnopreemptionY
-                        if preemptX < preemptY:
-                            return constants.BETTER
-                        elif preemptX > preemptY:
-                            return constants.WORSE
-                        else:
-                            if hasimgX and not hasimgY: return constants.BETTER
-                            elif not hasimgX and hasimgY: return constants.WORSE
-                            else: return constants.EQUAL
-                    else:
-                        return constants.EQUAL
         
-        # Order nodes
-        nodes.sort(comparenodes)
-        return nodes
-        
     def isFull(self, time):
         nodes = self.getAvailability(time)
         avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])



More information about the Haizea-commit mailing list