[haizea-commit] r560 - in trunk/src/haizea: common resourcemanager resourcemanager/scheduler resourcemanager/scheduler/preparation_schedulers traces
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Mon Feb 2 19:54:52 CST 2009
Author: borja
Date: 2009-02-02 19:54:47 -0600 (Mon, 02 Feb 2009)
New Revision: 560
Modified:
trunk/src/haizea/common/utils.py
trunk/src/haizea/resourcemanager/leases.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler/__init__.py
trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
trunk/src/haizea/traces/readers.py
Log:
Keep track of lease state using a state machine. Added more sanity checks that throw exceptions when lease is in an inconsistent state.
Modified: trunk/src/haizea/common/utils.py
===================================================================
--- trunk/src/haizea/common/utils.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/common/utils.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -127,5 +127,28 @@
from haizea.resourcemanager.rm import ResourceManager
return ResourceManager.get_singleton().clock
+class InvalidStateMachineTransition(Exception):
+ pass
+
class StateMachine(object):
- pass
\ No newline at end of file
+ def __init__(self, initial_state, transitions, state_str = None):
+ self.state = initial_state
+ self.transitions = transitions
+ self.state_str = state_str
+
+ def change_state(self, new_state):
+ valid_next_states = [x[0] for x in self.transitions[self.state]]
+ if new_state in valid_next_states:
+ self.state = new_state
+ else:
+ raise InvalidStateMachineTransition, "Invalid transition. State is %s, wanted to change to %s, can only change to %s" % (self.get_state_str(self.state), self.get_state_str(new_state), [self.get_state_str(x) for x in valid_next_states])
+
+ def get_state(self):
+ return self.state
+
+ def get_state_str(self, state):
+ if self.state_str == None:
+ return "%s" % state
+ else:
+ return self.state_str[state]
+
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/leases.py
===================================================================
--- trunk/src/haizea/resourcemanager/leases.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/leases.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -58,12 +58,14 @@
STATE_READY = 7
STATE_ACTIVE = 8
STATE_SUSPENDING = 9
- STATE_SUSPENDED = 10
- STATE_MIGRATING = 11
- STATE_RESUMING = 12
- STATE_RESUMED_READY = 13
- STATE_DONE = 14
- STATE_FAIL = 15
+ STATE_SUSPENDED_PENDING = 10
+ STATE_SUSPENDED_QUEUED = 11
+ STATE_SUSPENDED_SCHEDULED = 12
+ STATE_MIGRATING = 13
+ STATE_RESUMING = 14
+ STATE_RESUMED_READY = 15
+ STATE_DONE = 16
+ STATE_FAIL = 17
state_str = {STATE_NEW : "New",
STATE_PENDING : "Pending",
@@ -75,7 +77,9 @@
STATE_READY : "Ready",
STATE_ACTIVE : "Active",
STATE_SUSPENDING : "Suspending",
- STATE_SUSPENDED : "Suspended",
+ STATE_SUSPENDED_PENDING : "Suspended-Pending",
+ STATE_SUSPENDED_QUEUED : "Suspended-Queued",
+ STATE_SUSPENDED_SCHEDULED : "Suspended-Scheduled",
STATE_MIGRATING : "Migrating",
STATE_RESUMING : "Resuming",
STATE_RESUMED_READY: "Resumed-Ready",
@@ -102,7 +106,7 @@
# Bookkeeping attributes
# (keep track of the lease's state, resource reservations, etc.)
- self.state = Lease.STATE_NEW
+ self.state = LeaseStateMachine()
self.diskimagemap = {}
self.memimagemap = {}
self.deployment_rrs = []
@@ -114,11 +118,17 @@
self.logger = logging.getLogger("LEASES")
+ def get_state(self):
+ return self.state.get_state()
+
+ def set_state(self, state):
+ self.state.change_state(state)
+
def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
self.logger.log(loglevel, "Lease ID : %i" % self.id)
self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
self.logger.log(loglevel, "Duration : %s" % self.duration)
- self.logger.log(loglevel, "State : %s" % Lease.state_str[self.state])
+ self.logger.log(loglevel, "State : %s" % Lease.state_str[self.get_state()])
self.logger.log(loglevel, "Disk image : %s" % self.diskimage_id)
self.logger.log(loglevel, "Disk image size: %s" % self.diskimage_size)
self.logger.log(loglevel, "Num nodes : %s" % self.numnodes)
@@ -188,54 +198,71 @@
l["numnodes"] = self.numnodes
l["resources"] = `self.requested_resources`
l["preemptible"] = self.preemptible
- l["state"] = self.state
+ l["state"] = self.get_state()
l["vm_rrs"] = [vmrr.xmlrpc_marshall() for vmrr in self.vm_rrs]
return l
class LeaseStateMachine(StateMachine):
initial_state = Lease.STATE_NEW
- transitions = {Lease.STATE_NEW: [(Lease.STATE_PENDING, "")],
- Lease.STATE_PENDING: [(Lease.STATE_SCHEDULED, ""),
- (Lease.STATE_QUEUED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_REJECTED, "")],
- Lease.STATE_SCHEDULED: [(Lease.STATE_PREPARING, ""),
- (Lease.STATE_READY, ""),
- (Lease.STATE_MIGRATING, ""),
- (Lease.STATE_RESUMING, ""),
- (Lease.STATE_CANCELLED, "")],
- Lease.STATE_QUEUED: [(Lease.STATE_SCHEDULED, ""),
- (Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, "")],
- Lease.STATE_PREPARING: [(Lease.STATE_READY, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_READY: [(Lease.STATE_ACTIVE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_ACTIVE: [(Lease.STATE_SUSPENDING, ""),
- (Lease.STATE_DONE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_SUSPENDING: [(Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_SUSPENDED: [(Lease.STATE_QUEUED, ""),
- (Lease.STATE_MIGRATING, ""),
- (Lease.STATE_RESUMING, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_MIGRATING: [(Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_RESUMING: [(Lease.STATE_RESUMED_READY, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
+ transitions = {Lease.STATE_NEW: [(Lease.STATE_PENDING, "")],
+ Lease.STATE_PENDING: [(Lease.STATE_SCHEDULED, ""),
+ (Lease.STATE_QUEUED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_REJECTED, "")],
+
+ Lease.STATE_SCHEDULED: [(Lease.STATE_PREPARING, ""),
+ (Lease.STATE_READY, ""),
+ (Lease.STATE_CANCELLED, "")],
+
+ Lease.STATE_QUEUED: [(Lease.STATE_SCHEDULED, ""),
+ (Lease.STATE_CANCELLED, "")],
+
+ Lease.STATE_PREPARING: [(Lease.STATE_READY, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_READY: [(Lease.STATE_ACTIVE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_ACTIVE: [(Lease.STATE_SUSPENDING, ""),
+ (Lease.STATE_DONE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_SUSPENDING: [(Lease.STATE_SUSPENDED_PENDING, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_SUSPENDED_PENDING: [(Lease.STATE_SUSPENDED_QUEUED, ""),
+ (Lease.STATE_SUSPENDED_SCHEDULED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_SUSPENDED_QUEUED: [(Lease.STATE_SUSPENDED_QUEUED, ""),
+ (Lease.STATE_SUSPENDED_SCHEDULED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_SUSPENDED_SCHEDULED: [(Lease.STATE_MIGRATING, ""),
+ (Lease.STATE_RESUMING, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_MIGRATING: [(Lease.STATE_SUSPENDED_SCHEDULED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_RESUMING: [(Lease.STATE_RESUMED_READY, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
# Final states
Lease.STATE_DONE: [],
Lease.STATE_CANCELLED: [],
@@ -244,7 +271,7 @@
}
def __init__(self):
- StateMachine.__init__(initial_state, transitions)
+ StateMachine.__init__(self, LeaseStateMachine.initial_state, LeaseStateMachine.transitions, Lease.state_str)
class ARLease(Lease):
def __init__(self, submit_time, start, duration, diskimage_id,
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/rm.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -143,7 +143,7 @@
# Preparation scheduler
if preparation_type == constants.PREPARATION_UNMANAGED:
preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
- elif deploy_type == constants.PREPARATION_TRANSFER:
+ elif preparation_type == constants.PREPARATION_TRANSFER:
preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)
# VM Scheduler
Modified: trunk/src/haizea/resourcemanager/scheduler/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/__init__.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/__init__.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -21,7 +21,7 @@
"""A simple exception class used for scheduling exceptions"""
pass
-class NotSchedulableException(Exception):
+class NotSchedulableException(SchedException):
"""A simple exception class used when a lease cannot be scheduled
This exception must be raised when a lease cannot be scheduled
@@ -30,7 +30,7 @@
"""
pass
-class CriticalSchedException(Exception):
+class CriticalSchedException(SchedException):
"""A simple exception class used for critical scheduling exceptions
This exception must be raised when a non-recoverable error happens
@@ -39,7 +39,7 @@
"""
pass
-class RescheduleLeaseException(Exception):
+class PreparationSchedException(SchedException):
pass
class CancelLeaseException(Exception):
@@ -48,7 +48,10 @@
class NormalEndLeaseException(Exception):
pass
+class RescheduleLeaseException(SchedException):
+ pass
+
class ReservationEventHandler(object):
"""A wrapper for reservation event handlers.
Modified: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -99,12 +99,12 @@
self.enqueue(lease)
# Process immediate requests
- for lease_req in im_leases:
- self.__process_im_request(lease_req, nexttime)
+ for lease in im_leases:
+ self.__process_im_request(lease, nexttime)
# Process AR requests
- for lease_req in ar_leases:
- self.__process_ar_request(lease_req, nexttime)
+ for lease in ar_leases:
+ self.__process_ar_request(lease, nexttime)
# Process best-effort requests
self.__process_queue(nexttime)
@@ -123,7 +123,11 @@
self.handlers[type(rr)].on_end(lease, rr)
except RescheduleLeaseException, msg:
if isinstance(rr.lease, BestEffortLease):
- self.__enqueue_in_order(lease)
+ if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
+ self.__enqueue_in_order(lease)
+ lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to reschedule best-effort lease when state is %s)" % lease_state
except NormalEndLeaseException, msg:
self._handle_end_lease(lease)
@@ -141,12 +145,12 @@
get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
- def enqueue(self, lease_req):
+ def enqueue(self, lease):
"""Queues a best-effort lease request"""
- get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
- lease_req.state = Lease.STATE_QUEUED
- self.queue.enqueue(lease_req)
- self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ lease.set_state(Lease.STATE_QUEUED)
+ self.queue.enqueue(lease)
+ self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
def request_lease(self, lease):
"""
@@ -155,7 +159,7 @@
lease will be determined (right now, AR+IM leases get scheduled
right away, and best-effort leases get placed on a queue)
"""
- lease.state = Lease.STATE_PENDING
+ lease.set_state(Lease.STATE_PENDING)
self.leases.add(lease)
def is_queue_empty(self):
@@ -180,22 +184,27 @@
# The lease is either running, or scheduled to run
lease = self.leases.get_lease(lease_id)
- if lease.state == Lease.STATE_ACTIVE:
+ lease_state = lease.get_state()
+
+ if lease_state == Lease.STATE_ACTIVE:
self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
rr = lease.get_active_reservations(time)[0]
if isinstance(rr, VMResourceReservation):
self._handle_unscheduled_end_vm(lease, rr, enact=True)
# TODO: Handle cancelations in middle of suspensions and
# resumptions
- elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
+ elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
rrs = lease.get_scheduled_reservations()
for r in rrs:
lease.remove_rr(r)
self.slottable.removeReservation(r)
- lease.state = Lease.STATE_CANCELLED
+ lease.set_state(Lease.STATE_CANCELLED)
self.completedleases.add(lease)
self.leases.remove(lease)
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to cancel lease when state is %s)" % lease_state
+
elif self.queue.has_lease(lease_id):
# The lease is in the queue, waiting to be scheduled.
# Cancelling is as simple as removing it from the queue
@@ -235,41 +244,41 @@
- def __process_ar_request(self, lease_req, nexttime):
- self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested))
- self.logger.debug(" Start : %s" % lease_req.start)
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+ def __process_ar_request(self, lease, nexttime):
+ self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
+ self.logger.debug(" Start : %s" % lease.start)
+ self.logger.debug(" Duration: %s" % lease.duration)
+ self.logger.debug(" ResReq : %s" % lease.requested_resources)
accepted = False
try:
- self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ self.__schedule_ar_lease(lease, avoidpreempt=True, nexttime=nexttime)
+ self.leases.add(lease)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
accepted = True
except SchedException, msg:
# Our first try avoided preemption, try again
# without avoiding preemption.
# TODO: Roll this into the exact slot fitting algorithm
try:
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
- self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
- self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+ self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease.id)
+ self.__schedule_ar_lease(lease, nexttime, avoidpreempt=False)
+ self.leases.add(lease)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
accepted = True
except SchedException, msg:
raise
- get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+ get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
if accepted:
- self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
+ self.logger.info("AR lease request #%i has been accepted." % lease.id)
else:
- self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
- lease_req.state = Lease.STATE_REJECTED
- self.completedleases.add(lease_req)
- self.leases.remove(lease_req)
+ self.logger.info("AR lease request #%i has been rejected." % lease.id)
+ lease.set_state(Lease.STATE_REJECTED)
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
def __process_queue(self, nexttime):
@@ -280,19 +289,19 @@
self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
done = True
else:
- lease_req = self.queue.dequeue()
+ lease = self.queue.dequeue()
try:
- self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id)
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
- self.__schedule_besteffort_lease(lease_req, nexttime)
- self.leases.add(lease_req)
- get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
+ self.logger.debug(" Duration: %s" % lease.duration)
+ self.logger.debug(" ResReq : %s" % lease.requested_resources)
+ self.__schedule_besteffort_lease(lease, nexttime)
+ self.leases.add(lease)
+ get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease.id)
except SchedException, msg:
# Put back on queue
- newqueue.enqueue(lease_req)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
- self.logger.info("Lease %i could not be scheduled at this time." % lease_req.id)
+ newqueue.enqueue(lease)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
+ self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
if not self.is_backfilling():
done = True
@@ -302,45 +311,52 @@
self.queue = newqueue
- def __process_im_request(self, lease_req, nexttime):
- self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+ def __process_im_request(self, lease, nexttime):
+ self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease.id, lease.numnodes))
+ self.logger.debug(" Duration: %s" % lease.duration)
+ self.logger.debug(" ResReq : %s" % lease.requested_resources)
try:
- self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
- self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
+ self.__schedule_immediate_lease(lease, nexttime=nexttime)
+ self.leases.add(lease)
+ get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
+ self.logger.info("Immediate lease request #%i has been accepted." % lease.id)
except SchedException, msg:
- get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+ get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease.id, msg))
- def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+ def __schedule_ar_lease(self, lease, nexttime, avoidpreempt=True):
try:
- (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+ (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
if len(preemptions) > 0:
leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
- self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
- for lease in leases:
- self.__preempt(lease, preemption_time=vmrr.start)
+ self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease.id))
+ for l in leases:
+ self.__preempt(l, preemption_time=vmrr.start)
# Schedule deployment overhead
- self.preparation_scheduler.schedule(lease_req, vmrr, nexttime)
+ self.preparation_scheduler.schedule(lease, vmrr, nexttime)
# Commit reservation to slot table
# (we don't do this until the very end because the deployment overhead
# scheduling could still throw an exception)
- lease_req.append_vmrr(vmrr)
+ lease.append_vmrr(vmrr)
self.slottable.addReservation(vmrr)
# Post-VM RRs (if any)
for rr in vmrr.post_rrs:
self.slottable.addReservation(rr)
+
+ lease.set_state(Lease.STATE_SCHEDULED)
+
+ if self.preparation_scheduler.is_ready(lease):
+ lease.set_state(Lease.STATE_READY)
+ except SchedException, msg:
+ raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
except Exception, msg:
- raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
+ raise
def __schedule_besteffort_lease(self, lease, nexttime):
@@ -348,23 +364,27 @@
# Schedule the VMs
canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
+ lease_state = lease.get_state()
+
# Determine earliest start time in each node
- if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
+ if lease_state == Lease.STATE_QUEUED:
# Figure out earliest start times based on
# image schedule and reusable images
earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
- elif lease.state == Lease.STATE_SUSPENDED:
+ elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
# No need to transfer images from repository
# (only intra-node transfer)
earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
-
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to schedule best-effort lease when state is %s)" % lease_state
+
(vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
# Schedule deployment
- if lease.state != Lease.STATE_SUSPENDED:
+ if lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
+ else:
self.preparation_scheduler.schedule(lease, vmrr, nexttime)
- else:
- self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
# At this point, the lease is feasible.
# Commit changes by adding RRs to lease and to slot table
@@ -391,6 +411,14 @@
if in_future:
self.numbesteffortres += 1
+ if lease_state == Lease.STATE_QUEUED:
+ lease.set_state(Lease.STATE_SCHEDULED)
+ if self.preparation_scheduler.is_ready(lease):
+ lease.set_state(Lease.STATE_READY)
+ elif lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
+
+
lease.print_contents()
except SchedException, msg:
@@ -455,7 +483,7 @@
self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
self.preparation_scheduler.cancel_deployment(lease)
lease.diskimagemap = {}
- lease.state = Lease.STATE_QUEUED
+ lease.set_state(Lease.STATE_QUEUED)
self.__enqueue_in_order(lease)
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
else:
@@ -488,11 +516,12 @@
get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.queue.enqueue_in_order(lease)
+
def _handle_end_rr(self, l, rr):
self.slottable.removeReservation(rr)
def _handle_end_lease(self, l):
- l.state = Lease.STATE_DONE
+ l.set_state(Lease.STATE_DONE)
l.duration.actual = l.duration.accumulated
l.end = round_datetime(get_clock().get_time())
self.completedleases.add(l)
@@ -561,5 +590,5 @@
return [e for e in self.entries.values() if isinstance(e, type)]
def get_leases_by_state(self, state):
- return [e for e in self.entries.values() if e.state == state]
+ return [e for e in self.entries.values() if e.get_state() == state]
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -17,6 +17,7 @@
# -------------------------------------------------------------------------- #
import logging
+from haizea.common.utils import abstract
class PreparationScheduler(object):
def __init__(self, slottable, resourcepool, deployment_enact):
@@ -25,6 +26,8 @@
self.deployment_enact = deployment_enact
self.logger = logging.getLogger("DEPLOY")
+ def is_ready(self, lease):
+ abstract()
class PreparationSchedException(Exception):
pass
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -22,12 +22,15 @@
from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
from haizea.resourcemanager.scheduler import ReservationEventHandler
from haizea.common.utils import estimate_transfer_time, get_config
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
+from haizea.resourcemanager.scheduler import ReservationEventHandler, PreparationSchedException
+
import copy
class ImageTransferPreparationScheduler(PreparationScheduler):
def __init__(self, slottable, resourcepool, deployment_enact):
- DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
+ PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
# TODO: The following two should be merged into
# something like this:
@@ -51,8 +54,8 @@
self.handlers ={}
self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
sched = self,
- on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
- on_end = ImageTransferDeploymentScheduler.handle_end_filetransfer)
+ on_start = ImageTransferPreparationScheduler.handle_start_filetransfer,
+ on_end = ImageTransferPreparationScheduler.handle_end_filetransfer)
def schedule(self, lease, vmrr, nexttime):
if isinstance(lease, ARLease):
@@ -64,14 +67,15 @@
if isinstance(lease, BestEffortLease):
self.__remove_from_fifo_transfers(lease.id)
+ def is_ready(self, lease):
+ return False
+
def schedule_for_ar(self, lease, vmrr, nexttime):
config = get_config()
mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
- lease.state = Lease.STATE_SCHEDULED
-
if avoidredundant:
pass # TODO
@@ -96,7 +100,7 @@
musttransfer[vnode] = pnode
if len(musttransfer) == 0:
- lease.state = Lease.STATE_READY
+ lease.set_state(Lease.STATE_READY)
else:
if mechanism == constants.TRANSFER_UNICAST:
# Dictionary of transfer RRs. Key is the physical node where
@@ -116,7 +120,7 @@
try:
filetransfer = self.schedule_imagetransfer_edf(lease, musttransfer, nexttime)
lease.append_deployrr(filetransfer)
- except DeploymentSchedException, msg:
+ except PreparationSchedException, msg:
raise
# No chance of scheduling exception at this point. It's safe
@@ -131,7 +135,7 @@
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
earliest = self.find_earliest_starting_times(lease, nexttime)
- lease.state = Lease.STATE_SCHEDULED
+
transferRRs = []
musttransfer = {}
piggybacking = []
@@ -158,20 +162,20 @@
else:
# TODO: Not strictly correct. Should mark the lease
# as deployed when piggybacked transfers have concluded
- lease.state = Lease.STATE_READY
+ lease.set_state(Lease.STATE_READY)
if len(piggybacking) > 0:
endtimes = [t.end for t in piggybacking]
if len(musttransfer) > 0:
endtimes.append(endtransfer)
lease.imagesavail = max(endtimes)
if len(musttransfer)==0 and len(piggybacking)==0:
- lease.state = Lease.STATE_READY
+ lease.set_state(Lease.STATE_READY)
lease.imagesavail = nexttime
for rr in transferRRs:
lease.append_deployrr(rr)
- def find_earliest_starting_times(self, lease_req, nexttime):
+ def find_earliest_starting_times(self, lease, nexttime):
nodIDs = [n.nod_id for n in self.resourcepool.get_nodes()]
config = get_config()
mechanism = config.get("transfer-mechanism")
@@ -181,10 +185,10 @@
# Figure out starting time assuming we have to transfer the image
nextfifo = self.get_next_fifo_transfer_time(nexttime)
- imgTransferTime=self.estimate_image_transfer_time(lease_req, self.imagenode_bandwidth)
+ imgTransferTime=self.estimate_image_transfer_time(lease, self.imagenode_bandwidth)
# Find worst-case earliest start time
- if lease_req.numnodes == 1:
+ if lease.numnodes == 1:
startTime = nextfifo + imgTransferTime
earliest = dict([(node, [startTime, constants.REQTRANSFER_YES]) for node in nodIDs])
else:
@@ -200,7 +204,7 @@
# Check if we can reuse images
if reusealg==constants.REUSE_IMAGECACHES:
- nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease_req.diskimage_id)
+ nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.diskimage_id)
for node in nodeswithimg:
earliest[node] = [nexttime, constants.REQTRANSFER_COWPOOL]
@@ -214,7 +218,7 @@
# We can only piggyback on transfers that haven't started yet
transfers = [t for t in self.transfers_fifo if t.state == ResourceReservation.STATE_SCHEDULED]
for t in transfers:
- if t.file == lease_req.diskimage_id:
+ if t.file == lease.diskimage_id:
startTime = t.end
if startTime > nexttime:
for n in earliest:
@@ -240,9 +244,9 @@
newtransfers = transfermap.keys()
res = {}
- resimgnode = ds.ResourceTuple.create_empty()
+ resimgnode = ResourceTuple.create_empty()
resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
- resnode = ds.ResourceTuple.create_empty()
+ resnode = ResourceTuple.create_empty()
resnode.set_by_type(constants.RES_NETIN, bandwidth)
res[self.edf_node.nod_id] = resimgnode
for n in vnodes.values():
@@ -292,7 +296,7 @@
startTime = t.end
if not fits:
- raise DeploymentSchedException, "Adding this lease results in an unfeasible image transfer schedule."
+ raise PreparationSchedException, "Adding this lease results in an unfeasible image transfer schedule."
# Push image transfers as close as possible to their deadlines.
feasibleEndTime=newtransfers[-1].deadline
@@ -341,9 +345,9 @@
# Time to transfer is imagesize / bandwidth, regardless of
# number of nodes
res = {}
- resimgnode = ds.ResourceTuple.create_empty()
+ resimgnode = ResourceTuple.create_empty()
resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
- resnode = ds.ResourceTuple.create_empty()
+ resnode = ResourceTuple.create_empty()
resnode.set_by_type(constants.RES_NETIN, bandwidth)
res[self.fifo_node.nod_id] = resimgnode
for n in reqtransfers.values():
@@ -403,10 +407,14 @@
def handle_start_filetransfer(sched, lease, rr):
sched.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id)
lease.print_contents()
- if lease.state == Lease.STATE_SCHEDULED or lease.state == Lease.STATE_READY:
- lease.state = Lease.STATE_PREPARING
+ lease_state = lease.get_state()
+ if lease_state == Lease.STATE_SCHEDULED or lease_state == Lease.STATE_READY:
+ lease.set_state(Lease.STATE_PREPARING)
rr.state = ResourceReservation.STATE_ACTIVE
# TODO: Enactment
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % lease_state
+
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
sched.logger.info("Starting image transfer for lease %i" % (lease.id))
@@ -415,28 +423,32 @@
def handle_end_filetransfer(sched, lease, rr):
sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
lease.print_contents()
- if lease.state == Lease.STATE_PREPARING:
- lease.state = Lease.STATE_READY
+ lease_state = lease.get_state()
+ if lease_state == Lease.STATE_PREPARING:
+ lease.set_state(Lease.STATE_READY)
rr.state = ResourceReservation.STATE_DONE
for physnode in rr.transfers:
vnodes = rr.transfers[physnode]
- # Update VM Image maps
- for lease_id, v in vnodes:
- lease = sched.leases.get_lease(lease_id)
- lease.diskimagemap[v] = physnode
-
- # Find out timeout of image. It will be the latest end time of all the
- # leases being used by that image.
- leases = [l for (l, v) in vnodes]
- maxend=None
- for lease_id in leases:
- l = sched.leases.get_lease(lease_id)
- end = lease.get_endtime()
- if maxend==None or end>maxend:
- maxend=end
+# # Update VM Image maps
+# for lease_id, v in vnodes:
+# lease = sched.leases.get_lease(lease_id)
+# lease.diskimagemap[v] = physnode
+#
+# # Find out timeout of image. It will be the latest end time of all the
+# # leases being used by that image.
+# leases = [l for (l, v) in vnodes]
+# maxend=None
+# for lease_id in leases:
+# l = sched.leases.get_lease(lease_id)
+# end = lease.get_endtime()
+# if maxend==None or end>maxend:
+# maxend=end
+ maxend = None
# TODO: ENACTMENT: Verify the image was transferred correctly
- sched.deployment_scheduler.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
+ sched.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to start file transfer when state is %s)" % lease_state
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -27,11 +27,13 @@
# Add dummy disk images
def schedule(self, lease, vmrr, nexttime):
- lease.state = Lease.STATE_READY
for (vnode, pnode) in vmrr.nodes.items():
self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- def find_earliest_starting_times(self, lease_req, nexttime):
+ def is_ready(self, lease):
+ return True
+
+ def find_earliest_starting_times(self, lease, nexttime):
nod_ids = [n.nod_id for n in self.resourcepool.get_nodes()]
earliest = dict([(node, [nexttime, constants.REQTRANSFER_NO, None]) for node in nod_ids])
return earliest
Modified: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -22,7 +22,7 @@
from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
from haizea.resourcemanager.scheduler.slottable import ResourceReservation, ResourceTuple
from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException
+from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, CriticalSchedException
from operator import attrgetter, itemgetter
from mx.DateTime import TimeDelta
@@ -216,7 +216,7 @@
numnodes = lease.numnodes
requested_resources = lease.requested_resources
preemptible = lease.preemptible
- mustresume = (lease.state == Lease.STATE_SUSPENDED)
+ mustresume = (lease.get_state() == Lease.STATE_SUSPENDED_QUEUED)
shutdown_time = self.__estimate_shutdown_time(lease)
susptype = get_config().get("suspension")
if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
@@ -788,7 +788,7 @@
susp_overhead = self.__estimate_suspend_time(lease)
safe_duration = susp_overhead
- if lease.state == Lease.STATE_SUSPENDED:
+ if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
resm_overhead = self.__estimate_resume_time(lease)
safe_duration += resm_overhead
@@ -991,7 +991,7 @@
leases = []
vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
leases = set([rr.lease for rr in vmrrs])
- leases = [l for l in leases if isinstance(l, BestEffortLease) and l.state in (Lease.STATE_SUSPENDED,Lease.STATE_READY) and not l in checkedleases]
+ leases = [l for l in leases if isinstance(l, BestEffortLease) and l.get_state() in (Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY) and not l in checkedleases]
for lease in leases:
self.logger.debug("Found lease %i" % l.id)
l.print_contents()
@@ -1010,7 +1010,7 @@
old_start = vmrr.start
old_end = vmrr.end
nodes = vmrr.nodes.values()
- if lease.state == Lease.STATE_SUSPENDED:
+ if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
originalstart = vmrr.pre_rrs[0].start
else:
originalstart = vmrr.start
@@ -1031,7 +1031,7 @@
pass
else:
diff = originalstart - newstart
- if lease.state == Lease.STATE_SUSPENDED:
+ if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
for resmrr in resmrrs:
resmrr_old_start = resmrr.start
@@ -1076,8 +1076,9 @@
def _handle_start_vm(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
l.print_contents()
- if l.state == Lease.STATE_READY:
- l.state = Lease.STATE_ACTIVE
+ lease_state = l.get_state()
+ if lease_state == Lease.STATE_READY:
+ l.set_state(Lease.STATE_ACTIVE)
rr.state = ResourceReservation.STATE_ACTIVE
now_time = get_clock().get_time()
l.start.actual = now_time
@@ -1091,11 +1092,14 @@
except Exception, e:
self.logger.error("ERROR when starting VMs.")
raise
- elif l.state == Lease.STATE_RESUMED_READY:
- l.state = Lease.STATE_ACTIVE
+ elif lease_state == Lease.STATE_RESUMED_READY:
+ l.set_state(Lease.STATE_ACTIVE)
rr.state = ResourceReservation.STATE_ACTIVE
# No enactment to do here, since all the suspend/resume actions are
# handled during the suspend/resume RRs
+ else:
+ raise CriticalSchedException, "Lease is an inconsistent state (tried to start VM when state is %s)" % Lease.state_str[lease_state]
+
l.print_contents()
self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
@@ -1156,7 +1160,7 @@
pnode = rr.vmrr.nodes[vnode]
l.memimagemap[vnode] = pnode
if rr.is_first():
- l.state = Lease.STATE_SUSPENDING
+ l.set_state(Lease.STATE_SUSPENDING)
l.print_contents()
self.logger.info("Suspending lease %i..." % (l.id))
self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
@@ -1168,12 +1172,12 @@
self.resourcepool.verify_suspend(l, rr)
rr.state = ResourceReservation.STATE_DONE
if rr.is_last():
- l.state = Lease.STATE_SUSPENDED
+ l.set_state(Lease.STATE_SUSPENDED_PENDING)
l.print_contents()
self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
self.logger.info("Lease %i suspended." % (l.id))
- if l.state == Lease.STATE_SUSPENDED:
+ if l.get_state() == Lease.STATE_SUSPENDED_PENDING:
raise RescheduleLeaseException
def _handle_start_resume(self, l, rr):
@@ -1182,7 +1186,7 @@
self.resourcepool.resume_vms(l, rr)
rr.state = ResourceReservation.STATE_ACTIVE
if rr.is_first():
- l.state = Lease.STATE_RESUMING
+ l.set_state(Lease.STATE_RESUMING)
l.print_contents()
self.logger.info("Resuming lease %i..." % (l.id))
self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
@@ -1194,7 +1198,7 @@
self.resourcepool.verify_resume(l, rr)
rr.state = ResourceReservation.STATE_DONE
if rr.is_last():
- l.state = Lease.STATE_RESUMED_READY
+ l.set_state(Lease.STATE_RESUMED_READY)
self.logger.info("Resumed lease %i" % (l.id))
for vnode, pnode in rr.vmrr.nodes.items():
self.resourcepool.remove_ramfile(pnode, l.id, vnode)
Modified: trunk/src/haizea/traces/readers.py
===================================================================
--- trunk/src/haizea/traces/readers.py 2009-01-30 00:07:03 UTC (rev 559)
+++ trunk/src/haizea/traces/readers.py 2009-02-03 01:54:47 UTC (rev 560)
@@ -57,7 +57,6 @@
realdur = maxdur
preemptible = True
req = BestEffortLease(tSubmit, maxdur, vmimage, vmimagesize, numnodes, resreq, preemptible, realdur)
- req.state = Lease.STATE_NEW
requests.append(req)
return requests
@@ -100,6 +99,5 @@
else:
preemptible = False
req = ARLease(tSubmit, tStart, duration, vmimage, vmimagesize, numnodes, resreq, preemptible, realduration)
- req.state = Lease.STATE_NEW
requests.append(req)
return requests
More information about the Haizea-commit
mailing list