[haizea-commit] r563 - in trunk: src/haizea/common src/haizea/resourcemanager src/haizea/resourcemanager/enact src/haizea/resourcemanager/scheduler src/haizea/resourcemanager/scheduler/preparation_schedulers tests
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Wed Feb 4 19:16:47 CST 2009
Author: borja
Date: 2009-02-04 19:16:39 -0600 (Wed, 04 Feb 2009)
New Revision: 563
Modified:
trunk/src/haizea/common/constants.py
trunk/src/haizea/resourcemanager/configfile.py
trunk/src/haizea/resourcemanager/enact/__init__.py
trunk/src/haizea/resourcemanager/enact/opennebula.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler/__init__.py
trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
trunk/src/haizea/resourcemanager/scheduler/slottable.py
trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
trunk/tests/base_config_simulator.conf
Log:
Sanitized and improved exception handling
Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/common/constants.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -111,4 +111,8 @@
LOGLEVEL_VDEBUG = 5
LOGLEVEL_STATUS = 25
-NO_MEMORY_OVERRIDE = -1
\ No newline at end of file
+NO_MEMORY_OVERRIDE = -1
+
+ONFAILURE_CANCEL = "cancel"
+ONFAILURE_EXIT = "exit"
+ONFAILURE_EXIT_RAISE = "exit-raise"
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/configfile.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -101,6 +101,26 @@
from a repository node before the lease can start.
"""),
+ Option(name = "lease-failure-handling",
+ getter = "lease-failure-handling",
+ type = OPTTYPE_STRING,
+ required = False,
+ default = constants.ONFAILURE_CANCEL,
+ valid = [constants.ONFAILURE_CANCEL,
+ constants.ONFAILURE_EXIT,
+ constants.ONFAILURE_EXIT_RAISE],
+ doc = """
+ Sets how the scheduler will handle a failure in
+ a lease. Valid values are:
+
+ - cancel: The lease is cancelled and marked as "FAILED"
+ - exit: Haizea will exit cleanly, printing relevant debugging
+ information to its log.
+ - exit-raise: Haizea will exit by raising an exception. This is
+ useful for debugging, as IDEs will recognize this as an exception
+ and will facilitate debugging it.
+ """),
+
Option(name = "datafile",
getter = "datafile",
type = OPTTYPE_STRING,
Modified: trunk/src/haizea/resourcemanager/enact/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/__init__.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/enact/__init__.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -49,4 +49,3 @@
class DeploymentEnactment(object):
def __init__(self):
pass
-
Modified: trunk/src/haizea/resourcemanager/enact/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/enact/opennebula.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -16,6 +16,7 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+from haizea.resourcemanager.scheduler import EnactmentError
from haizea.resourcemanager.scheduler.resourcepool import Node
from haizea.resourcemanager.scheduler.slottable import ResourceTuple
from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
@@ -26,6 +27,14 @@
import commands
from time import sleep
+class OpenNebulaEnactmentError(EnactmentError):
+ def __init__(self, cmd, status, output):
+ self.cmd = cmd
+ self.status = status
+ self.output = output
+
+ self.message = "Error when running '%s' (status=%i, output='%s')" % (cmd, status, output)
+
class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
ONEATTR2HAIZEA = { "TOTALCPU": constants.RES_CPU,
"TOTALMEMORY": constants.RES_MEM }
@@ -114,7 +123,7 @@
if status == 0:
self.logger.debug("Command returned succesfully.")
else:
- raise Exception, "Error when running onevm deploy (status=%i, output='%s')" % (status, output)
+ raise OpenNebulaEnactmentError("onevm deploy", status, output)
def stop(self, action):
for vnode in action.vnodes:
@@ -125,7 +134,8 @@
if status == 0:
self.logger.debug("Command returned succesfully.")
else:
- raise Exception, "Error when running onevm shutdown (status=%i, output='%s')" % (status, output)
+ raise OpenNebulaEnactmentError("onevm shutdown", status, output)
+
# TODO: We should spawn out a thread to do this, so Haizea isn't
# blocking until all these commands end
interval = get_config().get("enactment-overhead").seconds
@@ -140,7 +150,8 @@
if status == 0:
self.logger.debug("Command returned succesfully.")
else:
- raise Exception, "Error when running onevm suspend (status=%i, output='%s')" % (status, output)
+ raise OpenNebulaEnactmentError("onevm suspend", status, output)
+
# Space out commands to avoid OpenNebula from getting saturated
# TODO: We should spawn out a thread to do this
# TODO: We should spawn out a thread to do this, so Haizea isn't
@@ -157,7 +168,8 @@
if status == 0:
self.logger.debug("Command returned succesfully.")
else:
- raise Exception, "Error when running onevm resume (status=%i, output='%s')" % (status, output)
+ raise OpenNebulaEnactmentError("onevm resume", status, output)
+
# Space out commands to avoid OpenNebula from getting saturated
# TODO: We should spawn out a thread to do this, so Haizea isn't
# blocking until all these commands end
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/rm.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -42,6 +42,7 @@
from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
from haizea.resourcemanager.frontends.rpc import RPCFrontend
from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler import UnrecoverableError
from haizea.resourcemanager.scheduler.lease_scheduler import LeaseScheduler
from haizea.resourcemanager.scheduler.vm_scheduler import VMScheduler
from haizea.resourcemanager.scheduler.slottable import SlotTable
@@ -53,6 +54,7 @@
import logging
import signal
import sys, os
+import traceback
from time import sleep
from math import ceil
from mx.DateTime import now, TimeDelta
@@ -352,25 +354,48 @@
# Run the scheduling function.
try:
self.scheduler.schedule(nexttime)
- except Exception, msg:
- # Exit if something goes horribly wrong
- self.logger.error("Exception in scheduling function. Dumping state..." )
- self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- raise
+ except UnrecoverableError, exc:
+ self.logger.error("Unrecoverable error has happened in scheduling function.")
+ self.logger.error("Original exception:")
+ self.print_exception(exc.exc, exc.get_traceback())
+ self.logger.error("Unrecoverable error traceback:")
+ self.print_exception(exc, sys.exc_traceback)
+ self.panic()
+ except Exception, exc:
+ self.logger.error("Unexpected exception has happened in scheduling function.")
+ self.print_exception(exc, sys.exc_traceback)
+ self.panic()
+
def process_reservations(self, time):
"""Process reservations starting/stopping at specified time"""
# The scheduler takes care of this.
try:
self.scheduler.process_reservations(time)
- except Exception, msg:
- # Exit if something goes horribly wrong
- self.logger.error("Exception when processing reservations. Dumping state..." )
- self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- raise
-
+ except UnrecoverableError, exc:
+ self.logger.error("Unrecoverable error has happened when processing reservations.")
+ self.logger.error("Original exception:")
+ self.print_exception(exc.exc, exc.get_traceback())
+ self.logger.error("Unrecoverable error traceback:")
+ self.print_exception(exc, sys.exc_traceback)
+ self.panic()
+ except Exception, exc:
+ self.logger.error("Unexpected exception has happened when processing reservations.")
+ self.print_exception(exc, sys.exc_traceback)
+ self.panic()
+
+ def print_exception(self, exc, exc_traceback):
+ tb = traceback.format_tb(exc_traceback)
+ for line in tb:
+ self.logger.error(line)
+ self.logger.error("Message: %s" % exc)
+
+ def panic(self):
+ #self.print_stats(logging.getLevelName("ERROR"), verbose=True)
+ exit(1)
+
def print_stats(self, loglevel, verbose=False):
"""Print some basic statistics in the log
Modified: trunk/src/haizea/resourcemanager/scheduler/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/__init__.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/__init__.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -16,42 +16,55 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+import sys
class SchedException(Exception):
- """A simple exception class used for scheduling exceptions"""
+ """The base class for scheduling exceptions"""
pass
class NotSchedulableException(SchedException):
"""A simple exception class used when a lease cannot be scheduled
This exception must be raised when a lease cannot be scheduled
- (this is not necessarily an error condition, but the scheduler will
- have to react to it)
"""
pass
-class CriticalSchedException(SchedException):
- """A simple exception class used for critical scheduling exceptions
-
- This exception must be raised when a non-recoverable error happens
- (e.g., when there are unexplained inconsistencies in the schedule,
- typically resulting from a code error)
- """
+class CancelLeaseException(SchedException):
pass
-class PreparationSchedException(SchedException):
+class NormalEndLeaseException(SchedException):
pass
-class CancelLeaseException(Exception):
+class RescheduleLeaseException(SchedException):
pass
-class NormalEndLeaseException(Exception):
+
+class SchedulingError(Exception):
+ """The base class for scheduling errors"""
pass
-class RescheduleLeaseException(SchedException):
+class InconsistentScheduleError(SchedulingError):
pass
+class InconsistentLeaseStateError(SchedulingError):
+ def __init__(self, lease, doing):
+ self.lease = lease
+ self.doing = doing
+
+ self.message = "Lease %i is in an inconsistent state (%i) when %s" % (lease.id, lease.get_state(), doing)
+class EnactmentError(SchedulingError):
+ pass
+
+class UnrecoverableError(SchedulingError):
+ def __init__(self, exc):
+ self.exc = exc
+ self.exc_info = sys.exc_info()
+
+ def get_traceback(self):
+ return self.exc_info[2]
+
+
class ReservationEventHandler(object):
"""A wrapper for reservation event handlers.
Modified: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -22,17 +22,12 @@
done to prepare a lease) happens in the modules inside the
haizea.resourcemanager.deployment package.
-This module provides the following classes:
-
-* SchedException: A scheduling exception
-* ReservationEventHandler: A simple wrapper class
-* Scheduler: Do I really need to spell this one out for you?
"""
import haizea.common.constants as constants
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
-from haizea.resourcemanager.scheduler import SchedException, RescheduleLeaseException, NormalEndLeaseException
-from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException, ResourceReservation
+from haizea.resourcemanager.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException
+from haizea.resourcemanager.scheduler.slottable import SlotTable, ResourceReservation
from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation, SuspensionResourceReservation, ResumptionResourceReservation, ShutdownResourceReservation
from operator import attrgetter, itemgetter
@@ -96,7 +91,8 @@
# Queue best-effort requests
for lease in be_leases:
- self.enqueue(lease)
+ self.__enqueue(lease)
+ lease.set_state(Lease.STATE_QUEUED)
# Process immediate requests
for lease in im_leases:
@@ -127,12 +123,27 @@
self.__enqueue_in_order(lease)
lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to reschedule best-effort lease when state is %s)" % lease_state
+ raise InconsistentLeaseStateError(l, doing = "rescheduling best-effort lease")
except NormalEndLeaseException, msg:
self._handle_end_lease(lease)
+ except InconsistentLeaseStateError, exc:
+ self.fail_lease(lease, exc)
+ except EnactmentError, exc:
+ self.fail_lease(lease, exc)
+ # Everything else gets propagated upwards to ResourceManager
+ # and will make Haizea crash and burn
+
for rr in starting:
- self.handlers[type(rr)].on_start(rr.lease, rr)
+ lease = rr.lease
+ try:
+ self.handlers[type(rr)].on_start(lease, rr)
+ except InconsistentLeaseStateError, exc:
+ self.fail_lease(lease, exc)
+ except EnactmentError, exc:
+ self.fail_lease(lease, exc)
+ # Everything else gets propagated upwards to ResourceManager
+ # and will make Haizea crash and burn
# TODO: Should be in VMScheduler
@@ -144,14 +155,6 @@
get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)
get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
-
- def enqueue(self, lease):
- """Queues a best-effort lease request"""
- get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
- lease.set_state(Lease.STATE_QUEUED)
- self.queue.enqueue(lease)
- self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
-
def request_lease(self, lease):
"""
Request a lease. At this point, it is simply marked as "Pending" and,
@@ -203,7 +206,7 @@
self.completedleases.add(lease)
self.leases.remove(lease)
else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to cancel lease when state is %s)" % lease_state
+ raise InconsistentLeaseStateError(l, doing = "cancelling the VM")
elif self.queue.has_lease(lease_id):
# The lease is in the queue, waiting to be scheduled.
@@ -211,21 +214,27 @@
self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
l = self.queue.get_lease(lease_id)
self.queue.remove_lease(lease)
+
- def fail_lease(self, lease_id):
+ def fail_lease(self, lease, exc=None):
"""Transitions a lease to a failed state, and does any necessary cleaning up
- TODO: For now, just use the cancelling algorithm
-
Arguments:
lease -- Lease to fail
- """
- try:
- raise
- self.cancel_lease(lease_id)
- except Exception, msg:
- # Exit if something goes horribly wrong
- raise CriticalSchedException()
+ exc -- The exception that made the lease fail
+ """
+ treatment = get_config().get("lease-failure-handling")
+
+ if treatment == constants.ONFAILURE_CANCEL:
+ rrs = lease.get_scheduled_reservations()
+ for r in rrs:
+ self.slottable.removeReservation(r)
+ lease.set_state(Lease.STATE_FAILED)
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
+ elif treatment == constants.ONFAILURE_EXIT:
+ raise UnrecoverableError(exc)
+
def notify_event(self, lease_id, event):
time = get_clock().get_time()
@@ -238,11 +247,8 @@
nexttime = get_clock().get_next_schedulable_time()
# We need to reevaluate the schedule to see if there are any future
# reservations that we can slide back.
- self.vm_scheduler.reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
+ self.__reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
-
-
-
def __process_ar_request(self, lease, nexttime):
self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
@@ -256,20 +262,20 @@
self.leases.add(lease)
get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
accepted = True
- except SchedException, msg:
+ except NotSchedulableException, exc:
# Our first try avoided preemption, try again
# without avoiding preemption.
# TODO: Roll this into the exact slot fitting algorithm
try:
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease.id)
self.__schedule_ar_lease(lease, nexttime, avoidpreempt=False)
self.leases.add(lease)
get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
accepted = True
- except SchedException, msg:
+ except NotSchedulableException, exc:
get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
if accepted:
self.logger.info("AR lease request #%i has been accepted." % lease.id)
@@ -296,7 +302,7 @@
self.__schedule_besteffort_lease(lease, nexttime)
self.leases.add(lease)
get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease.id)
- except SchedException, msg:
+ except NotSchedulableException, msg:
# Put back on queue
newqueue.enqueue(lease)
self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
@@ -320,142 +326,129 @@
self.leases.add(lease)
get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
self.logger.info("Immediate lease request #%i has been accepted." % lease.id)
- except SchedException, msg:
+ except NotSchedulableException, msg:
get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
def __schedule_ar_lease(self, lease, nexttime, avoidpreempt=True):
- try:
- (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-
- if len(preemptions) > 0:
- leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
- self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
- for l in leases:
- self.__preempt(l, preemption_time=vmrr.start)
+ (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+
+ if len(preemptions) > 0:
+ leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
+ self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
+ for l in leases:
+ self.__preempt(l, preemption_time=vmrr.start)
- # Schedule deployment overhead
- deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+ # Schedule deployment overhead
+ deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+
+ # Commit reservation to slot table
+ # (we don't do this until the very end because the deployment overhead
+ # scheduling could still throw an exception)
+ for rr in deploy_rrs:
+ lease.append_deployrr(rr)
- # Commit reservation to slot table
- # (we don't do this until the very end because the deployment overhead
- # scheduling could still throw an exception)
- for rr in deploy_rrs:
- lease.append_deployrr(rr)
-
- for rr in deploy_rrs:
- self.slottable.addReservation(rr)
-
- lease.append_vmrr(vmrr)
- self.slottable.addReservation(vmrr)
+ for rr in deploy_rrs:
+ self.slottable.addReservation(rr)
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- lease.set_state(Lease.STATE_SCHEDULED)
+ lease.append_vmrr(vmrr)
+ self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+
+ lease.set_state(Lease.STATE_SCHEDULED)
- if is_ready:
- lease.set_state(Lease.STATE_READY)
- except SchedException, msg:
- raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
- except Exception, msg:
- raise
+ if is_ready:
+ lease.set_state(Lease.STATE_READY)
def __schedule_besteffort_lease(self, lease, nexttime):
- try:
- # Schedule the VMs
- canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
-
- lease_state = lease.get_state()
-
- # Determine earliest start time in each node
- if lease_state == Lease.STATE_QUEUED:
- # Figure out earliest start times based on
- # image schedule and reusable images
- earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
- elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
- # No need to transfer images from repository
- # (only intra-node transfer)
- earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
- else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to schedule best-effort lease when state is %s)" % lease_state
-
- (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
-
- # Schedule deployment
- is_ready = False
- deploy_rrs = []
- if lease_state == Lease.STATE_SUSPENDED_QUEUED:
- self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
- else:
- deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+ # Schedule the VMs
+ canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
+
+ lease_state = lease.get_state()
+
+ # Determine earliest start time in each node
+ if lease_state == Lease.STATE_QUEUED:
+ # Figure out earliest start times based on
+ # image schedule and reusable images
+ earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
+ elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ # No need to transfer images from repository
+ # (only intra-node transfer)
+ earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
+ else:
+ raise InconsistentLeaseStateError(l, doing = "scheduling a best-effort lease")
+
+ (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
+
+ # Schedule deployment
+ is_ready = False
+ deploy_rrs = []
+ if lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
+ else:
+ deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
- # At this point, the lease is feasible.
- # Commit changes by adding RRs to lease and to slot table
-
- # Add deployment RRs (if any) to lease
- for rr in deploy_rrs:
- lease.append_deployrr(rr)
-
- # Add VMRR to lease
- lease.append_vmrr(vmrr)
-
+ # At this point, the lease is feasible.
+ # Commit changes by adding RRs to lease and to slot table
+
+ # Add deployment RRs (if any) to lease
+ for rr in deploy_rrs:
+ lease.append_deployrr(rr)
+
+ # Add VMRR to lease
+ lease.append_vmrr(vmrr)
+
- # Add resource reservations to slottable
+ # Add resource reservations to slottable
+
+ # Deployment RRs (if any)
+ for rr in deploy_rrs:
+ self.slottable.addReservation(rr)
+
+ # Pre-VM RRs (if any)
+ for rr in vmrr.pre_rrs:
+ self.slottable.addReservation(rr)
- # Deployment RRs (if any)
- for rr in deploy_rrs:
- self.slottable.addReservation(rr)
+ # VM
+ self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+
+ if in_future:
+ self.numbesteffortres += 1
- # Pre-VM RRs (if any)
- for rr in vmrr.pre_rrs:
- self.slottable.addReservation(rr)
-
- # VM
- self.slottable.addReservation(vmrr)
-
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- if in_future:
- self.numbesteffortres += 1
-
- if lease_state == Lease.STATE_QUEUED:
- lease.set_state(Lease.STATE_SCHEDULED)
- if is_ready:
- lease.set_state(Lease.STATE_READY)
- elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
- lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
+ if lease_state == Lease.STATE_QUEUED:
+ lease.set_state(Lease.STATE_SCHEDULED)
+ if is_ready:
+ lease.set_state(Lease.STATE_READY)
+ elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
-
- lease.print_contents()
+ lease.print_contents()
- except SchedException, msg:
- raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
-
def __schedule_immediate_lease(self, req, nexttime):
- try:
- (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
- # Schedule deployment
- self.preparation_scheduler.schedule(req, vmrr, nexttime)
-
- req.append_rr(vmrr)
- self.slottable.addReservation(vmrr)
-
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
+ (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+ # Schedule deployment
+ self.preparation_scheduler.schedule(req, vmrr, nexttime)
- req.print_contents()
- except SlotFittingException, msg:
- raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+ req.append_rr(vmrr)
+ self.slottable.addReservation(vmrr)
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+
+ req.print_contents()
+
def __preempt(self, lease, preemption_time):
self.logger.info("Preempting lease #%i..." % (lease.id))
@@ -506,6 +499,24 @@
self.logger.vdebug("Lease after preemption:")
lease.print_contents()
+ def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+ self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
+ leases = []
+ vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
+ leases = set([rr.lease for rr in vmrrs])
+ leases = [l for l in leases if isinstance(l, BestEffortLease) and l.get_state() in (Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY) and not l in checkedleases]
+ for lease in leases:
+ self.logger.debug("Found lease %i" % l.id)
+ l.print_contents()
+ # Earliest time can't be earlier than time when images will be
+ # available in node
+ earliest = max(nexttime, lease.imagesavail)
+ self.vm_scheduler.slideback(lease, earliest)
+ checkedleases.append(l)
+ #for l in leases:
+ # vmrr, susprr = l.getLastVMRR()
+ # self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
+
# TODO: Should be in VMScheduler
def __get_utilization(self, time):
total = self.slottable.get_total_capacity()
@@ -525,6 +536,12 @@
return util
+ def __enqueue(self, lease):
+ """Queues a best-effort lease request"""
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ self.queue.enqueue(lease)
+ self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
+
def __enqueue_in_order(self, lease):
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.queue.enqueue_in_order(lease)
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -28,6 +28,3 @@
def cleanup(self, lease):
abstract()
-
-class PreparationSchedException(Exception):
- pass
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -17,13 +17,13 @@
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
-from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler, PreparationSchedException
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler
from haizea.resourcemanager.scheduler.slottable import ResourceReservation
from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
from haizea.resourcemanager.scheduler import ReservationEventHandler
from haizea.common.utils import estimate_transfer_time, get_config
from haizea.resourcemanager.scheduler.slottable import ResourceTuple
-from haizea.resourcemanager.scheduler import ReservationEventHandler, PreparationSchedException, CriticalSchedException
+from haizea.resourcemanager.scheduler import ReservationEventHandler
import copy
@@ -109,7 +109,7 @@
elif mechanism == constants.TRANSFER_MULTICAST:
try:
filetransfer = self.schedule_imagetransfer_edf(lease, musttransfer, nexttime)
- except PreparationSchedException, msg:
+ except NotSchedulableException, exc:
raise
# No chance of scheduling exception at this point. It's safe
@@ -288,7 +288,7 @@
startTime = t.end
if not fits:
- raise PreparationSchedException, "Adding this lease results in an unfeasible image transfer schedule."
+ raise NotSchedulableException, "Adding this lease results in an unfeasible image transfer schedule."
# Push image transfers as close as possible to their deadlines.
feasibleEndTime=newtransfers[-1].deadline
@@ -403,7 +403,7 @@
rr.state = ResourceReservation.STATE_ACTIVE
# TODO: Enactment
else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % Lease.state_str[lease_state]
+ raise InconsistentLeaseStateError(l, doing = "starting a file transfer")
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
@@ -438,7 +438,7 @@
# TODO: ENACTMENT: Verify the image was transferred correctly
sched.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % lease_state
+ raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
Modified: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/resourcepool.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/resourcepool.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -19,10 +19,9 @@
from haizea.common.utils import vnodemapstr, get_accounting
import haizea.common.constants as constants
import haizea.resourcemanager.enact.actions as actions
+from haizea.resourcemanager.scheduler import EnactmentError
import logging
-class FailedEnactmentException(Exception):
- pass
class ResourcePool(object):
def __init__(self, info_enact, vm_enact, deploy_enact):
@@ -50,18 +49,18 @@
try:
self.vm.start(start_action)
- except Exception, msg:
- self.logger.error("Enactment of start VM failed: %s" % msg)
- raise FailedEnactmentException()
+ except EnactmentError, exc:
+ self.logger.error("Enactment of start VM failed: %s" % exc.message)
+ raise
def stop_vms(self, lease, rr):
stop_action = actions.VMEnactmentStopAction()
stop_action.from_rr(rr)
try:
self.vm.stop(stop_action)
- except Exception, msg:
- self.logger.error("Enactment of end VM failed: %s" % msg)
- raise FailedEnactmentException()
+ except EnactmentError, exc:
+ self.logger.error("Enactment of end VM failed: %s" % exc.message)
+ raise
def suspend_vms(self, lease, rr):
# Add memory image files
@@ -74,9 +73,9 @@
suspend_action.from_rr(rr)
try:
self.vm.suspend(suspend_action)
- except Exception, msg:
- self.logger.error("Enactment of suspend VM failed: %s" % msg)
- raise FailedEnactmentException()
+ except EnactmentError, exc:
+ self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
+ raise
def verify_suspend(self, lease, rr):
verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
@@ -94,9 +93,9 @@
resume_action.from_rr(rr)
try:
self.vm.resume(resume_action)
- except Exception, msg:
- self.logger.error("Enactment of resume VM failed: %s" % msg)
- raise FailedEnactmentException()
+ except EnactmentError, exc:
+ self.logger.error("Enactment of resume VM failed: %s" % exc.message)
+ raise
def verify_resume(self, lease, rr):
verify_resume_action = actions.VMEnactmentConfirmResumeAction()
Modified: trunk/src/haizea/resourcemanager/scheduler/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/slottable.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/slottable.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -24,13 +24,6 @@
import copy
import logging
-class SlotFittingException(Exception):
- pass
-
-class CriticalSlotFittingException(Exception):
- pass
-
-
class Node(object):
def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
self.capacity = ResourceTuple.copy(capacity)
Modified: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-02-05 01:16:39 UTC (rev 563)
@@ -18,11 +18,11 @@
import haizea.common.constants as constants
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_accounting, get_clock
-from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException
+from haizea.resourcemanager.scheduler.slottable import SlotTable
from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
from haizea.resourcemanager.scheduler.slottable import ResourceReservation, ResourceTuple
from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, CriticalSchedException
+from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError
from operator import attrgetter, itemgetter
from mx.DateTime import TimeDelta
@@ -97,14 +97,14 @@
fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
if fitatstart < numnodes:
if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
+ raise NotSchedulableException, "Not enough resources in specified interval"
else:
unfeasiblewithoutpreemption = True
feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
fitatend = sum([n for n in canfitnopreempt.values()])
if fitatend < numnodes:
if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
+ raise NotSchedulableException, "Not enough resources in specified interval"
else:
unfeasiblewithoutpreemption = True
@@ -112,11 +112,11 @@
if canpreempt:
fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
if fitatstart < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
+ raise NotSchedulableException, "Not enough resources in specified interval"
feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
fitatend = sum([n for n in canfitpreempt.values()])
if fitatend < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
+ raise NotSchedulableException, "Not enough resources in specified interval"
else:
if unfeasiblewithoutpreemption:
mustpreempt = True
@@ -200,7 +200,7 @@
break
if vnode <= numnodes:
- raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
+ raise InconsistentScheduleError, "Availability window indicated that request is feasible, but could not fit it"
# Create VM resource reservations
vmrr = VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
@@ -312,12 +312,12 @@
if not allow_reservation_in_future:
# We did not find a suitable starting time. This can happen
# if we're unable to make future reservations
- raise SchedException, "Could not find enough resources for this request"
+ raise NotSchedulableException, "Could not find enough resources for this request"
else:
mustsuspend = (end - start) < duration
if mustsuspend and not suspendable:
if not allow_reservation_in_future:
- raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+ raise NotSchedulableException, "Scheduling this lease would require preempting it, which is not allowed"
else:
start = None # No satisfactory start time
@@ -466,8 +466,151 @@
for susprr in vmrr.post_rrs:
self.slottable.addReservation(susprr)
+ def find_preemptable_leases(self, mustpreempt, startTime, endTime):
+ def comparepreemptability(rrX, rrY):
+ if rrX.lease.submit_time > rrY.lease.submit_time:
+ return constants.BETTER
+ elif rrX.lease.submit_time < rrY.lease.submit_time:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+
+ def preemptedEnough(amountToPreempt):
+ for node in amountToPreempt:
+ if not amountToPreempt[node].is_zero_or_less():
+ return False
+ return True
+ # Get allocations at the specified time
+ atstart = set()
+ atmiddle = set()
+ nodes = set(mustpreempt.keys())
+ reservationsAtStart = self.slottable.getReservationsAt(startTime)
+ reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtStart.sort(comparepreemptability)
+ reservationsAtMiddle.sort(comparepreemptability)
+
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+
+ # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+ for r in reservationsAtStart:
+ # The following will really only come into play when we have
+ # multiple VMs per node
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ # Don't need to preempt if we've already preempted all
+ # the needed resources in node n
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atstart.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+ if len(reservationsAtMiddle)>0:
+ changepoints = set()
+ for r in reservationsAtMiddle:
+ changepoints.add(r.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+
+ for cp in changepoints:
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+ reservations = [r for r in reservationsAtMiddle
+ if r.start <= cp and cp < r.end]
+ for r in reservations:
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atmiddle.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+ self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+
+ leases = [r.lease for r in atstart|atmiddle]
+
+ return leases
+
+
+ def slideback(self, lease, earliest):
+ vmrr = lease.get_last_vmrr()
+ # Save original start and end time of the vmrr
+ old_start = vmrr.start
+ old_end = vmrr.end
+ nodes = vmrr.nodes.values()
+ if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
+ originalstart = vmrr.pre_rrs[0].start
+ else:
+ originalstart = vmrr.start
+ cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+ cp = [earliest] + cp
+ newstart = None
+ for p in cp:
+ self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+ self.slottable.availabilitywindow.printContents()
+ if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+ (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+ if end == originalstart and set(nodes) <= set(canfit.keys()):
+ self.logger.debug("Can slide back to %s" % p)
+ newstart = p
+ break
+ if newstart == None:
+ # Can't slide back. Leave as is.
+ pass
+ else:
+ diff = originalstart - newstart
+ if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
+ resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+ for resmrr in resmrrs:
+ resmrr_old_start = resmrr.start
+ resmrr_old_end = resmrr.end
+ resmrr.start -= diff
+ resmrr.end -= diff
+ self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
+ vmrr.update_start(vmrr.start - diff)
+
+ # If the lease was going to be suspended, check to see if
+ # we don't need to suspend any more.
+ remdur = lease.duration.get_remaining_duration()
+ if vmrr.is_suspending() and vmrr.end - newstart >= remdur:
+ vmrr.update_end(vmrr.start + remdur)
+ for susprr in vmrr.post_rrs:
+ self.slottable.removeReservation(susprr)
+ vmrr.post_rrs = []
+ else:
+ vmrr.update_end(vmrr.end - diff)
+
+ if not vmrr.is_suspending():
+ # If the VM was set to shutdown, we need to slideback the shutdown RRs
+ for rr in vmrr.post_rrs:
+ rr_old_start = rr.start
+ rr_old_end = rr.end
+ rr.start -= diff
+ rr.end -= diff
+ self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
+
+ self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+ self.logger.vdebug("New lease descriptor (after slideback):")
+ lease.print_contents()
+
def can_suspend_at(self, lease, t):
# TODO: Make more general, should determine vmrr based on current time
vmrr = lease.get_last_vmrr()
@@ -655,7 +798,7 @@
rate = config.get("suspend-rate")
if suspend_by < vmrr.start or suspend_by > vmrr.end:
- raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
+ raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
suspend_rrs = []
@@ -678,7 +821,7 @@
susp_start = suspend_rrs[0].start
if susp_start < vmrr.start:
- raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
+ raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
vmrr.update_end(susp_start)
@@ -698,7 +841,7 @@
rate = config.get("resume-rate")
if resume_at < vmrr.start or resume_at > vmrr.end:
- raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+ raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
resume_rrs = []
@@ -721,7 +864,7 @@
resm_end = resume_rrs[-1].end
if resm_end > vmrr.end:
- raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+ raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
vmrr.update_start(resm_end)
for resmrr in resume_rrs:
@@ -902,171 +1045,6 @@
nodes.sort(comparenodes)
return nodes
- def find_preemptable_leases(self, mustpreempt, startTime, endTime):
- def comparepreemptability(rrX, rrY):
- if rrX.lease.submit_time > rrY.lease.submit_time:
- return constants.BETTER
- elif rrX.lease.submit_time < rrY.lease.submit_time:
- return constants.WORSE
- else:
- return constants.EQUAL
-
- def preemptedEnough(amountToPreempt):
- for node in amountToPreempt:
- if not amountToPreempt[node].is_zero_or_less():
- return False
- return True
-
- # Get allocations at the specified time
- atstart = set()
- atmiddle = set()
- nodes = set(mustpreempt.keys())
-
- reservationsAtStart = self.slottable.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtStart.sort(comparepreemptability)
- reservationsAtMiddle.sort(comparepreemptability)
-
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
-
- # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
- for r in reservationsAtStart:
- # The following will really only come into play when we have
- # multiple VMs per node
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- # Don't need to preempt if we've already preempted all
- # the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atstart.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
- if len(reservationsAtMiddle)>0:
- changepoints = set()
- for r in reservationsAtMiddle:
- changepoints.add(r.start)
- changepoints = list(changepoints)
- changepoints.sort()
-
- for cp in changepoints:
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
- reservations = [r for r in reservationsAtMiddle
- if r.start <= cp and cp < r.end]
- for r in reservations:
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atmiddle.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
- self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-
- leases = [r.lease for r in atstart|atmiddle]
-
- return leases
-
- # TODO: Should be moved to LeaseScheduler
- def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
- self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
- leases = []
- vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
- leases = set([rr.lease for rr in vmrrs])
- leases = [l for l in leases if isinstance(l, BestEffortLease) and l.get_state() in (Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY) and not l in checkedleases]
- for lease in leases:
- self.logger.debug("Found lease %i" % l.id)
- l.print_contents()
- # Earliest time can't be earlier than time when images will be
- # available in node
- earliest = max(nexttime, lease.imagesavail)
- self.__slideback(lease, earliest)
- checkedleases.append(l)
- #for l in leases:
- # vmrr, susprr = l.getLastVMRR()
- # self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
-
- def __slideback(self, lease, earliest):
- vmrr = lease.get_last_vmrr()
- # Save original start and end time of the vmrr
- old_start = vmrr.start
- old_end = vmrr.end
- nodes = vmrr.nodes.values()
- if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
- originalstart = vmrr.pre_rrs[0].start
- else:
- originalstart = vmrr.start
- cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
- cp = [earliest] + cp
- newstart = None
- for p in cp:
- self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
- self.slottable.availabilitywindow.printContents()
- if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
- (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
- if end == originalstart and set(nodes) <= set(canfit.keys()):
- self.logger.debug("Can slide back to %s" % p)
- newstart = p
- break
- if newstart == None:
- # Can't slide back. Leave as is.
- pass
- else:
- diff = originalstart - newstart
- if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
- resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
- for resmrr in resmrrs:
- resmrr_old_start = resmrr.start
- resmrr_old_end = resmrr.end
- resmrr.start -= diff
- resmrr.end -= diff
- self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
- vmrr.update_start(vmrr.start - diff)
-
- # If the lease was going to be suspended, check to see if
- # we don't need to suspend any more.
- remdur = lease.duration.get_remaining_duration()
- if vmrr.is_suspending() and vmrr.end - newstart >= remdur:
- vmrr.update_end(vmrr.start + remdur)
- for susprr in vmrr.post_rrs:
- self.slottable.removeReservation(susprr)
- vmrr.post_rrs = []
- else:
- vmrr.update_end(vmrr.end - diff)
-
- if not vmrr.is_suspending():
- # If the VM was set to shutdown, we need to slideback the shutdown RRs
- for rr in vmrr.post_rrs:
- rr_old_start = rr.start
- rr_old_end = rr.end
- rr.start -= diff
- rr.end -= diff
- self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
-
- self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
- self.logger.vdebug("New lease descriptor (after slideback):")
- lease.print_contents()
-
-
-
#-------------------------------------------------------------------#
# #
# SLOT TABLE EVENT HANDLERS #
@@ -1089,16 +1067,21 @@
# appropriate inside the resourcepool module
for (vnode, pnode) in rr.nodes.items():
l.diskimagemap[vnode] = pnode
- except Exception, e:
- self.logger.error("ERROR when starting VMs.")
+ except EnactmentError, exc:
+ self.logger.error("Enactment error when starting VMs.")
+ # Right now, this is a non-recoverable error, so we just
+ # propagate it upwards to the lease scheduler
+ # In the future, it may be possible to react to these
+ # kind of errors.
raise
+
elif lease_state == Lease.STATE_RESUMED_READY:
l.set_state(Lease.STATE_ACTIVE)
rr.state = ResourceReservation.STATE_ACTIVE
# No enactment to do here, since all the suspend/resume actions are
# handled during the suspend/resume RRs
else:
- raise CriticalSchedException, "Lease is an inconsistent state (tried to start VM when state is %s)" % Lease.state_str[lease_state]
+ raise InconsistentLeaseStateError(l, doing = "starting a VM")
l.print_contents()
self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
@@ -1136,7 +1119,16 @@
self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
l.print_contents()
rr.state = ResourceReservation.STATE_ACTIVE
- self.resourcepool.stop_vms(l, rr)
+ try:
+ self.resourcepool.stop_vms(l, rr)
+ except EnactmentError, exc:
+ self.logger.error("Enactment error when shutting down VMs.")
+ # Right now, this is a non-recoverable error, so we just
+ # propagate it upwards to the lease scheduler
+ # In the future, it may be possible to react to these
+ # kind of errors.
+ raise
+
l.print_contents()
self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
@@ -1155,7 +1147,17 @@
self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
l.print_contents()
rr.state = ResourceReservation.STATE_ACTIVE
- self.resourcepool.suspend_vms(l, rr)
+
+ try:
+ self.resourcepool.suspend_vms(l, rr)
+ except EnactmentError, exc:
+ self.logger.error("Enactment error when suspending VMs.")
+ # Right now, this is a non-recoverable error, so we just
+ # propagate it upwards to the lease scheduler
+ # In the future, it may be possible to react to these
+ # kind of errors.
+ raise
+
for vnode in rr.vnodes:
pnode = rr.vmrr.nodes[vnode]
l.memimagemap[vnode] = pnode
@@ -1183,7 +1185,17 @@
def _handle_start_resume(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
l.print_contents()
- self.resourcepool.resume_vms(l, rr)
+
+ try:
+ self.resourcepool.resume_vms(l, rr)
+ except EnactmentError, exc:
+ self.logger.error("Enactment error when resuming VMs.")
+ # Right now, this is a non-recoverable error, so we just
+ # propagate it upwards to the lease scheduler
+ # In the future, it may be possible to react to these
+ # kind of errors.
+ raise
+
rr.state = ResourceReservation.STATE_ACTIVE
if rr.is_first():
l.set_state(Lease.STATE_RESUMING)
Modified: trunk/tests/base_config_simulator.conf
===================================================================
--- trunk/tests/base_config_simulator.conf 2009-02-03 22:57:47 UTC (rev 562)
+++ trunk/tests/base_config_simulator.conf 2009-02-05 01:16:39 UTC (rev 563)
@@ -3,6 +3,7 @@
mode: simulated
lease-preparation: unmanaged
datafile: /var/tmp/haizea/results.dat
+lease-failure-handling: exit
[simulation]
clock: simulated
More information about the Haizea-commit
mailing list