[haizea-commit] r540 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Fri Nov 7 16:37:20 CST 2008


Author: borja
Date: 2008-11-07 16:37:18 -0600 (Fri, 07 Nov 2008)
New Revision: 540

Modified:
   trunk/src/haizea/resourcemanager/datastruct.py
   trunk/src/haizea/resourcemanager/scheduler.py
Log:
- Modified suspend/resume scheduling so, when doing local exclusion, suspend/resumes happening at the same time are consolidated into a single RR. 
- Included enactment overhead into suspend/resume scheduling
- Minor fixes

Modified: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py	2008-11-06 23:54:13 UTC (rev 539)
+++ trunk/src/haizea/resourcemanager/datastruct.py	2008-11-07 22:37:18 UTC (rev 540)
@@ -404,6 +404,7 @@
 
     def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
         self.logger.log(loglevel, "Type           : SUSPEND")
+        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
         ResourceReservation.print_contents(self, loglevel)
         
     def is_first(self):
@@ -430,8 +431,9 @@
         self.vnodes = vnodes
 
     def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : RESUME")
+        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
         ResourceReservation.print_contents(self, loglevel)
-        self.logger.log(loglevel, "Type           : RESUME")
 
     def is_first(self):
         return (self == self.vmrr.pre_rrs[0])

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-11-06 23:54:13 UTC (rev 539)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-11-07 22:37:18 UTC (rev 540)
@@ -594,7 +594,7 @@
             suspendable = True
 
         # Determine earliest start time in each node
-        if lease.state == Lease.STATE_QUEUED:
+        if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
             # Figure out earliest start times based on
             # image schedule and reusable images
             earliest = self.deployment_scheduler.find_earliest_starting_times(lease, nexttime)
@@ -820,8 +820,9 @@
         return start, end, canfit
     
     def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate):
-        times = [] # (start, end, pnode, vnodes)
-
+        times = [] # (start, end, {pnode -> vnodes})
+        enactment_overhead = get_config().get("enactment-overhead") 
+        
         if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
             # Global exclusion (which represents, e.g., reading/writing the memory image files
             # from a global file system) meaning no two suspensions/resumptions can happen at 
@@ -833,19 +834,21 @@
             for (vnode,pnode) in vmrr.nodes.items():
                 mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
                 op_time = self.__compute_suspend_resume_time(mem, rate)
+                op_time += enactment_overhead
                 t_prev = t
                 
                 if direction == constants.DIRECTION_FORWARD:
                     t += op_time
-                    times.append((t_prev, t, pnode, [vnode]))
+                    times.append((t_prev, t, {pnode:vnode}))
                 elif direction == constants.DIRECTION_BACKWARD:
                     t -= op_time
-                    times.append((t, t_prev, pnode, [vnode]))
+                    times.append((t, t_prev, {pnode:vnode}))
 
         elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
             # Local exclusion (which represents, e.g., reading the memory image files
             # from a local file system) means no two resumptions can happen at the same
             # time in the same physical node.
+            pervnode_times = [] # (start, end, vnode)
             vnodes_in_pnode = {}
             for (vnode,pnode) in vmrr.nodes.items():
                 vnodes_in_pnode.setdefault(pnode, []).append(vnode)
@@ -855,18 +858,61 @@
                 for vnode in vnodes_in_pnode[pnode]:
                     mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
                     op_time = self.__compute_suspend_resume_time(mem, rate)
+                    
                     t_prev = t
                     
                     if direction == constants.DIRECTION_FORWARD:
                         t += op_time
-                        times.append((t_prev, t, pnode, [vnode]))
+                        pervnode_times.append((t_prev, t, vnode))
                     elif direction == constants.DIRECTION_BACKWARD:
                         t -= op_time
-                        times.append((t, t_prev, pnode, [vnode]))
-            # TODO: "consolidate" times (i.e., figure out what operations can be grouped
-            # into a single RR. This will not be an issue when running with real hardware,
-            # but might impact simulation performance.
+                        pervnode_times.append((t, t_prev, vnode))
+            
+            # Consolidate suspend/resume operations happening at the same time
+            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
+            for (start, end) in uniq_times:
+                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
+                node_mappings = {}
+                for vnode in vnodes:
+                    pnode = vmrr.nodes[vnode]
+                    node_mappings.setdefault(pnode, []).append(vnode)
+                times.append([start,end,node_mappings])
         
+            # Add the enactment overhead
+            for t in times:
+                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
+                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
+                if direction == constants.DIRECTION_FORWARD:
+                    t[1] += overhead
+                elif direction == constants.DIRECTION_BACKWARD:
+                    t[0] -= overhead
+                    
+            # Fix overlaps
+            if direction == constants.DIRECTION_FORWARD:
+                times.sort(key=itemgetter(0))
+            elif direction == constants.DIRECTION_BACKWARD:
+                times.sort(key=itemgetter(1))
+                times.reverse()
+                
+            prev_start = None
+            prev_end = None
+            for t in times:
+                if prev_start != None:
+                    start = t[0]
+                    end = t[1]
+                    if direction == constants.DIRECTION_FORWARD:
+                        if start < prev_end:
+                            diff = prev_end - start
+                            t[0] += diff
+                            t[1] += diff
+                    elif direction == constants.DIRECTION_BACKWARD:
+                        if end > prev_start:
+                            diff = end - prev_start
+                            t[0] -= diff
+                            t[1] -= diff
+                prev_start = t[0]
+                prev_end = t[1]
+        
         return times
 
     def __schedule_shutdown(self, vmrr):
@@ -899,13 +945,18 @@
 
         times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate)
         suspend_rrs = []
-        for (start, end, pnode, vnodes) in times:
-            r = ds.ResourceTuple.create_empty()
-            mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-            r.set_by_type(constants.RES_MEM, mem)
-            r.set_by_type(constants.RES_DISK, mem)
-            suspres = {pnode: r}
-            susprr = ds.SuspensionResourceReservation(vmrr.lease, start, end, suspres, vnodes, vmrr)
+        for (start, end, node_mappings) in times:
+            suspres = {}
+            all_vnodes = []
+            for (pnode,vnodes) in node_mappings.items():
+                num_vnodes = len(vnodes)
+                r = ds.ResourceTuple.create_empty()
+                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+                suspres[pnode] = r          
+                all_vnodes += vnodes                      
+            susprr = ds.SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
             susprr.state = ResourceReservation.STATE_SCHEDULED
             suspend_rrs.append(susprr)
                 
@@ -936,13 +987,18 @@
 
         times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate)
         resume_rrs = []
-        for (start, end, pnode, vnodes) in times:
-            r = ds.ResourceTuple.create_empty()
-            mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-            r.set_by_type(constants.RES_MEM, mem)
-            r.set_by_type(constants.RES_DISK, mem)
-            resmres = {pnode: r}
-            resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, vnodes, vmrr)
+        for (start, end, node_mappings) in times:
+            resmres = {}
+            all_vnodes = []
+            for (pnode,vnodes) in node_mappings.items():
+                num_vnodes = len(vnodes)
+                r = ds.ResourceTuple.create_empty()
+                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+                resmres[pnode] = r
+                all_vnodes += vnodes
+            resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
             resmrr.state = ResourceReservation.STATE_SCHEDULED
             resume_rrs.append(resmrr)
                 
@@ -966,13 +1022,14 @@
     
     def __estimate_suspend_resume_time(self, lease):
         susp_exclusion = get_config().get("suspendresume-exclusion")        
+        enactment_overhead = get_config().get("enactment-overhead") 
         rate = self.resourcepool.info.get_suspendresume_rate()
         mem = lease.requested_resources.get_by_type(constants.RES_MEM)
         if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
-            return lease.numnodes * self.__compute_suspend_resume_time(mem, rate)
+            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
         elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
             # Overestimating
-            return lease.numnodes * self.__compute_suspend_resume_time(mem, rate)
+            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
 
     def __estimate_shutdown_time(self, lease):
         # Always uses fixed value in configuration file



More information about the Haizea-commit mailing list