[haizea-commit] r542 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Tue Nov 11 18:59:16 CST 2008


Author: borja
Date: 2008-11-11 18:59:08 -0600 (Tue, 11 Nov 2008)
New Revision: 542

Modified:
   trunk/src/haizea/resourcemanager/datastruct.py
   trunk/src/haizea/resourcemanager/scheduler.py
Log:
Explicitly schedule cold migrations

Modified: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py	2008-11-07 22:38:08 UTC (rev 541)
+++ trunk/src/haizea/resourcemanager/datastruct.py	2008-11-12 00:59:08 UTC (rev 542)
@@ -337,7 +337,7 @@
 class VMResourceReservation(ResourceReservation):
     def __init__(self, lease, start, end, nodes, res, backfill_reservation):
         ResourceReservation.__init__(self, lease, start, end, res)
-        self.nodes = nodes
+        self.nodes = nodes # { vnode -> pnode }
         self.backfill_reservation = backfill_reservation
         self.pre_rrs = []
         self.post_rrs = []
@@ -471,10 +471,19 @@
         return rr
 
 class MigrationResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, res, vmrr):
+    def __init__(self, lease, start, end, res, vmrr, transfers):
         ResourceReservation.__init__(self, lease, start, end, res)
         self.vmrr = vmrr
+        self.transfers = transfers
+        
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : MIGRATE")
+        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
+        ResourceReservation.print_contents(self, loglevel)        
 
+    def is_preemptible(self):
+        return False        
+
 #-------------------------------------------------------------------#
 #                                                                   #
 #                         LEASE CONTAINERS                          #

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-11-07 22:38:08 UTC (rev 541)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-11-12 00:59:08 UTC (rev 542)
@@ -40,7 +40,7 @@
 import haizea.common.constants as constants
 from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
 from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation 
+from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation, MigrationResourceReservation
 from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
 from operator import attrgetter, itemgetter
 from mx.DateTime import TimeDelta
@@ -131,6 +131,10 @@
                               on_start = Scheduler._handle_start_resume,
                               on_end   = Scheduler._handle_end_resume)
         
+        self.register_handler(type     = ds.MigrationResourceReservation, 
+                              on_start = Scheduler._handle_start_migrate,
+                              on_end   = Scheduler._handle_end_migrate)
+        
         for (type, handler) in self.deployment_scheduler.handlers.items():
             self.handlers[type] = handler
 
@@ -389,24 +393,19 @@
             if lease.state != Lease.STATE_SUSPENDED:
                 self.deployment_scheduler.schedule(lease, vmrr, nexttime)
             else:
-                # TODO: schedule migrations
-                pass
+                self.__schedule_migration(lease, vmrr, nexttime)
 
             # At this point, the lease is feasible.
             # Commit changes by adding RRs to lease and to slot table
             
-            # Add resource reservations to lease
-            # TODO: deployment
-            # TODO: migrations
+            # Add VMRR to lease
             lease.append_vmrr(vmrr)
             
 
             # Add resource reservations to slottable
             
-            # TODO: deployment
+            # TODO: deployment RRs should be added here, not in the preparation scheduler
             
-            # TODO: migrations
-            
             # Pre-VM RRs (if any)
             for rr in vmrr.pre_rrs:
                 self.slottable.addReservation(rr)
@@ -1012,9 +1011,61 @@
         for resmrr in resume_rrs:
             vmrr.pre_rrs.append(resmrr)        
            
-    
- 
 
+    # TODO: This has to be tied in with the preparation scheduler
+    def __schedule_migration(self, lease, vmrr, nexttime):
+        last_vmrr = lease.get_last_vmrr()
+        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
+        
+        mustmigrate = False
+        for vnode in vnode_migrations:
+            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
+                mustmigrate = True
+                break
+            
+        if not mustmigrate:
+            return
+
+        # Figure out what migrations can be done simultaneously
+        migrations = []
+        while len(vnode_migrations) > 0:
+            pnodes = set()
+            migration = {}
+            for vnode in vnode_migrations:
+                origin = vnode_migrations[vnode][0]
+                dest = vnode_migrations[vnode][1]
+                if not origin in pnodes and not dest in pnodes:
+                    migration[vnode] = vnode_migrations[vnode]
+                    pnodes.add(origin)
+                    pnodes.add(dest)
+            for vnode in migration:
+                del vnode_migrations[vnode]
+            migrations.append(migration)
+        
+        # Create migration RRs
+        start = last_vmrr.post_rrs[-1].end
+        migr_time = self.__estimate_migration_time(lease)
+        bandwidth = self.resourcepool.info.get_migration_bandwidth()
+        migr_rrs = []
+        for m in migrations:
+            end = start + migr_time
+            res = {}
+            for (origin,dest) in m.values():
+                resorigin = ds.ResourceTuple.create_empty()
+                resorigin.set_by_type(constants.RES_NETOUT, bandwidth)
+                resdest = ds.ResourceTuple.create_empty()
+                resdest.set_by_type(constants.RES_NETIN, bandwidth)
+                res[origin] = resorigin
+                res[dest] = resdest
+            migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+            migr_rr.state = ResourceReservation.STATE_SCHEDULED
+            migr_rrs.append(migr_rr)
+            start = end
+
+        migr_rrs.reverse()
+        for migr_rr in migr_rrs:
+            vmrr.pre_rrs.insert(0, migr_rr)
+
     def __compute_suspend_resume_time(self, mem, rate):
         time = float(mem) / rate
         time = round_datetime_delta(TimeDelta(seconds = time))
@@ -1043,9 +1094,7 @@
 
 
     def __estimate_migration_time(self, lease):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        whattomigrate = config.get("what-to-migrate")
+        whattomigrate = get_config().get("what-to-migrate")
         bandwidth = self.resourcepool.info.get_migration_bandwidth()
         if whattomigrate == constants.MIGRATE_NONE:
             return TimeDelta(seconds=0)
@@ -1556,7 +1605,7 @@
     def _handle_start_migrate(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
         l.print_contents()
-
+        rr.state = ResourceReservation.STATE_ACTIVE
         l.print_contents()
         self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
         self.logger.info("Migrating lease %i..." % (l.id))
@@ -1565,22 +1614,21 @@
         self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
         l.print_contents()
 
-#        if lease.state == Lease.STATE_SUSPENDED:
-#            # Update VM image mappings, since we might be resuming
-#            # in different nodes.
-#            for vnode, pnode in lease.vmimagemap.items():
-#                self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
-#            lease.vmimagemap = vmrr.nodes
-#            for vnode, pnode in lease.vmimagemap.items():
-#                self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
-#            
-#            # Update RAM file mappings
-#            for vnode, pnode in lease.memimagemap.items():
-#                self.resourcepool.remove_ramfile(pnode, lease.id, vnode)
-#            for vnode, pnode in vmrr.nodes.items():
-#                self.resourcepool.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
-#                lease.memimagemap[vnode] = pnode
+        for vnode in rr.transfers:
+            origin = rr.transfers[vnode][0]
+            dest = rr.transfers[vnode][1]
+            
+            # Update VM image mappings
+            self.resourcepool.remove_diskimage(origin, l.id, vnode)
+            self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
+            l.diskimagemap[vnode] = dest
 
+            # Update RAM file mappings
+            self.resourcepool.remove_ramfile(origin, l.id, vnode)
+            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
+            l.memimagemap[vnode] = dest
+        
+        rr.state = ResourceReservation.STATE_DONE
         l.print_contents()
         self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
         self.logger.info("Migrated lease %i..." % (l.id))



More information about the Haizea-commit mailing list