[haizea-commit] r564 - in trunk/src/haizea: common resourcemanager resourcemanager/scheduler resourcemanager/scheduler/preparation_schedulers
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Fri Feb 6 18:15:33 CST 2009
Author: borja
Date: 2009-02-06 18:15:26 -0600 (Fri, 06 Feb 2009)
New Revision: 564
Modified:
trunk/src/haizea/common/constants.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
Log:
Cleaned up code and added more comments, primarily in LeaseScheduler
Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/common/constants.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -99,7 +99,6 @@
COUNTER_BESTEFFORTCOMPLETED="Best-effort completed"
COUNTER_QUEUESIZE="Queue size"
COUNTER_DISKUSAGE="Disk usage"
-COUNTER_CPUUTILIZATION="VM CPU utilization"
COUNTER_UTILIZATION="Resource utilization"
AVERAGE_NONE=0
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/resourcemanager/rm.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -289,7 +289,6 @@
self.accounting.create_counter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
self.accounting.create_counter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
self.accounting.create_counter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
- self.accounting.create_counter(constants.COUNTER_CPUUTILIZATION, constants.AVERAGE_TIMEWEIGHTED)
self.accounting.create_counter(constants.COUNTER_UTILIZATION, constants.AVERAGE_NONE)
if self.daemon:
@@ -392,8 +391,16 @@
self.logger.error("Message: %s" % exc)
def panic(self):
+ treatment = self.config.get("lease-failure-handling")
+
+ # Stop the resource manager
+ self.stop()
+
#self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- exit(1)
+ if treatment == constants.ONFAILURE_EXIT:
+ exit(1)
+ elif treatment == constants.ONFAILURE_EXIT_RAISE:
+ raise
def print_stats(self, loglevel, verbose=False):
@@ -440,7 +447,8 @@
# ending VM.
def notify_event(self, lease_id, event):
try:
- self.scheduler.notify_event(lease_id, event)
+ lease = self.scheduler.get_lease_by_id(lease_id)
+ self.scheduler.notify_event(lease, event)
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception when notifying an event for lease %i. Dumping state..." % lease_id )
@@ -451,9 +459,10 @@
"""Cancels a lease.
Arguments:
- lease -- Lease to cancel
+ lease_id -- ID of lease to cancel
"""
try:
+ lease = self.scheduler.get_lease_by_id(lease_id)
self.scheduler.cancel_lease(lease_id)
except Exception, msg:
# Exit if something goes horribly wrong
@@ -826,16 +835,3 @@
self.rm.stop()
sys.exit()
-if __name__ == "__main__":
- from haizea.resourcemanager.configfile import HaizeaConfig
- from haizea.common.config import ConfigException
- CONFIGFILE = "../../../etc/sample_trace.conf"
- try:
- CONFIG = HaizeaConfig.from_file(CONFIGFILE)
- except ConfigException, msg:
- print >> sys.stderr, "Error in configuration file:"
- print >> sys.stderr, msg
- exit(1)
- from haizea.resourcemanager.rm import ResourceManager
- RM = ResourceManager(CONFIG)
- RM.start()
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -17,11 +17,15 @@
# -------------------------------------------------------------------------- #
-"""This module provides the main classes for Haizea's scheduler, particularly
-the Scheduler class. The deployment scheduling code (everything that has to be
-done to prepare a lease) happens in the modules inside the
-haizea.resourcemanager.deployment package.
+"""This module provides the main classes for Haizea's lease scheduler, particularly
+the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
+the code that decides what physical hosts a VM should be mapped to), which is
+located in the VMScheduler class (in the vm_scheduler module). Lease preparation
+code (e.g., image transfer scheduling) is located in the preparation_schedulers
+package.
+This module also includes a Queue class and a LeaseTable class, which are used
+by the lease scheduler.
"""
import haizea.common.constants as constants
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
@@ -39,31 +43,59 @@
"""The Haizea Lease Scheduler
Public methods:
+ request_lease -- Entry point of leases into the scheduler
schedule -- The scheduling function
process_reservations -- Processes starting/ending reservations at a given time
- enqueue -- Queues a best-effort request
+ cancel_lease -- Cancels a lease
+ fail_lease -- Marks a lease as failed, and does any necessary cleaning up
is_queue_empty -- Is the queue empty?
exists_scheduled_leases -- Are there any leases scheduled?
Private methods:
- __schedule_ar_lease -- Schedules an AR lease
- __schedule_besteffort_lease -- Schedules a best-effort lease
- __preempt -- Preempts a lease
- __reevaluate_schedule -- Reevaluate the schedule (used after resources become
- unexpectedly unavailable)
- _handle_* -- Reservation event handlers
+ __process_queue -- Processes queue and, if possible, schedules leases
+ __schedule_lease -- Schedules a lease
+ __preempt_lease -- Preempts a lease
+ __enqueue -- Puts a lease at the end of the queue
+ __enqueue_in_order -- Queues a lease in order (currently, time of submission)
+ _handle_end_rr -- Code that has to be run when a reservation ends
+ _handle_end_lease -- Code that has to be run at the end of a lease
"""
def __init__(self, vm_scheduler, preparation_scheduler, slottable):
+ """Constructor
+
+ The constructor does little more than create the lease scheduler's
+ attributes. However, it does expect (in the arguments) a fully-constructed
+ VMScheduler, PreparationScheduler, and SlotTable (these are currently
+ constructed in the ResourceManager's constructor).
+
+ Arguments:
+ vm_scheduler -- VM scheduler
+ preparation_scheduler -- Preparation scheduler
+ slottable -- Slottable
+ """
+
+ # Logger
+ self.logger = logging.getLogger("LSCHED")
+
+ # Assign schedulers and slottable
self.vm_scheduler = vm_scheduler
self.preparation_scheduler = preparation_scheduler
self.slottable = slottable
- self.logger = logging.getLogger("LSCHED")
+ # Create other data structures
self.queue = Queue(self)
self.leases = LeaseTable(self)
self.completedleases = LeaseTable(self)
+ # Handlers are callback functions that get called whenever a type of
+ # resource reservation starts or ends. Each scheduler publishes the
+ # handlers it supports through its "handlers" attributes. For example,
+ # the VMScheduler provides _handle_start_vm and _handle_end_vm that
+ # must be called when a VMResourceReservation start or end is encountered
+ # in the slot table.
+ #
+ # Handlers are called from the process_reservations method of this class
self.handlers = {}
for (type, handler) in self.vm_scheduler.handlers.items():
self.handlers[type] = handler
@@ -71,150 +103,244 @@
for (type, handler) in self.preparation_scheduler.handlers.items():
self.handlers[type] = handler
- backfilling = get_config().get("backfilling")
- if backfilling == constants.BACKFILLING_OFF:
- self.maxres = 0
- elif backfilling == constants.BACKFILLING_AGGRESSIVE:
- self.maxres = 1
- elif backfilling == constants.BACKFILLING_CONSERVATIVE:
- self.maxres = 1000000 # Arbitrarily large
- elif backfilling == constants.BACKFILLING_INTERMEDIATE:
- self.maxres = get_config().get("backfilling-reservations")
- self.numbesteffortres = 0
+ def request_lease(self, lease):
+ """Requests a leases. This is the entry point of leases into the scheduler.
- def schedule(self, nexttime):
+ Request a lease. At this point, it is simply marked as "Pending" and,
+ next time the scheduling function is called, the fate of the
+ lease will be determined (right now, AR+IM leases get scheduled
+ right away, and best-effort leases get placed on a queue)
+
+ Arguments:
+ lease -- Lease object. Its state must be STATE_NEW.
+ """
+ self.logger.info("Lease #%i has been requested and is pending." % lease.id)
+ lease.print_contents()
+ lease.set_state(Lease.STATE_PENDING)
+ self.leases.add(lease)
+
+
+ def schedule(self, nexttime):
+ """ The main scheduling function
+
+ The scheduling function looks at all pending requests and schedules them.
+ Note that most of the actual scheduling code is contained in the
+ __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
+
+ Arguments:
+ nexttime -- The next time at which the scheduler can allocate resources.
+ """
+
+ # Get pending leases
pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
- # Queue best-effort requests
+ # Queue best-effort leases
for lease in be_leases:
self.__enqueue(lease)
lease.set_state(Lease.STATE_QUEUED)
-
- # Process immediate requests
+ self.logger.info("Queued best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
+
+ # Schedule immediate leases
for lease in im_leases:
- self.__process_im_request(lease, nexttime)
+ self.logger.info("Scheduling immediate lease #%i (%i nodes)" % (lease.id, lease.numnodes))
+ lease.print_contents()
+
+ try:
+ self.__schedule_lease(lease, nexttime=nexttime)
+ self.logger.info("Immediate lease #%i has been accepted." % lease.id)
+ get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
+ lease.print_contents()
+ except NotSchedulableException, exc:
+ get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
+ self.logger.info("Immediate lease request #%i has been rejected: %s" % (lease.id, exc.message))
+ lease.set_state(Lease.STATE_REJECTED)
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
- # Process AR requests
+ # Schedule AR requests
for lease in ar_leases:
- self.__process_ar_request(lease, nexttime)
+ self.logger.info("Scheduling AR lease #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
+ lease.print_contents()
- # Process best-effort requests
+ try:
+ self.__schedule_lease(lease, nexttime)
+ self.logger.info("AR lease #%i has been accepted." % lease.id)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
+ lease.print_contents()
+ except NotSchedulableException, exc:
+ get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
+ self.logger.info("AR lease request #%i has been rejected: %s" % (lease.id, exc.message))
+ lease.set_state(Lease.STATE_REJECTED)
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
+
+ # Process queue (i.e., traverse queue in search of leases that can be scheduled)
self.__process_queue(nexttime)
def process_reservations(self, nowtime):
+ """Processes starting/ending reservations
+
+ This method checks the slottable to see if there are any reservations that are
+ starting or ending at "nowtime". If so, the appropriate handler is called.
+
+ Arguments:
+ nowtime -- Time at which to check for starting/ending reservations.
+ """
+
+ # Find starting/ending reservations
starting = self.slottable.get_reservations_starting_at(nowtime)
starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
ending = self.slottable.get_reservations_ending_at(nowtime)
ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
+ # Process ending reservations
for rr in ending:
lease = rr.lease
- self._handle_end_rr(lease, rr)
+ self._handle_end_rr(rr)
+
+ # Call the appropriate handler, and catch exceptions and errors.
try:
self.handlers[type(rr)].on_end(lease, rr)
+
+ # A RescheduleLeaseException indicates that the lease has to be rescheduled
except RescheduleLeaseException, msg:
+ # Currently, the only leases that get rescheduled are best-effort leases,
+ # once they've been suspended.
if isinstance(rr.lease, BestEffortLease):
if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
+ # Put back in the queue, in the same order it arrived
self.__enqueue_in_order(lease)
lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
else:
raise InconsistentLeaseStateError(l, doing = "rescheduling best-effort lease")
+
+ # A NormalEndLeaseException indicates that the end of this reservations marks
+ # the normal end of the lease.
except NormalEndLeaseException, msg:
self._handle_end_lease(lease)
+
+ # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
+ # state. This is usually indicative of a programming error, but not necessarily
+ # one that affects all leases, so we just fail this lease. Note that Haizea can also
+ # be configured to stop immediately when a lease fails.
except InconsistentLeaseStateError, exc:
self.fail_lease(lease, exc)
+
+ # An EnactmentError is raised when the handler had to perform an enactment action
+ # (e.g., stopping a VM), and that enactment action failed. This is currently treated
+ # as a non-recoverable error for the lease, and the lease is failed.
except EnactmentError, exc:
self.fail_lease(lease, exc)
- # Everything else gets propagated upwards to ResourceManager
- # and will make Haizea crash and burn
+ # Other exceptions are not expected, and generally indicate a programming error.
+ # Thus, they are propagated upwards to the ResourceManager where they will make
+ # Haizea crash and burn.
+
+ # Process starting reservations
for rr in starting:
lease = rr.lease
+ # Call the appropriate handler, and catch exceptions and errors.
try:
self.handlers[type(rr)].on_start(lease, rr)
+
+
+ # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
+ # state. This is usually indicative of a programming error, but not necessarily
+ # one that affects all leases, so we just fail this lease. Note that Haizea can also
+ # be configured to stop immediately when a lease fails.
except InconsistentLeaseStateError, exc:
self.fail_lease(lease, exc)
+ # An EnactmentError is raised when the handler had to perform an enactment action
+ # (e.g., stopping a VM), and that enactment action failed. This is currently treated
+ # as a non-recoverable error for the lease, and the lease is failed.
except EnactmentError, exc:
self.fail_lease(lease, exc)
- # Everything else gets propagated upwards to ResourceManager
- # and will make Haizea crash and burn
+
+ # Other exceptions are not expected, and generally indicate a programming error.
+ # Thus, they are propagated upwards to the ResourceManager where they will make
+ # Haizea crash and burn.
- # TODO: Should be in VMScheduler
- util = self.__get_utilization(nowtime)
- if not util.has_key(VMResourceReservation):
- cpuutil = 0.0
- else:
- cpuutil = util[VMResourceReservation]
- get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)
+ # Each time we process reservations, we report resource utilization to the accounting
+ # module. This utilization information shows what portion of the physical resources
+ # is used by each type of reservation (e.g., 70% are running a VM, 5% are doing suspensions,
+ # etc.) See the get_utilization module for details on how this data is stored.
+ # Currently we only collect utilization from the VM Scheduler (in the future,
+ # information will also be gathered from the preparation scheduler).
+ util = self.vm_scheduler.get_utilization(nowtime)
get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
- def request_lease(self, lease):
+
+ def get_lease_by_id(self, lease_id):
+ """Gets a lease with the given ID
+
+ This method is useful for UIs (like the CLI) that operate on the lease ID.
+ If no lease with a given ID is found, None is returned.
+
+ Arguments:
+ lease_id -- The ID of the lease
"""
- Request a lease. At this point, it is simply marked as "Pending" and,
- next time the scheduling function is called, the fate of the
- lease will be determined (right now, AR+IM leases get scheduled
- right away, and best-effort leases get placed on a queue)
- """
- lease.set_state(Lease.STATE_PENDING)
- self.leases.add(lease)
+ if not self.leases.has_lease(lease_id):
+ return None
+ else:
+ return self.leases.get_lease(lease_id)
- def is_queue_empty(self):
- """Return True is the queue is empty, False otherwise"""
- return self.queue.is_empty()
-
- def exists_scheduled_leases(self):
- """Return True if there are any leases scheduled in the future"""
- return not self.slottable.is_empty()
-
- def cancel_lease(self, lease_id):
+ def cancel_lease(self, lease):
"""Cancels a lease.
Arguments:
- lease_id -- ID of lease to cancel
+ lease -- Lease to cancel
"""
time = get_clock().get_time()
self.logger.info("Cancelling lease %i..." % lease_id)
- if self.leases.has_lease(lease_id):
- # The lease is either running, or scheduled to run
- lease = self.leases.get_lease(lease_id)
- lease_state = lease.get_state()
+ lease_state = lease.get_state()
+
+ if lease_state == Lease.STATE_PENDING:
+ # If a lease is pending, we just need to change its state and
+ # remove it from the lease table. Since this is done at the
+ # end of this method, we do nothing here.
+ pass
+
+ elif lease_state == Lease.STATE_ACTIVE:
+ # If a lease is active, that means we have to shut down its VMs to cancel it.
+ self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
+ rr = lease.get_active_reservations(time)[0]
+ self.vm_scheduler._handle_unscheduled_end_vm(lease, rr, enact=True)
+
+ elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]:
+ # If a lease is scheduled or ready, we just need to cancel all future reservations
+ # for that lease
+ self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
+ rrs = lease.get_scheduled_reservations()
+ for r in rrs:
+ lease.remove_rr(r)
+ self.slottable.removeReservation(r)
- if lease_state == Lease.STATE_ACTIVE:
- self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
- rr = lease.get_active_reservations(time)[0]
- if isinstance(rr, VMResourceReservation):
- self._handle_unscheduled_end_vm(lease, rr, enact=True)
- # TODO: Handle cancelations in middle of suspensions and
- # resumptions
- elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
- self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
- rrs = lease.get_scheduled_reservations()
- for r in rrs:
- lease.remove_rr(r)
- self.slottable.removeReservation(r)
- lease.set_state(Lease.STATE_CANCELLED)
- self.completedleases.add(lease)
- self.leases.remove(lease)
- else:
- raise InconsistentLeaseStateError(l, doing = "cancelling the VM")
+ elif lease_state == [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]:
+ # If a lease is in the queue, waiting to be scheduled, cancelling
+ # just requires removing it from the queue
- elif self.queue.has_lease(lease_id):
- # The lease is in the queue, waiting to be scheduled.
- # Cancelling is as simple as removing it from the queue
self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
l = self.queue.get_lease(lease_id)
self.queue.remove_lease(lease)
+ else:
+ # Cancelling in any of the other states is currently unsupported
+ raise InconsistentLeaseStateError(l, doing = "cancelling the VM")
+ # Change state, and remove from lease table
+ lease.set_state(Lease.STATE_CANCELLED)
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
+
def fail_lease(self, lease, exc=None):
"""Transitions a lease to a failed state, and does any necessary cleaning up
@@ -226,22 +352,37 @@
treatment = get_config().get("lease-failure-handling")
if treatment == constants.ONFAILURE_CANCEL:
+ # In this case, a lease failure is handled by cancelling the lease,
+ # but allowing Haizea to continue to run normally.
rrs = lease.get_scheduled_reservations()
for r in rrs:
self.slottable.removeReservation(r)
lease.set_state(Lease.STATE_FAILED)
self.completedleases.add(lease)
self.leases.remove(lease)
- elif treatment == constants.ONFAILURE_EXIT:
+ elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE:
+ # In this case, a lease failure makes Haizea exit. This is useful when debugging,
+ # so we can immediately know about any errors.
raise UnrecoverableError(exc)
- def notify_event(self, lease_id, event):
+ def notify_event(self, lease, event):
+ """Notifies an event that affects a lease.
+
+ This is the entry point of asynchronous events into the scheduler. Currently,
+ the only supported event is the premature end of a VM (i.e., before its
+ scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4,
+ since that version will support sending asynchronous events to Haizea.
+
+ Arguments:
+ lease -- Lease the event refers to
+ event -- Event type
+ """
time = get_clock().get_time()
if event == constants.EVENT_END_VM:
- lease = self.leases.get_lease(lease_id)
vmrr = lease.get_last_vmrr()
- self._handle_end_rr(lease, vmrr)
+ self._handle_end_rr(vmrr)
+ # TODO: Exception handling
self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr, enact=False)
self._handle_end_lease(lease)
nexttime = get_clock().get_next_schedulable_time()
@@ -249,130 +390,71 @@
# reservations that we can slide back.
self.__reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
+
+ def is_queue_empty(self):
+ """Return True is the queue is empty, False otherwise"""
+ return self.queue.is_empty()
+
- def __process_ar_request(self, lease, nexttime):
- self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
- self.logger.debug(" Start : %s" % lease.start)
- self.logger.debug(" Duration: %s" % lease.duration)
- self.logger.debug(" ResReq : %s" % lease.requested_resources)
-
- accepted = False
- try:
- self.__schedule_ar_lease(lease, avoidpreempt=True, nexttime=nexttime)
- self.leases.add(lease)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
- accepted = True
- except NotSchedulableException, exc:
- # Our first try avoided preemption, try again
- # without avoiding preemption.
- # TODO: Roll this into the exact slot fitting algorithm
- try:
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
- self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease.id)
- self.__schedule_ar_lease(lease, nexttime, avoidpreempt=False)
- self.leases.add(lease)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
- accepted = True
- except NotSchedulableException, exc:
- get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, exc.message))
+ def exists_scheduled_leases(self):
+ """Return True if there are any leases scheduled in the future"""
+ return not self.slottable.is_empty()
- if accepted:
- self.logger.info("AR lease request #%i has been accepted." % lease.id)
- else:
- self.logger.info("AR lease request #%i has been rejected." % lease.id)
- lease.set_state(Lease.STATE_REJECTED)
- self.completedleases.add(lease)
- self.leases.remove(lease)
+
+ def __process_queue(self, nexttime):
+ """ Traverses the queue in search of leases that can be scheduled.
+ This method processes the queue in order, but takes into account that
+ it may be possible to schedule leases in the future (using a
+ backfilling algorithm)
- def __process_queue(self, nexttime):
+ TODO: Refine the backfilling algorithm, both here and in the VMScheduler.
+ Currently, only aggressive backfilling is supported, and somewhat crudely
+ (still better than no backfilling at all, though)
+
+ Arguments:
+ nexttime -- The next time at which the scheduler can allocate resources.
+ """
+
done = False
newqueue = Queue(self)
while not done and not self.is_queue_empty():
- if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
- self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
+ if not self.vm_scheduler.can_reserve_besteffort_in_future() and self.slottable.isFull(nexttime):
+ self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
done = True
else:
lease = self.queue.dequeue()
try:
self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
- self.logger.debug(" Duration: %s" % lease.duration)
- self.logger.debug(" ResReq : %s" % lease.requested_resources)
- self.__schedule_besteffort_lease(lease, nexttime)
- self.leases.add(lease)
+ lease.print_contents()
+ self.__schedule_lease(lease, nexttime)
get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease.id)
except NotSchedulableException, msg:
# Put back on queue
newqueue.enqueue(lease)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
- if not self.is_backfilling():
+ if not self.vm_scheduler.is_backfilling():
done = True
for lease in self.queue:
newqueue.enqueue(lease)
self.queue = newqueue
-
-
- def __process_im_request(self, lease, nexttime):
- self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease.id, lease.numnodes))
- self.logger.debug(" Duration: %s" % lease.duration)
- self.logger.debug(" ResReq : %s" % lease.requested_resources)
-
- try:
- self.__schedule_immediate_lease(lease, nexttime=nexttime)
- self.leases.add(lease)
- get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
- self.logger.info("Immediate lease request #%i has been accepted." % lease.id)
- except NotSchedulableException, msg:
- get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
-
- def __schedule_ar_lease(self, lease, nexttime, avoidpreempt=True):
- (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-
- if len(preemptions) > 0:
- leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
- self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
- for l in leases:
- self.__preempt(l, preemption_time=vmrr.start)
- # Schedule deployment overhead
- deploy_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+ def __schedule_lease(self, lease, nexttime):
+ """ Schedules a lease.
- # Commit reservation to slot table
- # (we don't do this until the very end because the deployment overhead
- # scheduling could still throw an exception)
- for rr in deploy_rrs:
- lease.append_deployrr(rr)
-
- for rr in deploy_rrs:
- self.slottable.addReservation(rr)
-
- lease.append_vmrr(vmrr)
- self.slottable.addReservation(vmrr)
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- lease.set_state(Lease.STATE_SCHEDULED)
-
- if is_ready:
- lease.set_state(Lease.STATE_READY)
-
-
- def __schedule_besteffort_lease(self, lease, nexttime):
- # Schedule the VMs
- canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
-
+ Arguments:
+ lease -- Lease to schedule.
+ nexttime -- The next time at which the scheduler can allocate resources.
+ """
+
lease_state = lease.get_state()
# Determine earliest start time in each node
- if lease_state == Lease.STATE_QUEUED:
+ if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
# Figure out earliest start times based on
# image schedule and reusable images
earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
@@ -383,8 +465,18 @@
else:
raise InconsistentLeaseStateError(l, doing = "scheduling a best-effort lease")
- (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
-
+ if isinstance(lease, BestEffortLease):
+ (vmrr, preemptions) = self.vm_scheduler.fit_asap(lease, nexttime, earliest)
+ elif isinstance(lease, ARLease):
+ (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True)
+ elif isinstance(lease, ImmediateLease):
+ (vmrr, preemptions) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future=False)
+
+ if len(preemptions) > 0:
+ self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
+ for l in preemptions:
+ self.__preempt_lease(l, preemption_time=vmrr.start)
+
# Schedule deployment
is_ready = False
deploy_rrs = []
@@ -420,11 +512,8 @@
# Post-VM RRs (if any)
for rr in vmrr.post_rrs:
self.slottable.addReservation(rr)
-
- if in_future:
- self.numbesteffortres += 1
-
- if lease_state == Lease.STATE_QUEUED:
+
+ if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
lease.set_state(Lease.STATE_SCHEDULED)
if is_ready:
lease.set_state(Lease.STATE_READY)
@@ -433,23 +522,15 @@
lease.print_contents()
-
- def __schedule_immediate_lease(self, req, nexttime):
- (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
- # Schedule deployment
- self.preparation_scheduler.schedule(req, vmrr, nexttime)
-
- req.append_rr(vmrr)
- self.slottable.addReservation(vmrr)
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- req.print_contents()
+ def __preempt_lease(self, lease, preemption_time):
+ """ Preempts a lease.
- def __preempt(self, lease, preemption_time):
+ Arguments:
+ lease -- Lease to schedule.
+ preemption_time -- Time at which lease must be preempted
+ """
self.logger.info("Preempting lease #%i..." % (lease.id))
self.logger.vdebug("Lease before preemption:")
@@ -478,28 +559,45 @@
if must_cancel_and_requeue:
self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
- if vmrr.backfill_reservation == True:
- self.numbesteffortres -= 1
- # If there are any post RRs, remove them
- for rr in vmrr.post_rrs:
- self.slottable.removeReservation(rr)
+ self.vm_scheduler.cancel_vm(vmrr)
lease.remove_vmrr(vmrr)
- self.slottable.removeReservation(vmrr)
- for vnode, pnode in lease.diskimagemap.items():
- self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
- self.preparation_scheduler.cancel_deployment(lease)
- lease.diskimagemap = {}
+ self.preparation_scheduler.cancel_preparation(lease)
lease.set_state(Lease.STATE_QUEUED)
self.__enqueue_in_order(lease)
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
else:
self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
- self.vm_scheduler.preempt(vmrr, preemption_time)
+ self.vm_scheduler.preempt_vm(vmrr, preemption_time)
self.logger.vdebug("Lease after preemption:")
lease.print_contents()
+
def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+ """ Reevaluate the schedule after a lease ends prematurely
+
+ After a lease ends prematurely, resources may become available. If so,
+ any lease that was scheduled under the assumption that the earliest starting time
+ was after the lease that ended prematurely, we will be able to start that
+ lease earlier than expected.
+
+ TODO: Refine the backfilling algorithm, both here and in the VMScheduler.
+ Currently, only aggressive backfilling is supported, and somewhat crudely
+ (still better than no backfilling at all, though). In particular, it might
+ be a good idea to just do away with the "slideback" algorithm and simply
+ keep better track of what leases have been scheduled in the future, and
+ just reschedule them (almost) as if they had been submitted again.
+
+ Arguments:
+ endinglease -- The lease that ended prematurely and prompted a schedule reevaluation
+ nodes -- Physical nodes where schedule will be reevaluated
+ nexttime -- The next time at which the scheduler can allocate resources.
+ checkedleases -- What leases have been already checked for rescheduling (regardless
+ of whether we were actually able to reschedule them). This method
+ uses a recursive algorithm, so the value of this argument has to be
+ initially [] (the empty list)
+ """
+
self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
leases = []
vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
@@ -517,40 +615,42 @@
# vmrr, susprr = l.getLastVMRR()
# self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
- # TODO: Should be in VMScheduler
- def __get_utilization(self, time):
- total = self.slottable.get_total_capacity()
- util = {}
- reservations = self.slottable.getReservationsAt(time)
- for r in reservations:
- for node in r.resources_in_pnode:
- if isinstance(r, VMResourceReservation):
- use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
- util[type(r)] = use + util.setdefault(type(r),0.0)
- elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
- use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
- util[type(r)] = use + util.setdefault(type(r),0.0)
- util[None] = total - sum(util.values())
- for k in util:
- util[k] /= total
-
- return util
-
+
def __enqueue(self, lease):
- """Queues a best-effort lease request"""
+ """Queues a best-effort lease request
+
+ Arguments:
+ lease -- Lease to be queued
+ """
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.queue.enqueue(lease)
- self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
+
def __enqueue_in_order(self, lease):
+ """Queues a lease in order (currently, time of submission)
+
+ Arguments:
+ lease -- Lease to be queued
+ """
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.queue.enqueue_in_order(lease)
- def _handle_end_rr(self, l, rr):
+ def _handle_end_rr(self, rr):
+ """Performs actions that have to be done each time a reservation ends.
+
+ Arguments:
+ rr -- Reservation that ended
+ """
self.slottable.removeReservation(rr)
+
def _handle_end_lease(self, l):
+ """Performs actions that have to be done each time a lease ends.
+
+ Arguments:
+ lease -- Lease that has ended
+ """
l.set_state(Lease.STATE_DONE)
l.duration.actual = l.duration.accumulated
l.end = round_datetime(get_clock().get_time())
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -63,9 +63,12 @@
elif isinstance(lease, BestEffortLease):
return self.schedule_for_besteffort(lease, vmrr, nexttime)
- def cancel_deployment(self, lease):
+ def cancel_preparation(self, lease):
if isinstance(lease, BestEffortLease):
self.__remove_from_fifo_transfers(lease.id)
+ self.cleanup(lease)
+ lease.diskimagemap = {}
+
def is_ready(self, lease, vmrr):
return False
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -41,4 +41,4 @@
def cleanup(self, lease):
for vnode, pnode in lease.diskimagemap.items():
- self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-02-05 01:16:39 UTC (rev 563)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-02-07 00:15:26 UTC (rev 564)
@@ -207,10 +207,12 @@
vmrr.state = ResourceReservation.STATE_SCHEDULED
self.__schedule_shutdown(vmrr)
+
+ preemptions = self.__find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
return vmrr, preemptions
- def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = False):
+ def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = None):
lease_id = lease.id
remaining_duration = lease.duration.get_remaining_duration()
numnodes = lease.numnodes
@@ -224,6 +226,9 @@
else:
suspendable = True
+ if allow_reservation_in_future == None:
+ allow_reservation_in_future = self.can_reserve_besteffort_in_future()
+
canmigrate = get_config().get("migration")
#
@@ -392,6 +397,8 @@
vmrr.end = vmrr.start + remaining_duration + shutdown_time
self.__schedule_shutdown(vmrr)
+ if reservation:
+ self.numbesteffortres += 1
susp_str = res_str = ""
@@ -401,7 +408,7 @@
susp_str = " (suspending)"
self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
- return vmrr, reservation
+ return vmrr, []
# TODO: This has to be tied in with the preparation scheduler
def schedule_migration(self, lease, vmrr, nexttime):
@@ -457,7 +464,24 @@
for migr_rr in migr_rrs:
vmrr.pre_rrs.insert(0, migr_rr)
- def preempt(self, vmrr, t):
+ def cancel_vm(self, vmrr):
+
+ if vmrr.backfill_reservation == True:
+ self.numbesteffortres -= 1
+
+ # If there are any pre-RRs that are scheduled, remove them
+ for rr in vmrr.pre_rrs:
+ if rr.state == ResourceReservation.STATE_SCHEDULED:
+ self.slottable.removeReservation(rr)
+
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+
+ self.slottable.removeReservation(vmrr)
+
+
+ def preempt_vm(self, vmrr, t):
# Save original start and end time of the vmrr
old_start = vmrr.start
old_end = vmrr.end
@@ -466,90 +490,6 @@
for susprr in vmrr.post_rrs:
self.slottable.addReservation(susprr)
- def find_preemptable_leases(self, mustpreempt, startTime, endTime):
- def comparepreemptability(rrX, rrY):
- if rrX.lease.submit_time > rrY.lease.submit_time:
- return constants.BETTER
- elif rrX.lease.submit_time < rrY.lease.submit_time:
- return constants.WORSE
- else:
- return constants.EQUAL
-
- def preemptedEnough(amountToPreempt):
- for node in amountToPreempt:
- if not amountToPreempt[node].is_zero_or_less():
- return False
- return True
-
- # Get allocations at the specified time
- atstart = set()
- atmiddle = set()
- nodes = set(mustpreempt.keys())
-
- reservationsAtStart = self.slottable.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtStart.sort(comparepreemptability)
- reservationsAtMiddle.sort(comparepreemptability)
-
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
-
- # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
- for r in reservationsAtStart:
- # The following will really only come into play when we have
- # multiple VMs per node
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- # Don't need to preempt if we've already preempted all
- # the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atstart.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
- if len(reservationsAtMiddle)>0:
- changepoints = set()
- for r in reservationsAtMiddle:
- changepoints.add(r.start)
- changepoints = list(changepoints)
- changepoints.sort()
-
- for cp in changepoints:
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
- reservations = [r for r in reservationsAtMiddle
- if r.start <= cp and cp < r.end]
- for r in reservations:
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atmiddle.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
- self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-
- leases = [r.lease for r in atstart|atmiddle]
-
- return leases
-
-
def slideback(self, lease, earliest):
vmrr = lease.get_last_vmrr()
# Save original start and end time of the vmrr
@@ -611,6 +551,24 @@
self.logger.vdebug("New lease descriptor (after slideback):")
lease.print_contents()
+ def get_utilization(self, time):
+ total = self.slottable.get_total_capacity()
+ util = {}
+ reservations = self.slottable.getReservationsAt(time)
+ for r in reservations:
+ for node in r.resources_in_pnode:
+ if isinstance(r, VMResourceReservation):
+ use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+ util[type(r)] = use + util.setdefault(type(r),0.0)
+ elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
+ use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+ util[type(r)] = use + util.setdefault(type(r),0.0)
+ util[None] = total - sum(util.values())
+ for k in util:
+ util[k] /= total
+
+ return util
+
def can_suspend_at(self, lease, t):
# TODO: Make more general, should determine vmrr based on current time
vmrr = lease.get_last_vmrr()
@@ -950,6 +908,90 @@
threshold = safe_duration + (min_duration * factor)
return threshold
+ def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
+ def comparepreemptability(rrX, rrY):
+ if rrX.lease.submit_time > rrY.lease.submit_time:
+ return constants.BETTER
+ elif rrX.lease.submit_time < rrY.lease.submit_time:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+
+ def preemptedEnough(amountToPreempt):
+ for node in amountToPreempt:
+ if not amountToPreempt[node].is_zero_or_less():
+ return False
+ return True
+
+ # Get allocations at the specified time
+ atstart = set()
+ atmiddle = set()
+ nodes = set(mustpreempt.keys())
+
+ reservationsAtStart = self.slottable.getReservationsAt(startTime)
+ reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtStart.sort(comparepreemptability)
+ reservationsAtMiddle.sort(comparepreemptability)
+
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+
+ # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+ for r in reservationsAtStart:
+ # The following will really only come into play when we have
+ # multiple VMs per node
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ # Don't need to preempt if we've already preempted all
+ # the needed resources in node n
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atstart.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+ if len(reservationsAtMiddle)>0:
+ changepoints = set()
+ for r in reservationsAtMiddle:
+ changepoints.add(r.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+
+ for cp in changepoints:
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+ reservations = [r for r in reservationsAtMiddle
+ if r.start <= cp and cp < r.end]
+ for r in reservations:
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atmiddle.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+ self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+
+ leases = [r.lease for r in atstart|atmiddle]
+
+ return leases
+
+
def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
# TODO2: Choose appropriate prioritizing function based on a
# config file, instead of hardcoding it)
More information about the Haizea-commit
mailing list