[haizea-commit] r563 - in trunk: src/haizea/common src/haizea/resourcemanager src/haizea/resourcemanager/enact src/haizea/resourcemanager/scheduler src/haizea/resourcemanager/scheduler/preparation_schedulers tests

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Wed Feb 4 19:16:47 CST 2009


Author: borja
Date: 2009-02-04 19:16:39 -0600 (Wed, 04 Feb 2009)
New Revision: 563

Modified:
   trunk/src/haizea/common/constants.py
   trunk/src/haizea/resourcemanager/configfile.py
   trunk/src/haizea/resourcemanager/enact/__init__.py
   trunk/src/haizea/resourcemanager/enact/opennebula.py
   trunk/src/haizea/resourcemanager/rm.py
   trunk/src/haizea/resourcemanager/scheduler/__init__.py
   trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
   trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
   trunk/src/haizea/resourcemanager/scheduler/slottable.py
   trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
   trunk/tests/base_config_simulator.conf
Log:
Sanitized and improved exception handling

Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/common/constants.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -111,4 +111,8 @@
 LOGLEVEL_VDEBUG = 5
 LOGLEVEL_STATUS = 25
 
-NO_MEMORY_OVERRIDE = -1
\ No newline at end of file
+NO_MEMORY_OVERRIDE = -1
+
+ONFAILURE_CANCEL = "cancel"
+ONFAILURE_EXIT = "exit"
+ONFAILURE_EXIT_RAISE = "exit-raise"
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/configfile.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -101,6 +101,26 @@
                from a repository node before the lease can start.
             """),
 
+     Option(name        = "lease-failure-handling",
+            getter      = "lease-failure-handling",
+            type        = OPTTYPE_STRING,
+            required    = False,
+            default     = constants.ONFAILURE_CANCEL,
+            valid       = [constants.ONFAILURE_CANCEL,
+                           constants.ONFAILURE_EXIT,
+                           constants.ONFAILURE_EXIT_RAISE],
+            doc         = """
+            Sets how the scheduler will handle a failure in
+            a lease. Valid values are:
+            
+             - cancel: The lease is cancelled and marked as "FAILED"
+             - exit: Haizea will exit cleanly, printing relevant debugging
+               information to its log.
+             - exit-raise: Haizea will exit by raising an exception. This is
+               useful for debugging, as IDEs will recognize this as an exception
+               and will facilitate debugging it.
+            """),
+
      Option(name        = "datafile",
             getter      = "datafile",
             type        = OPTTYPE_STRING,

Modified: trunk/src/haizea/resourcemanager/enact/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/__init__.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/enact/__init__.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -49,4 +49,3 @@
 class DeploymentEnactment(object):
     def __init__(self):
         pass
-    

Modified: trunk/src/haizea/resourcemanager/enact/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/enact/opennebula.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -16,6 +16,7 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
+from haizea.resourcemanager.scheduler import EnactmentError
 from haizea.resourcemanager.scheduler.resourcepool import Node
 from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
@@ -26,6 +27,14 @@
 import commands
 from time import sleep
 
+class OpenNebulaEnactmentError(EnactmentError):
+    def __init__(self, cmd, status, output):
+        self.cmd = cmd
+        self.status = status
+        self.output = output
+        
+        self.message = "Error when running '%s' (status=%i, output='%s')" % (cmd, status, output)
+
 class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
     ONEATTR2HAIZEA = { "TOTALCPU": constants.RES_CPU,
                    "TOTALMEMORY": constants.RES_MEM }
@@ -114,7 +123,7 @@
             if status == 0:
                 self.logger.debug("Command returned succesfully.")
             else:
-                raise Exception, "Error when running onevm deploy (status=%i, output='%s')" % (status, output)
+                raise OpenNebulaEnactmentError("onevm deploy", status, output)
             
     def stop(self, action):
         for vnode in action.vnodes:
@@ -125,7 +134,8 @@
             if status == 0:
                 self.logger.debug("Command returned succesfully.")
             else:
-                raise Exception, "Error when running onevm shutdown (status=%i, output='%s')" % (status, output)
+                raise OpenNebulaEnactmentError("onevm shutdown", status, output)
+
             # TODO: We should spawn out a thread to do this, so Haizea isn't
             # blocking until all these commands end
             interval = get_config().get("enactment-overhead").seconds
@@ -140,7 +150,8 @@
             if status == 0:
                 self.logger.debug("Command returned succesfully.")
             else:
-                raise Exception, "Error when running onevm suspend (status=%i, output='%s')" % (status, output)
+                raise OpenNebulaEnactmentError("onevm suspend", status, output)
+
             # Space out commands to avoid OpenNebula from getting saturated
             # TODO: We should spawn out a thread to do this
             # TODO: We should spawn out a thread to do this, so Haizea isn't
@@ -157,7 +168,8 @@
             if status == 0:
                 self.logger.debug("Command returned succesfully.")
             else:
-                raise Exception, "Error when running onevm resume (status=%i, output='%s')" % (status, output)
+                raise OpenNebulaEnactmentError("onevm resume", status, output)
+
             # Space out commands to avoid OpenNebula from getting saturated
             # TODO: We should spawn out a thread to do this, so Haizea isn't
             # blocking until all these commands end

Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/rm.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -42,6 +42,7 @@
 from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
 from haizea.resourcemanager.frontends.rpc import RPCFrontend
 from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler import UnrecoverableError
 from haizea.resourcemanager.scheduler.lease_scheduler import LeaseScheduler
 from haizea.resourcemanager.scheduler.vm_scheduler import VMScheduler
 from haizea.resourcemanager.scheduler.slottable import SlotTable
@@ -53,6 +54,7 @@
 import logging
 import signal
 import sys, os
+import traceback
 from time import sleep
 from math import ceil
 from mx.DateTime import now, TimeDelta
@@ -352,25 +354,48 @@
         # Run the scheduling function.
         try:
             self.scheduler.schedule(nexttime)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            self.logger.error("Exception in scheduling function. Dumping state..." )
-            self.print_stats(logging.getLevelName("ERROR"), verbose=True)
-            raise      
+        except UnrecoverableError, exc:
+            self.logger.error("Unrecoverable error has happened in scheduling function.")
+            self.logger.error("Original exception:")
+            self.print_exception(exc.exc, exc.get_traceback())
+            self.logger.error("Unrecoverable error traceback:")
+            self.print_exception(exc, sys.exc_traceback)
+            self.panic()
+        except Exception, exc:
+            self.logger.error("Unexpected exception has happened in scheduling function.")
+            self.print_exception(exc, sys.exc_traceback)
+            self.panic()
 
+
     def process_reservations(self, time):
         """Process reservations starting/stopping at specified time"""
         
         # The scheduler takes care of this.
         try:
             self.scheduler.process_reservations(time)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            self.logger.error("Exception when processing reservations. Dumping state..." )
-            self.print_stats(logging.getLevelName("ERROR"), verbose=True)
-            raise      
-
+        except UnrecoverableError, exc:
+            self.logger.error("Unrecoverable error has happened when processing reservations.")
+            self.logger.error("Original exception:")
+            self.print_exception(exc.exc, exc.get_traceback())
+            self.logger.error("Unrecoverable error traceback:")
+            self.print_exception(exc, sys.exc_traceback)
+            self.panic()
+        except Exception, exc:
+            self.logger.error("Unexpected exception has happened when processing reservations.")
+            self.print_exception(exc, sys.exc_traceback)
+            self.panic()
+            
+    def print_exception(self, exc, exc_traceback):
+        tb = traceback.format_tb(exc_traceback)
+        for line in tb:
+            self.logger.error(line)
+        self.logger.error("Message: %s" % exc)
+    
+    def panic(self):
+        #self.print_stats(logging.getLevelName("ERROR"), verbose=True)
+        exit(1)
         
+        
     def print_stats(self, loglevel, verbose=False):
         """Print some basic statistics in the log
         

Modified: trunk/src/haizea/resourcemanager/scheduler/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/__init__.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/__init__.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -16,42 +16,55 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
+import sys
 
 class SchedException(Exception):
-    """A simple exception class used for scheduling exceptions"""
+    """The base class for scheduling exceptions"""
     pass
 
 class NotSchedulableException(SchedException):
     """A simple exception class used when a lease cannot be scheduled
     
     This exception must be raised when a lease cannot be scheduled
-    (this is not necessarily an error condition, but the scheduler will
-    have to react to it)
     """
     pass
 
-class CriticalSchedException(SchedException):
-    """A simple exception class used for critical scheduling exceptions
-    
-    This exception must be raised when a non-recoverable error happens
-    (e.g., when there are unexplained inconsistencies in the schedule,
-    typically resulting from a code error)
-    """
+class CancelLeaseException(SchedException):
     pass
 
-class PreparationSchedException(SchedException):
+class NormalEndLeaseException(SchedException):
     pass
 
-class CancelLeaseException(Exception):
+class RescheduleLeaseException(SchedException):
     pass
 
-class NormalEndLeaseException(Exception):
+
+class SchedulingError(Exception):
+    """The base class for scheduling errors"""
     pass
 
-class RescheduleLeaseException(SchedException):
+class InconsistentScheduleError(SchedulingError):
     pass
 
+class InconsistentLeaseStateError(SchedulingError):
+    def __init__(self, lease, doing):
+        self.lease = lease
+        self.doing = doing
+        
+        self.message = "Lease %i is in an inconsistent state (%i) when %s" % (lease.id, lease.get_state(), doing)
 
+class EnactmentError(SchedulingError):
+    pass
+
+class UnrecoverableError(SchedulingError):
+    def __init__(self, exc):
+        self.exc = exc
+        self.exc_info = sys.exc_info()
+        
+    def get_traceback(self):
+        return self.exc_info[2]
+
+
 class ReservationEventHandler(object):
     """A wrapper for reservation event handlers.
     

Modified: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -22,17 +22,12 @@
 done to prepare a lease) happens in the modules inside the 
 haizea.resourcemanager.deployment package.
 
-This module provides the following classes:
-
-* SchedException: A scheduling exception
-* ReservationEventHandler: A simple wrapper class
-* Scheduler: Do I really need to spell this one out for you?
 """
 import haizea.common.constants as constants
 from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
 from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
-from haizea.resourcemanager.scheduler import SchedException, RescheduleLeaseException, NormalEndLeaseException
-from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException, ResourceReservation
+from haizea.resourcemanager.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException
+from haizea.resourcemanager.scheduler.slottable import SlotTable, ResourceReservation
 from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
 from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation, SuspensionResourceReservation, ResumptionResourceReservation, ShutdownResourceReservation
 from operator import attrgetter, itemgetter
@@ -96,7 +91,8 @@
         
         # Queue best-effort requests
         for lease in be_leases:
-            self.enqueue(lease)
+            self.__enqueue(lease)
+            lease.set_state(Lease.STATE_QUEUED)
         
         # Process immediate requests
         for lease in im_leases:
@@ -127,12 +123,27 @@
                         self.__enqueue_in_order(lease)
                         lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
                     else:
-                        raise CriticalSchedException, "Lease is an inconsistent state (tried to reschedule best-effort lease when state is %s)" % lease_state
+                        raise InconsistentLeaseStateError(l, doing = "rescheduling best-effort lease")
             except NormalEndLeaseException, msg:
                 self._handle_end_lease(lease)
+            except InconsistentLeaseStateError, exc:
+                self.fail_lease(lease, exc)
+            except EnactmentError, exc:
+                self.fail_lease(lease, exc)
+            # Everything else gets propagated upwards to ResourceManager
+            # and will make Haizea crash and burn
+                
         
         for rr in starting:
-            self.handlers[type(rr)].on_start(rr.lease, rr)
+            lease = rr.lease
+            try:
+                self.handlers[type(rr)].on_start(lease, rr)
+            except InconsistentLeaseStateError, exc:
+                self.fail_lease(lease, exc)
+            except EnactmentError, exc:
+                self.fail_lease(lease, exc)
+            # Everything else gets propagated upwards to ResourceManager
+            # and will make Haizea crash and burn
             
 
         # TODO: Should be in VMScheduler
@@ -144,14 +155,6 @@
         get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)        
         get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)        
 
-    
-    def enqueue(self, lease):
-        """Queues a best-effort lease request"""
-        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
-        lease.set_state(Lease.STATE_QUEUED)
-        self.queue.enqueue(lease)
-        self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
-
     def request_lease(self, lease):
         """
         Request a lease. At this point, it is simply marked as "Pending" and,
@@ -203,7 +206,7 @@
                 self.completedleases.add(lease)
                 self.leases.remove(lease)
             else:
-                raise CriticalSchedException, "Lease is an inconsistent state (tried to cancel lease when state is %s)" % lease_state
+                raise InconsistentLeaseStateError(l, doing = "cancelling the VM")
             
         elif self.queue.has_lease(lease_id):
             # The lease is in the queue, waiting to be scheduled.
@@ -211,21 +214,27 @@
             self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
             l = self.queue.get_lease(lease_id)
             self.queue.remove_lease(lease)
+            
     
-    def fail_lease(self, lease_id):
+    def fail_lease(self, lease, exc=None):
         """Transitions a lease to a failed state, and does any necessary cleaning up
         
-        TODO: For now, just use the cancelling algorithm
-        
         Arguments:
         lease -- Lease to fail
-        """    
-        try:
-            raise
-            self.cancel_lease(lease_id)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            raise CriticalSchedException()      
+        exc -- The exception that made the lease fail
+        """
+        treatment = get_config().get("lease-failure-handling")
+        
+        if treatment == constants.ONFAILURE_CANCEL:
+            rrs = lease.get_scheduled_reservations()
+            for r in rrs:
+                self.slottable.removeReservation(r)
+            lease.set_state(Lease.STATE_FAILED)
+            self.completedleases.add(lease)
+            self.leases.remove(lease)
+        elif treatment == constants.ONFAILURE_EXIT:
+            raise UnrecoverableError(exc)
+            
     
     def notify_event(self, lease_id, event):
         time = get_clock().get_time()
@@ -238,11 +247,8 @@
             nexttime = get_clock().get_next_schedulable_time()
             # We need to reevaluate the schedule to see if there are any future
             # reservations that we can slide back.
-            self.vm_scheduler.reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
+            self.__reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
 
-            
-        
-
     
     def __process_ar_request(self, lease, nexttime):
         self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
@@ -256,20 +262,20 @@
             self.leases.add(lease)
             get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
             accepted = True
-        except SchedException, msg:
+        except NotSchedulableException, exc:
             # Our first try avoided preemption, try again
             # without avoiding preemption.
             # TODO: Roll this into the exact slot fitting algorithm
             try:
-                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
                 self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease.id)
                 self.__schedule_ar_lease(lease, nexttime, avoidpreempt=False)
                 self.leases.add(lease)
                 get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
                 accepted = True
-            except SchedException, msg:
+            except NotSchedulableException, exc:
                 get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
-                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
 
         if accepted:
             self.logger.info("AR lease request #%i has been accepted." % lease.id)
@@ -296,7 +302,7 @@
                     self.__schedule_besteffort_lease(lease, nexttime)
                     self.leases.add(lease)
                     get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease.id)
-                except SchedException, msg:
+                except NotSchedulableException, msg:
                     # Put back on queue
                     newqueue.enqueue(lease)
                     self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
@@ -320,142 +326,129 @@
             self.leases.add(lease)
             get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
             self.logger.info("Immediate lease request #%i has been accepted." % lease.id)
-        except SchedException, msg:
+        except NotSchedulableException, msg:
             get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
             self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
     
     
     def __schedule_ar_lease(self, lease, nexttime, avoidpreempt=True):
-        try:
-            (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-            
-            if len(preemptions) > 0:
-                leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
-                self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
-                for l in leases:
-                    self.__preempt(l, preemption_time=vmrr.start)
+        (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+        
+        if len(preemptions) > 0:
+            leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
+            self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
+            for l in leases:
+                self.__preempt(l, preemption_time=vmrr.start)
 
-            # Schedule deployment overhead
-            deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+        # Schedule deployment overhead
+        deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+        
+        # Commit reservation to slot table
+        # (we don't do this until the very end because the deployment overhead
+        # scheduling could still throw an exception)
+        for rr in deploy_rrs:
+            lease.append_deployrr(rr)
             
-            # Commit reservation to slot table
-            # (we don't do this until the very end because the deployment overhead
-            # scheduling could still throw an exception)
-            for rr in deploy_rrs:
-                lease.append_deployrr(rr)
-                
-            for rr in deploy_rrs:
-                self.slottable.addReservation(rr)
-                
-            lease.append_vmrr(vmrr)
-            self.slottable.addReservation(vmrr)
+        for rr in deploy_rrs:
+            self.slottable.addReservation(rr)
             
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
-                
-            lease.set_state(Lease.STATE_SCHEDULED)
+        lease.append_vmrr(vmrr)
+        self.slottable.addReservation(vmrr)
+        
+        # Post-VM RRs (if any)
+        for rr in vmrr.post_rrs:
+            self.slottable.addReservation(rr)
+            
+        lease.set_state(Lease.STATE_SCHEDULED)
 
-            if is_ready:
-                lease.set_state(Lease.STATE_READY)
-        except SchedException, msg:
-            raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
-        except Exception, msg:
-            raise
+        if is_ready:
+            lease.set_state(Lease.STATE_READY)
 
 
     def __schedule_besteffort_lease(self, lease, nexttime):            
-        try:
-            # Schedule the VMs
-            canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
-            
-            lease_state = lease.get_state()
-            
-            # Determine earliest start time in each node
-            if lease_state == Lease.STATE_QUEUED:
-                # Figure out earliest start times based on
-                # image schedule and reusable images
-                earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
-            elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
-                # No need to transfer images from repository
-                # (only intra-node transfer)
-                earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
-            else:
-                raise CriticalSchedException, "Lease is an inconsistent state (tried to schedule best-effort lease when state is %s)" % lease_state
-            
-            (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
-            
-            # Schedule deployment
-            is_ready = False
-            deploy_rrs = []
-            if lease_state == Lease.STATE_SUSPENDED_QUEUED:
-                self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
-            else:
-                deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+        # Schedule the VMs
+        canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
+        
+        lease_state = lease.get_state()
+        
+        # Determine earliest start time in each node
+        if lease_state == Lease.STATE_QUEUED:
+            # Figure out earliest start times based on
+            # image schedule and reusable images
+            earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
+        elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
+            # No need to transfer images from repository
+            # (only intra-node transfer)
+            earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
+        else:
+            raise InconsistentLeaseStateError(l, doing = "scheduling a best-effort lease")
+        
+        (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
+        
+        # Schedule deployment
+        is_ready = False
+        deploy_rrs = []
+        if lease_state == Lease.STATE_SUSPENDED_QUEUED:
+            self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
+        else:
+            deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
 
-            # At this point, the lease is feasible.
-            # Commit changes by adding RRs to lease and to slot table
-            
-            # Add deployment RRs (if any) to lease
-            for rr in deploy_rrs:
-                lease.append_deployrr(rr)
-            
-            # Add VMRR to lease
-            lease.append_vmrr(vmrr)
-            
+        # At this point, the lease is feasible.
+        # Commit changes by adding RRs to lease and to slot table
+        
+        # Add deployment RRs (if any) to lease
+        for rr in deploy_rrs:
+            lease.append_deployrr(rr)
+        
+        # Add VMRR to lease
+        lease.append_vmrr(vmrr)
+        
 
-            # Add resource reservations to slottable
+        # Add resource reservations to slottable
+        
+        # Deployment RRs (if any)
+        for rr in deploy_rrs:
+            self.slottable.addReservation(rr)
+        
+        # Pre-VM RRs (if any)
+        for rr in vmrr.pre_rrs:
+            self.slottable.addReservation(rr)
             
-            # Deployment RRs (if any)
-            for rr in deploy_rrs:
-                self.slottable.addReservation(rr)
+        # VM
+        self.slottable.addReservation(vmrr)
+        
+        # Post-VM RRs (if any)
+        for rr in vmrr.post_rrs:
+            self.slottable.addReservation(rr)
+       
+        if in_future:
+            self.numbesteffortres += 1
             
-            # Pre-VM RRs (if any)
-            for rr in vmrr.pre_rrs:
-                self.slottable.addReservation(rr)
-                
-            # VM
-            self.slottable.addReservation(vmrr)
-            
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
-           
-            if in_future:
-                self.numbesteffortres += 1
-                
-            if lease_state == Lease.STATE_QUEUED:
-                lease.set_state(Lease.STATE_SCHEDULED)
-                if is_ready:
-                    lease.set_state(Lease.STATE_READY)
-            elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
-                lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
+        if lease_state == Lease.STATE_QUEUED:
+            lease.set_state(Lease.STATE_SCHEDULED)
+            if is_ready:
+                lease.set_state(Lease.STATE_READY)
+        elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
+            lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
 
-                
-            lease.print_contents()
+        lease.print_contents()
 
-        except SchedException, msg:
-            raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
 
-
     def __schedule_immediate_lease(self, req, nexttime):
-        try:
-            (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
-            # Schedule deployment
-            self.preparation_scheduler.schedule(req, vmrr, nexttime)
-                        
-            req.append_rr(vmrr)
-            self.slottable.addReservation(vmrr)
-            
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
+        (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+        # Schedule deployment
+        self.preparation_scheduler.schedule(req, vmrr, nexttime)
                     
-            req.print_contents()
-        except SlotFittingException, msg:
-            raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+        req.append_rr(vmrr)
+        self.slottable.addReservation(vmrr)
         
+        # Post-VM RRs (if any)
+        for rr in vmrr.post_rrs:
+            self.slottable.addReservation(rr)
+                
+        req.print_contents()
         
+        
     def __preempt(self, lease, preemption_time):
         
         self.logger.info("Preempting lease #%i..." % (lease.id))
@@ -506,6 +499,24 @@
         self.logger.vdebug("Lease after preemption:")
         lease.print_contents()
         
+    def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+        self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime)) 
+        leases = []
+        vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
+        leases = set([rr.lease for rr in vmrrs])
+        leases = [l for l in leases if isinstance(l, BestEffortLease) and l.get_state() in (Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY) and not l in checkedleases]
+        for lease in leases:
+            self.logger.debug("Found lease %i" % l.id)
+            l.print_contents()
+            # Earliest time can't be earlier than time when images will be
+            # available in node
+            earliest = max(nexttime, lease.imagesavail)
+            self.vm_scheduler.slideback(lease, earliest)
+            checkedleases.append(l)
+        #for l in leases:
+        #    vmrr, susprr = l.getLastVMRR()
+        #    self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)        
+        
     # TODO: Should be in VMScheduler
     def __get_utilization(self, time):
         total = self.slottable.get_total_capacity()
@@ -525,6 +536,12 @@
             
         return util        
 
+    def __enqueue(self, lease):
+        """Queues a best-effort lease request"""
+        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+        self.queue.enqueue(lease)
+        self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
+
     def __enqueue_in_order(self, lease):
         get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
         self.queue.enqueue_in_order(lease)

Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -28,6 +28,3 @@
         
     def cleanup(self, lease):
         abstract()
-        
-class PreparationSchedException(Exception):
-    pass
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -17,13 +17,13 @@
 # -------------------------------------------------------------------------- #
 
 import haizea.common.constants as constants
-from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler, PreparationSchedException
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler
 from haizea.resourcemanager.scheduler.slottable import ResourceReservation
 from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
 from haizea.resourcemanager.scheduler import ReservationEventHandler
 from haizea.common.utils import estimate_transfer_time, get_config
 from haizea.resourcemanager.scheduler.slottable import ResourceTuple
-from haizea.resourcemanager.scheduler import ReservationEventHandler, PreparationSchedException, CriticalSchedException
+from haizea.resourcemanager.scheduler import ReservationEventHandler
 
 
 import copy
@@ -109,7 +109,7 @@
             elif mechanism == constants.TRANSFER_MULTICAST:
                 try:
                     filetransfer = self.schedule_imagetransfer_edf(lease, musttransfer, nexttime)
-                except PreparationSchedException, msg:
+                except NotSchedulableException, exc:
                     raise
  
         # No chance of scheduling exception at this point. It's safe
@@ -288,7 +288,7 @@
             startTime = t.end
              
         if not fits:
-             raise PreparationSchedException, "Adding this lease results in an unfeasible image transfer schedule."
+             raise NotSchedulableException, "Adding this lease results in an unfeasible image transfer schedule."
 
         # Push image transfers as close as possible to their deadlines. 
         feasibleEndTime=newtransfers[-1].deadline
@@ -403,7 +403,7 @@
             rr.state = ResourceReservation.STATE_ACTIVE
             # TODO: Enactment
         else:
-            raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % Lease.state_str[lease_state]
+            raise InconsistentLeaseStateError(l, doing = "starting a file transfer")
             
         lease.print_contents()
         sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
@@ -438,7 +438,7 @@
                 # TODO: ENACTMENT: Verify the image was transferred correctly
                 sched.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
         else:
-            raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % lease_state
+            raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
 
         lease.print_contents()
         sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)

Modified: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/resourcepool.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/resourcepool.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -19,10 +19,9 @@
 from haizea.common.utils import vnodemapstr, get_accounting
 import haizea.common.constants as constants
 import haizea.resourcemanager.enact.actions as actions
+from haizea.resourcemanager.scheduler import EnactmentError
 import logging 
 
-class FailedEnactmentException(Exception):
-    pass
 
 class ResourcePool(object):
     def __init__(self, info_enact, vm_enact, deploy_enact):
@@ -50,18 +49,18 @@
 
         try:
             self.vm.start(start_action)
-        except Exception, msg:
-            self.logger.error("Enactment of start VM failed: %s" % msg)
-            raise FailedEnactmentException()
+        except EnactmentError, exc:
+            self.logger.error("Enactment of start VM failed: %s" % exc.message)
+            raise
         
     def stop_vms(self, lease, rr):
         stop_action = actions.VMEnactmentStopAction()
         stop_action.from_rr(rr)
         try:
             self.vm.stop(stop_action)
-        except Exception, msg:
-            self.logger.error("Enactment of end VM failed: %s" % msg)
-            raise FailedEnactmentException()
+        except EnactmentError, exc:
+            self.logger.error("Enactment of end VM failed: %s" % exc.message)
+            raise
          
     def suspend_vms(self, lease, rr):
         # Add memory image files
@@ -74,9 +73,9 @@
         suspend_action.from_rr(rr)
         try:
             self.vm.suspend(suspend_action)
-        except Exception, msg:
-            self.logger.error("Enactment of suspend VM failed: %s" % msg)
-            raise FailedEnactmentException()
+        except EnactmentError, exc:
+            self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
+            raise
     
     def verify_suspend(self, lease, rr):
         verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
@@ -94,9 +93,9 @@
         resume_action.from_rr(rr)
         try:
             self.vm.resume(resume_action)
-        except Exception, msg:
-            self.logger.error("Enactment of resume VM failed: %s" % msg)
-            raise FailedEnactmentException()
+        except EnactmentError, exc:
+            self.logger.error("Enactment of resume VM failed: %s" % exc.message)
+            raise
     
     def verify_resume(self, lease, rr):
         verify_resume_action = actions.VMEnactmentConfirmResumeAction()

Modified: trunk/src/haizea/resourcemanager/scheduler/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/slottable.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/slottable.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -24,13 +24,6 @@
 import copy
 import logging
 
-class SlotFittingException(Exception):
-    pass
-
-class CriticalSlotFittingException(Exception):
-    pass
-
-
 class Node(object):
     def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
         self.capacity = ResourceTuple.copy(capacity)

Modified: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py	2009-02-05 01:16:39 UTC (rev 563)
@@ -18,11 +18,11 @@
 
 import haizea.common.constants as constants
 from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_accounting, get_clock
-from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException
+from haizea.resourcemanager.scheduler.slottable import SlotTable
 from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
 from haizea.resourcemanager.scheduler.slottable import ResourceReservation, ResourceTuple
 from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, CriticalSchedException
+from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError
 from operator import attrgetter, itemgetter
 from mx.DateTime import TimeDelta
 
@@ -97,14 +97,14 @@
         fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
         if fitatstart < numnodes:
             if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
+                raise NotSchedulableException, "Not enough resources in specified interval"
             else:
                 unfeasiblewithoutpreemption = True
         feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
         fitatend = sum([n for n in canfitnopreempt.values()])
         if fitatend < numnodes:
             if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
+                raise NotSchedulableException, "Not enough resources in specified interval"
             else:
                 unfeasiblewithoutpreemption = True
 
@@ -112,11 +112,11 @@
         if canpreempt:
             fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
             if fitatstart < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
+                raise NotSchedulableException, "Not enough resources in specified interval"
             feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
             fitatend = sum([n for n in canfitpreempt.values()])
             if fitatend < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
+                raise NotSchedulableException, "Not enough resources in specified interval"
             else:
                 if unfeasiblewithoutpreemption:
                     mustpreempt = True
@@ -200,7 +200,7 @@
                     break
 
         if vnode <= numnodes:
-            raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
+            raise InconsistentScheduleError, "Availability window indicated that request is feasible, but could not fit it"
 
         # Create VM resource reservations
         vmrr = VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
@@ -312,12 +312,12 @@
             if not allow_reservation_in_future:
                 # We did not find a suitable starting time. This can happen
                 # if we're unable to make future reservations
-                raise SchedException, "Could not find enough resources for this request"
+                raise NotSchedulableException, "Could not find enough resources for this request"
         else:
             mustsuspend = (end - start) < duration
             if mustsuspend and not suspendable:
                 if not allow_reservation_in_future:
-                    raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+                    raise NotSchedulableException, "Scheduling this lease would require preempting it, which is not allowed"
                 else:
                     start = None # No satisfactory start time
             
@@ -466,8 +466,151 @@
         for susprr in vmrr.post_rrs:
             self.slottable.addReservation(susprr)
         
+    def find_preemptable_leases(self, mustpreempt, startTime, endTime):
+        def comparepreemptability(rrX, rrY):
+            if rrX.lease.submit_time > rrY.lease.submit_time:
+                return constants.BETTER
+            elif rrX.lease.submit_time < rrY.lease.submit_time:
+                return constants.WORSE
+            else:
+                return constants.EQUAL        
+            
+        def preemptedEnough(amountToPreempt):
+            for node in amountToPreempt:
+                if not amountToPreempt[node].is_zero_or_less():
+                    return False
+            return True
         
+        # Get allocations at the specified time
+        atstart = set()
+        atmiddle = set()
+        nodes = set(mustpreempt.keys())
         
+        reservationsAtStart = self.slottable.getReservationsAt(startTime)
+        reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+        reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtStart.sort(comparepreemptability)
+        reservationsAtMiddle.sort(comparepreemptability)
+        
+        amountToPreempt = {}
+        for n in mustpreempt:
+            amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+
+        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+        for r in reservationsAtStart:
+            # The following will really only come into play when we have
+            # multiple VMs per node
+            mustpreemptres = False
+            for n in r.resources_in_pnode.keys():
+                # Don't need to preempt if we've already preempted all
+                # the needed resources in node n
+                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                    amountToPreempt[n].decr(r.resources_in_pnode[n])
+                    mustpreemptres = True
+            if mustpreemptres:
+                atstart.add(r)
+            if preemptedEnough(amountToPreempt):
+                break
+        
+        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+        if len(reservationsAtMiddle)>0:
+            changepoints = set()
+            for r in reservationsAtMiddle:
+                changepoints.add(r.start)
+            changepoints = list(changepoints)
+            changepoints.sort()        
+            
+            for cp in changepoints:
+                amountToPreempt = {}
+                for n in mustpreempt:
+                    amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+                reservations = [r for r in reservationsAtMiddle 
+                                if r.start <= cp and cp < r.end]
+                for r in reservations:
+                    mustpreemptres = False
+                    for n in r.resources_in_pnode.keys():
+                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                            amountToPreempt[n].decr(r.resources_in_pnode[n])
+                            mustpreemptres = True
+                    if mustpreemptres:
+                        atmiddle.add(r)
+                    if preemptedEnough(amountToPreempt):
+                        break
+            
+        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+        
+        leases = [r.lease for r in atstart|atmiddle]
+        
+        return leases
+        
+        
+    def slideback(self, lease, earliest):
+        vmrr = lease.get_last_vmrr()
+        # Save original start and end time of the vmrr
+        old_start = vmrr.start
+        old_end = vmrr.end
+        nodes = vmrr.nodes.values()
+        if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
+            originalstart = vmrr.pre_rrs[0].start
+        else:
+            originalstart = vmrr.start
+        cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+        cp = [earliest] + cp
+        newstart = None
+        for p in cp:
+            self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+            self.slottable.availabilitywindow.printContents()
+            if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+                (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+                if end == originalstart and set(nodes) <= set(canfit.keys()):
+                    self.logger.debug("Can slide back to %s" % p)
+                    newstart = p
+                    break
+        if newstart == None:
+            # Can't slide back. Leave as is.
+            pass
+        else:
+            diff = originalstart - newstart
+            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
+                resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+                for resmrr in resmrrs:
+                    resmrr_old_start = resmrr.start
+                    resmrr_old_end = resmrr.end
+                    resmrr.start -= diff
+                    resmrr.end -= diff
+                    self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
+            vmrr.update_start(vmrr.start - diff)
+            
+            # If the lease was going to be suspended, check to see if
+            # we don't need to suspend any more.
+            remdur = lease.duration.get_remaining_duration()
+            if vmrr.is_suspending() and vmrr.end - newstart >= remdur: 
+                vmrr.update_end(vmrr.start + remdur)
+                for susprr in vmrr.post_rrs:
+                    self.slottable.removeReservation(susprr)
+                vmrr.post_rrs = []
+            else:
+                vmrr.update_end(vmrr.end - diff)
+                
+            if not vmrr.is_suspending():
+                # If the VM was set to shutdown, we need to slideback the shutdown RRs
+                for rr in vmrr.post_rrs:
+                    rr_old_start = rr.start
+                    rr_old_end = rr.end
+                    rr.start -= diff
+                    rr.end -= diff
+                    self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
+
+            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+            self.logger.vdebug("New lease descriptor (after slideback):")
+            lease.print_contents()        
+        
     def can_suspend_at(self, lease, t):
         # TODO: Make more general, should determine vmrr based on current time
         vmrr = lease.get_last_vmrr()
@@ -655,7 +798,7 @@
         rate = config.get("suspend-rate") 
 
         if suspend_by < vmrr.start or suspend_by > vmrr.end:
-            raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
+            raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
 
         times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
         suspend_rrs = []
@@ -678,7 +821,7 @@
             
         susp_start = suspend_rrs[0].start
         if susp_start < vmrr.start:
-            raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
+            raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
         
         vmrr.update_end(susp_start)
         
@@ -698,7 +841,7 @@
         rate = config.get("resume-rate") 
 
         if resume_at < vmrr.start or resume_at > vmrr.end:
-            raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+            raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
 
         times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
         resume_rrs = []
@@ -721,7 +864,7 @@
             
         resm_end = resume_rrs[-1].end
         if resm_end > vmrr.end:
-            raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+            raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
         
         vmrr.update_start(resm_end)
         for resmrr in resume_rrs:
@@ -902,171 +1045,6 @@
         nodes.sort(comparenodes)
         return nodes        
 
-    def find_preemptable_leases(self, mustpreempt, startTime, endTime):
-        def comparepreemptability(rrX, rrY):
-            if rrX.lease.submit_time > rrY.lease.submit_time:
-                return constants.BETTER
-            elif rrX.lease.submit_time < rrY.lease.submit_time:
-                return constants.WORSE
-            else:
-                return constants.EQUAL        
-            
-        def preemptedEnough(amountToPreempt):
-            for node in amountToPreempt:
-                if not amountToPreempt[node].is_zero_or_less():
-                    return False
-            return True
-        
-        # Get allocations at the specified time
-        atstart = set()
-        atmiddle = set()
-        nodes = set(mustpreempt.keys())
-        
-        reservationsAtStart = self.slottable.getReservationsAt(startTime)
-        reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
-        reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtStart.sort(comparepreemptability)
-        reservationsAtMiddle.sort(comparepreemptability)
-        
-        amountToPreempt = {}
-        for n in mustpreempt:
-            amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
-
-        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
-        for r in reservationsAtStart:
-            # The following will really only come into play when we have
-            # multiple VMs per node
-            mustpreemptres = False
-            for n in r.resources_in_pnode.keys():
-                # Don't need to preempt if we've already preempted all
-                # the needed resources in node n
-                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                    amountToPreempt[n].decr(r.resources_in_pnode[n])
-                    mustpreemptres = True
-            if mustpreemptres:
-                atstart.add(r)
-            if preemptedEnough(amountToPreempt):
-                break
-        
-        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
-        if len(reservationsAtMiddle)>0:
-            changepoints = set()
-            for r in reservationsAtMiddle:
-                changepoints.add(r.start)
-            changepoints = list(changepoints)
-            changepoints.sort()        
-            
-            for cp in changepoints:
-                amountToPreempt = {}
-                for n in mustpreempt:
-                    amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
-                reservations = [r for r in reservationsAtMiddle 
-                                if r.start <= cp and cp < r.end]
-                for r in reservations:
-                    mustpreemptres = False
-                    for n in r.resources_in_pnode.keys():
-                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                            amountToPreempt[n].decr(r.resources_in_pnode[n])
-                            mustpreemptres = True
-                    if mustpreemptres:
-                        atmiddle.add(r)
-                    if preemptedEnough(amountToPreempt):
-                        break
-            
-        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
-        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-        
-        leases = [r.lease for r in atstart|atmiddle]
-        
-        return leases
-                
-    # TODO: Should be moved to LeaseScheduler
-    def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
-        self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime)) 
-        leases = []
-        vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
-        leases = set([rr.lease for rr in vmrrs])
-        leases = [l for l in leases if isinstance(l, BestEffortLease) and l.get_state() in (Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY) and not l in checkedleases]
-        for lease in leases:
-            self.logger.debug("Found lease %i" % l.id)
-            l.print_contents()
-            # Earliest time can't be earlier than time when images will be
-            # available in node
-            earliest = max(nexttime, lease.imagesavail)
-            self.__slideback(lease, earliest)
-            checkedleases.append(l)
-        #for l in leases:
-        #    vmrr, susprr = l.getLastVMRR()
-        #    self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
-          
-    def __slideback(self, lease, earliest):
-        vmrr = lease.get_last_vmrr()
-        # Save original start and end time of the vmrr
-        old_start = vmrr.start
-        old_end = vmrr.end
-        nodes = vmrr.nodes.values()
-        if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
-            originalstart = vmrr.pre_rrs[0].start
-        else:
-            originalstart = vmrr.start
-        cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
-        cp = [earliest] + cp
-        newstart = None
-        for p in cp:
-            self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
-            self.slottable.availabilitywindow.printContents()
-            if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
-                (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
-                if end == originalstart and set(nodes) <= set(canfit.keys()):
-                    self.logger.debug("Can slide back to %s" % p)
-                    newstart = p
-                    break
-        if newstart == None:
-            # Can't slide back. Leave as is.
-            pass
-        else:
-            diff = originalstart - newstart
-            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
-                resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
-                for resmrr in resmrrs:
-                    resmrr_old_start = resmrr.start
-                    resmrr_old_end = resmrr.end
-                    resmrr.start -= diff
-                    resmrr.end -= diff
-                    self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
-            vmrr.update_start(vmrr.start - diff)
-            
-            # If the lease was going to be suspended, check to see if
-            # we don't need to suspend any more.
-            remdur = lease.duration.get_remaining_duration()
-            if vmrr.is_suspending() and vmrr.end - newstart >= remdur: 
-                vmrr.update_end(vmrr.start + remdur)
-                for susprr in vmrr.post_rrs:
-                    self.slottable.removeReservation(susprr)
-                vmrr.post_rrs = []
-            else:
-                vmrr.update_end(vmrr.end - diff)
-                
-            if not vmrr.is_suspending():
-                # If the VM was set to shutdown, we need to slideback the shutdown RRs
-                for rr in vmrr.post_rrs:
-                    rr_old_start = rr.start
-                    rr_old_end = rr.end
-                    rr.start -= diff
-                    rr.end -= diff
-                    self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
-
-            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
-            self.logger.vdebug("New lease descriptor (after slideback):")
-            lease.print_contents()
-    
-          
-
     #-------------------------------------------------------------------#
     #                                                                   #
     #                  SLOT TABLE EVENT HANDLERS                        #
@@ -1089,16 +1067,21 @@
                 # appropriate inside the resourcepool module
                 for (vnode, pnode) in rr.nodes.items():
                     l.diskimagemap[vnode] = pnode
-            except Exception, e:
-                self.logger.error("ERROR when starting VMs.")
+            except EnactmentError, exc:
+                self.logger.error("Enactment error when starting VMs.")
+                # Right now, this is a non-recoverable error, so we just
+                # propagate it upwards to the lease scheduler
+                # In the future, it may be possible to react to these
+                # kind of errors.
                 raise
+                
         elif lease_state == Lease.STATE_RESUMED_READY:
             l.set_state(Lease.STATE_ACTIVE)
             rr.state = ResourceReservation.STATE_ACTIVE
             # No enactment to do here, since all the suspend/resume actions are
             # handled during the suspend/resume RRs
         else:
-            raise CriticalSchedException, "Lease is an inconsistent state (tried to start VM when state is %s)" % Lease.state_str[lease_state]
+            raise InconsistentLeaseStateError(l, doing = "starting a VM")
         
         l.print_contents()
         self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
@@ -1136,7 +1119,16 @@
         self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
         l.print_contents()
         rr.state = ResourceReservation.STATE_ACTIVE
-        self.resourcepool.stop_vms(l, rr)
+        try:
+            self.resourcepool.stop_vms(l, rr)
+        except EnactmentError, exc:
+            self.logger.error("Enactment error when shutting down VMs.")
+            # Right now, this is a non-recoverable error, so we just
+            # propagate it upwards to the lease scheduler
+            # In the future, it may be possible to react to these
+            # kind of errors.
+            raise
+        
         l.print_contents()
         self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
 
@@ -1155,7 +1147,17 @@
         self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
         l.print_contents()
         rr.state = ResourceReservation.STATE_ACTIVE
-        self.resourcepool.suspend_vms(l, rr)
+        
+        try:
+            self.resourcepool.suspend_vms(l, rr)
+        except EnactmentError, exc:
+            self.logger.error("Enactment error when suspending VMs.")
+            # Right now, this is a non-recoverable error, so we just
+            # propagate it upwards to the lease scheduler
+            # In the future, it may be possible to react to these
+            # kind of errors.
+            raise            
+        
         for vnode in rr.vnodes:
             pnode = rr.vmrr.nodes[vnode]
             l.memimagemap[vnode] = pnode
@@ -1183,7 +1185,17 @@
     def _handle_start_resume(self, l, rr):
         self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
         l.print_contents()
-        self.resourcepool.resume_vms(l, rr)
+        
+        try:
+            self.resourcepool.resume_vms(l, rr)
+        except EnactmentError, exc:
+            self.logger.error("Enactment error when resuming VMs.")
+            # Right now, this is a non-recoverable error, so we just
+            # propagate it upwards to the lease scheduler
+            # In the future, it may be possible to react to these
+            # kind of errors.
+            raise
+                    
         rr.state = ResourceReservation.STATE_ACTIVE
         if rr.is_first():
             l.set_state(Lease.STATE_RESUMING)

Modified: trunk/tests/base_config_simulator.conf
===================================================================
--- trunk/tests/base_config_simulator.conf	2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/tests/base_config_simulator.conf	2009-02-05 01:16:39 UTC (rev 563)
@@ -3,6 +3,7 @@
 mode: simulated
 lease-preparation: unmanaged
 datafile: /var/tmp/haizea/results.dat
+lease-failure-handling: exit
 
 [simulation]
 clock: simulated



More information about the Haizea-commit mailing list