[haizea-commit] r597 - in branches/TP2.0: src/haizea/core src/haizea/core/scheduler tests

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Fri Jul 3 06:19:46 CDT 2009


Author: borja
Date: 2009-07-03 06:19:38 -0500 (Fri, 03 Jul 2009)
New Revision: 597

Modified:
   branches/TP2.0/src/haizea/core/leases.py
   branches/TP2.0/src/haizea/core/scheduler/policy.py
   branches/TP2.0/src/haizea/core/scheduler/slottable.py
   branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
   branches/TP2.0/tests/test_slottable.py
Log:
Best-effort leases working in TP2.0 (without preemption). Required some bug fixes in AvailabilityWindow.

Modified: branches/TP2.0/src/haizea/core/leases.py
===================================================================
--- branches/TP2.0/src/haizea/core/leases.py	2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/leases.py	2009-07-03 11:19:38 UTC (rev 597)
@@ -131,13 +131,18 @@
         self.state.change_state(state)
         
     def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "__________________________________________________")
         self.logger.log(loglevel, "Lease ID       : %i" % self.id)
+        self.logger.log(loglevel, "Type           : %s" % Lease.type_str[self.get_type()])
         self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
+        self.logger.log(loglevel, "Start          : %s" % self.start)
         self.logger.log(loglevel, "Duration       : %s" % self.duration)
         self.logger.log(loglevel, "State          : %s" % Lease.state_str[self.get_state()])
         self.logger.log(loglevel, "Resource req   : %s" % self.requested_resources)
         self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
         self.logger.log(loglevel, "Mem image map  : %s" % pretty_nodemap(self.memimagemap))
+        self.print_rrs(loglevel)
+        self.logger.log(loglevel, "--------------------------------------------------")
 
     def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
         if len(self.preparation_rrs) > 0:

Modified: branches/TP2.0/src/haizea/core/scheduler/policy.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/policy.py	2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/policy.py	2009-07-03 11:19:38 UTC (rev 597)
@@ -17,6 +17,7 @@
 # -------------------------------------------------------------------------- #
 
 from haizea.common.utils import abstract
+from haizea.core.leases import Lease
 import operator
 
 class Policy(object):

Modified: branches/TP2.0/src/haizea/core/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/slottable.py	2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/slottable.py	2009-07-03 11:19:38 UTC (rev 597)
@@ -259,15 +259,26 @@
         res = [x.value for x in self.reservations_by_start[startpos:endpos]]
         return res
 
-    # on or after
     def get_reservations_starting_after(self, start):
         startitem = KeyValueWrapper(start, None)
-        startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+        startpos = bisect.bisect_right(self.reservations_by_start, startitem)
         res = [x.value for x in self.reservations_by_start[startpos:]]
         return res
 
     def get_reservations_ending_after(self, end):
         startitem = KeyValueWrapper(end, None)
+        startpos = bisect.bisect_right(self.reservations_by_end, startitem)
+        res = [x.value for x in self.reservations_by_end[startpos:]]
+        return res
+
+    def get_reservations_starting_on_or_after(self, start):
+        startitem = KeyValueWrapper(start, None)
+        startpos = bisect.bisect_left(self.reservations_by_start, startitem)
+        res = [x.value for x in self.reservations_by_start[startpos:]]
+        return res
+
+    def get_reservations_ending_on_or_after(self, end):
+        startitem = KeyValueWrapper(end, None)
         startpos = bisect.bisect_left(self.reservations_by_end, startitem)
         res = [x.value for x in self.reservations_by_end[startpos:]]
         return res
@@ -290,6 +301,11 @@
         bystart = set(self.get_reservations_starting_after(time))
         byend = set(self.get_reservations_ending_after(time))
         return list(bystart | byend)
+    
+    def get_reservations_on_or_after(self, time):
+        bystart = set(self.get_reservations_starting_on_or_after(time))
+        byend = set(self.get_reservations_ending_on_or_after(time))
+        return list(bystart | byend)    
 
     def get_changepoints_after(self, after, until=None, nodes=None):
         changepoints = set()
@@ -604,10 +620,10 @@
         self.leases = set()
         
         self.cp_list = [self.time] + self.slottable.get_changepoints_after(time, nodes=onlynodes)
-        
+
         # Create initial changepoint hash table
         self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
-        
+ 
         for cp in self.changepoints.values():
             for node_id, node in enumerate(self.slottable.nodes):
                 cp.add_node(node_id + 1, node.capacity)
@@ -621,7 +637,7 @@
                 pos += 1
                 
             lease = rr.lease
-            
+
             self.leases.add(lease)
             
             if rr.start >= self.time:

Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-03 11:19:38 UTC (rev 597)
@@ -81,11 +81,11 @@
 
     def schedule(self, lease, nexttime, earliest):
         if lease.get_type() == Lease.BEST_EFFORT:
-            return self.__schedule_asap(lease, nexttime, earliest)
+            return self.__schedule_asap(lease, nexttime, earliest, allow_reservation_in_future = True)
         elif lease.get_type() == Lease.ADVANCE_RESERVATION:
             return self.__schedule_exact(lease, nexttime, earliest)
         elif lease.get_type() == Lease.IMMEDIATE:
-            return self.__schedule_asap(lease, nexttime, earliest)
+            return self.__schedule_asap(lease, nexttime, earliest, allow_reservation_in_future = False)
 
     def __schedule_exact(self, lease, nexttime, earliest):
         start = lease.start.requested
@@ -117,19 +117,11 @@
 
         return vmrr, preemptions
 
-    def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = None):
+    def __schedule_asap(self, lease, nexttime, earliest, allow_reservation_in_future = None):
         lease_id = lease.id
         remaining_duration = lease.duration.get_remaining_duration()
-        numnodes = lease.numnodes
-        requested_resources = lease.requested_resources
-        preemptible = lease.preemptible
         mustresume = (lease.get_state() == Lease.STATE_SUSPENDED_QUEUED)
         shutdown_time = self.__estimate_shutdown_time(lease)
-        susptype = get_config().get("suspension")
-        if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
-            suspendable = False
-        else:
-            suspendable = True
 
         if allow_reservation_in_future == None:
             allow_reservation_in_future = self.can_reserve_besteffort_in_future()
@@ -197,6 +189,7 @@
 
 
 
+
         #
         # STEP 3: SLOT FITTING
         #
@@ -209,85 +202,48 @@
 
         duration += shutdown_time
 
+        reservation = False
+
         # First, assuming we can't make reservations in the future
-        start, end, canfit = self.__find_fit_at_points(
-                                                       changepoints, 
-                                                       numnodes, 
-                                                       requested_resources, 
-                                                       duration, 
-                                                       suspendable, 
-                                                       min_duration)
+        start, end, mapping, preemptions = self.__find_fit_at_points(lease,
+                                                                     changepoints, 
+                                                                     duration, 
+                                                                     min_duration)
         
-        if start == None:
-            if not allow_reservation_in_future:
+        if start == None and not allow_reservation_in_future:
                 # We did not find a suitable starting time. This can happen
                 # if we're unable to make future reservations
                 raise NotSchedulableException, "Could not find enough resources for this request"
-        else:
-            mustsuspend = (end - start) < duration
-            if mustsuspend and not suspendable:
-                if not allow_reservation_in_future:
-                    raise NotSchedulableException, "Scheduling this lease would require preempting it, which is not allowed"
-                else:
-                    start = None # No satisfactory start time
-            
+
         # If we haven't been able to fit the lease, check if we can
         # reserve it in the future
         if start == None and allow_reservation_in_future:
-            start, end, canfit = self.__find_fit_at_points(
-                                                           futurecp, 
-                                                           numnodes, 
-                                                           requested_resources, 
-                                                           duration, 
-                                                           suspendable, 
-                                                           min_duration
-                                                           )
+            start, end, mapping, preemptions = self.__find_fit_at_points(lease,
+                                                                         futurecp, 
+                                                                         duration, 
+                                                                         min_duration
+                                                                         )
+            # TODO: The following will also raise an exception if a lease
+            # makes a request that could *never* be satisfied with the
+            # current resources.
+            if start == None:
+                raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
 
-
-        if start in [p[0] for p in futurecp]:
             reservation = True
-        else:
-            reservation = False
 
-
         #
-        # STEP 4: FINAL SLOT FITTING
+        # STEP 4: CREATE RESERVATIONS
         #
-        # At this point, we know the lease fits, but we have to map it to
-        # specific physical nodes.
-        
-        # Sort physical nodes
-        physnodes = canfit.keys()
-        if mustresume:
-            # If we're resuming, we prefer resuming in the nodes we're already
-            # deployed in, to minimize the number of transfers.
-            vmrr = lease.get_last_vmrr()
-            nodes = set(vmrr.nodes.values())
-            availnodes = set(physnodes)
-            deplnodes = availnodes.intersection(nodes)
-            notdeplnodes = availnodes.difference(nodes)
-            physnodes = list(deplnodes) + list(notdeplnodes)
-        else:
-            physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-        
-        # Map to physical nodes
-        mappings = {}
         res = {}
-        vmnode = 1
-        while vmnode <= numnodes:
-            for n in physnodes:
-                if canfit[n]>0:
-                    canfit[n] -= 1
-                    mappings[vmnode] = n
-                    if res.has_key(n):
-                        res[n].incr(requested_resources)
-                    else:
-                        res[n] = ResourceTuple.copy(requested_resources)
-                    vmnode += 1
-                    break
+        
+        for (vnode,pnode) in mapping.items():
+            vnode_res = lease.requested_resources[vnode]
+            if res.has_key(pnode):
+                res[pnode].incr(vnode_res)
+            else:
+                res[pnode] = ResourceTuple.copy(vnode_res)
 
-
-        vmrr = VMResourceReservation(lease, start, end, mappings, res, reservation)
+        vmrr = VMResourceReservation(lease, start, end, mapping, res, reservation)
         vmrr.state = ResourceReservation.STATE_SCHEDULED
 
         if mustresume:
@@ -305,15 +261,14 @@
         if reservation:
             self.numbesteffortres += 1
 
-        
         susp_str = res_str = ""
         if mustresume:
             res_str = " (resuming)"
         if mustsuspend:
             susp_str = " (suspending)"
-        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
 
-        return vmrr, []
+        return vmrr, preemptions
 
     def estimate_migration_time(self, lease):
         return self.__estimate_migration_time(lease)
@@ -492,45 +447,37 @@
         return self.maxres > 0
 
 
-    def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
-        start = None
-        end = None
-        canfit = None
-        availabilitywindow = self.slottable.availabilitywindow
-
-
-        for p in changepoints:
-            availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
-            availabilitywindow.printContents()
+    def __find_fit_at_points(self, lease, changepoints, duration, min_duration):
+        found = False
+        for time, onlynodes in changepoints:
+            start = time
+            end = start + duration
+            self.logger.debug("Attempting to map from %s to %s" % (start, end))
+            mapping, actualend, preemptions = self.mapper.map(lease, 
+                                                              lease.requested_resources,
+                                                              start, 
+                                                              end, 
+                                                              strictend = False,
+                                                              onlynodes = onlynodes)
             
-            if availabilitywindow.fitAtStart() >= numnodes:
-                start=p[0]
-                maxend = start + duration
-                end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-        
-                self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-                
-                if end < maxend:
-                    self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-                    
-                    if not suspendable:
-                        pass
-                        # If we can't suspend, this fit is no good, and we have to keep looking
+            if mapping != None:
+                if actualend < end:
+                    actualduration = actualend - start
+                    if actualduration >= min_duration:
+                        self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
+                        found = True
+                        break
                     else:
-                        # If we can suspend, we still have to check if the lease will
-                        # be able to run for the specified minimum duration
-                        if end-start > min_duration:
-                            break # We found a fit; stop looking
-                        else:
-                            self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
-                            # Set start back to None, to indicate that we haven't
-                            # found a satisfactory start time
-                            start = None
+                        self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
                 else:
-                    # We've found a satisfactory starting time
-                    break        
-                
-        return start, end, canfit
+                    self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
+                    found = True
+                    break
+        
+        if found:
+            return start, actualend, mapping, preemptions
+        else:
+            return None, None, None, None
     
     def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
         times = [] # (start, end, {pnode -> vnodes})

Modified: branches/TP2.0/tests/test_slottable.py
===================================================================
--- branches/TP2.0/tests/test_slottable.py	2009-07-03 08:46:59 UTC (rev 596)
+++ branches/TP2.0/tests/test_slottable.py	2009-07-03 11:19:38 UTC (rev 597)
@@ -526,3 +526,35 @@
         assert(len(avail.avail_list)==1)
         assert(avail.avail_list[0].available == EMPT_NODE)
         assert(avail.avail_list[0].until     == None)        
+
+        self.slottable.awcache = None
+        aw = self.slottable.get_availability_window(T1415)
+        # 14:15
+        avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 1, 
+                              leases = {}, next_cp = None)
+        avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 2, 
+                              leases = {}, next_cp = None)
+        avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 3, 
+                              leases = {}, next_cp = None)
+        avail_node_assertions(time = T1415, avail = FULL_NODE, node_id = 4, 
+                              leases = {}, next_cp = None)
+
+        avail = aw.get_availability_at_node(T1415, 1)
+        assert(len(avail.avail_list)==1)
+        assert(avail.avail_list[0].available == FULL_NODE)
+        assert(avail.avail_list[0].until     == None)
+        
+        avail = aw.get_availability_at_node(T1415, 2)
+        assert(len(avail.avail_list)==1)
+        assert(avail.avail_list[0].available == FULL_NODE)
+        assert(avail.avail_list[0].until     == None)
+        
+        avail = aw.get_availability_at_node(T1415, 3)
+        assert(len(avail.avail_list)==1)
+        assert(avail.avail_list[0].available == FULL_NODE)
+        assert(avail.avail_list[0].until     == None)
+        
+        avail = aw.get_availability_at_node(T1415, 4)
+        assert(len(avail.avail_list)==1)
+        assert(avail.avail_list[0].available == FULL_NODE)
+        assert(avail.avail_list[0].until     == None)                        
\ No newline at end of file



More information about the Haizea-commit mailing list