[haizea-commit] r584 - branches/TP2.0/src/haizea/resourcemanager/scheduler

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Wed Jun 17 10:29:16 CDT 2009


Author: borja
Date: 2009-06-17 10:29:09 -0500 (Wed, 17 Jun 2009)
New Revision: 584

Added:
   branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py
   branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py
Modified:
   branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
Log:
Added greedy mapper. Misc. changes to slot table.

Added: branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py	                        (rev 0)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/mapper.py	2009-06-17 15:29:09 UTC (rev 584)
@@ -0,0 +1,171 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import abstract
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple, AvailabilityWindow
+import haizea.common.constants as constants
+import operator
+
+class Mapper(object):
+    """Base class for mappers
+    
+    """
+    def __init__(self, slottable, policy):
+        self.slottable = slottable
+        self.policy = policy
+    
+    def map(self, requested_resources, start, end, strictend):
+        abstract()
+
+
+class GreedyMapper(Mapper):
+    def __init__(self, slottable, policy):
+        Mapper.__init__(self, slottable, policy)
+        
+    def map(self, lease, requested_resources, start, end, strictend, onlynodes = None):
+        aw = self.slottable.get_availability_window(start, onlynodes)
+        
+        pnodes = self.__sort_pnodes(lease, start, aw, onlynodes)
+        vnodes = self.__sort_vnodes(requested_resources)
+        vnodes.reverse()
+        leases = aw.get_leases_until(end)
+        mapping = {}
+        print "pnodes:", pnodes
+        print "vnodes:", vnodes
+        print "leases:", [l.id for l in leases]
+        leases = self.policy.sort_leases(lease, leases)
+        preemptable_leases = leases
+        preempting = []
+        
+        done = False
+        while not done:
+            vnodes_pos = 0
+            vnode = vnodes[vnodes_pos]
+            capacity = requested_resources[vnode]
+            maxend = end 
+            for pnode in pnodes:
+                need_to_map = ResourceTuple.create_empty()
+                need_to_map.incr(capacity)
+                avail=aw.get_availability_at_node(start, pnode, preempted_leases = preempting)
+                pnode_done = False
+                while not pnode_done:
+                    if avail.fits(need_to_map, until = maxend):
+                        mapping[vnode] = pnode
+                        vnodes_pos += 1
+                        if vnodes_pos >= len(vnodes):
+                            done = True
+                            break
+                        else:
+                            vnode = vnodes[vnodes_pos]
+                            capacity = requested_resources[vnode]
+                            need_to_map.incr(capacity)
+                    else:
+                        if strictend:
+                            pnode_done = True
+                        else:
+                            latest = avail.latest_fit(need_to_map)
+                            if latest == None:
+                                pnode_done = True
+                            else:
+                                maxend = latest
+                    
+                if done:
+                    break
+
+            if len(preemptable_leases) == 0:
+                done = True
+            else:
+                preempting.append(preemptable_leases.pop())
+            
+        if len(mapping) != len(requested_resources):
+            return None
+        else:
+            return mapping, maxend, preempting
+        
+    
+    
+    def __sort_pnodes(self, lease, time, aw,onlynodes):    
+        if onlynodes == None:    
+            nodes = aw.get_nodes_at(time)
+        else:
+            nodes = onlynodes
+
+        # Compares node x and node y. 
+        # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+        def comparenodes(node_x, node_y):
+            
+            leases_x = aw.get_leases_at(node_x, time)
+            leases_y = aw.get_leases_at(node_y, time)            
+            # 1st: We prefer nodes with fewer leases to preempt
+            if len(leases_x) < len(leases_y):
+                return constants.BETTER
+            elif len(leases_x) > len(leases_y):
+                return constants.WORSE
+            else:
+                # 2nd: we prefer nodes with the highest capacity
+                avail_x = aw.get_availability_at(node_x, time)
+                avail_y = aw.get_availability_at(node_y, time)
+                if avail_x > avail_y:
+                    return constants.BETTER
+                elif avail_x < avail_y:
+                    return constants.WORSE
+                else:
+                    # 3rd: we prefer nodes where the current capacity
+                    # doesn't change for the longest time.
+                    duration_x = aw.get_capacity_interval(node_x, time)
+                    duration_y = aw.get_capacity_interval(node_y, time)
+                    if (duration_x == None and duration_y != None) or duration_x > duration_y:
+                        return constants.BETTER
+                    elif (duration_x != None and duration_y == None) or duration_x < duration_y:
+                        return constants.WORSE
+                    else:
+                        return constants.EQUAL
+            # TODO
+            #elif len(node_x.leases) == 0 and len(node_y.leases) == 0:
+                
+            #    preemptable_in_x = [l for l in node_x.leases if policy.can_preempt(lease, l)]
+            #    preemptable_in_y = [l for l in node_x.leases if policy.can_preempt(lease, l)]
+                
+            #    avail_x = node_x.get_avail_withpreemption(preemptable_in_x)
+            #    avail_y = node_x.get_avail_withpreemption(preemptable_in_y)
+
+        # Order nodes
+        nodes.sort(comparenodes)
+        return nodes        
+
+    def __sort_vnodes(self, requested_resources):
+        max_res = ResourceTuple.create_empty()
+        for res in requested_resources.values():
+            for t in ResourceTuple.get_resource_types():
+                v = res.get_by_type(t)
+                if v > max_res.get_by_type(t):
+                    max_res.set_by_type(t,v)
+                    
+        norm_res = {}
+        for k,v in requested_resources.items():
+            norm_capacity = 0
+            for t in ResourceTuple.get_resource_types():
+                if max_res.get_by_type(t) > 0:
+                    norm_capacity += v.get_by_type(t) / float(max_res.get_by_type(t))
+            norm_res[k] = norm_capacity
+             
+        vnodes = norm_res.items()
+        vnodes.sort(key=operator.itemgetter(1))
+        vnodes = [k for k,v in vnodes]
+        return vnodes      
+                    

Added: branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py	                        (rev 0)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/policy.py	2009-06-17 15:29:09 UTC (rev 584)
@@ -0,0 +1,42 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import abstract
+import operator
+
+class Policy(object):
+    """Base class for policy module
+    
+    """    
+    def __init__(self, slottable):
+        self.slottable = slottable
+    
+    def sort_leases(self, preemptor, preemptees):
+        leases_score = [(preemptee, self.get_lease_preemptability_score(preemptor,preemptee)) for preemptee in preemptees]
+        leases_score = [(preemptee,score) for preemptee,score in leases_score if score != -1]
+        leases_score.sort(key=operator.itemgetter(1))
+        return [preempte for preempte,score in leases_score]
+    
+    def get_lease_preemptability_score(self, preemptor, preemptee):
+        abstract()
+        
+    def accept_lease(self, lease):
+        abstract()
+
+#    def get_host_score(self, node):
+#        abstract()

Modified: branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py	2009-06-08 18:05:04 UTC (rev 583)
+++ branches/TP2.0/src/haizea/resourcemanager/scheduler/slottable.py	2009-06-17 15:29:09 UTC (rev 584)
@@ -59,6 +59,10 @@
         cls.tuplelength = len(resourcetypes)
 
     @classmethod
+    def get_resource_types(cls):
+        return cls.type2pos.keys()
+
+    @classmethod
     def create_empty(cls):
         return cls([0 for x in range(cls.tuplelength)])
         
@@ -105,6 +109,9 @@
     def __eq__(self, res2):
         return self._res == res2._res
 
+    def __cmp__(self, res2):
+        return cmp(self._res, res2._res)
+
 class ResourceReservation(object):
     """A resource reservation
     
@@ -220,8 +227,7 @@
         self.reservations = []
         self.reservations_by_start = []
         self.reservations_by_end = []
-        self.availabilitycache = {}
-        self.changepointcache = None
+        self.__dirty()
 
     def add_node(self, resourcepoolnode):
         self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
@@ -416,6 +422,24 @@
             return time1
         else:
             return min(time1, time2)
+        
+    def get_availability_window(self, start, onlynodes = None):
+        cache_miss = False
+        if self.awcache == None:
+            cache_miss = True
+        else:
+            if self.awcache_onlynodes != None:
+                if onlynodes == None:
+                    cache_miss = True
+                elif not set(onlynodes).issubset(set(self.awcache_onlynodes)):
+                    cache_miss = True
+            if start < self.awcache_time:
+                cache_miss = True
+           
+        if cache_miss:
+            self.__get_aw_cache_miss(start, onlynodes)
+         
+        return self.awcache
 
     # ONLY for simulation
     def get_next_premature_end(self, after):
@@ -448,35 +472,36 @@
         
         
     def __get_availability_cache_miss(self, time):
-        allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
-        onlynodes = None       
+        allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])   
         nodes = {} 
         reservations = self.get_reservations_at(time)
         # Find how much resources are available on each node
         for r in reservations:
             for node in r.resources_in_pnode:
-                if onlynodes == None or (onlynodes != None and node in onlynodes):
-                    if not nodes.has_key(node):
-                        n = self.nodes[node]
-                        nodes[node] = Node(n.capacity, n.resourcepoolnode)
-                    nodes[node].capacity.decr(r.resources_in_pnode[node])
+                if not nodes.has_key(node):
+                    n = self.nodes[node]
+                    nodes[node] = Node(n.capacity, n.resourcepoolnode)
+                nodes[node].capacity.decr(r.resources_in_pnode[node])
 
         # For the remaining nodes, use a reference to the original node, not a copy
-        if onlynodes == None:
-            missing = allnodes - set(nodes.keys())
-        else:
-            missing = onlynodes - set(nodes.keys())
-            
+        missing = allnodes - set(nodes.keys())
         for node in missing:
             nodes[node] = self.nodes[node]                    
             
         self.availabilitycache[time] = nodes
 
-
+    def __get_aw_cache_miss(self, time, onlynodes):
+        self.awcache = AvailabilityWindow(self, time, onlynodes)
+        self.awcache_time = time
+        self.awcache_onlynodes = onlynodes
+        
     def __dirty(self):
         # You're a dirty, dirty slot table and you should be
         # ashamed of having outdated caches!
         self.availabilitycache = {}
+        self.awcache_time = None
+        self.awcache_onlynodes = None
+        self.awcache = None
 
     
     
@@ -501,11 +526,11 @@
     def decr(self, capacity):
         self.available.decr(capacity)
 
-    def add_lease(self, lease_id, capacity):
-        if not lease_id in self.leases:
-            self.leases.add(lease_id)
-            self.available_if_preempting[lease_id] = ResourceTuple.create_empty()
-        self.available_if_preempting[lease_id].incr(capacity)
+    def add_lease(self, lease, capacity):
+        if not lease in self.leases:
+            self.leases.add(lease)
+            self.available_if_preempting[lease] = ResourceTuple.create_empty()
+        self.available_if_preempting[lease].incr(capacity)
         
     def get_avail_withpreemption(self, leases):
         avail = ResourceTuple.copy(available)
@@ -517,7 +542,26 @@
         self.available = available
         self.until = until
     
+class AvailabilityInNode(object):
+    def __init__(self, avail_list):
+        self.avail_list = avail_list
+        
+    def fits(self, capacity, until):
+        for avail in self.avail_list:
+            if avail.until == None or avail.until >= until:
+                return capacity.fits_in(avail.available)
 
+    def latest_fit(self, capacity):
+        prev = None
+        for avail in self.avail_list:
+            if not capacity.fits_in(avail.available):
+                return prev
+            else:
+                prev = avail.until
+
+    def get_avail_at_end(self):
+        return self.avail_list[-1]
+
 class AvailabilityWindow(object):
     """An availability window
     
@@ -537,7 +581,7 @@
         self.onlynodes = onlynodes
         self.leases = set()
         
-        self.cp_list = self.slottable.get_changepoints_after(time, nodes=onlynodes)
+        self.cp_list = [self.time] + self.slottable.get_changepoints_after(time, nodes=onlynodes)
         
         # Create initial changepoint hash table
         self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
@@ -546,13 +590,6 @@
             for node_id, node in enumerate(self.slottable.nodes):
                 cp.add_node(node_id + 1, node.capacity)
         
-        # Initial time
-        avail_at_start = self.slottable.get_availability(self.time, onlynodes=onlynodes)
-        self.cp_list = [self.time] + self.cp_list
-        self.changepoints[self.time] = ChangepointAvail()
-        for node_id, node in avail_at_start.items():
-            self.changepoints[self.time].add_node(node_id, node.capacity)
-        
         rrs = self.slottable.get_reservations_after(time)
         rrs.sort(key=attrgetter("start"))
         pos = 0
@@ -561,30 +598,29 @@
             while rr.start >= self.time and self.cp_list[pos] != rr.start:
                 pos += 1
                 
-            lease_id = rr.lease.id
+            lease = rr.lease
             
-            self.leases.add(lease_id)
+            self.leases.add(lease)
             
             if rr.start >= self.time:
                 start_cp = self.changepoints[rr.start]
-                start_cp.leases.add(lease_id)
-                for node in rr.resources_in_pnode:
-                    start_cp.nodes[node].decr(rr.resources_in_pnode[node])
-                    start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
             else:
                 start_cp = self.changepoints[self.time]
-                start_cp.leases.add(lease_id)
-                for node in rr.resources_in_pnode:
-                    start_cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])            
+            
+            start_cp.leases.add(lease)
+            for node in rr.resources_in_pnode:
+                start_cp.nodes[node].decr(rr.resources_in_pnode[node])
+                start_cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
+         
                 
             pos2 = pos + 1
 
             while self.cp_list[pos2] < rr.end:
                 cp = self.changepoints[self.cp_list[pos2]]
-                cp.leases.add(lease_id)
+                cp.leases.add(lease)
                 for node in rr.resources_in_pnode:
                     cp.nodes[node].decr(rr.resources_in_pnode[node])
-                    cp.nodes[node].add_lease(lease_id, rr.resources_in_pnode[node])
+                    cp.nodes[node].add_lease(lease, rr.resources_in_pnode[node])
                     
                 pos2 += 1
         
@@ -617,7 +653,7 @@
             for l in preempted_leases:
                 if node.available_if_preempting.has_key(l):
                     available.incr(node.available_if_preempting[l])
-            
+
             if prev_avail != None and available.fits_in(prev_avail.available):
                 availentry = AvailEntry(available, None)
                 avails.append(availentry)
@@ -631,80 +667,29 @@
             prev_node = node
             node = node.next_nodeavail
         
-        return avails
+        return AvailabilityInNode(avails)
     
     def get_nodes_at(self, time):
-        return self.changepoints[time].nodes.items()
-                  
+        return self.changepoints[time].nodes.keys()
+
+    def get_leases_at(self, node, time):
+        return self.changepoints[time].nodes[node].leases
     
-### Everything after this is from the old implementation
+    def get_availability_at(self, node, time):
+        return self.changepoints[time].nodes[node].available
     
-    def fit_at_start(self, nodes = None):
-        if nodes != None:
-            avail = [v for (k, v) in self.avail.items() if k in nodes]
+    def get_capacity_interval(self, node, time):
+        next_cp = self.changepoints[time].nodes[node].next_cp
+        if next_cp == None:
+            return None
         else:
-            avail = self.avail.values()
-
-        return sum([e[0].canfit for e in avail])
+            return next_cp - time
         
-    # TODO: Also return the amount of resources that would have to be
-    # preempted in each physnode
-    def find_pnodes_for_vms(self, numnodes, maxend, strictend=False):
-        # Returns the physical nodes that can run all VMs, and the
-        # time at which the VMs must end
-        canfit = dict([(n, v[0].canfit) for (n, v) in self.avail.items()])
-        entries = []
-        for n in self.avail.keys():
-            entries += [(n, e) for e in self.avail[n][1:]]
-        getTime = lambda x: x[1].time
-        entries.sort(key=getTime)
-        if strictend:
-            end = None
-        else:
-            end = maxend
-        for e in entries:
-            physnode = e[0]
-            entry = e[1]
-       
-            if entry.time >= maxend:
-                # Can run to its maximum duration
+    def get_leases_until(self, until):
+        leases = set()
+        for cp in self.cp_list:
+            if until <= cp:
                 break
-            else:
-                diff = canfit[physnode] - entry.canfit
-                totalcanfit = sum([n for n in canfit.values()]) - diff
-                if totalcanfit < numnodes and not strictend:
-                    # Not enough resources. Must end here
-                    end = entry.time
-                    break
-                else:
-                    # Update canfit
-                    canfit[physnode] = entry.canfit
+            leases.update(self.changepoints[cp].leases)
+        return list(leases)
 
-        # Filter out nodes where we can't fit any vms
-        canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
-        
-        return end, canfit
-            
-                    
-    def print_contents(self, nodes = None):
-        if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
-            if nodes == None:
-                physnodes = self.avail.keys()
-            else:
-                physnodes = [k for k in self.avail.keys() if k in nodes]
-            physnodes.sort()
-
-            self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
-            for n in physnodes:
-                contents = "Node %i --- " % n
-                for x in self.avail[n]:
-                    contents += "[ %s " % x.time
-                    contents += "{ "
-                    if x.avail == None :
-                        contents += "END "
-                    else:
-                        res = x.avail
-                        canfit = x.canfit
-                        contents += "%s" % res
-                    contents += "} (Fits: %i) ]  " % canfit
-                self.logger.vdebug(contents)



More information about the Haizea-commit mailing list