[haizea-commit] r539 - trunk/src/haizea/resourcemanager
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Nov 6 17:54:18 CST 2008
Author: borja
Date: 2008-11-06 17:54:13 -0600 (Thu, 06 Nov 2008)
New Revision: 539
Modified:
trunk/src/haizea/resourcemanager/configfile.py
trunk/src/haizea/resourcemanager/datastruct.py
trunk/src/haizea/resourcemanager/scheduler.py
Log:
Added ShutdownResource Reservation to explicitly schedule shutdowns.
Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py 2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/configfile.py 2008-11-06 23:54:13 UTC (rev 539)
@@ -286,6 +286,42 @@
The default should be good for most configurations, but
may need to be increased if you're dealing with exceptionally
high loads.
+ """),
+
+ Option(name = "shutdown-time",
+ getter = "shutdown-time",
+ type = OPTTYPE_TIMEDELTA,
+ required = False,
+ default = TimeDelta(seconds=0),
+ doc = """
+ The amount of time that will be allocated for a VM to shutdown.
+ When running in OpenNebula mode, it is advisable to set this to
+ a few seconds, so no operation gets scheduled right when a
+ VM is shutting down. The most common scenario is that a VM
+ will start resuming right when another VM shuts down. However,
+ since both these activities involve I/O, it can delay the resume
+ operation and affect Haizea's estimation of how long the resume
+ will take.
+ """),
+
+ Option(name = "enactment-overhead",
+ getter = "enactment-overhead",
+ type = OPTTYPE_TIMEDELTA,
+ required = False,
+ default = TimeDelta(seconds=0),
+ doc = """
+ The amount of time that is required to send
+ an enactment command. This value will affect suspend/resume
+ estimations and, in OpenNebula mode, will force a pause
+ of this much time between suspend/resume enactment
+ commands. When suspending/resuming many VMs at the same time
+ (which is likely to happen if suspendresume-exclusion is set
+ to "local"), it will take OpenNebula 1-2 seconds to process
+ each command (this is a small amount of time, but if 32 VMs
+ are being suspended at the same time, on in each physical node,
+ this time can compound up to 32-64 seconds, which has to be
+ taken into account when estimating when to start a suspend
+ operation that must be completed before another lease starts).
""")
]
Modified: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py 2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/datastruct.py 2008-11-06 23:54:13 UTC (rev 539)
@@ -211,7 +211,8 @@
realdur = None):
start = Timestamp(start)
duration = Duration(duration)
- duration.known = realdur # ONLY for simulation
+ if realdur != duration.requested:
+ duration.known = realdur # ONLY for simulation
Lease.__init__(self, submit_time, start, duration, diskimage_id,
diskimage_size, numnodes, resreq, preemptible)
@@ -236,7 +237,8 @@
realdur = None):
start = Timestamp(None) # i.e., start on a best-effort basis
duration = Duration(duration)
- duration.known = realdur # ONLY for simulation
+ if realdur != duration.requested:
+ duration.known = realdur # ONLY for simulation
# When the images will be available
self.imagesavail = None
Lease.__init__(self, submit_time, start, duration, diskimage_id,
@@ -368,6 +370,9 @@
def is_suspending(self):
return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
+ def is_shutting_down(self):
+ return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
+
def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
for resmrr in self.pre_rrs:
resmrr.print_contents(loglevel)
@@ -444,7 +449,25 @@
rr = ResourceReservation.xmlrpc_marshall(self)
rr["type"] = "RESM"
return rr
+
+class ShutdownResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vnodes, vmrr):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.vnodes = vnodes
+ def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : SHUTDOWN")
+ ResourceReservation.print_contents(self, loglevel)
+
+ def is_preemptible(self):
+ return True
+
+ def xmlrpc_marshall(self):
+ rr = ResourceReservation.xmlrpc_marshall(self)
+ rr["type"] = "SHTD"
+ return rr
+
class MigrationResourceReservation(ResourceReservation):
def __init__(self, lease, start, end, res, vmrr):
ResourceReservation.__init__(self, lease, start, end, res)
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-11-06 16:49:05 UTC (rev 538)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-11-06 23:54:13 UTC (rev 539)
@@ -119,6 +119,10 @@
on_start = Scheduler._handle_start_vm,
on_end = Scheduler._handle_end_vm)
+ self.register_handler(type = ds.ShutdownResourceReservation,
+ on_start = Scheduler._handle_start_shutdown,
+ on_end = Scheduler._handle_end_shutdown)
+
self.register_handler(type = ds.SuspensionResourceReservation,
on_start = Scheduler._handle_start_suspend,
on_end = Scheduler._handle_end_suspend)
@@ -299,6 +303,9 @@
self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
else:
self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
+ lease_req.state = Lease.STATE_REJECTED
+ self.completedleases.add(lease_req)
+ self.leases.remove(lease_req)
def __process_queue(self, nexttime):
@@ -347,21 +354,15 @@
def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
- start = lease_req.start.requested
- end = lease_req.start.requested + lease_req.duration.requested
try:
- (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+ (vmrr, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
if len(preemptions) > 0:
- leases = self.__find_preemptable_leases(preemptions, start, end)
+ leases = self.__find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
for lease in leases:
- self.__preempt(lease, preemption_time=start)
+ self.__preempt(lease, preemption_time=vmrr.start)
- # Create VM resource reservations
- vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, False)
- vmrr.state = ResourceReservation.STATE_SCHEDULED
-
# Schedule deployment overhead
self.deployment_scheduler.schedule(lease_req, vmrr, nexttime)
@@ -370,6 +371,10 @@
# scheduling could still throw an exception)
lease_req.append_vmrr(vmrr)
self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
except Exception, msg:
raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
@@ -402,7 +407,7 @@
# TODO: migrations
- # Post-VM RRs (if any)
+ # Pre-VM RRs (if any)
for rr in vmrr.pre_rrs:
self.slottable.addReservation(rr)
@@ -426,12 +431,16 @@
def __schedule_immediate_lease(self, req, nexttime):
try:
- (resmrr, vmrr, susprr, reservation) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+ (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
# Schedule deployment
self.deployment_scheduler.schedule(req, vmrr, nexttime)
req.append_rr(vmrr)
self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
req.print_contents()
except SlotFittingException, msg:
@@ -440,7 +449,7 @@
def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
lease_id = leasereq.id
start = leasereq.start.requested
- end = leasereq.start.requested + leasereq.duration.requested
+ end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
diskImageID = leasereq.diskimage_id
numnodes = leasereq.numnodes
resreq = leasereq.requested_resources
@@ -560,10 +569,16 @@
break
if vnode <= numnodes:
- raise SchedException, "Availability window indicated that request but feasible, but could not fit it"
+ raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
- return nodeassignment, res, preemptions
+ # Create VM resource reservations
+ vmrr = ds.VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
+ vmrr.state = ResourceReservation.STATE_SCHEDULED
+ self.__schedule_shutdown(vmrr)
+
+ return vmrr, preemptions
+
def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
lease_id = lease.id
remaining_duration = lease.duration.get_remaining_duration()
@@ -571,6 +586,7 @@
requested_resources = lease.requested_resources
preemptible = lease.preemptible
mustresume = (lease.state == Lease.STATE_SUSPENDED)
+ shutdown_time = self.__estimate_shutdown_time(lease)
susptype = get_config().get("suspension")
if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
suspendable = False
@@ -657,6 +673,7 @@
else:
duration = remaining_duration
+ duration += shutdown_time
# First, assuming we can't make reservations in the future
start, end, canfit = self.__find_fit_at_points(
@@ -745,10 +762,13 @@
mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
if mustsuspend:
self.__schedule_suspension(vmrr, end)
+ else:
+ # Compensate for any overestimation
+ if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
+ vmrr.end = vmrr.start + remaining_duration + shutdown_time
+ self.__schedule_shutdown(vmrr)
- # Compensate for any overestimation
- if (vmrr.end - vmrr.start) > remaining_duration:
- vmrr.end = vmrr.start + remaining_duration
+
susp_str = res_str = ""
if mustresume:
@@ -849,39 +869,25 @@
return times
+ def __schedule_shutdown(self, vmrr):
+ config = get_config()
+ shutdown_time = config.get("shutdown-time")
- def __schedule_resumption(self, vmrr, resume_at):
- from haizea.resourcemanager.rm import ResourceManager
- config = ResourceManager.get_singleton().config
- resm_exclusion = config.get("suspendresume-exclusion")
- rate = self.resourcepool.info.get_suspendresume_rate()
+ start = vmrr.end - shutdown_time
+ end = vmrr.end
+
+ shutdown_rr = ds.ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
+ shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
+
+ vmrr.update_end(start)
+
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
- if resume_at < vmrr.start or resume_at > vmrr.end:
- raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+ vmrr.post_rrs.append(shutdown_rr)
- times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate)
- resume_rrs = []
- for (start, end, pnode, vnodes) in times:
- r = ds.ResourceTuple.create_empty()
- mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
- r.set_by_type(constants.RES_MEM, mem)
- r.set_by_type(constants.RES_DISK, mem)
- resmres = {pnode: r}
- resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, vnodes, vmrr)
- resmrr.state = ResourceReservation.STATE_SCHEDULED
- resume_rrs.append(resmrr)
-
- resume_rrs.sort(key=attrgetter("start"))
-
- resm_end = resume_rrs[-1].end
- if resm_end > vmrr.end:
- raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
-
- vmrr.update_start(resm_end)
- for resmrr in resume_rrs:
- vmrr.pre_rrs.append(resmrr)
-
-
def __schedule_suspension(self, vmrr, suspend_by):
from haizea.resourcemanager.rm import ResourceManager
config = ResourceManager.get_singleton().config
@@ -911,24 +917,55 @@
vmrr.update_end(susp_start)
- # If we're already suspending, remove previous susprrs
- if vmrr.is_suspending():
- for susprr in vmrr.post_rrs:
- self.slottable.removeReservation(susprr)
- vmrr.post_rrs = []
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
for susprr in suspend_rrs:
- vmrr.post_rrs.append(susprr)
+ vmrr.post_rrs.append(susprr)
+
+ def __schedule_resumption(self, vmrr, resume_at):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ resm_exclusion = config.get("suspendresume-exclusion")
+ rate = self.resourcepool.info.get_suspendresume_rate()
+ if resume_at < vmrr.start or resume_at > vmrr.end:
+ raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+
+ times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate)
+ resume_rrs = []
+ for (start, end, pnode, vnodes) in times:
+ r = ds.ResourceTuple.create_empty()
+ mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+ r.set_by_type(constants.RES_MEM, mem)
+ r.set_by_type(constants.RES_DISK, mem)
+ resmres = {pnode: r}
+ resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, vnodes, vmrr)
+ resmrr.state = ResourceReservation.STATE_SCHEDULED
+ resume_rrs.append(resmrr)
+
+ resume_rrs.sort(key=attrgetter("start"))
+
+ resm_end = resume_rrs[-1].end
+ if resm_end > vmrr.end:
+ raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+
+ vmrr.update_start(resm_end)
+ for resmrr in resume_rrs:
+ vmrr.pre_rrs.append(resmrr)
+
+
+
+
def __compute_suspend_resume_time(self, mem, rate):
time = float(mem) / rate
time = round_datetime_delta(TimeDelta(seconds = time))
return time
def __estimate_suspend_resume_time(self, lease):
- from haizea.resourcemanager.rm import ResourceManager
- config = ResourceManager.get_singleton().config
- susp_exclusion = config.get("suspendresume-exclusion")
+ susp_exclusion = get_config().get("suspendresume-exclusion")
rate = self.resourcepool.info.get_suspendresume_rate()
mem = lease.requested_resources.get_by_type(constants.RES_MEM)
if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
@@ -937,6 +974,10 @@
# Overestimating
return lease.numnodes * self.__compute_suspend_resume_time(mem, rate)
+ def __estimate_shutdown_time(self, lease):
+ # Always uses fixed value in configuration file
+ return get_config().get("shutdown-time")
+
def __estimate_suspend_time(self, lease):
return self.__estimate_suspend_resume_time(lease)
@@ -1201,9 +1242,9 @@
self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
if vmrr.backfill_reservation == True:
self.numbesteffortres -= 1
- if vmrr.is_suspending():
- for susprr in vmrr.post_rrs:
- self.slottable.removeReservation(susprr)
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
lease.remove_vmrr(vmrr)
self.slottable.removeReservation(vmrr)
for vnode, pnode in lease.diskimagemap.items():
@@ -1291,6 +1332,15 @@
vmrr.post_rrs = []
else:
vmrr.update_end(vmrr.end - diff)
+
+ if not vmrr.is_suspending():
+ # If the VM was set to shutdown, we need to slideback the shutdown RRs
+ for rr in vmrr.post_rrs:
+ rr_old_start = rr.start
+ rr_old_end = rr.end
+ rr.start -= diff
+ rr.end -= diff
+ self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
self.logger.vdebug("New lease descriptor (after slideback):")
@@ -1332,10 +1382,8 @@
self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
- # TODO: Replace enact with a saner way of handling leases that have failed or
- # ended prematurely.
- # Possibly factor out the "clean up" code to a separate function
- def _handle_end_vm(self, l, rr, enact=True):
+
+ def _handle_end_vm(self, l, rr):
self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
self.logger.vdebug("LEASE-%i Before:" % l.id)
l.print_contents()
@@ -1343,19 +1391,11 @@
diff = now_time - rr.start
l.duration.accumulate_duration(diff)
rr.state = ResourceReservation.STATE_DONE
- if not rr.is_suspending():
- self.resourcepool.stop_vms(l, rr)
- l.state = Lease.STATE_DONE
- l.duration.actual = l.duration.accumulated
- l.end = now_time
- self.completedleases.add(l)
- self.leases.remove(l)
- if isinstance(l, ds.BestEffortLease):
- get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
if isinstance(l, ds.BestEffortLease):
if rr.backfill_reservation == True:
self.numbesteffortres -= 1
+
self.logger.vdebug("LEASE-%i After:" % l.id)
l.print_contents()
self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
@@ -1364,17 +1404,46 @@
def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
self._handle_end_rr(l, vmrr)
- if vmrr.is_suspending():
- for susprr in vmrr.post_rrs:
- self.slottable.removeReservation(susprr)
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
+ # TODO: slideback shutdown RRs
vmrr.end = get_clock().get_time()
- self._handle_end_vm(l, vmrr, enact=enact)
+ self._handle_end_vm(l, vmrr)
+ self._handle_end_lease(l)
nexttime = get_clock().get_next_schedulable_time()
if self.is_backfilling():
# We need to reevaluate the schedule to see if there are any future
# reservations that we can slide back.
self.__reevaluate_schedule(l, vmrr.nodes.values(), nexttime, [])
+ def _handle_start_shutdown(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_ACTIVE
+ self.resourcepool.stop_vms(l, rr)
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
+
+ def _handle_end_shutdown(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_DONE
+ self._handle_end_lease(l)
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
+ self.logger.info("Lease %i shutdown." % (l.id))
+
+ def _handle_end_lease(self, l):
+ l.state = Lease.STATE_DONE
+ l.duration.actual = l.duration.accumulated
+ l.end = round_datetime(get_clock().get_time())
+ self.completedleases.add(l)
+ self.leases.remove(l)
+ if isinstance(l, ds.BestEffortLease):
+ get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+
+
def _handle_start_suspend(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
l.print_contents()
More information about the Haizea-commit
mailing list