[haizea-commit] r608 - in branches/TP2.0/src/haizea: common core core/scheduler core/scheduler/preparation_schedulers

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Fri Jul 17 10:02:16 CDT 2009


Author: borja
Date: 2009-07-17 10:02:10 -0500 (Fri, 17 Jul 2009)
New Revision: 608

Modified:
   branches/TP2.0/src/haizea/common/constants.py
   branches/TP2.0/src/haizea/core/configfile.py
   branches/TP2.0/src/haizea/core/manager.py
   branches/TP2.0/src/haizea/core/scheduler/__init__.py
   branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
   branches/TP2.0/src/haizea/core/scheduler/mapper.py
   branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
   branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
   branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
Log:
Migration is working again. It is now separated into two steps: migrating the software environment (usually a disk image) and migrating the memory state files.

Modified: branches/TP2.0/src/haizea/common/constants.py
===================================================================
--- branches/TP2.0/src/haizea/common/constants.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/common/constants.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -42,9 +42,9 @@
 SUSPRES_EXCLUSION_LOCAL="local"
 SUSPRES_EXCLUSION_GLOBAL="global"
 
-MIGRATE_NONE="nothing"
-MIGRATE_MEM="mem"
-MIGRATE_MEMDISK="mem+disk"
+MIGRATE_NO="no"
+MIGRATE_YES="yes"
+MIGRATE_YES_NOTRANSFER="yes-notransfer"
 
 TRANSFER_UNICAST="unicast"
 TRANSFER_MULTICAST="multicast"

Modified: branches/TP2.0/src/haizea/core/configfile.py
===================================================================
--- branches/TP2.0/src/haizea/core/configfile.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/configfile.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -341,31 +341,20 @@
 
      Option(name        = "migration",
             getter      = "migration",
-            type        = OPTTYPE_BOOLEAN,
-            required    = True,
-            doc         = """
-            Specifies whether leases can be migrated from one
-            physical node to another. Valid values are "True" or "False"                
-            """),
-
-     Option(name        = "what-to-migrate",
-            getter      = "what-to-migrate",
             type        = OPTTYPE_STRING,
             required    = False,
-            required_if = [(("scheduling","migration"),True)],
-            default     = constants.MIGRATE_NONE,
-            valid       = [constants.MIGRATE_NONE,
-                           constants.MIGRATE_MEM,
-                           constants.MIGRATE_MEMDISK],
+            default     = constants.MIGRATE_NO,          
+            valid       = [constants.MIGRATE_NO,
+                           constants.MIGRATE_YES,
+                           constants.MIGRATE_YES_NOTRANSFER],              
             doc         = """
-            Specifies what data has to be moved around when
-            migrating a lease. Valid values are:
-            
-             - nothing: migration can be performed without transferring any
-               files.
-             - mem: only the memory must be transferred
-             - mem+disk: both the memory and the VM disk image must be
-               transferred                
+            Specifies whether leases can be migrated from one
+            physical node to another. Valid values are: 
+                           
+             - no
+             - yes
+             - yes-notransfer: migration can be performed without
+               transferring any files. 
             """),
 
      Option(name        = "non-schedulable-interval",

Modified: branches/TP2.0/src/haizea/core/manager.py
===================================================================
--- branches/TP2.0/src/haizea/core/manager.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/manager.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -405,16 +405,27 @@
             self.__unexpected_exception(exc)
 
 
-    def process_reservations(self, time):
+    def process_starting_reservations(self, time):
         """Process reservations starting/stopping at specified time"""
         
         # The scheduler takes care of this.
         try:
-            self.scheduler.process_reservations(time)
+            self.scheduler.process_starting_reservations(time)
         except UnrecoverableError, exc:
             self.__unrecoverable_error(exc)
         except Exception, exc:
             self.__unexpected_exception(exc)
+
+    def process_ending_reservations(self, time):
+        """Process reservations starting/stopping at specified time"""
+        
+        # The scheduler takes care of this.
+        try:
+            self.scheduler.process_ending_reservations(time)
+        except UnrecoverableError, exc:
+            self.__unrecoverable_error(exc)
+        except Exception, exc:
+            self.__unexpected_exception(exc)
          
     def notify_event(self, lease_id, event):
         """Notifies an asynchronous event to Haizea.
@@ -630,12 +641,15 @@
                 
             # Process reservations starting/stopping at the current time and
             # check if there are any new requests.
-            self.manager.process_reservations(self.time)
+            self.manager.process_ending_reservations(self.time)
+            self.manager.process_starting_reservations(self.time)
             self.manager.process_requests(self.time)
             
             # Since processing requests may have resulted in new reservations
             # starting now, we process reservations again.
-            self.manager.process_reservations(self.time)
+            self.manager.process_starting_reservations(self.time)
+            # And one final call to deal with nil-duration reservations
+            self.manager.process_ending_reservations(self.time)
             
             # Print a status message
             if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
@@ -808,7 +822,8 @@
             self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
             
             # Wake up the resource manager
-            self.manager.process_reservations(self.lastwakeup)
+            self.manager.process_ending_reservations(self.lastwakeup)
+            self.manager.process_starting_reservations(self.lastwakeup)
             # TODO: Compute nextschedulable here, before processing requests
             self.manager.process_requests(self.nextschedulable)
             

Modified: branches/TP2.0/src/haizea/core/scheduler/__init__.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/__init__.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/__init__.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -16,6 +16,8 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
+from haizea.core.scheduler.slottable import ResourceReservation
+import haizea.common.constants as constants
 import sys
 
 class SchedException(Exception):
@@ -89,4 +91,10 @@
     
     def __init__(self, time, type):
         self.time = time
-        self.type = type  
\ No newline at end of file
+        self.type = type  
+        
+class MigrationResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vmrr, transfers):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.transfers = transfers         
\ No newline at end of file

Modified: branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -31,7 +31,7 @@
 import haizea.common.constants as constants
 from haizea.common.utils import round_datetime, get_config, get_accounting, get_clock
 from haizea.core.leases import Lease
-from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException
+from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
 from haizea.core.scheduler.slottable import ResourceReservation
 from haizea.core.scheduler.vm_scheduler import VMResourceReservation
 from operator import attrgetter
@@ -169,7 +169,7 @@
         self.__process_queue(nexttime)
         
     
-    def process_reservations(self, nowtime):
+    def process_starting_reservations(self, nowtime):
         """Processes starting/ending reservations
         
         This method checks the slottable to see if there are any reservations that are
@@ -182,6 +182,53 @@
         # Find starting/ending reservations
         starting = self.slottable.get_reservations_starting_at(nowtime)
         starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
+        
+        # Process starting reservations
+        for rr in starting:
+            lease = rr.lease
+            # Call the appropriate handler, and catch exceptions and errors.
+            try:
+                self.handlers[type(rr)].on_start(lease, rr)
+                
+            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
+            # state. This is usually indicative of a programming error, but not necessarily
+            # one that affects all leases, so we just fail this lease. Note that Haizea can also
+            # be configured to stop immediately when a lease fails.
+            except InconsistentLeaseStateError, exc:
+                self.fail_lease(lease, exc)
+            # An EnactmentError is raised when the handler had to perform an enactment action
+            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
+            # as a non-recoverable error for the lease, and the lease is failed.
+            except EnactmentError, exc:
+                self.fail_lease(lease, exc)
+
+            # Other exceptions are not expected, and generally indicate a programming error.
+            # Thus, they are propagated upwards to the Manager where they will make
+            # Haizea crash and burn.
+            
+
+        # TODO: Move up to manager
+        # Each time we process reservations, we report resource utilization to the accounting
+        # module. This utilization information shows what portion of the physical resources
+        # is used by each type of reservation (e.g., 70% are running a VM, 5% are doing suspensions,
+        # etc.) See the get_utilization module for details on how this data is stored.
+        # Currently we only collect utilization from the VM Scheduler (in the future,
+        # information will also be gathered from the preparation scheduler).
+        #util = self.vm_scheduler.get_utilization(nowtime)
+        #get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)        
+
+
+    def process_ending_reservations(self, nowtime):
+        """Processes starting/ending reservations
+        
+        This method checks the slottable to see if there are any reservations that are
+        starting or ending at "nowtime". If so, the appropriate handler is called.
+
+        Arguments:
+        nowtime -- Time at which to check for starting/ending reservations.
+        """
+
+        # Find starting/ending reservations
         ending = self.slottable.get_reservations_ending_at(nowtime)
         ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
 
@@ -228,42 +275,7 @@
             # Thus, they are propagated upwards to the Manager where they will make
             # Haizea crash and burn.
                 
-        
-        # Process starting reservations
-        for rr in starting:
-            lease = rr.lease
-            # Call the appropriate handler, and catch exceptions and errors.
-            try:
-                self.handlers[type(rr)].on_start(lease, rr)
-                
-                
-            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
-            # state. This is usually indicative of a programming error, but not necessarily
-            # one that affects all leases, so we just fail this lease. Note that Haizea can also
-            # be configured to stop immediately when a lease fails.
-            except InconsistentLeaseStateError, exc:
-                self.fail_lease(lease, exc)
-            # An EnactmentError is raised when the handler had to perform an enactment action
-            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
-            # as a non-recoverable error for the lease, and the lease is failed.
-            except EnactmentError, exc:
-                self.fail_lease(lease, exc)
 
-            # Other exceptions are not expected, and generally indicate a programming error.
-            # Thus, they are propagated upwards to the Manager where they will make
-            # Haizea crash and burn.
-            
-
-        # Each time we process reservations, we report resource utilization to the accounting
-        # module. This utilization information shows what portion of the physical resources
-        # is used by each type of reservation (e.g., 70% are running a VM, 5% are doing suspensions,
-        # etc.) See the get_utilization module for details on how this data is stored.
-        # Currently we only collect utilization from the VM Scheduler (in the future,
-        # information will also be gathered from the preparation scheduler).
-        util = self.vm_scheduler.get_utilization(nowtime)
-        get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)        
-
-
     def get_lease_by_id(self, lease_id):
         """Gets a lease with the given ID
         
@@ -382,7 +394,11 @@
             self.vm_scheduler.cancel_vm(vmrr)
             l.remove_vmrr(vmrr)
             # TODO: Clean up (transfers, etc.)
-            l.state = Lease.STATE_PENDING
+            if l.state in (Lease.STATE_READY, Lease.STATE_SCHEDULED):
+                l.state = Lease.STATE_PENDING
+            elif l.state == Lease.STATE_SUSPENDED_SCHEDULED:
+                l.state = Lease.STATE_SUSPENDED_PENDING
+                
             self.__schedule_lease(l, nexttime)
 
     def is_queue_empty(self):
@@ -446,19 +462,26 @@
         """       
                 
         lease_state = lease.get_state()
-
+        migration = get_config().get("migration")
+        
         # Determine earliest start time in each node
         if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
             # Figure out earliest start times based on
             # image schedule and reusable images
             earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
-        elif lease_state == Lease.STATE_SUSPENDED_QUEUED or lease_state == Lease.STATE_SUSPENDED_SCHEDULED:
-            # No need to transfer images from repository
-            # (only intra-node transfer)
-            earliest = self.preparation_scheduler.find_earliest_migration_times(lease, nexttime)            
-            migr_time = self.preparation_scheduler.estimate_migration_time(lease)
-            for pnode in earliest:
-                earliest[pnode].time += migr_time
+        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
+            # Migration
+
+            node_ids = self.slottable.nodes.keys()
+            earliest = {}
+            if migration == constants.MIGRATE_NO:
+                for node in node_ids:
+                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
+            else:
+                prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)            
+                vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
+                for node in node_ids:
+                    earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
         else:
             raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
 
@@ -472,10 +495,17 @@
         # Schedule deployment
         is_ready = False
         preparation_rrs = []
-        if lease_state == Lease.STATE_SUSPENDED_QUEUED:
-            #self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
-            self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
-        else:
+        if lease_state == Lease.STATE_SUSPENDED_QUEUED and migration != constants.MIGRATE_NO:
+            migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
+            if len(migr_rrs) > 0:
+                end_migr = migr_rrs[-1].end
+            else:
+                end_migr = nexttime
+            migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
+            migr_rrs.reverse()
+            for migr_rr in migr_rrs:
+                vmrr.pre_rrs.insert(0, migr_rr)
+        elif lease_state != Lease.STATE_SUSPENDED_QUEUED:
             preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
 
         # At this point, the lease is feasible.

Modified: branches/TP2.0/src/haizea/core/scheduler/mapper.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/mapper.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/mapper.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -39,11 +39,13 @@
     def __init__(self, slottable, policy):
         Mapper.__init__(self, slottable, policy)
         
-    def map(self, lease, requested_resources, start, end, strictend):
+    def map(self, lease, requested_resources, start, end, strictend, onlynodes=None):
         aw = self.slottable.get_availability_window(start)
          
         nodes = aw.get_nodes_at(start)     
-        
+        if onlynodes != None:
+            nodes = list(set(nodes) & onlynodes)
+
         pnodes = self.policy.sort_hosts(nodes, start, lease)
         vnodes = self.__sort_vnodes(requested_resources)
         vnodes.reverse()

Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -19,10 +19,12 @@
 import haizea.common.constants as constants
 from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
 from haizea.core.scheduler.slottable import ResourceReservation
-from haizea.core.leases import Lease, Capacity
+from haizea.core.scheduler import MigrationResourceReservation
+from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
 from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
 from haizea.common.utils import estimate_transfer_time, get_config
 from haizea.core.scheduler.slottable import ResourceTuple
+from mx.DateTime import TimeDelta
 
 import copy
 import bisect
@@ -58,11 +60,91 @@
                                 on_end   = ImageTransferPreparationScheduler._handle_end_migrate)
 
     def schedule(self, lease, vmrr, earliest):
+        if type(lease.software) == UnmanagedSoftwareEnvironment:
+            return [], True
         if lease.get_type() == Lease.ADVANCE_RESERVATION:
             return self.__schedule_deadline(lease, vmrr, earliest)
         elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
             return self.__schedule_asap(lease, vmrr, earliest)
 
+    def schedule_migration(self, lease, vmrr, nexttime):
+        if type(lease.software) == UnmanagedSoftwareEnvironment:
+            return []
+        
+        # This code is the same as the one in vm_scheduler
+        # Should be factored out
+        last_vmrr = lease.get_last_vmrr()
+        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
+        
+        mustmigrate = False
+        for vnode in vnode_migrations:
+            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
+                mustmigrate = True
+                break
+            
+        if not mustmigrate:
+            return []
+
+        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
+            start = nexttime
+            end = nexttime
+            res = {}
+            migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
+            migr_rr.state = ResourceReservation.STATE_SCHEDULED
+            return [migr_rr]
+
+        # Figure out what migrations can be done simultaneously
+        migrations = []
+        while len(vnode_migrations) > 0:
+            pnodes = set()
+            migration = {}
+            for vnode in vnode_migrations:
+                origin = vnode_migrations[vnode][0]
+                dest = vnode_migrations[vnode][1]
+                if not origin in pnodes and not dest in pnodes:
+                    migration[vnode] = vnode_migrations[vnode]
+                    pnodes.add(origin)
+                    pnodes.add(dest)
+            for vnode in migration:
+                del vnode_migrations[vnode]
+            migrations.append(migration)
+        
+        # Create migration RRs
+        start = max(last_vmrr.post_rrs[-1].end, nexttime)
+        bandwidth = self.resourcepool.info.get_migration_bandwidth()
+        migr_rrs = []
+        for m in migrations:
+            mb_to_migrate = lease.software.image_size * len(m.keys())
+            migr_time = estimate_transfer_time(mb_to_migrate, bandwidth)
+            end = start + migr_time
+            res = {}
+            for (origin,dest) in m.values():
+                resorigin = Capacity([constants.RES_NETOUT])
+                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
+                resdest = Capacity([constants.RES_NETIN])
+                resdest.set_quantity(constants.RES_NETIN, bandwidth)
+                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
+                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
+            migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+            migr_rr.state = ResourceReservation.STATE_SCHEDULED
+            migr_rrs.append(migr_rr)
+            start = end
+        
+        return migr_rrs
+
+    def estimate_migration_time(self, lease):
+        migration = get_config().get("migration")
+        if migration == constants.MIGRATE_YES:
+            vmrr = lease.get_last_vmrr()
+            images_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
+            for (vnode,pnode) in vmrr.nodes.items():
+                images_in_pnode[pnode] += lease.software.image_size
+            max_to_transfer = max(images_in_pnode.values())
+            bandwidth = self.resourcepool.info.get_migration_bandwidth()
+            return estimate_transfer_time(max_to_transfer, bandwidth)
+        elif migration == constants.MIGRATE_YES_NOTRANSFER:
+            return TimeDelta(seconds=0)
+
     def find_earliest_starting_times(self, lease, nexttime):
         node_ids = [node.id for node in self.resourcepool.get_nodes()]  
         config = get_config()
@@ -70,6 +152,12 @@
         reusealg = config.get("diskimage-reuse")
         avoidredundant = config.get("avoid-redundant-transfers")
         
+        if type(lease.software) == UnmanagedSoftwareEnvironment:
+            earliest = {}
+            for node in node_ids:
+                earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
+            return earliest
+        
         # Figure out earliest times assuming we have to transfer the images
         transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
         if mechanism == constants.TRANSFER_UNICAST:
@@ -386,14 +474,30 @@
         sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
         sched.logger.info("Completed image transfer for lease %i" % (lease.id))
 
-    @staticmethod
-    def _handle_start_migrate(sched, lease, rr):
-        pass
+    def _handle_start_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_ACTIVE
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+        self.logger.info("Migrating lease %i..." % (l.id))
 
-    @staticmethod
-    def _handle_end_migrate(sched, lease, rr):
-        pass
+    def _handle_end_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+        l.print_contents()
+
+        for vnode in rr.transfers:
+            origin = rr.transfers[vnode][0]
+            dest = rr.transfers[vnode][1]
+            
+            self.resourcepool.remove_diskimage(origin, l.id, vnode)
+            self.resourcepool.add_diskimage(dest, l.software.image_id, l.software.image_size, l.id, vnode)
         
+        rr.state = ResourceReservation.STATE_DONE
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+        self.logger.info("Migrated lease %i..." % (l.id))
+        
     def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
         self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
 
@@ -442,11 +546,7 @@
                 self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
                     
         pnode.print_files()
-        
 
-class DiskImageMigrationResourceReservation(ResourceReservation):
-    pass
-
 class FileTransferResourceReservation(ResourceReservation):
     def __init__(self, lease, res, start=None, end=None):
         ResourceReservation.__init__(self, lease, start, end, res)
@@ -484,4 +584,11 @@
         self.transfer_start = None
         self.piggybacking_on = None
 
+class DiskImageMigrationResourceReservation(MigrationResourceReservation):
+    def __init__(self, lease, start, end, res, vmrr, transfers):
+        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
 
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : DISK IMAGE MIGRATION")
+        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
+        ResourceReservation.print_contents(self, loglevel)     
\ No newline at end of file

Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -20,6 +20,7 @@
 from haizea.core.scheduler import EarliestStartingTime
 from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
 import haizea.common.constants as constants
+from mx.DateTime import TimeDelta
 
 class UnmanagedPreparationScheduler(PreparationScheduler):
     def __init__(self, slottable, resourcepool, deployment_enact):
@@ -38,6 +39,12 @@
             earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
         return earliest
             
+    def estimate_migration_time(self, lease):
+        return TimeDelta(seconds=0)     
+            
+    def schedule_migration(self, lease, vmrr, nexttime):
+        return []
+                
     def cancel_preparation(self, lease):
         self.cleanup(lease)
 

Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-17 15:02:10 UTC (rev 608)
@@ -20,7 +20,7 @@
 from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_policy
 from haizea.core.leases import Lease, Capacity
 from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
-from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError
+from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
 from haizea.core.scheduler.mapper import GreedyMapper
 from operator import attrgetter, itemgetter
 from mx.DateTime import TimeDelta
@@ -62,7 +62,7 @@
                                 on_start = VMScheduler._handle_start_resume,
                                 on_end   = VMScheduler._handle_end_resume)
 
-        self.handlers[MigrationResourceReservation] = ReservationEventHandler(
+        self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
                                 sched    = self,
                                 on_start = VMScheduler._handle_start_migrate,
                                 on_end   = VMScheduler._handle_end_migrate)
@@ -129,8 +129,6 @@
         if allow_reservation_in_future == None:
             allow_reservation_in_future = self.can_reserve_in_future()
 
-        canmigrate = get_config().get("migration")
-
         #
         # STEP 1: FIGURE OUT THE MINIMUM DURATION
         #
@@ -145,6 +143,12 @@
         # Find the changepoints, and the nodes we can use at each changepoint
         # Nodes may not be available at a changepoint because images
         # cannot be transferred at that time.
+        if mustresume and get_config().get("migration") == constants.MIGRATE_NO:
+            vmrr = lease.get_last_vmrr()
+            onlynodes = set(vmrr.nodes.values())
+        else:
+            onlynodes = None        
+            
         if not mustresume:
             cps = [(node, e.time) for node, e in earliest.items()]
             cps.sort(key=itemgetter(1))
@@ -154,27 +158,14 @@
             for node, time in cps:
                 nodes.append(node)
                 if time != curcp:
-                    changepoints.append([time, nodes[:]])
+                    changepoints.append([time, set(nodes)])
                     curcp = time
                 else:
-                    changepoints[-1][1] = nodes[:]
+                    changepoints[-1][1] = set(nodes)
         else:
-            if not canmigrate:
-                vmrr = lease.get_last_vmrr()
-                curnodes = set(vmrr.nodes.values())
-            else:
-                curnodes=None
-                # If we have to resume this lease, make sure that
-                # we have enough time to transfer the images.
-                migratetime = self.__estimate_migration_time(lease)
-                earliesttransfer = get_clock().get_time() + migratetime
-    
-                for n in earliest:
-                    earliest[n][0] = max(earliest[n][0], earliesttransfer)
-
-            changepoints = list(set([x[0] for x in earliest.values()]))
+            changepoints = list(set([x.time for x in earliest.values()]))
             changepoints.sort()
-            changepoints = [(x, curnodes) for x in changepoints]
+            changepoints = [(x, onlynodes) for x in changepoints]
 
         # If we can make reservations in the future,
         # we also consider future changepoints
@@ -186,7 +177,10 @@
             # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
             # included in futurecp.
             futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
-            futurecp = [(p,None) for p in futurecp]
+            if not mustresume:
+                futurecp = [(p,None) for p in futurecp]
+            else:
+                futurecp = [(p,onlynodes) for p in futurecp]                
         else:
             futurecp = []
 
@@ -278,7 +272,18 @@
         return vmrr, preemptions
 
     def estimate_migration_time(self, lease):
-        return self.__estimate_migration_time(lease)
+        migration = get_config().get("migration")
+        if migration == constants.MIGRATE_YES:
+            vmrr = lease.get_last_vmrr()
+            mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
+            for (vnode,pnode) in vmrr.nodes.items():
+                mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
+                mem_in_pnode[pnode] += mem
+            max_mem_to_transfer = max(mem_in_pnode.values())
+            bandwidth = self.resourcepool.info.get_migration_bandwidth()
+            return estimate_transfer_time(max_mem_to_transfer, bandwidth)
+        elif migration == constants.MIGRATE_YES_NOTRANSFER:
+            return TimeDelta(seconds=0)        
 
     def schedule_migration(self, lease, vmrr, nexttime):
         last_vmrr = lease.get_last_vmrr()
@@ -291,8 +296,16 @@
                 break
             
         if not mustmigrate:
-            return
+            return []
 
+        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
+            start = nexttime
+            end = nexttime
+            res = {}
+            migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
+            migr_rr.state = ResourceReservation.STATE_SCHEDULED
+            return [migr_rr]
+
         # Figure out what migrations can be done simultaneously
         migrations = []
         while len(vnode_migrations) > 0:
@@ -311,10 +324,12 @@
         
         # Create migration RRs
         start = max(last_vmrr.post_rrs[-1].end, nexttime)
-        migr_time = self.__estimate_migration_time(lease)
         bandwidth = self.resourcepool.info.get_migration_bandwidth()
         migr_rrs = []
         for m in migrations:
+            vnodes_to_migrate = m.keys()
+            max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
+            migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
             end = start + migr_time
             res = {}
             for (origin,dest) in m.values():
@@ -324,15 +339,13 @@
                 resdest.set_quantity(constants.RES_NETIN, bandwidth)
                 res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
                 res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
-            migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+            migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
             migr_rr.state = ResourceReservation.STATE_SCHEDULED
             migr_rrs.append(migr_rr)
             start = end
+            
+        return migr_rrs
 
-        migr_rrs.reverse()
-        for migr_rr in migr_rrs:
-            vmrr.pre_rrs.insert(0, migr_rr)
-
     def cancel_vm(self, vmrr):
 
         if vmrr.backfill_reservation == True:
@@ -405,9 +418,11 @@
                                                               requested_resources,
                                                               start, 
                                                               end, 
-                                                              strictend = False)
+                                                              strictend = False,
+                                                              onlynodes = onlynodes)
             
             if mapping != None:
+                # TODO: Take into account case where suspension is disabled.
                 if actualend < end:
                     actualduration = actualend - start
                     if actualduration >= min_duration:
@@ -669,21 +684,6 @@
         else:
             return self.__estimate_suspend_resume_time(lease, rate)
 
-    def __estimate_migration_time(self, lease):
-        whattomigrate = get_config().get("what-to-migrate")
-        if whattomigrate == constants.MIGRATE_NONE:
-            # TODO: At this point, giving an RR a duration of 0 seconds
-            # will produce unexpected results. So, we need to give
-            # migrations a symbolic duration of one second,
-            # even when we are assuming that migrations are instantaneous
-            return TimeDelta(seconds=1)
-        else:
-            bandwidth = self.resourcepool.info.get_migration_bandwidth()
-            if whattomigrate == constants.MIGRATE_MEM:
-                mbtotransfer = lease.requested_resources.get_quantity(constants.RES_MEM)
-            elif whattomigrate == constants.MIGRATE_MEMDISK:
-                mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
-            return estimate_transfer_time(mbtotransfer, bandwidth)
 
     # TODO: Take into account other things like boot overhead, migration overhead, etc.
     def __compute_scheduling_threshold(self, lease):
@@ -897,11 +897,6 @@
             origin = rr.transfers[vnode][0]
             dest = rr.transfers[vnode][1]
             
-            # Commenting for now
-            # Has to be moved to preparation scheduler migrate handler
-            #self.resourcepool.remove_diskimage(origin, l.id, vnode)
-            #self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
-
             # Update RAM files
             self.resourcepool.remove_ramfile(origin, l.id, vnode)
             self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
@@ -913,8 +908,6 @@
 
 
 
-            
-
 class VMResourceReservation(ResourceReservation):
     def __init__(self, lease, start, end, nodes, res, backfill_reservation):
         ResourceReservation.__init__(self, lease, start, end, res)
@@ -978,9 +971,6 @@
         for susprr in self.post_rrs:
             self.logger.log(loglevel, "--")
             susprr.print_contents(loglevel)
-        
-    def is_preemptible(self):
-        return self.lease.preemptible
 
     def xmlrpc_marshall(self):
         rr = ResourceReservation.xmlrpc_marshall(self)
@@ -1004,14 +994,8 @@
         return (self == self.vmrr.post_rrs[0])
 
     def is_last(self):
-        return (self == self.vmrr.post_rrs[-1])
+        return (self == self.vmrr.post_rrs[-1])   
         
-    # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
-    # has wider implications (with a non-trivial handling). For now, we leave them 
-    # as non-preemptible, since the probability of preempting a suspension RR is slim.
-    def is_preemptible(self):
-        return False        
-        
     def xmlrpc_marshall(self):
         rr = ResourceReservation.xmlrpc_marshall(self)
         rr["type"] = "SUSP"
@@ -1035,13 +1019,7 @@
     def is_last(self):
         resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
         return (self == resm_rrs[-1])
-
-    # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
-    # has wider implications (with a non-trivial handling). For now, we leave them 
-    # as non-preemptible, since the probability of preempting a resumption RR is slim.
-    def is_preemptible(self):
-        return False        
-        
+    
     def xmlrpc_marshall(self):
         rr = ResourceReservation.xmlrpc_marshall(self)
         rr["type"] = "RESM"
@@ -1056,26 +1034,17 @@
     def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
         self.logger.log(loglevel, "Type           : SHUTDOWN")
         ResourceReservation.print_contents(self, loglevel)
-        
-    def is_preemptible(self):
-        return True        
-        
+
     def xmlrpc_marshall(self):
         rr = ResourceReservation.xmlrpc_marshall(self)
         rr["type"] = "SHTD"
         return rr
 
-class MigrationResourceReservation(ResourceReservation):
+class MemImageMigrationResourceReservation(MigrationResourceReservation):
     def __init__(self, lease, start, end, res, vmrr, transfers):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.vmrr = vmrr
-        self.transfers = transfers
-        
+        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
+  
     def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Type           : MIGRATE")
+        self.logger.log(loglevel, "Type           : MEM IMAGE MIGRATION")
         self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
-        ResourceReservation.print_contents(self, loglevel)        
-
-    def is_preemptible(self):
-        return False        
-
+        ResourceReservation.print_contents(self, loglevel)   
\ No newline at end of file



More information about the Haizea-commit mailing list