[haizea-commit] r539 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Thu Nov 6 17:54:18 CST 2008


Author: borja
Date: 2008-11-06 17:54:13 -0600 (Thu, 06 Nov 2008)
New Revision: 539

Modified:
   trunk/src/haizea/resourcemanager/configfile.py
   trunk/src/haizea/resourcemanager/datastruct.py
   trunk/src/haizea/resourcemanager/scheduler.py
Log:
Added ShutdownResource Reservation to explicitly schedule shutdowns.

Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py	2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/configfile.py	2008-11-06 23:54:13 UTC (rev 539)
@@ -286,6 +286,42 @@
             The default should be good for most configurations, but
             may need to be increased if you're dealing with exceptionally
             high loads.                
+            """),
+
+     Option(name        = "shutdown-time",
+            getter      = "shutdown-time",
+            type        = OPTTYPE_TIMEDELTA,
+            required    = False,
+            default     = TimeDelta(seconds=0),
+            doc         = """
+            The amount of time that will be allocated for a VM to shutdown.
+            When running in OpenNebula mode, it is advisable to set this to
+            a few seconds, so no operation gets scheduled right when a
+            VM is shutting down. The most common scenario is that a VM
+            will start resuming right when another VM shuts down. However,
+            since both these activities involve I/O, it can delay the resume
+            operation and affect Haizea's estimation of how long the resume
+            will take.
+            """),
+
+     Option(name        = "enactment-overhead",
+            getter      = "enactment-overhead",
+            type        = OPTTYPE_TIMEDELTA,
+            required    = False,
+            default     = TimeDelta(seconds=0),
+            doc         = """
+            The amount of time that is required to send
+            an enactment command. This value will affect suspend/resume
+            estimations and, in OpenNebula mode, will force a pause
+            of this much time between suspend/resume enactment
+            commands. When suspending/resuming many VMs at the same time
+            (which is likely to happen if suspendresume-exclusion is set
+            to "local"), it will take OpenNebula 1-2 seconds to process
+            each command (this is a small amount of time, but if 32 VMs
+            are being suspended at the same time, on in each physical node,
+            this time can compound up to 32-64 seconds, which has to be
+            taken into account when estimating when to start a suspend
+            operation that must be completed before another lease starts).
             """)
 
     ]

Modified: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py	2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/datastruct.py	2008-11-06 23:54:13 UTC (rev 539)
@@ -211,7 +211,8 @@
                  realdur = None):
         start = Timestamp(start)
         duration = Duration(duration)
-        duration.known = realdur # ONLY for simulation
+        if realdur != duration.requested:
+            duration.known = realdur # ONLY for simulation
         Lease.__init__(self, submit_time, start, duration, diskimage_id,
                            diskimage_size, numnodes, resreq, preemptible)
         
@@ -236,7 +237,8 @@
                  realdur = None):
         start = Timestamp(None) # i.e., start on a best-effort basis
         duration = Duration(duration)
-        duration.known = realdur # ONLY for simulation
+        if realdur != duration.requested:
+            duration.known = realdur # ONLY for simulation
         # When the images will be available
         self.imagesavail = None        
         Lease.__init__(self, submit_time, start, duration, diskimage_id,
@@ -368,6 +370,9 @@
     def is_suspending(self):
         return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
 
+    def is_shutting_down(self):
+        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
+
     def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
         for resmrr in self.pre_rrs:
             resmrr.print_contents(loglevel)
@@ -444,7 +449,25 @@
         rr = ResourceReservation.xmlrpc_marshall(self)
         rr["type"] = "RESM"
         return rr
+    
+class ShutdownResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vnodes, vmrr):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.vnodes = vnodes
 
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : SHUTDOWN")
+        ResourceReservation.print_contents(self, loglevel)
+        
+    def is_preemptible(self):
+        return True        
+        
+    def xmlrpc_marshall(self):
+        rr = ResourceReservation.xmlrpc_marshall(self)
+        rr["type"] = "SHTD"
+        return rr
+
 class MigrationResourceReservation(ResourceReservation):
     def __init__(self, lease, start, end, res, vmrr):
         ResourceReservation.__init__(self, lease, start, end, res)

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-11-06 23:54:13 UTC (rev 539)
@@ -119,6 +119,10 @@
                               on_start = Scheduler._handle_start_vm,
                               on_end   = Scheduler._handle_end_vm)
 
+        self.register_handler(type     = ds.ShutdownResourceReservation, 
+                              on_start = Scheduler._handle_start_shutdown,
+                              on_end   = Scheduler._handle_end_shutdown)
+
         self.register_handler(type     = ds.SuspensionResourceReservation, 
                               on_start = Scheduler._handle_start_suspend,
                               on_end   = Scheduler._handle_end_suspend)
@@ -299,6 +303,9 @@
             self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
         else:
             self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
+            lease_req.state = Lease.STATE_REJECTED
+            self.completedleases.add(lease_req)
+            self.leases.remove(lease_req)
         
         
     def __process_queue(self, nexttime):
@@ -347,21 +354,15 @@
     
     
     def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
-        start = lease_req.start.requested
-        end = lease_req.start.requested + lease_req.duration.requested
         try:
-            (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+            (vmrr, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
             
             if len(preemptions) > 0:
-                leases = self.__find_preemptable_leases(preemptions, start, end)
+                leases = self.__find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
                 self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
                 for lease in leases:
-                    self.__preempt(lease, preemption_time=start)
+                    self.__preempt(lease, preemption_time=vmrr.start)
 
-            # Create VM resource reservations
-            vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, False)
-            vmrr.state = ResourceReservation.STATE_SCHEDULED
-
             # Schedule deployment overhead
             self.deployment_scheduler.schedule(lease_req, vmrr, nexttime)
             
@@ -370,6 +371,10 @@
             # scheduling could still throw an exception)
             lease_req.append_vmrr(vmrr)
             self.slottable.addReservation(vmrr)
+            
+            # Post-VM RRs (if any)
+            for rr in vmrr.post_rrs:
+                self.slottable.addReservation(rr)
         except Exception, msg:
             raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
 
@@ -402,7 +407,7 @@
             
             # TODO: migrations
             
-            # Post-VM RRs (if any)
+            # Pre-VM RRs (if any)
             for rr in vmrr.pre_rrs:
                 self.slottable.addReservation(rr)
                 
@@ -426,12 +431,16 @@
         
     def __schedule_immediate_lease(self, req, nexttime):
         try:
-            (resmrr, vmrr, susprr, reservation) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+            (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
             # Schedule deployment
             self.deployment_scheduler.schedule(req, vmrr, nexttime)
                         
             req.append_rr(vmrr)
             self.slottable.addReservation(vmrr)
+            
+            # Post-VM RRs (if any)
+            for rr in vmrr.post_rrs:
+                self.slottable.addReservation(rr)
                     
             req.print_contents()
         except SlotFittingException, msg:
@@ -440,7 +449,7 @@
     def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
         lease_id = leasereq.id
         start = leasereq.start.requested
-        end = leasereq.start.requested + leasereq.duration.requested
+        end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
         diskImageID = leasereq.diskimage_id
         numnodes = leasereq.numnodes
         resreq = leasereq.requested_resources
@@ -560,10 +569,16 @@
                     break
 
         if vnode <= numnodes:
-            raise SchedException, "Availability window indicated that request but feasible, but could not fit it"
+            raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
 
-        return nodeassignment, res, preemptions
+        # Create VM resource reservations
+        vmrr = ds.VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
+        vmrr.state = ResourceReservation.STATE_SCHEDULED
 
+        self.__schedule_shutdown(vmrr)
+
+        return vmrr, preemptions
+
     def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
         lease_id = lease.id
         remaining_duration = lease.duration.get_remaining_duration()
@@ -571,6 +586,7 @@
         requested_resources = lease.requested_resources
         preemptible = lease.preemptible
         mustresume = (lease.state == Lease.STATE_SUSPENDED)
+        shutdown_time = self.__estimate_shutdown_time(lease)
         susptype = get_config().get("suspension")
         if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
             suspendable = False
@@ -657,6 +673,7 @@
         else:
             duration = remaining_duration
 
+        duration += shutdown_time
 
         # First, assuming we can't make reservations in the future
         start, end, canfit = self.__find_fit_at_points(
@@ -745,10 +762,13 @@
         mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
         if mustsuspend:
             self.__schedule_suspension(vmrr, end)
+        else:
+            # Compensate for any overestimation
+            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
+                vmrr.end = vmrr.start + remaining_duration + shutdown_time
+            self.__schedule_shutdown(vmrr)
         
-        # Compensate for any overestimation
-        if (vmrr.end - vmrr.start) > remaining_duration:
-            vmrr.end = vmrr.start + remaining_duration
+
         
         susp_str = res_str = ""
         if mustresume:
@@ -849,39 +869,25 @@
         
         return times
 
+    def __schedule_shutdown(self, vmrr):
+        config = get_config()
+        shutdown_time = config.get("shutdown-time")
 
-    def __schedule_resumption(self, vmrr, resume_at):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        resm_exclusion = config.get("suspendresume-exclusion")        
-        rate = self.resourcepool.info.get_suspendresume_rate()
+        start = vmrr.end - shutdown_time
+        end = vmrr.end
+        
+        shutdown_rr = ds.ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
+        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
+                
+        vmrr.update_end(start)
+        
+        # If there are any post RRs, remove them
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
 
-        if resume_at < vmrr.start or resume_at > vmrr.end:
-            raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+        vmrr.post_rrs.append(shutdown_rr)
 
-        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate)
-        resume_rrs = []
-        for (start, end, pnode, vnodes) in times:
-            r = ds.ResourceTuple.create_empty()
-            mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-            r.set_by_type(constants.RES_MEM, mem)
-            r.set_by_type(constants.RES_DISK, mem)
-            resmres = {pnode: r}
-            resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, vnodes, vmrr)
-            resmrr.state = ResourceReservation.STATE_SCHEDULED
-            resume_rrs.append(resmrr)
-                
-        resume_rrs.sort(key=attrgetter("start"))
-            
-        resm_end = resume_rrs[-1].end
-        if resm_end > vmrr.end:
-            raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
-        
-        vmrr.update_start(resm_end)
-        for resmrr in resume_rrs:
-            vmrr.pre_rrs.append(resmrr)        
-           
-    
     def __schedule_suspension(self, vmrr, suspend_by):
         from haizea.resourcemanager.rm import ResourceManager
         config = ResourceManager.get_singleton().config
@@ -911,24 +917,55 @@
         
         vmrr.update_end(susp_start)
         
-        # If we're already suspending, remove previous susprrs
-        if vmrr.is_suspending():
-            for susprr in vmrr.post_rrs:
-                self.slottable.removeReservation(susprr)
-            vmrr.post_rrs = []
+        # If there are any post RRs, remove them
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
 
         for susprr in suspend_rrs:
-            vmrr.post_rrs.append(susprr)        
+            vmrr.post_rrs.append(susprr)       
+            
+    def __schedule_resumption(self, vmrr, resume_at):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        resm_exclusion = config.get("suspendresume-exclusion")        
+        rate = self.resourcepool.info.get_suspendresume_rate()
 
+        if resume_at < vmrr.start or resume_at > vmrr.end:
+            raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+
+        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate)
+        resume_rrs = []
+        for (start, end, pnode, vnodes) in times:
+            r = ds.ResourceTuple.create_empty()
+            mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+            r.set_by_type(constants.RES_MEM, mem)
+            r.set_by_type(constants.RES_DISK, mem)
+            resmres = {pnode: r}
+            resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, vnodes, vmrr)
+            resmrr.state = ResourceReservation.STATE_SCHEDULED
+            resume_rrs.append(resmrr)
+                
+        resume_rrs.sort(key=attrgetter("start"))
+            
+        resm_end = resume_rrs[-1].end
+        if resm_end > vmrr.end:
+            raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+        
+        vmrr.update_start(resm_end)
+        for resmrr in resume_rrs:
+            vmrr.pre_rrs.append(resmrr)        
+           
+    
+ 
+
     def __compute_suspend_resume_time(self, mem, rate):
         time = float(mem) / rate
         time = round_datetime_delta(TimeDelta(seconds = time))
         return time
     
     def __estimate_suspend_resume_time(self, lease):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        susp_exclusion = config.get("suspendresume-exclusion")        
+        susp_exclusion = get_config().get("suspendresume-exclusion")        
         rate = self.resourcepool.info.get_suspendresume_rate()
         mem = lease.requested_resources.get_by_type(constants.RES_MEM)
         if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
@@ -937,6 +974,10 @@
             # Overestimating
             return lease.numnodes * self.__compute_suspend_resume_time(mem, rate)
 
+    def __estimate_shutdown_time(self, lease):
+        # Always uses fixed value in configuration file
+        return get_config().get("shutdown-time")
+
     def __estimate_suspend_time(self, lease):
         return self.__estimate_suspend_resume_time(lease)
 
@@ -1201,9 +1242,9 @@
             self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
             if vmrr.backfill_reservation == True:
                 self.numbesteffortres -= 1
-            if vmrr.is_suspending():
-                for susprr in vmrr.post_rrs:
-                    self.slottable.removeReservation(susprr)
+            # If there are any post RRs, remove them
+            for rr in vmrr.post_rrs:
+                self.slottable.removeReservation(rr)
             lease.remove_vmrr(vmrr)
             self.slottable.removeReservation(vmrr)
             for vnode, pnode in lease.diskimagemap.items():
@@ -1291,6 +1332,15 @@
                 vmrr.post_rrs = []
             else:
                 vmrr.update_end(vmrr.end - diff)
+                
+            if not vmrr.is_suspending():
+                # If the VM was set to shutdown, we need to slideback the shutdown RRs
+                for rr in vmrr.post_rrs:
+                    rr_old_start = rr.start
+                    rr_old_end = rr.end
+                    rr.start -= diff
+                    rr.end -= diff
+                    self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
 
             self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
             self.logger.vdebug("New lease descriptor (after slideback):")
@@ -1332,10 +1382,8 @@
         self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
         self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
 
-    # TODO: Replace enact with a saner way of handling leases that have failed or
-    #       ended prematurely.
-    #       Possibly factor out the "clean up" code to a separate function
-    def _handle_end_vm(self, l, rr, enact=True):
+
+    def _handle_end_vm(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
         self.logger.vdebug("LEASE-%i Before:" % l.id)
         l.print_contents()
@@ -1343,19 +1391,11 @@
         diff = now_time - rr.start
         l.duration.accumulate_duration(diff)
         rr.state = ResourceReservation.STATE_DONE
-        if not rr.is_suspending():
-            self.resourcepool.stop_vms(l, rr)
-            l.state = Lease.STATE_DONE
-            l.duration.actual = l.duration.accumulated
-            l.end = now_time
-            self.completedleases.add(l)
-            self.leases.remove(l)
-            if isinstance(l, ds.BestEffortLease):
-                get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
        
         if isinstance(l, ds.BestEffortLease):
             if rr.backfill_reservation == True:
                 self.numbesteffortres -= 1
+                
         self.logger.vdebug("LEASE-%i After:" % l.id)
         l.print_contents()
         self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
@@ -1364,17 +1404,46 @@
     def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
         self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
         self._handle_end_rr(l, vmrr)
-        if vmrr.is_suspending():
-            for susprr in vmrr.post_rrs:
-                self.slottable.removeReservation(susprr)
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
+        # TODO: slideback shutdown RRs
         vmrr.end = get_clock().get_time()
-        self._handle_end_vm(l, vmrr, enact=enact)
+        self._handle_end_vm(l, vmrr)
+        self._handle_end_lease(l)
         nexttime = get_clock().get_next_schedulable_time()
         if self.is_backfilling():
             # We need to reevaluate the schedule to see if there are any future
             # reservations that we can slide back.
             self.__reevaluate_schedule(l, vmrr.nodes.values(), nexttime, [])
 
+    def _handle_start_shutdown(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_ACTIVE
+        self.resourcepool.stop_vms(l, rr)
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
+
+    def _handle_end_shutdown(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_DONE
+        self._handle_end_lease(l)
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
+        self.logger.info("Lease %i shutdown." % (l.id))
+
+    def _handle_end_lease(self, l):
+        l.state = Lease.STATE_DONE
+        l.duration.actual = l.duration.accumulated
+        l.end = round_datetime(get_clock().get_time())
+        self.completedleases.add(l)
+        self.leases.remove(l)
+        if isinstance(l, ds.BestEffortLease):
+            get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+
+
     def _handle_start_suspend(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
         l.print_contents()



More information about the Haizea-commit mailing list