[haizea-commit] r491 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Thu Sep 11 11:47:36 CDT 2008


Author: borja
Date: 2008-09-11 11:47:36 -0500 (Thu, 11 Sep 2008)
New Revision: 491

Modified:
   trunk/src/haizea/resourcemanager/scheduler.py
   trunk/src/haizea/resourcemanager/slottable.py
Log:
Sanitizing of scheduling code:
 - Moved more scheduling code out of the slottable and into the Scheduler class
 - Factored out resumption/suspension scheduling code, to facilitate supporting more complex suspend/resume scenarios.
 - Adapted code to new lease data structure topology
 - All leases (except those that have been completed) are now contained in a single "leases" table.
 
 Note that, at this point, lease preemption is broken but should be fixed soon.

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-09-11 16:41:18 UTC (rev 490)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-09-11 16:47:36 UTC (rev 491)
@@ -32,13 +32,15 @@
 
 import haizea.resourcemanager.datastruct as ds
 import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, estimate_transfer_time
 from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
 from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeployment
 from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
 from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
-from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation 
+from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation 
 from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
 from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
 
 import logging
 
@@ -85,9 +87,8 @@
             self.resourcepool = ResourcePool(self)
         self.slottable = SlotTable(self)
         self.queue = ds.Queue(self)
-        self.scheduledleases = ds.LeaseTable(self)
+        self.leases = ds.LeaseTable(self)
         self.completedleases = ds.LeaseTable(self)
-        self.pending_leases = []
             
         for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
             self.slottable.add_node(n)
@@ -126,11 +127,16 @@
 
         self.numbesteffortres = 0
     
-    def schedule(self, nexttime):        
-        ar_leases = [req for req in self.pending_leases if isinstance(req, ARLease)]
-        im_leases = [req for req in self.pending_leases if isinstance(req, ImmediateLease)]
-        self.pending_leases = []
+    def schedule(self, nexttime):      
+        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
+        ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
+        im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
+        be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
         
+        # Queue best-effort requests
+        for lease in be_leases:
+            self.enqueue(lease)
+        
         # Process immediate requests
         for lease_req in im_leases:
             self.__process_im_request(lease_req, nexttime)
@@ -144,18 +150,14 @@
         
     
     def process_reservations(self, nowtime):
-        starting = [l for l in self.scheduledleases.entries.values() if l.has_starting_reservations(nowtime)]
-        ending = [l for l in self.scheduledleases.entries.values() if l.has_ending_reservations(nowtime)]
-        for l in ending:
-            rrs = l.get_ending_reservations(nowtime)
-            for rr in rrs:
-                self._handle_end_rr(l, rr)
-                self.handlers[type(rr)].on_end(self, l, rr)
+        starting = self.slottable.get_reservations_starting_at(nowtime)
+        ending = self.slottable.get_reservations_ending_at(nowtime)
+        for rr in ending:
+            self._handle_end_rr(rr.lease, rr)
+            self.handlers[type(rr)].on_end(self, rr.lease, rr)
         
-        for l in starting:
-            rrs = l.get_starting_reservations(nowtime)
-            for rr in rrs:
-                self.handlers[type(rr)].on_start(self, l, rr)
+        for rr in starting:
+            self.handlers[type(rr)].on_start(self, rr.lease, rr)
 
         util = self.slottable.getUtilization(nowtime)
         self.rm.accounting.append_stat(constants.COUNTER_CPUUTILIZATION, util)
@@ -170,18 +172,19 @@
     def enqueue(self, lease_req):
         """Queues a best-effort lease request"""
         self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+        lease_req.state = Lease.STATE_QUEUED
         self.queue.enqueue(lease_req)
         self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
 
-    def add_pending_lease(self, lease_req):
+    def request_lease(self, lease):
         """
-        Adds a pending lease request, to be scheduled as soon as
-        the scheduling function is called. Unlike best-effort leases,
-        if one these leases can't be scheduled immediately, it is
-        rejected (instead of being placed on a queue, in case resources
-        become available later on).
+        Request a lease. At this point, it is simply marked as "Pending" and,
+        next time the scheduling function is called, the fate of the
+        lease will be determined (right now, AR+IM leases get scheduled
+        right away, and best-effort leases get placed on a queue)
         """
-        self.pending_leases.append(lease_req)
+        lease.state = Lease.STATE_PENDING
+        self.leases.add(lease)
 
     def is_queue_empty(self):
         """Return True is the queue is empty, False otherwise"""
@@ -190,7 +193,7 @@
     
     def exists_scheduled_leases(self):
         """Return True if there are any leases scheduled in the future"""
-        return not self.scheduledleases.is_empty()    
+        return not self.slottable.is_empty()    
 
     def cancel_lease(self, lease_id):
         """Cancels a lease.
@@ -201,26 +204,26 @@
         time = self.rm.clock.get_time()
         
         self.logger.info("Cancelling lease %i..." % lease_id)
-        if self.scheduledleases.has_lease(lease_id):
+        if self.leases.has_lease(lease_id):
             # The lease is either running, or scheduled to run
-            lease = self.scheduledleases.get_lease(lease_id)
+            lease = self.leases.get_lease(lease_id)
             
-            if lease.state == constants.LEASE_STATE_ACTIVE:
+            if lease.state == Lease.STATE_ACTIVE:
                 self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
                 rr = lease.get_active_reservations(time)[0]
                 if isinstance(rr, VMResourceReservation):
                     self._handle_unscheduled_end_vm(lease, rr, enact=True)
                 # TODO: Handle cancelations in middle of suspensions and
                 # resumptions                
-            elif lease.state in [constants.LEASE_STATE_SCHEDULED, constants.LEASE_STATE_DEPLOYED]:
+            elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
                 self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
                 rrs = lease.get_scheduled_reservations()
                 for r in rrs:
                     lease.remove_rr(r)
                     self.slottable.removeReservation(r)
-                lease.state = constants.LEASE_STATE_DONE
+                lease.state = Lease.STATE_CANCELLED
                 self.completedleases.add(lease)
-                self.scheduledleases.remove(lease)
+                self.leases.remove(lease)
         elif self.queue.has_lease(lease_id):
             # The lease is in the queue, waiting to be scheduled.
             # Cancelling is as simple as removing it from the queue
@@ -248,7 +251,7 @@
     def notify_event(self, lease_id, event):
         time = self.rm.clock.get_time()
         if event == constants.EVENT_END_VM:
-            lease = self.scheduledleases.get_lease(lease_id)
+            lease = self.leases.get_lease(lease_id)
             rr = lease.get_active_reservations(time)[0]
             self._handle_unscheduled_end_vm(lease, rr, enact=False)
 
@@ -262,7 +265,7 @@
         accepted = False
         try:
             self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
-            self.scheduledleases.add(lease_req)
+            self.leases.add(lease_req)
             self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
             accepted = True
         except SchedException, msg:
@@ -273,7 +276,7 @@
                 self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
                 self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
                 self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
-                self.scheduledleases.add(lease_req)
+                self.leases.add(lease_req)
                 self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
                 accepted = True
             except SchedException, msg:
@@ -285,34 +288,7 @@
         else:
             self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
         
-    
-    def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
-        start = lease_req.start.requested
-        end = lease_req.start.requested + lease_req.duration.requested
-        try:
-            (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-            
-            if len(preemptions) > 0:
-                leases = self.__find_preemptable_leases(preemptions, start, end)
-                self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
-                for lease in leases:
-                    self.preempt(lease, time=start)
-
-            # Create VM resource reservations
-            vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, constants.ONCOMPLETE_ENDLEASE, False)
-            vmrr.state = constants.RES_STATE_SCHEDULED
-
-            # Schedule deployment overhead
-            self.deployment.schedule(lease_req, vmrr, nexttime)
-            
-            # Commit reservation to slot table
-            # (we don't do this until the very end because the deployment overhead
-            # scheduling could still throw an exception)
-            lease_req.append_rr(vmrr)
-            self.slottable.addReservation(vmrr)
-        except SlotFittingException, msg:
-            raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
-
+        
     def __process_queue(self, nexttime):
         done = False
         newqueue = ds.Queue(self)
@@ -327,7 +303,7 @@
                     self.logger.debug("  Duration: %s" % lease_req.duration)
                     self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
                     self.__schedule_besteffort_lease(lease_req, nexttime)
-                    self.scheduledleases.add(lease_req)
+                    self.leases.add(lease_req)
                     self.rm.accounting.decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
                 except SchedException, msg:
                     # Put back on queue
@@ -341,92 +317,104 @@
             newqueue.enqueue(lease)
         
         self.queue = newqueue 
+
+
+    def __process_im_request(self, lease_req, nexttime):
+        self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
+        self.logger.debug("  Duration: %s" % lease_req.duration)
+        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
         
+        try:
+            self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+            self.leases.add(lease_req)
+            self.rm.accounting.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
+            self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
+        except SchedException, msg:
+            self.rm.accounting.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
+            self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+    
+    
+    def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+        start = lease_req.start.requested
+        end = lease_req.start.requested + lease_req.duration.requested
+        try:
+            (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+            
+            if len(preemptions) > 0:
+                leases = self.__find_preemptable_leases(preemptions, start, end)
+                self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
+                for lease in leases:
+                    self.__preempt(lease, preemption_time=start)
 
-    def __schedule_besteffort_lease(self, req, nexttime):
-        # Determine earliest start time in each node
-        if req.state == constants.LEASE_STATE_PENDING:
-            # Figure out earliest start times based on
-            # image schedule and reusable images
-            earliest = self.deployment.find_earliest_starting_times(req, nexttime)
-        elif req.state == constants.LEASE_STATE_SUSPENDED:
-            # No need to transfer images from repository
-            # (only intra-node transfer)
-            earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(req.numnodes)])
+            # Create VM resource reservations
+            vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, False)
+            vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+            # Schedule deployment overhead
+            self.deployment.schedule(lease_req, vmrr, nexttime)
             
-        susptype = self.rm.config.get("suspension")
-        if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and req.numnodes == 1):
-            cansuspend = False
-        else:
-            cansuspend = True
+            # Commit reservation to slot table
+            # (we don't do this until the very end because the deployment overhead
+            # scheduling could still throw an exception)
+            lease_req.append_vmrr(vmrr)
+            self.slottable.addReservation(vmrr)
+        except SlotFittingException, msg:
+            raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
 
-        canmigrate = self.rm.config.get("migration")
+
+    def __schedule_besteffort_lease(self, lease, nexttime):            
         try:
-            mustresume = (req.state == constants.LEASE_STATE_SUSPENDED)
-            canreserve = self.canReserveBestEffort()
-            (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
+            # Schedule the VMs
+            canreserve = self.__can_reserve_besteffort_in_future()
+            (vmrr, in_future) = self.__fit_asap(lease, nexttime, allow_reservation_in_future = canreserve)
             
             # Schedule deployment
-            if req.state != constants.LEASE_STATE_SUSPENDED:
-                self.deployment.schedule(req, vmrr, nexttime)
+            if lease.state != Lease.STATE_SUSPENDED:
+                self.deployment.schedule(lease, vmrr, nexttime)
+            else:
+                # TODO: schedule migrations
+                pass
+
+            # At this point, the lease is feasible.
+            # Commit changes by adding RRs to lease and to slot table
             
-            # TODO: The following would be more correctly handled in the RR handle functions.
-            # We need to have an explicit MigrationResourceReservation before doing that.
-            if req.state == constants.LEASE_STATE_SUSPENDED:
-                # Update VM image mappings, since we might be resuming
-                # in different nodes.
-                for vnode, pnode in req.vmimagemap.items():
-                    self.resourcepool.remove_diskimage(pnode, req.id, vnode)
-                req.vmimagemap = vmrr.nodes
-                for vnode, pnode in req.vmimagemap.items():
-                    self.resourcepool.add_diskimage(pnode, req.diskimage_id, req.diskimage_size, req.id, vnode)
+            # Add resource reservations to lease
+            # TODO: deployment
+            # TODO: migrations
+            lease.append_vmrr(vmrr)
+            
+
+            # Add resource reservations to slottable
+            
+            # TODO: deployment
+            
+            # TODO: migrations
+            
+            # Resumptions (if any)
+            for resmrr in vmrr.resm_rrs:
+                self.slottable.addReservation(resmrr)
                 
-                # Update RAM file mappings
-                for vnode, pnode in req.memimagemap.items():
-                    self.resourcepool.remove_ramfile(pnode, req.id, vnode)
-                for vnode, pnode in vmrr.nodes.items():
-                    self.resourcepool.add_ramfile(pnode, req.id, vnode, req.requested_resources.get_by_type(constants.RES_MEM))
-                    req.memimagemap[vnode] = pnode
-                    
-            # Add resource reservations
-            if resmrr != None:
-                req.append_rr(resmrr)
-                self.slottable.addReservation(resmrr)
-            req.append_rr(vmrr)
+            # VM
             self.slottable.addReservation(vmrr)
-            if susprr != None:
-                req.append_rr(susprr)
+            
+            # Suspensions (if any)
+            for susprr in vmrr.susp_rrs:
                 self.slottable.addReservation(susprr)
            
-            if reservation:
+            if in_future:
                 self.numbesteffortres += 1
                 
-            req.print_contents()
-            
-        except SlotFittingException, msg:
+            lease.print_contents()
+
+        except SchedException, msg:
             raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
 
         
-    def __process_im_request(self, lease_req, nexttime):
-        self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
-        self.logger.debug("  Duration: %s" % lease_req.duration)
-        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
         
-        try:
-            self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
-            self.scheduledleases.add(lease_req)
-            self.rm.accounting.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
-            self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
-        except SchedException, msg:
-            self.rm.accounting.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
-            self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
         
-        
     def __schedule_immediate_lease(self, req, nexttime):
-        # Determine earliest start time in each node
-        earliest = self.deployment.find_earliest_starting_times(req, nexttime)
         try:
-            (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
+            (resmrr, vmrr, susprr, reservation) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
             # Schedule deployment
             self.deployment.schedule(req, vmrr, nexttime)
                         
@@ -564,130 +552,39 @@
 
         return nodeassignment, res, preemptions
 
+    def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
+        lease_id = lease.id
+        remaining_duration = lease.duration.get_remaining_duration()
+        numnodes = lease.numnodes
+        requested_resources = lease.requested_resources
+        preemptible = lease.preemptible
+        mustresume = (lease.state == Lease.STATE_SUSPENDED)
+        susptype = self.rm.config.get("suspension")
+        if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
+            suspendable = False
+        else:
+            suspendable = True
 
-    def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
-        def comparepreemptability(rrX, rrY):
-            if rrX.lease.submit_time > rrY.lease.submit_time:
-                return constants.BETTER
-            elif rrX.lease.submit_time < rrY.lease.submit_time:
-                return constants.WORSE
-            else:
-                return constants.EQUAL        
-            
-        def preemptedEnough(amountToPreempt):
-            for node in amountToPreempt:
-                if not amountToPreempt[node].is_zero_or_less():
-                    return False
-            return True
-        
-        # Get allocations at the specified time
-        atstart = set()
-        atmiddle = set()
-        nodes = set(mustpreempt.keys())
-        
-        reservationsAtStart = self.slottable.getReservationsAt(startTime)
-        reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtMiddle = self.slottable.getReservationsStartingBetween(startTime, endTime)
-        reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtStart.sort(comparepreemptability)
-        reservationsAtMiddle.sort(comparepreemptability)
-        
-        amountToPreempt = {}
-        for n in mustpreempt:
-            amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+        # Determine earliest start time in each node
+        if lease.state == Lease.STATE_QUEUED:
+            # Figure out earliest start times based on
+            # image schedule and reusable images
+            earliest = self.deployment.find_earliest_starting_times(lease, nexttime)
+        elif lease.state == Lease.STATE_SUSPENDED:
+            # No need to transfer images from repository
+            # (only intra-node transfer)
+            earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
 
-        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
-        for r in reservationsAtStart:
-            # The following will really only come into play when we have
-            # multiple VMs per node
-            mustpreemptres = False
-            for n in r.resources_in_pnode.keys():
-                # Don't need to preempt if we've already preempted all
-                # the needed resources in node n
-                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                    amountToPreempt[n].decr(r.resources_in_pnode[n])
-                    mustpreemptres = True
-            if mustpreemptres:
-                atstart.add(r)
-            if preemptedEnough(amountToPreempt):
-                break
-        
-        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
-        if len(reservationsAtMiddle)>0:
-            changepoints = set()
-            for r in reservationsAtMiddle:
-                changepoints.add(r.start)
-            changepoints = list(changepoints)
-            changepoints.sort()        
-            
-            for cp in changepoints:
-                amountToPreempt = {}
-                for n in mustpreempt:
-                    amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-                reservations = [r for r in reservationsAtMiddle 
-                                if r.start <= cp and cp < r.end]
-                for r in reservations:
-                    mustpreemptres = False
-                    for n in r.resources_in_pnode.keys():
-                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                            amountToPreempt[n].decr(r.resources_in_pnode[n])
-                            mustpreemptres = True
-                    if mustpreemptres:
-                        atmiddle.add(r)
-                    if preemptedEnough(amountToPreempt):
-                        break
-            
-        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
-        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-        
-        leases = [r.lease for r in atstart|atmiddle]
-        
-        return leases
 
+        canmigrate = self.rm.config.get("migration")
 
-    def __fit_besteffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
-        lease_id = lease.id
-        remdur = lease.duration.get_remaining_duration()
-        numnodes = lease.numnodes
-        resreq = lease.requested_resources
-        preemptible = lease.preemptible
-        suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
-        migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
-
         #
-        # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
+        # STEP 1: FIGURE OUT THE MINIMUM DURATION
         #
         
-        curnodes=None
-        # If we can't migrate, we have to stay in the
-        # nodes where the lease is currently deployed
-        if mustresume and not canmigrate:
-            vmrr, susprr = lease.get_last_vmrr()
-            curnodes = set(vmrr.nodes.values())
-            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
-        
-        if mustresume and canmigrate:
-            # If we have to resume this lease, make sure that
-            # we have enough time to transfer the images.
-            migratetime = lease.estimate_migration_time(migration_bandwidth)
-            earliesttransfer = self.rm.clock.get_time() + migratetime
+        min_duration = self.__compute_scheduling_threshold(lease)
 
-            for n in earliest:
-                earliest[n][0] = max(earliest[n][0], earliesttransfer)
-            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
-                    
-        if mustresume:
-            resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
-            # Must allocate time for resumption too
-            remdur += resumetime
-        else:
-            suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
 
-
         #
         # STEP 2: FIND THE CHANGEPOINTS
         #
@@ -709,15 +606,28 @@
                 else:
                     changepoints[-1][1] = nodes[:]
         else:
+            if not canmigrate:
+                vmrr, susprr = lease.get_last_vmrr()
+                curnodes = set(vmrr.nodes.values())
+            else:
+                curnodes=None
+                # If we have to resume this lease, make sure that
+                # we have enough time to transfer the images.
+                migratetime = self.__estimate_migration_time(lease)
+                earliesttransfer = self.rm.clock.get_time() + migratetime
+    
+                for n in earliest:
+                    earliest[n][0] = max(earliest[n][0], earliesttransfer)
+
             changepoints = list(set([x[0] for x in earliest.values()]))
             changepoints.sort()
             changepoints = [(x, curnodes) for x in changepoints]
 
-        # If we can make reservations for best-effort leases,
+        # If we can make reservations in the future,
         # we also consider future changepoints
         # (otherwise, we only allow the VMs to start "now", accounting
         #  for the fact that vm images will have to be deployed)
-        if canreserve:
+        if allow_reservation_in_future:
             futurecp = self.slottable.findChangePointsAfter(changepoints[-1][0])
             futurecp = [(p,None) for p in futurecp]
         else:
@@ -728,28 +638,48 @@
         #
         # STEP 3: SLOT FITTING
         #
+        
+        # If resuming, we also have to allocate enough for the resumption
+        if mustresume:
+            duration = remaining_duration + self.__estimate_resume_time(lease)
+        else:
+            duration = remaining_duration
 
+
         # First, assuming we can't make reservations in the future
-        start, end, canfit, mustsuspend = self.__find_fit_at_points(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
-        if not canreserve:
-            if start == None:
+        start, end, canfit = self.__find_fit_at_points(
+                                                       changepoints, 
+                                                       numnodes, 
+                                                       requested_resources, 
+                                                       duration, 
+                                                       suspendable, 
+                                                       min_duration)
+        
+        if start == None:
+            if not allow_reservation_in_future:
                 # We did not find a suitable starting time. This can happen
                 # if we're unable to make future reservations
-                raise SlotFittingException, "Could not find enough resources for this request"
-            elif mustsuspend and not suspendable:
-                raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
-        if start != None and mustsuspend and not suspendable:
-            start = None # No satisfactory start time
+                raise SchedException, "Could not find enough resources for this request"
+        else:
+            mustsuspend = (end - start) < duration
+            if mustsuspend and not suspendable:
+                if not allow_reservation_in_future:
+                    raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+                else:
+                    start = None # No satisfactory start time
             
         # If we haven't been able to fit the lease, check if we can
         # reserve it in the future
-        if start == None and canreserve:
-            start, end, canfit, mustsuspend = self.__find_fit_at_points(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
+        if start == None and allow_reservation_in_future:
+            start, end, canfit = self.__find_fit_at_points(
+                                                           futurecp, 
+                                                           numnodes, 
+                                                           requested_resources, 
+                                                           duration, 
+                                                           suspendable, 
+                                                           min_duration
+                                                           )
 
-        if mustsuspend and not suspendable:
-            raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
 
         if start in [p[0] for p in futurecp]:
             reservation = True
@@ -768,7 +698,7 @@
         if mustresume:
             # If we're resuming, we prefer resuming in the nodes we're already
             # deployed in, to minimize the number of transfers.
-            vmrr, susprr = lease.get_last_vmrr()
+            vmrr = lease.get_last_vmrr()
             nodes = set(vmrr.nodes.values())
             availnodes = set(physnodes)
             deplnodes = availnodes.intersection(nodes)
@@ -776,14 +706,6 @@
             physnodes = list(deplnodes) + list(notdeplnodes)
         else:
             physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
-        # Adjust times in case the lease has to be suspended/resumed
-        if mustsuspend:
-            suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
-            end -= suspendtime
-                
-        if mustresume:
-            start += resumetime
         
         # Map to physical nodes
         mappings = {}
@@ -795,45 +717,22 @@
                     canfit[n] -= 1
                     mappings[vmnode] = n
                     if res.has_key(n):
-                        res[n].incr(resreq)
+                        res[n].incr(requested_resources)
                     else:
-                        res[n] = ds.ResourceTuple.copy(resreq)
+                        res[n] = ds.ResourceTuple.copy(requested_resources)
                     vmnode += 1
                     break
 
 
+        vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, reservation)
+        vmrr.state = ResourceReservation.STATE_SCHEDULED
 
-        #
-        # STEP 5: CREATE RESOURCE RESERVATIONS
-        #
-        
         if mustresume:
-            resmres = {}
-            for n in mappings.values():
-                r = ds.ResourceTuple.create_empty()
-                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
-                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
-                resmres[n] = r
-            resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
-            resmrr.state = constants.RES_STATE_SCHEDULED
-        else:
-            resmrr = None
+            self.__schedule_resumption(vmrr, start)
+
+        mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
         if mustsuspend:
-            suspres = {}
-            for n in mappings.values():
-                r = ds.ResourceTuple.create_empty()
-                r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
-                r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
-                suspres[n] = r
-            susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
-            susprr.state = constants.RES_STATE_SCHEDULED
-            oncomplete = constants.ONCOMPLETE_SUSPEND
-        else:
-            susprr = None
-            oncomplete = constants.ONCOMPLETE_ENDLEASE
-
-        vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
-        vmrr.state = constants.RES_STATE_SCHEDULED
+            self.__schedule_suspension(vmrr, end)
         
         susp_str = res_str = ""
         if mustresume:
@@ -842,51 +741,142 @@
             susp_str = " (suspending)"
         self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
 
-        return resmrr, vmrr, susprr, reservation
+        return vmrr, reservation
 
-    def __find_fit_at_points(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
+    def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
         start = None
         end = None
         canfit = None
-        mustsuspend = None
         availabilitywindow = self.slottable.availabilitywindow
 
 
         for p in changepoints:
-            availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
+            availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
             availabilitywindow.printContents()
             
             if availabilitywindow.fitAtStart() >= numnodes:
                 start=p[0]
-                maxend = start + remdur
+                maxend = start + duration
                 end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
         
                 self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
                 
                 if end < maxend:
-                    mustsuspend=True
                     self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
                     
-                    if suspendable:
-                        # It the lease is suspendable...
-                        if suspendthreshold != None:
-                            if end-start > suspendthreshold:
-                                break
-                            else:
-                                self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
-                                start = None
+                    if not suspendable:
+                        pass
+                        # If we can't suspend, this fit is no good, and we have to keep looking
+                    else:
+                        # If we can suspend, we still have to check if the lease will
+                        # be able to run for the specified minimum duration
+                        if end-start > min_duration:
+                            break # We found a fit; stop looking
                         else:
-                            pass
-                    else:
-                        # Keep looking
-                        pass
+                            self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
+                            # Set start back to None, to indicate that we haven't
+                            # found a satisfactory start time
+                            start = None
                 else:
-                    mustsuspend=False
                     # We've found a satisfactory starting time
                     break        
                 
-        return start, end, canfit, mustsuspend
+        return start, end, canfit
 
+    def __schedule_resumption(self, vmrr, resume_at):
+        resumetime = self.__estimate_resume_time(vmrr.lease)
+        vmrr.update_start(resume_at + resumetime)
+        
+        mappings = vmrr.nodes
+        resmres = {}
+        for n in mappings.values():
+            r = ds.ResourceTuple.create_empty()
+            r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
+            r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
+            resmres[n] = r
+        resmrr = ds.ResumptionResourceReservation(vmrr.lease, resume_at, resume_at + resumetime, resmres, vmrr)
+        resmrr.state = ResourceReservation.STATE_SCHEDULED
+
+        vmrr.resm_rrs.append(resmrr)        
+    
+    def __schedule_suspension(self, vmrr, suspend_by):
+        suspendtime = self.__estimate_suspend_time(vmrr.lease)
+        vmrr.update_end(suspend_by - suspendtime)
+        
+        mappings = vmrr.nodes
+        suspres = {}
+        for n in mappings.values():
+            r = ds.ResourceTuple.create_empty()
+            r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
+            r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
+            suspres[n] = r
+        
+        susprr = ds.SuspensionResourceReservation(vmrr.lease, suspend_by - suspendtime, suspend_by, suspres, vmrr)
+        susprr.state = ResourceReservation.STATE_SCHEDULED
+        
+        vmrr.susp_rrs.append(susprr)        
+
+    def __estimate_suspend_resume_time(self, lease):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        rate = self.resourcepool.info.get_suspendresume_rate()
+        time = float(lease.requested_resources.get_by_type(constants.RES_MEM)) / rate
+        time = round_datetime_delta(TimeDelta(seconds = time))
+        return time
+
+    def __estimate_suspend_time(self, lease):
+        return self.__estimate_suspend_resume_time(lease)
+
+    def __estimate_resume_time(self, lease):
+        return self.__estimate_suspend_resume_time(lease)
+
+
+    def __estimate_migration_time(self, lease):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        whattomigrate = config.get("what-to-migrate")
+        bandwidth = self.resourcepool.info.get_migration_bandwidth()
+        if whattomigrate == constants.MIGRATE_NONE:
+            return TimeDelta(seconds=0)
+        else:
+            if whattomigrate == constants.MIGRATE_MEM:
+                mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
+            elif whattomigrate == constants.MIGRATE_MEMDISK:
+                mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
+            return estimate_transfer_time(mbtotransfer, bandwidth)
+
+    # TODO: Take into account other things like boot overhead, migration overhead, etc.
+    def __compute_scheduling_threshold(self, lease):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        threshold = config.get("force-scheduling-threshold")
+        if threshold != None:
+            # If there is a hard-coded threshold, use that
+            return threshold
+        else:
+            factor = config.get("scheduling-threshold-factor")
+            susp_overhead = self.__estimate_suspend_time(lease)
+            safe_duration = susp_overhead
+            
+            if lease.state == Lease.STATE_SUSPENDED:
+                resm_overhead = self.__estimate_resume_time(lease)
+                safe_duration += resm_overhead
+            
+            # TODO: Incorporate other overheads into the minimum duration
+            min_duration = safe_duration
+            
+            # At the very least, we want to allocate enough time for the
+            # safe duration (otherwise, we'll end up with incorrect schedules,
+            # where a lease is scheduled to suspend, but isn't even allocated
+            # enough time to suspend). 
+            # The factor is assumed to be non-negative. i.e., a factor of 0
+            # means we only allocate enough time for potential suspend/resume
+            # operations, while a factor of 1 means the lease will get as much
+            # running time as spend on the runtime overheads involved in setting
+            # it up
+            threshold = safe_duration + (min_duration * factor)
+            return threshold
+
     def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
         # TODO2: Choose appropriate prioritizing function based on a
         # config file, instead of hardcoding it)
@@ -981,85 +971,224 @@
         # Order nodes
         nodes.sort(comparenodes)
         return nodes        
+
+    def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
+        def comparepreemptability(rrX, rrY):
+            if rrX.lease.submit_time > rrY.lease.submit_time:
+                return constants.BETTER
+            elif rrX.lease.submit_time < rrY.lease.submit_time:
+                return constants.WORSE
+            else:
+                return constants.EQUAL        
+            
+        def preemptedEnough(amountToPreempt):
+            for node in amountToPreempt:
+                if not amountToPreempt[node].is_zero_or_less():
+                    return False
+            return True
         
-    def preempt(self, req, time):
-        self.logger.info("Preempting lease #%i..." % (req.id))
+        # Get allocations at the specified time
+        atstart = set()
+        atmiddle = set()
+        nodes = set(mustpreempt.keys())
+        
+        reservationsAtStart = self.slottable.getReservationsAt(startTime)
+        reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+        reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtStart.sort(comparepreemptability)
+        reservationsAtMiddle.sort(comparepreemptability)
+        
+        amountToPreempt = {}
+        for n in mustpreempt:
+            amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+
+        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+        for r in reservationsAtStart:
+            # The following will really only come into play when we have
+            # multiple VMs per node
+            mustpreemptres = False
+            for n in r.resources_in_pnode.keys():
+                # Don't need to preempt if we've already preempted all
+                # the needed resources in node n
+                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                    amountToPreempt[n].decr(r.resources_in_pnode[n])
+                    mustpreemptres = True
+            if mustpreemptres:
+                atstart.add(r)
+            if preemptedEnough(amountToPreempt):
+                break
+        
+        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+        if len(reservationsAtMiddle)>0:
+            changepoints = set()
+            for r in reservationsAtMiddle:
+                changepoints.add(r.start)
+            changepoints = list(changepoints)
+            changepoints.sort()        
+            
+            for cp in changepoints:
+                amountToPreempt = {}
+                for n in mustpreempt:
+                    amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+                reservations = [r for r in reservationsAtMiddle 
+                                if r.start <= cp and cp < r.end]
+                for r in reservations:
+                    mustpreemptres = False
+                    for n in r.resources_in_pnode.keys():
+                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                            amountToPreempt[n].decr(r.resources_in_pnode[n])
+                            mustpreemptres = True
+                    if mustpreemptres:
+                        atmiddle.add(r)
+                    if preemptedEnough(amountToPreempt):
+                        break
+            
+        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+        
+        leases = [r.lease for r in atstart|atmiddle]
+        
+        return leases
+        
+    def __preempt(self, lease, preemption_time):
+        self.logger.info("Preempting lease #%i..." % (lease.id))
         self.logger.vdebug("Lease before preemption:")
-        req.print_contents()
-        vmrr, susprr  = req.get_last_vmrr()
+        lease.print_contents()
+        vmrr = lease.get_last_vmrr()
         suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
         
-        if vmrr.state == constants.RES_STATE_SCHEDULED and vmrr.start >= time:
-            self.logger.info("... lease #%i has been cancelled and requeued." % req.id)
+        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
+            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
             self.logger.debug("Lease was set to start in the middle of the preempting lease.")
-            req.state = constants.LEASE_STATE_PENDING
+            lease.state = Lease.STATE_PENDING
             if vmrr.backfill_reservation == True:
                 self.numbesteffortres -= 1
-            req.remove_rr(vmrr)
+            lease.remove_rr(vmrr)
             self.slottable.removeReservation(vmrr)
-            if susprr != None:
-                req.remove_rr(susprr)
-                self.slottable.removeReservation(susprr)
-            for vnode, pnode in req.vmimagemap.items():
-                self.resourcepool.remove_diskimage(pnode, req.id, vnode)
-            self.deployment.cancel_deployment(req)
-            req.vmimagemap = {}
-            self.scheduledleases.remove(req)
-            self.queue.enqueue_in_order(req)
-            self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
+           # if susprr != None:
+           #     lease.remove_rr(susprr)
+           #     self.slottable.removeReservation(susprr)
+            for vnode, pnode in lease.vmimagemap.items():
+                self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+            self.deployment.cancel_deployment(lease)
+            lease.vmimagemap = {}
+            # TODO: Change state back to queued
+            self.queue.enqueue_in_order(lease)
+            self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
         else:
             susptype = self.rm.config.get("suspension")
-            timebeforesuspend = time - vmrr.start
+            timebeforesuspend = preemption_time - vmrr.start
             # TODO: Determine if it is in fact the initial VMRR or not. Right now
             # we conservatively overestimate
             canmigrate = self.rm.config.get("migration")
-            suspendthreshold = req.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
+            suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
             # We can't suspend if we're under the suspend threshold
             suspendable = timebeforesuspend >= suspendthreshold
-            if suspendable and (susptype == constants.SUSPENSION_ALL or (req.numnodes == 1 and susptype == constants.SUSPENSION_SERIAL)):
-                self.logger.info("... lease #%i will be suspended at %s." % (req.id, time))
-                self.slottable.suspend(req, time)
+            if suspendable and (susptype == constants.SUSPENSION_ALL or (lease.numnodes == 1 and susptype == constants.SUSPENSION_SERIAL)):
+                self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
+                # Careful: VMRR update,etc. will have to be done here
+                self.__schedule_suspension(lease, preemption_time)
             else:
-                self.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % req.id)
-                req.state = constants.LEASE_STATE_PENDING
+                self.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % lease.id)
+                lease.state = Lease.STATE_PENDING
                 if vmrr.backfill_reservation == True:
                     self.numbesteffortres -= 1
-                req.remove_rr(vmrr)
+                lease.remove_rr(vmrr)
                 self.slottable.removeReservation(vmrr)
-                if susprr != None:
-                    req.remove_rr(susprr)
-                    self.slottable.removeReservation(susprr)
-                if req.state == constants.LEASE_STATE_SUSPENDED:
-                    resmrr = req.prev_rr(vmrr)
-                    req.remove_rr(resmrr)
+                #if susprr != None:
+                #    lease.remove_rr(susprr)
+                #    self.slottable.removeReservation(susprr)
+                if lease.state == Lease.STATE_SUSPENDED:
+                    resmrr = lease.prev_rr(vmrr)
+                    lease.remove_rr(resmrr)
                     self.slottable.removeReservation(resmrr)
-                for vnode, pnode in req.vmimagemap.items():
-                    self.resourcepool.remove_diskimage(pnode, req.id, vnode)
-                self.deployment.cancel_deployment(req)
-                req.vmimagemap = {}
-                self.scheduledleases.remove(req)
-                self.queue.enqueue_in_order(req)
-                self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
+                for vnode, pnode in lease.vmimagemap.items():
+                    self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+                self.deployment.cancel_deployment(lease)
+                lease.vmimagemap = {}
+                # TODO: Change state back to queued
+                self.queue.enqueue_in_order(lease)
+                self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
         self.logger.vdebug("Lease after preemption:")
-        req.print_contents()
+        lease.print_contents()
         
-    def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+    def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
         self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime)) 
-        leases = self.scheduledleases.getNextLeasesScheduledInNodes(nexttime, nodes)
+        leases = []
+        # TODO: "getNextLeasesScheduledInNodes" has to be moved to the slot table
+        #leases = self.scheduledleases.getNextLeasesScheduledInNodes(nexttime, nodes)
         leases = [l for l in leases if isinstance(l, ds.BestEffortLease) and not l in checkedleases]
-        for l in leases:
+        for lease in leases:
             self.logger.debug("Found lease %i" % l.id)
             l.print_contents()
             # Earliest time can't be earlier than time when images will be
             # available in node
-            earliest = max(nexttime, l.imagesavail)
-            self.slottable.slideback(l, earliest)
+            earliest = max(nexttime, lease.imagesavail)
+            self.__slideback(lease, earliest)
             checkedleases.append(l)
         #for l in leases:
         #    vmrr, susprr = l.getLastVMRR()
         #    self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
           
-            
+    def __slideback(self, lease, earliest):
+        pass
+#        (vmrr, susprr) = lease.get_last_vmrr()
+#        vmrrnew = copy.copy(vmrr)
+#        nodes = vmrrnew.nodes.values()
+#        if lease.state == Lease.LEASE_STATE_SUSPENDED:
+#            resmrr = lease.prev_rr(vmrr)
+#            originalstart = resmrr.start
+#        else:
+#            resmrr = None
+#            originalstart = vmrrnew.start
+#        cp = self.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+#        cp = [earliest] + cp
+#        newstart = None
+#        for p in cp:
+#            self.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+#            self.availabilitywindow.printContents()
+#            if self.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+#                (end, canfit) = self.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+#                if end == originalstart and set(nodes) <= set(canfit.keys()):
+#                    self.logger.debug("Can slide back to %s" % p)
+#                    newstart = p
+#                    break
+#        if newstart == None:
+#            # Can't slide back. Leave as is.
+#            pass
+#        else:
+#            diff = originalstart - newstart
+#            if resmrr != None:
+#                resmrrnew = copy.copy(resmrr)
+#                resmrrnew.start -= diff
+#                resmrrnew.end -= diff
+#                self.updateReservationWithKeyChange(resmrr, resmrrnew)
+#            vmrrnew.start -= diff
+#            
+#            # If the lease was going to be suspended, check to see if
+#            # we don't need to suspend any more.
+#            remdur = lease.duration.get_remaining_duration()
+#            if susprr != None and vmrrnew.end - newstart >= remdur: 
+#                vmrrnew.end = vmrrnew.start + remdur
+#                #vmrrnew.oncomplete = constants.ONCOMPLETE_ENDLEASE
+#                lease.remove_rr(susprr)
+#                self.removeReservation(susprr)
+#            else:
+#                vmrrnew.end -= diff
+#            # ONLY for simulation
+#            if vmrrnew.prematureend != None:
+#                vmrrnew.prematureend -= diff
+#            self.updateReservationWithKeyChange(vmrr, vmrrnew)
+#            self.dirty()
+#            self.logger.vdebug("New lease descriptor (after slideback):")
+#            lease.print_contents()
+    
+          
 
     #-------------------------------------------------------------------#
     #                                                                   #
@@ -1070,9 +1199,9 @@
     def _handle_start_vm(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
         l.print_contents()
-        if l.state == constants.LEASE_STATE_DEPLOYED:
-            l.state = constants.LEASE_STATE_ACTIVE
-            rr.state = constants.RES_STATE_ACTIVE
+        if l.state == Lease.STATE_READY:
+            l.state = Lease.STATE_ACTIVE
+            rr.state = ResourceReservation.STATE_ACTIVE
             now_time = self.rm.clock.get_time()
             l.start.actual = now_time
             
@@ -1082,13 +1211,13 @@
                 # The next two lines have to be moved somewhere more
                 # appropriate inside the resourcepool module
                 for (vnode, pnode) in rr.nodes.items():
-                    l.vmimagemap[vnode] = pnode
+                    l.diskimagemap[vnode] = pnode
             except Exception, e:
                 self.logger.error("ERROR when starting VMs.")
                 raise
-        elif l.state == constants.LEASE_STATE_SUSPENDED:
-            l.state = constants.LEASE_STATE_ACTIVE
-            rr.state = constants.RES_STATE_ACTIVE
+        elif l.state == Lease.STATE_RESUMED_READY:
+            l.state = Lease.STATE_ACTIVE
+            rr.state = ResourceReservation.STATE_ACTIVE
             # No enactment to do here, since all the suspend/resume actions are
             # handled during the suspend/resume RRs
         l.print_contents()
@@ -1106,14 +1235,14 @@
         now_time = self.rm.clock.get_time()
         diff = now_time - rr.start
         l.duration.accumulate_duration(diff)
-        rr.state = constants.RES_STATE_DONE
-        if rr.oncomplete == constants.ONCOMPLETE_ENDLEASE:
+        rr.state = ResourceReservation.STATE_DONE
+        if not rr.is_suspending():
             self.resourcepool.stop_vms(l, rr)
-            l.state = constants.LEASE_STATE_DONE
+            l.state = Lease.STATE_DONE
             l.duration.actual = l.duration.accumulated
             l.end = now_time
             self.completedleases.add(l)
-            self.scheduledleases.remove(l)
+            self.leases.remove(l)
             self.deployment.cleanup(l, rr)
             if isinstance(l, ds.BestEffortLease):
                 self.rm.accounting.incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
@@ -1130,30 +1259,29 @@
     def _handle_unscheduled_end_vm(self, l, rr, enact=False):
         self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
         self._handle_end_rr(l, rr)
-        if rr.oncomplete == constants.ONCOMPLETE_SUSPEND:
+        if rr.is_suspending():
             rrs = l.next_rrs(rr)
             for r in rrs:
                 l.remove_rr(r)
                 self.slottable.removeReservation(r)
-        rr.oncomplete = constants.ONCOMPLETE_ENDLEASE
         rr.end = self.rm.clock.get_time()
         self._handle_end_vm(l, rr, enact=enact)
         nexttime = self.rm.clock.get_next_schedulable_time()
         if self.is_backfilling():
             # We need to reevaluate the schedule to see if there are any future
             # reservations that we can slide back.
-            self.reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
+            self.__reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
 
     def _handle_start_suspend(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
         l.print_contents()
-        rr.state = constants.RES_STATE_ACTIVE
+        rr.state = ResourceReservation.STATE_ACTIVE
         self.resourcepool.suspend_vms(l, rr)
-        for vnode, pnode in rr.nodes.items():
+        for vnode, pnode in rr.vmrr.nodes.items():
             self.resourcepool.add_ramfile(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
             l.memimagemap[vnode] = pnode
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
+        self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
         self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
         self.logger.info("Suspending lease %i..." % (l.id))
 
@@ -1162,13 +1290,12 @@
         l.print_contents()
         # TODO: React to incomplete suspend
         self.resourcepool.verify_suspend(l, rr)
-        rr.state = constants.RES_STATE_DONE
-        l.state = constants.LEASE_STATE_SUSPENDED
-        self.scheduledleases.remove(l)
+        rr.state = ResourceReservation.STATE_DONE
+        l.state = Lease.STATE_SUSPENDED
         self.queue.enqueue_in_order(l)
         self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, l.id)
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
+        self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_IDLE, l.id)
         self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
         self.logger.info("Lease %i suspended." % (l.id))
 
@@ -1176,9 +1303,9 @@
         self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
         l.print_contents()
         self.resourcepool.resume_vms(l, rr)
-        rr.state = constants.RES_STATE_ACTIVE
+        rr.state = ResourceReservation.STATE_ACTIVE
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME, l.id)
+        self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_VM_RESUME, l.id)
         self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
         self.logger.info("Resuming lease %i..." % (l.id))
 
@@ -1187,14 +1314,46 @@
         l.print_contents()
         # TODO: React to incomplete resume
         self.resourcepool.verify_resume(l, rr)
-        rr.state = constants.RES_STATE_DONE
-        for vnode, pnode in rr.nodes.items():
+        rr.state = ResourceReservation.STATE_DONE
+        for vnode, pnode in rr.vmrr.nodes.items():
             self.resourcepool.remove_ramfile(pnode, l.id, vnode)
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
+        self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_IDLE, l.id)
         self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
         self.logger.info("Resumed lease %i" % (l.id))
 
+    def _handle_start_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+        l.print_contents()
+
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+        self.logger.info("Migrating lease %i..." % (l.id))
+
+    def _handle_end_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+        l.print_contents()
+
+#        if lease.state == Lease.STATE_SUSPENDED:
+#            # Update VM image mappings, since we might be resuming
+#            # in different nodes.
+#            for vnode, pnode in lease.vmimagemap.items():
+#                self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+#            lease.vmimagemap = vmrr.nodes
+#            for vnode, pnode in lease.vmimagemap.items():
+#                self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
+#            
+#            # Update RAM file mappings
+#            for vnode, pnode in lease.memimagemap.items():
+#                self.resourcepool.remove_ramfile(pnode, lease.id, vnode)
+#            for vnode, pnode in vmrr.nodes.items():
+#                self.resourcepool.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
+#                lease.memimagemap[vnode] = pnode
+
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+        self.logger.info("Migrated lease %i..." % (l.id))
+
     def _handle_end_rr(self, l, rr):
         self.slottable.removeReservation(rr)
 
@@ -1207,7 +1366,7 @@
     
     
         
-    def canReserveBestEffort(self):
+    def __can_reserve_besteffort_in_future(self):
         return self.numbesteffortres < self.maxres
     
                         

Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py	2008-09-11 16:41:18 UTC (rev 490)
+++ trunk/src/haizea/resourcemanager/slottable.py	2008-09-11 16:47:36 UTC (rev 491)
@@ -20,7 +20,6 @@
 from operator import attrgetter, itemgetter
 import haizea.common.constants as constants
 import haizea.resourcemanager.datastruct as ds
-from haizea.common.utils import roundDateTimeDelta
 import bisect
 import copy
 import logging
@@ -96,6 +95,9 @@
     def add_node(self, resourcepoolnode):
         self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
 
+    def is_empty(self):
+        return (len(self.reservationsByStart) == 0)
+
     def dirty(self):
         # You're a dirty, dirty slot table and you should be
         # ashamed of having outdated caches!
@@ -154,14 +156,28 @@
         res = bystart & byend
         return list(res)
     
-    def getReservationsStartingBetween(self, start, end):
+    def get_reservations_starting_between(self, start, end):
         startitem = KeyValueWrapper(start, None)
         enditem = KeyValueWrapper(end, None)
         startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+        endpos = bisect.bisect_right(self.reservationsByStart, enditem)
+        res = [x.value for x in self.reservationsByStart[startpos:endpos]]
+        return res
+
+    def get_reservations_ending_between(self, start, end):
+        startitem = KeyValueWrapper(start, None)
+        enditem = KeyValueWrapper(end, None)
+        startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
         endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
         res = [x.value for x in self.reservationsByStart[startpos:endpos]]
         return res
     
+    def get_reservations_starting_at(self, time):
+        return self.get_reservations_starting_between(time, time)
+
+    def get_reservations_ending_at(self, time):
+        return self.get_reservations_ending_between(time, time)
+    
     # ONLY for simulation
     def getNextPrematureEnd(self, after):
         # Inefficient, but ok since this query seldom happens
@@ -181,7 +197,7 @@
     def getReservationsWithChangePointsAfter(self, after):
         item = KeyValueWrapper(after, None)
         startpos = bisect.bisect_right(self.reservationsByStart, item)
-        bystart = set([x.value for x in self.reservationsByStart[startpos:]])
+        bystart = set([x.value for x in self.reservationsByStart[:startpos]])
         endpos = bisect.bisect_right(self.reservationsByEnd, item)
         byend = set([x.value for x in self.reservationsByEnd[endpos:]])
         res = bystart | byend
@@ -267,94 +283,7 @@
         if p != None:
             self.changepointcache.pop()
         return p
-    
-
-    def suspend(self, lease, time):
-        suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
         
-        (vmrr, susprr) = lease.get_last_vmrr()
-        vmrrnew = copy.copy(vmrr)
-        
-        suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
-        vmrrnew.end = time - suspendtime
-            
-        vmrrnew.oncomplete = constants.ONCOMPLETE_SUSPEND
-
-        self.updateReservationWithKeyChange(vmrr, vmrrnew)
-       
-        if susprr != None:
-            lease.remove_rr(susprr)
-            self.removeReservation(susprr)
-        
-        mappings = vmrr.nodes
-        suspres = {}
-        for n in mappings.values():
-            r = ds.ResourceTuple.create_empty()
-            r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
-            r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
-            suspres[n] = r
-        
-        newsusprr = ds.SuspensionResourceReservation(lease, time - suspendtime, time, suspres, mappings)
-        newsusprr.state = constants.RES_STATE_SCHEDULED
-        lease.append_rr(newsusprr)
-        self.addReservation(newsusprr)
-        
-
-    def slideback(self, lease, earliest):
-        (vmrr, susprr) = lease.get_last_vmrr()
-        vmrrnew = copy.copy(vmrr)
-        nodes = vmrrnew.nodes.values()
-        if lease.state == constants.LEASE_STATE_SUSPENDED:
-            resmrr = lease.prev_rr(vmrr)
-            originalstart = resmrr.start
-        else:
-            resmrr = None
-            originalstart = vmrrnew.start
-        cp = self.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
-        cp = [earliest] + cp
-        newstart = None
-        for p in cp:
-            self.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
-            self.availabilitywindow.printContents()
-            if self.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
-                (end, canfit) = self.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
-                if end == originalstart and set(nodes) <= set(canfit.keys()):
-                    self.logger.debug("Can slide back to %s" % p)
-                    newstart = p
-                    break
-        if newstart == None:
-            # Can't slide back. Leave as is.
-            pass
-        else:
-            diff = originalstart - newstart
-            if resmrr != None:
-                resmrrnew = copy.copy(resmrr)
-                resmrrnew.start -= diff
-                resmrrnew.end -= diff
-                self.updateReservationWithKeyChange(resmrr, resmrrnew)
-            vmrrnew.start -= diff
-            
-            # If the lease was going to be suspended, check to see if
-            # we don't need to suspend any more.
-            remdur = lease.duration.get_remaining_duration()
-            if susprr != None and vmrrnew.end - newstart >= remdur: 
-                vmrrnew.end = vmrrnew.start + remdur
-                vmrrnew.oncomplete = constants.ONCOMPLETE_ENDLEASE
-                lease.remove_rr(susprr)
-                self.removeReservation(susprr)
-            else:
-                vmrrnew.end -= diff
-            # ONLY for simulation
-            if vmrrnew.prematureend != None:
-                vmrrnew.prematureend -= diff
-            self.updateReservationWithKeyChange(vmrr, vmrrnew)
-            self.dirty()
-            self.logger.vdebug("New lease descriptor (after slideback):")
-            lease.print_contents()
-
-
-
-        
     def isFull(self, time):
         nodes = self.getAvailability(time)
         avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])



More information about the Haizea-commit mailing list