[haizea-commit] r491 - trunk/src/haizea/resourcemanager
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Sep 11 11:47:36 CDT 2008
Author: borja
Date: 2008-09-11 11:47:36 -0500 (Thu, 11 Sep 2008)
New Revision: 491
Modified:
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Log:
Sanitizing of scheduling code:
- Moved more scheduling code out of the slottable and into the Scheduler class
- Factored out resumption/suspension scheduling code, to facilitate supporting more complex suspend/resume scenarios.
- Adapted code to new lease data structure topology
- All leases (except those that have been completed) are now contained in a single "leases" table.
Note that, at this point, lease preemption is broken but should be fixed soon.
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-09-11 16:41:18 UTC (rev 490)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-09-11 16:47:36 UTC (rev 491)
@@ -32,13 +32,15 @@
import haizea.resourcemanager.datastruct as ds
import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, estimate_transfer_time
from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeployment
from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
-from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation
+from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation
from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
import logging
@@ -85,9 +87,8 @@
self.resourcepool = ResourcePool(self)
self.slottable = SlotTable(self)
self.queue = ds.Queue(self)
- self.scheduledleases = ds.LeaseTable(self)
+ self.leases = ds.LeaseTable(self)
self.completedleases = ds.LeaseTable(self)
- self.pending_leases = []
for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
self.slottable.add_node(n)
@@ -126,11 +127,16 @@
self.numbesteffortres = 0
- def schedule(self, nexttime):
- ar_leases = [req for req in self.pending_leases if isinstance(req, ARLease)]
- im_leases = [req for req in self.pending_leases if isinstance(req, ImmediateLease)]
- self.pending_leases = []
+ def schedule(self, nexttime):
+ pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
+ ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
+ im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
+ be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
+ # Queue best-effort requests
+ for lease in be_leases:
+ self.enqueue(lease)
+
# Process immediate requests
for lease_req in im_leases:
self.__process_im_request(lease_req, nexttime)
@@ -144,18 +150,14 @@
def process_reservations(self, nowtime):
- starting = [l for l in self.scheduledleases.entries.values() if l.has_starting_reservations(nowtime)]
- ending = [l for l in self.scheduledleases.entries.values() if l.has_ending_reservations(nowtime)]
- for l in ending:
- rrs = l.get_ending_reservations(nowtime)
- for rr in rrs:
- self._handle_end_rr(l, rr)
- self.handlers[type(rr)].on_end(self, l, rr)
+ starting = self.slottable.get_reservations_starting_at(nowtime)
+ ending = self.slottable.get_reservations_ending_at(nowtime)
+ for rr in ending:
+ self._handle_end_rr(rr.lease, rr)
+ self.handlers[type(rr)].on_end(self, rr.lease, rr)
- for l in starting:
- rrs = l.get_starting_reservations(nowtime)
- for rr in rrs:
- self.handlers[type(rr)].on_start(self, l, rr)
+ for rr in starting:
+ self.handlers[type(rr)].on_start(self, rr.lease, rr)
util = self.slottable.getUtilization(nowtime)
self.rm.accounting.append_stat(constants.COUNTER_CPUUTILIZATION, util)
@@ -170,18 +172,19 @@
def enqueue(self, lease_req):
"""Queues a best-effort lease request"""
self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ lease_req.state = Lease.STATE_QUEUED
self.queue.enqueue(lease_req)
self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
- def add_pending_lease(self, lease_req):
+ def request_lease(self, lease):
"""
- Adds a pending lease request, to be scheduled as soon as
- the scheduling function is called. Unlike best-effort leases,
- if one these leases can't be scheduled immediately, it is
- rejected (instead of being placed on a queue, in case resources
- become available later on).
+ Request a lease. At this point, it is simply marked as "Pending" and,
+ next time the scheduling function is called, the fate of the
+ lease will be determined (right now, AR+IM leases get scheduled
+ right away, and best-effort leases get placed on a queue)
"""
- self.pending_leases.append(lease_req)
+ lease.state = Lease.STATE_PENDING
+ self.leases.add(lease)
def is_queue_empty(self):
"""Return True is the queue is empty, False otherwise"""
@@ -190,7 +193,7 @@
def exists_scheduled_leases(self):
"""Return True if there are any leases scheduled in the future"""
- return not self.scheduledleases.is_empty()
+ return not self.slottable.is_empty()
def cancel_lease(self, lease_id):
"""Cancels a lease.
@@ -201,26 +204,26 @@
time = self.rm.clock.get_time()
self.logger.info("Cancelling lease %i..." % lease_id)
- if self.scheduledleases.has_lease(lease_id):
+ if self.leases.has_lease(lease_id):
# The lease is either running, or scheduled to run
- lease = self.scheduledleases.get_lease(lease_id)
+ lease = self.leases.get_lease(lease_id)
- if lease.state == constants.LEASE_STATE_ACTIVE:
+ if lease.state == Lease.STATE_ACTIVE:
self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
rr = lease.get_active_reservations(time)[0]
if isinstance(rr, VMResourceReservation):
self._handle_unscheduled_end_vm(lease, rr, enact=True)
# TODO: Handle cancelations in middle of suspensions and
# resumptions
- elif lease.state in [constants.LEASE_STATE_SCHEDULED, constants.LEASE_STATE_DEPLOYED]:
+ elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
rrs = lease.get_scheduled_reservations()
for r in rrs:
lease.remove_rr(r)
self.slottable.removeReservation(r)
- lease.state = constants.LEASE_STATE_DONE
+ lease.state = Lease.STATE_CANCELLED
self.completedleases.add(lease)
- self.scheduledleases.remove(lease)
+ self.leases.remove(lease)
elif self.queue.has_lease(lease_id):
# The lease is in the queue, waiting to be scheduled.
# Cancelling is as simple as removing it from the queue
@@ -248,7 +251,7 @@
def notify_event(self, lease_id, event):
time = self.rm.clock.get_time()
if event == constants.EVENT_END_VM:
- lease = self.scheduledleases.get_lease(lease_id)
+ lease = self.leases.get_lease(lease_id)
rr = lease.get_active_reservations(time)[0]
self._handle_unscheduled_end_vm(lease, rr, enact=False)
@@ -262,7 +265,7 @@
accepted = False
try:
self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
- self.scheduledleases.add(lease_req)
+ self.leases.add(lease_req)
self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
@@ -273,7 +276,7 @@
self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
- self.scheduledleases.add(lease_req)
+ self.leases.add(lease_req)
self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
@@ -285,34 +288,7 @@
else:
self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
-
- def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
- start = lease_req.start.requested
- end = lease_req.start.requested + lease_req.duration.requested
- try:
- (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-
- if len(preemptions) > 0:
- leases = self.__find_preemptable_leases(preemptions, start, end)
- self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
- for lease in leases:
- self.preempt(lease, time=start)
-
- # Create VM resource reservations
- vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, constants.ONCOMPLETE_ENDLEASE, False)
- vmrr.state = constants.RES_STATE_SCHEDULED
-
- # Schedule deployment overhead
- self.deployment.schedule(lease_req, vmrr, nexttime)
-
- # Commit reservation to slot table
- # (we don't do this until the very end because the deployment overhead
- # scheduling could still throw an exception)
- lease_req.append_rr(vmrr)
- self.slottable.addReservation(vmrr)
- except SlotFittingException, msg:
- raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
-
+
def __process_queue(self, nexttime):
done = False
newqueue = ds.Queue(self)
@@ -327,7 +303,7 @@
self.logger.debug(" Duration: %s" % lease_req.duration)
self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
self.__schedule_besteffort_lease(lease_req, nexttime)
- self.scheduledleases.add(lease_req)
+ self.leases.add(lease_req)
self.rm.accounting.decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
except SchedException, msg:
# Put back on queue
@@ -341,92 +317,104 @@
newqueue.enqueue(lease)
self.queue = newqueue
+
+
+ def __process_im_request(self, lease_req, nexttime):
+ self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
+ self.logger.debug(" Duration: %s" % lease_req.duration)
+ self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+ try:
+ self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+ self.leases.add(lease_req)
+ self.rm.accounting.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
+ self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
+ except SchedException, msg:
+ self.rm.accounting.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+
+
+ def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+ start = lease_req.start.requested
+ end = lease_req.start.requested + lease_req.duration.requested
+ try:
+ (nodeassignment, res, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+
+ if len(preemptions) > 0:
+ leases = self.__find_preemptable_leases(preemptions, start, end)
+ self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
+ for lease in leases:
+ self.__preempt(lease, preemption_time=start)
- def __schedule_besteffort_lease(self, req, nexttime):
- # Determine earliest start time in each node
- if req.state == constants.LEASE_STATE_PENDING:
- # Figure out earliest start times based on
- # image schedule and reusable images
- earliest = self.deployment.find_earliest_starting_times(req, nexttime)
- elif req.state == constants.LEASE_STATE_SUSPENDED:
- # No need to transfer images from repository
- # (only intra-node transfer)
- earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(req.numnodes)])
+ # Create VM resource reservations
+ vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, False)
+ vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+ # Schedule deployment overhead
+ self.deployment.schedule(lease_req, vmrr, nexttime)
- susptype = self.rm.config.get("suspension")
- if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and req.numnodes == 1):
- cansuspend = False
- else:
- cansuspend = True
+ # Commit reservation to slot table
+ # (we don't do this until the very end because the deployment overhead
+ # scheduling could still throw an exception)
+ lease_req.append_vmrr(vmrr)
+ self.slottable.addReservation(vmrr)
+ except SlotFittingException, msg:
+ raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
- canmigrate = self.rm.config.get("migration")
+
+ def __schedule_besteffort_lease(self, lease, nexttime):
try:
- mustresume = (req.state == constants.LEASE_STATE_SUSPENDED)
- canreserve = self.canReserveBestEffort()
- (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
+ # Schedule the VMs
+ canreserve = self.__can_reserve_besteffort_in_future()
+ (vmrr, in_future) = self.__fit_asap(lease, nexttime, allow_reservation_in_future = canreserve)
# Schedule deployment
- if req.state != constants.LEASE_STATE_SUSPENDED:
- self.deployment.schedule(req, vmrr, nexttime)
+ if lease.state != Lease.STATE_SUSPENDED:
+ self.deployment.schedule(lease, vmrr, nexttime)
+ else:
+ # TODO: schedule migrations
+ pass
+
+ # At this point, the lease is feasible.
+ # Commit changes by adding RRs to lease and to slot table
- # TODO: The following would be more correctly handled in the RR handle functions.
- # We need to have an explicit MigrationResourceReservation before doing that.
- if req.state == constants.LEASE_STATE_SUSPENDED:
- # Update VM image mappings, since we might be resuming
- # in different nodes.
- for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.remove_diskimage(pnode, req.id, vnode)
- req.vmimagemap = vmrr.nodes
- for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.add_diskimage(pnode, req.diskimage_id, req.diskimage_size, req.id, vnode)
+ # Add resource reservations to lease
+ # TODO: deployment
+ # TODO: migrations
+ lease.append_vmrr(vmrr)
+
+
+ # Add resource reservations to slottable
+
+ # TODO: deployment
+
+ # TODO: migrations
+
+ # Resumptions (if any)
+ for resmrr in vmrr.resm_rrs:
+ self.slottable.addReservation(resmrr)
- # Update RAM file mappings
- for vnode, pnode in req.memimagemap.items():
- self.resourcepool.remove_ramfile(pnode, req.id, vnode)
- for vnode, pnode in vmrr.nodes.items():
- self.resourcepool.add_ramfile(pnode, req.id, vnode, req.requested_resources.get_by_type(constants.RES_MEM))
- req.memimagemap[vnode] = pnode
-
- # Add resource reservations
- if resmrr != None:
- req.append_rr(resmrr)
- self.slottable.addReservation(resmrr)
- req.append_rr(vmrr)
+ # VM
self.slottable.addReservation(vmrr)
- if susprr != None:
- req.append_rr(susprr)
+
+ # Suspensions (if any)
+ for susprr in vmrr.susp_rrs:
self.slottable.addReservation(susprr)
- if reservation:
+ if in_future:
self.numbesteffortres += 1
- req.print_contents()
-
- except SlotFittingException, msg:
+ lease.print_contents()
+
+ except SchedException, msg:
raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
- def __process_im_request(self, lease_req, nexttime):
- self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
- try:
- self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
- self.scheduledleases.add(lease_req)
- self.rm.accounting.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
- self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
- except SchedException, msg:
- self.rm.accounting.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-
def __schedule_immediate_lease(self, req, nexttime):
- # Determine earliest start time in each node
- earliest = self.deployment.find_earliest_starting_times(req, nexttime)
try:
- (resmrr, vmrr, susprr, reservation) = self.__fit_besteffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
+ (resmrr, vmrr, susprr, reservation) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
# Schedule deployment
self.deployment.schedule(req, vmrr, nexttime)
@@ -564,130 +552,39 @@
return nodeassignment, res, preemptions
+ def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
+ lease_id = lease.id
+ remaining_duration = lease.duration.get_remaining_duration()
+ numnodes = lease.numnodes
+ requested_resources = lease.requested_resources
+ preemptible = lease.preemptible
+ mustresume = (lease.state == Lease.STATE_SUSPENDED)
+ susptype = self.rm.config.get("suspension")
+ if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
+ suspendable = False
+ else:
+ suspendable = True
- def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
- def comparepreemptability(rrX, rrY):
- if rrX.lease.submit_time > rrY.lease.submit_time:
- return constants.BETTER
- elif rrX.lease.submit_time < rrY.lease.submit_time:
- return constants.WORSE
- else:
- return constants.EQUAL
-
- def preemptedEnough(amountToPreempt):
- for node in amountToPreempt:
- if not amountToPreempt[node].is_zero_or_less():
- return False
- return True
-
- # Get allocations at the specified time
- atstart = set()
- atmiddle = set()
- nodes = set(mustpreempt.keys())
-
- reservationsAtStart = self.slottable.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtMiddle = self.slottable.getReservationsStartingBetween(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtStart.sort(comparepreemptability)
- reservationsAtMiddle.sort(comparepreemptability)
-
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+ # Determine earliest start time in each node
+ if lease.state == Lease.STATE_QUEUED:
+ # Figure out earliest start times based on
+ # image schedule and reusable images
+ earliest = self.deployment.find_earliest_starting_times(lease, nexttime)
+ elif lease.state == Lease.STATE_SUSPENDED:
+ # No need to transfer images from repository
+ # (only intra-node transfer)
+ earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
- # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
- for r in reservationsAtStart:
- # The following will really only come into play when we have
- # multiple VMs per node
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- # Don't need to preempt if we've already preempted all
- # the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atstart.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
- if len(reservationsAtMiddle)>0:
- changepoints = set()
- for r in reservationsAtMiddle:
- changepoints.add(r.start)
- changepoints = list(changepoints)
- changepoints.sort()
-
- for cp in changepoints:
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
- reservations = [r for r in reservationsAtMiddle
- if r.start <= cp and cp < r.end]
- for r in reservations:
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atmiddle.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
- self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-
- leases = [r.lease for r in atstart|atmiddle]
-
- return leases
+ canmigrate = self.rm.config.get("migration")
- def __fit_besteffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
- lease_id = lease.id
- remdur = lease.duration.get_remaining_duration()
- numnodes = lease.numnodes
- resreq = lease.requested_resources
- preemptible = lease.preemptible
- suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
- migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
-
#
- # STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
+ # STEP 1: FIGURE OUT THE MINIMUM DURATION
#
- curnodes=None
- # If we can't migrate, we have to stay in the
- # nodes where the lease is currently deployed
- if mustresume and not canmigrate:
- vmrr, susprr = lease.get_last_vmrr()
- curnodes = set(vmrr.nodes.values())
- suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=False)
-
- if mustresume and canmigrate:
- # If we have to resume this lease, make sure that
- # we have enough time to transfer the images.
- migratetime = lease.estimate_migration_time(migration_bandwidth)
- earliesttransfer = self.rm.clock.get_time() + migratetime
+ min_duration = self.__compute_scheduling_threshold(lease)
- for n in earliest:
- earliest[n][0] = max(earliest[n][0], earliesttransfer)
- suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
-
- if mustresume:
- resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
- # Must allocate time for resumption too
- remdur += resumetime
- else:
- suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
-
#
# STEP 2: FIND THE CHANGEPOINTS
#
@@ -709,15 +606,28 @@
else:
changepoints[-1][1] = nodes[:]
else:
+ if not canmigrate:
+ vmrr, susprr = lease.get_last_vmrr()
+ curnodes = set(vmrr.nodes.values())
+ else:
+ curnodes=None
+ # If we have to resume this lease, make sure that
+ # we have enough time to transfer the images.
+ migratetime = self.__estimate_migration_time(lease)
+ earliesttransfer = self.rm.clock.get_time() + migratetime
+
+ for n in earliest:
+ earliest[n][0] = max(earliest[n][0], earliesttransfer)
+
changepoints = list(set([x[0] for x in earliest.values()]))
changepoints.sort()
changepoints = [(x, curnodes) for x in changepoints]
- # If we can make reservations for best-effort leases,
+ # If we can make reservations in the future,
# we also consider future changepoints
# (otherwise, we only allow the VMs to start "now", accounting
# for the fact that vm images will have to be deployed)
- if canreserve:
+ if allow_reservation_in_future:
futurecp = self.slottable.findChangePointsAfter(changepoints[-1][0])
futurecp = [(p,None) for p in futurecp]
else:
@@ -728,28 +638,48 @@
#
# STEP 3: SLOT FITTING
#
+
+ # If resuming, we also have to allocate enough for the resumption
+ if mustresume:
+ duration = remaining_duration + self.__estimate_resume_time(lease)
+ else:
+ duration = remaining_duration
+
# First, assuming we can't make reservations in the future
- start, end, canfit, mustsuspend = self.__find_fit_at_points(changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold)
-
- if not canreserve:
- if start == None:
+ start, end, canfit = self.__find_fit_at_points(
+ changepoints,
+ numnodes,
+ requested_resources,
+ duration,
+ suspendable,
+ min_duration)
+
+ if start == None:
+ if not allow_reservation_in_future:
# We did not find a suitable starting time. This can happen
# if we're unable to make future reservations
- raise SlotFittingException, "Could not find enough resources for this request"
- elif mustsuspend and not suspendable:
- raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
-
- if start != None and mustsuspend and not suspendable:
- start = None # No satisfactory start time
+ raise SchedException, "Could not find enough resources for this request"
+ else:
+ mustsuspend = (end - start) < duration
+ if mustsuspend and not suspendable:
+ if not allow_reservation_in_future:
+ raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+ else:
+ start = None # No satisfactory start time
# If we haven't been able to fit the lease, check if we can
# reserve it in the future
- if start == None and canreserve:
- start, end, canfit, mustsuspend = self.__find_fit_at_points(futurecp, numnodes, resreq, remdur, suspendable, suspendthreshold)
+ if start == None and allow_reservation_in_future:
+ start, end, canfit = self.__find_fit_at_points(
+ futurecp,
+ numnodes,
+ requested_resources,
+ duration,
+ suspendable,
+ min_duration
+ )
- if mustsuspend and not suspendable:
- raise SlotFittingException, "Scheduling this lease would require preempting it, which is not allowed"
if start in [p[0] for p in futurecp]:
reservation = True
@@ -768,7 +698,7 @@
if mustresume:
# If we're resuming, we prefer resuming in the nodes we're already
# deployed in, to minimize the number of transfers.
- vmrr, susprr = lease.get_last_vmrr()
+ vmrr = lease.get_last_vmrr()
nodes = set(vmrr.nodes.values())
availnodes = set(physnodes)
deplnodes = availnodes.intersection(nodes)
@@ -776,14 +706,6 @@
physnodes = list(deplnodes) + list(notdeplnodes)
else:
physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
- # Adjust times in case the lease has to be suspended/resumed
- if mustsuspend:
- suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
- end -= suspendtime
-
- if mustresume:
- start += resumetime
# Map to physical nodes
mappings = {}
@@ -795,45 +717,22 @@
canfit[n] -= 1
mappings[vmnode] = n
if res.has_key(n):
- res[n].incr(resreq)
+ res[n].incr(requested_resources)
else:
- res[n] = ds.ResourceTuple.copy(resreq)
+ res[n] = ds.ResourceTuple.copy(requested_resources)
vmnode += 1
break
+ vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, reservation)
+ vmrr.state = ResourceReservation.STATE_SCHEDULED
- #
- # STEP 5: CREATE RESOURCE RESERVATIONS
- #
-
if mustresume:
- resmres = {}
- for n in mappings.values():
- r = ds.ResourceTuple.create_empty()
- r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
- r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
- resmres[n] = r
- resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
- resmrr.state = constants.RES_STATE_SCHEDULED
- else:
- resmrr = None
+ self.__schedule_resumption(vmrr, start)
+
+ mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
if mustsuspend:
- suspres = {}
- for n in mappings.values():
- r = ds.ResourceTuple.create_empty()
- r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
- r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
- suspres[n] = r
- susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
- susprr.state = constants.RES_STATE_SCHEDULED
- oncomplete = constants.ONCOMPLETE_SUSPEND
- else:
- susprr = None
- oncomplete = constants.ONCOMPLETE_ENDLEASE
-
- vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, oncomplete, reservation)
- vmrr.state = constants.RES_STATE_SCHEDULED
+ self.__schedule_suspension(vmrr, end)
susp_str = res_str = ""
if mustresume:
@@ -842,51 +741,142 @@
susp_str = " (suspending)"
self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
- return resmrr, vmrr, susprr, reservation
+ return vmrr, reservation
- def __find_fit_at_points(self, changepoints, numnodes, resreq, remdur, suspendable, suspendthreshold):
+ def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
start = None
end = None
canfit = None
- mustsuspend = None
availabilitywindow = self.slottable.availabilitywindow
for p in changepoints:
- availabilitywindow.initWindow(p[0], resreq, p[1], canpreempt = False)
+ availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
availabilitywindow.printContents()
if availabilitywindow.fitAtStart() >= numnodes:
start=p[0]
- maxend = start + remdur
+ maxend = start + duration
end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
if end < maxend:
- mustsuspend=True
self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
- if suspendable:
- # It the lease is suspendable...
- if suspendthreshold != None:
- if end-start > suspendthreshold:
- break
- else:
- self.logger.debug("This starting time does not meet the suspend threshold (%s < %s)" % (end-start, suspendthreshold))
- start = None
+ if not suspendable:
+ pass
+ # If we can't suspend, this fit is no good, and we have to keep looking
+ else:
+ # If we can suspend, we still have to check if the lease will
+ # be able to run for the specified minimum duration
+ if end-start > min_duration:
+ break # We found a fit; stop looking
else:
- pass
- else:
- # Keep looking
- pass
+ self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
+ # Set start back to None, to indicate that we haven't
+ # found a satisfactory start time
+ start = None
else:
- mustsuspend=False
# We've found a satisfactory starting time
break
- return start, end, canfit, mustsuspend
+ return start, end, canfit
+ def __schedule_resumption(self, vmrr, resume_at):
+ resumetime = self.__estimate_resume_time(vmrr.lease)
+ vmrr.update_start(resume_at + resumetime)
+
+ mappings = vmrr.nodes
+ resmres = {}
+ for n in mappings.values():
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
+ resmres[n] = r
+ resmrr = ds.ResumptionResourceReservation(vmrr.lease, resume_at, resume_at + resumetime, resmres, vmrr)
+ resmrr.state = ResourceReservation.STATE_SCHEDULED
+
+ vmrr.resm_rrs.append(resmrr)
+
+ def __schedule_suspension(self, vmrr, suspend_by):
+ suspendtime = self.__estimate_suspend_time(vmrr.lease)
+ vmrr.update_end(suspend_by - suspendtime)
+
+ mappings = vmrr.nodes
+ suspres = {}
+ for n in mappings.values():
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
+ suspres[n] = r
+
+ susprr = ds.SuspensionResourceReservation(vmrr.lease, suspend_by - suspendtime, suspend_by, suspres, vmrr)
+ susprr.state = ResourceReservation.STATE_SCHEDULED
+
+ vmrr.susp_rrs.append(susprr)
+
+ def __estimate_suspend_resume_time(self, lease):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ rate = self.resourcepool.info.get_suspendresume_rate()
+ time = float(lease.requested_resources.get_by_type(constants.RES_MEM)) / rate
+ time = round_datetime_delta(TimeDelta(seconds = time))
+ return time
+
+ def __estimate_suspend_time(self, lease):
+ return self.__estimate_suspend_resume_time(lease)
+
+ def __estimate_resume_time(self, lease):
+ return self.__estimate_suspend_resume_time(lease)
+
+
+ def __estimate_migration_time(self, lease):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ whattomigrate = config.get("what-to-migrate")
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ if whattomigrate == constants.MIGRATE_NONE:
+ return TimeDelta(seconds=0)
+ else:
+ if whattomigrate == constants.MIGRATE_MEM:
+ mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
+ elif whattomigrate == constants.MIGRATE_MEMDISK:
+ mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
+ return estimate_transfer_time(mbtotransfer, bandwidth)
+
+ # TODO: Take into account other things like boot overhead, migration overhead, etc.
+ def __compute_scheduling_threshold(self, lease):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ threshold = config.get("force-scheduling-threshold")
+ if threshold != None:
+ # If there is a hard-coded threshold, use that
+ return threshold
+ else:
+ factor = config.get("scheduling-threshold-factor")
+ susp_overhead = self.__estimate_suspend_time(lease)
+ safe_duration = susp_overhead
+
+ if lease.state == Lease.STATE_SUSPENDED:
+ resm_overhead = self.__estimate_resume_time(lease)
+ safe_duration += resm_overhead
+
+ # TODO: Incorporate other overheads into the minimum duration
+ min_duration = safe_duration
+
+ # At the very least, we want to allocate enough time for the
+ # safe duration (otherwise, we'll end up with incorrect schedules,
+ # where a lease is scheduled to suspend, but isn't even allocated
+ # enough time to suspend).
+ # The factor is assumed to be non-negative. i.e., a factor of 0
+ # means we only allocate enough time for potential suspend/resume
+ # operations, while a factor of 1 means the lease will get as much
+ # running time as spend on the runtime overheads involved in setting
+ # it up
+ threshold = safe_duration + (min_duration * factor)
+ return threshold
+
def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
# TODO2: Choose appropriate prioritizing function based on a
# config file, instead of hardcoding it)
@@ -981,85 +971,224 @@
# Order nodes
nodes.sort(comparenodes)
return nodes
+
+ def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
+ def comparepreemptability(rrX, rrY):
+ if rrX.lease.submit_time > rrY.lease.submit_time:
+ return constants.BETTER
+ elif rrX.lease.submit_time < rrY.lease.submit_time:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+
+ def preemptedEnough(amountToPreempt):
+ for node in amountToPreempt:
+ if not amountToPreempt[node].is_zero_or_less():
+ return False
+ return True
- def preempt(self, req, time):
- self.logger.info("Preempting lease #%i..." % (req.id))
+ # Get allocations at the specified time
+ atstart = set()
+ atmiddle = set()
+ nodes = set(mustpreempt.keys())
+
+ reservationsAtStart = self.slottable.getReservationsAt(startTime)
+ reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtStart.sort(comparepreemptability)
+ reservationsAtMiddle.sort(comparepreemptability)
+
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+
+ # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+ for r in reservationsAtStart:
+ # The following will really only come into play when we have
+ # multiple VMs per node
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ # Don't need to preempt if we've already preempted all
+ # the needed resources in node n
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atstart.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+ if len(reservationsAtMiddle)>0:
+ changepoints = set()
+ for r in reservationsAtMiddle:
+ changepoints.add(r.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+
+ for cp in changepoints:
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
+ reservations = [r for r in reservationsAtMiddle
+ if r.start <= cp and cp < r.end]
+ for r in reservations:
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atmiddle.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+ self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+
+ leases = [r.lease for r in atstart|atmiddle]
+
+ return leases
+
+ def __preempt(self, lease, preemption_time):
+ self.logger.info("Preempting lease #%i..." % (lease.id))
self.logger.vdebug("Lease before preemption:")
- req.print_contents()
- vmrr, susprr = req.get_last_vmrr()
+ lease.print_contents()
+ vmrr = lease.get_last_vmrr()
suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
- if vmrr.state == constants.RES_STATE_SCHEDULED and vmrr.start >= time:
- self.logger.info("... lease #%i has been cancelled and requeued." % req.id)
+ if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
+ self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
self.logger.debug("Lease was set to start in the middle of the preempting lease.")
- req.state = constants.LEASE_STATE_PENDING
+ lease.state = Lease.STATE_PENDING
if vmrr.backfill_reservation == True:
self.numbesteffortres -= 1
- req.remove_rr(vmrr)
+ lease.remove_rr(vmrr)
self.slottable.removeReservation(vmrr)
- if susprr != None:
- req.remove_rr(susprr)
- self.slottable.removeReservation(susprr)
- for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.remove_diskimage(pnode, req.id, vnode)
- self.deployment.cancel_deployment(req)
- req.vmimagemap = {}
- self.scheduledleases.remove(req)
- self.queue.enqueue_in_order(req)
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
+ # if susprr != None:
+ # lease.remove_rr(susprr)
+ # self.slottable.removeReservation(susprr)
+ for vnode, pnode in lease.vmimagemap.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+ self.deployment.cancel_deployment(lease)
+ lease.vmimagemap = {}
+ # TODO: Change state back to queued
+ self.queue.enqueue_in_order(lease)
+ self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
else:
susptype = self.rm.config.get("suspension")
- timebeforesuspend = time - vmrr.start
+ timebeforesuspend = preemption_time - vmrr.start
# TODO: Determine if it is in fact the initial VMRR or not. Right now
# we conservatively overestimate
canmigrate = self.rm.config.get("migration")
- suspendthreshold = req.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
+ suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
# We can't suspend if we're under the suspend threshold
suspendable = timebeforesuspend >= suspendthreshold
- if suspendable and (susptype == constants.SUSPENSION_ALL or (req.numnodes == 1 and susptype == constants.SUSPENSION_SERIAL)):
- self.logger.info("... lease #%i will be suspended at %s." % (req.id, time))
- self.slottable.suspend(req, time)
+ if suspendable and (susptype == constants.SUSPENSION_ALL or (lease.numnodes == 1 and susptype == constants.SUSPENSION_SERIAL)):
+ self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
+ # Careful: VMRR update,etc. will have to be done here
+ self.__schedule_suspension(lease, preemption_time)
else:
- self.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % req.id)
- req.state = constants.LEASE_STATE_PENDING
+ self.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % lease.id)
+ lease.state = Lease.STATE_PENDING
if vmrr.backfill_reservation == True:
self.numbesteffortres -= 1
- req.remove_rr(vmrr)
+ lease.remove_rr(vmrr)
self.slottable.removeReservation(vmrr)
- if susprr != None:
- req.remove_rr(susprr)
- self.slottable.removeReservation(susprr)
- if req.state == constants.LEASE_STATE_SUSPENDED:
- resmrr = req.prev_rr(vmrr)
- req.remove_rr(resmrr)
+ #if susprr != None:
+ # lease.remove_rr(susprr)
+ # self.slottable.removeReservation(susprr)
+ if lease.state == Lease.STATE_SUSPENDED:
+ resmrr = lease.prev_rr(vmrr)
+ lease.remove_rr(resmrr)
self.slottable.removeReservation(resmrr)
- for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.remove_diskimage(pnode, req.id, vnode)
- self.deployment.cancel_deployment(req)
- req.vmimagemap = {}
- self.scheduledleases.remove(req)
- self.queue.enqueue_in_order(req)
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
+ for vnode, pnode in lease.vmimagemap.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+ self.deployment.cancel_deployment(lease)
+ lease.vmimagemap = {}
+ # TODO: Change state back to queued
+ self.queue.enqueue_in_order(lease)
+ self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.logger.vdebug("Lease after preemption:")
- req.print_contents()
+ lease.print_contents()
- def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+ def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
- leases = self.scheduledleases.getNextLeasesScheduledInNodes(nexttime, nodes)
+ leases = []
+ # TODO: "getNextLeasesScheduledInNodes" has to be moved to the slot table
+ #leases = self.scheduledleases.getNextLeasesScheduledInNodes(nexttime, nodes)
leases = [l for l in leases if isinstance(l, ds.BestEffortLease) and not l in checkedleases]
- for l in leases:
+ for lease in leases:
self.logger.debug("Found lease %i" % l.id)
l.print_contents()
# Earliest time can't be earlier than time when images will be
# available in node
- earliest = max(nexttime, l.imagesavail)
- self.slottable.slideback(l, earliest)
+ earliest = max(nexttime, lease.imagesavail)
+ self.__slideback(lease, earliest)
checkedleases.append(l)
#for l in leases:
# vmrr, susprr = l.getLastVMRR()
# self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
-
+ def __slideback(self, lease, earliest):
+ pass
+# (vmrr, susprr) = lease.get_last_vmrr()
+# vmrrnew = copy.copy(vmrr)
+# nodes = vmrrnew.nodes.values()
+# if lease.state == Lease.LEASE_STATE_SUSPENDED:
+# resmrr = lease.prev_rr(vmrr)
+# originalstart = resmrr.start
+# else:
+# resmrr = None
+# originalstart = vmrrnew.start
+# cp = self.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+# cp = [earliest] + cp
+# newstart = None
+# for p in cp:
+# self.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+# self.availabilitywindow.printContents()
+# if self.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+# (end, canfit) = self.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+# if end == originalstart and set(nodes) <= set(canfit.keys()):
+# self.logger.debug("Can slide back to %s" % p)
+# newstart = p
+# break
+# if newstart == None:
+# # Can't slide back. Leave as is.
+# pass
+# else:
+# diff = originalstart - newstart
+# if resmrr != None:
+# resmrrnew = copy.copy(resmrr)
+# resmrrnew.start -= diff
+# resmrrnew.end -= diff
+# self.updateReservationWithKeyChange(resmrr, resmrrnew)
+# vmrrnew.start -= diff
+#
+# # If the lease was going to be suspended, check to see if
+# # we don't need to suspend any more.
+# remdur = lease.duration.get_remaining_duration()
+# if susprr != None and vmrrnew.end - newstart >= remdur:
+# vmrrnew.end = vmrrnew.start + remdur
+# #vmrrnew.oncomplete = constants.ONCOMPLETE_ENDLEASE
+# lease.remove_rr(susprr)
+# self.removeReservation(susprr)
+# else:
+# vmrrnew.end -= diff
+# # ONLY for simulation
+# if vmrrnew.prematureend != None:
+# vmrrnew.prematureend -= diff
+# self.updateReservationWithKeyChange(vmrr, vmrrnew)
+# self.dirty()
+# self.logger.vdebug("New lease descriptor (after slideback):")
+# lease.print_contents()
+
+
#-------------------------------------------------------------------#
# #
@@ -1070,9 +1199,9 @@
def _handle_start_vm(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
l.print_contents()
- if l.state == constants.LEASE_STATE_DEPLOYED:
- l.state = constants.LEASE_STATE_ACTIVE
- rr.state = constants.RES_STATE_ACTIVE
+ if l.state == Lease.STATE_READY:
+ l.state = Lease.STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
now_time = self.rm.clock.get_time()
l.start.actual = now_time
@@ -1082,13 +1211,13 @@
# The next two lines have to be moved somewhere more
# appropriate inside the resourcepool module
for (vnode, pnode) in rr.nodes.items():
- l.vmimagemap[vnode] = pnode
+ l.diskimagemap[vnode] = pnode
except Exception, e:
self.logger.error("ERROR when starting VMs.")
raise
- elif l.state == constants.LEASE_STATE_SUSPENDED:
- l.state = constants.LEASE_STATE_ACTIVE
- rr.state = constants.RES_STATE_ACTIVE
+ elif l.state == Lease.STATE_RESUMED_READY:
+ l.state = Lease.STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
# No enactment to do here, since all the suspend/resume actions are
# handled during the suspend/resume RRs
l.print_contents()
@@ -1106,14 +1235,14 @@
now_time = self.rm.clock.get_time()
diff = now_time - rr.start
l.duration.accumulate_duration(diff)
- rr.state = constants.RES_STATE_DONE
- if rr.oncomplete == constants.ONCOMPLETE_ENDLEASE:
+ rr.state = ResourceReservation.STATE_DONE
+ if not rr.is_suspending():
self.resourcepool.stop_vms(l, rr)
- l.state = constants.LEASE_STATE_DONE
+ l.state = Lease.STATE_DONE
l.duration.actual = l.duration.accumulated
l.end = now_time
self.completedleases.add(l)
- self.scheduledleases.remove(l)
+ self.leases.remove(l)
self.deployment.cleanup(l, rr)
if isinstance(l, ds.BestEffortLease):
self.rm.accounting.incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
@@ -1130,30 +1259,29 @@
def _handle_unscheduled_end_vm(self, l, rr, enact=False):
self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
self._handle_end_rr(l, rr)
- if rr.oncomplete == constants.ONCOMPLETE_SUSPEND:
+ if rr.is_suspending():
rrs = l.next_rrs(rr)
for r in rrs:
l.remove_rr(r)
self.slottable.removeReservation(r)
- rr.oncomplete = constants.ONCOMPLETE_ENDLEASE
rr.end = self.rm.clock.get_time()
self._handle_end_vm(l, rr, enact=enact)
nexttime = self.rm.clock.get_next_schedulable_time()
if self.is_backfilling():
# We need to reevaluate the schedule to see if there are any future
# reservations that we can slide back.
- self.reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
+ self.__reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
def _handle_start_suspend(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
l.print_contents()
- rr.state = constants.RES_STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
self.resourcepool.suspend_vms(l, rr)
- for vnode, pnode in rr.nodes.items():
+ for vnode, pnode in rr.vmrr.nodes.items():
self.resourcepool.add_ramfile(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
l.memimagemap[vnode] = pnode
l.print_contents()
- self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
+ self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
self.logger.info("Suspending lease %i..." % (l.id))
@@ -1162,13 +1290,12 @@
l.print_contents()
# TODO: React to incomplete suspend
self.resourcepool.verify_suspend(l, rr)
- rr.state = constants.RES_STATE_DONE
- l.state = constants.LEASE_STATE_SUSPENDED
- self.scheduledleases.remove(l)
+ rr.state = ResourceReservation.STATE_DONE
+ l.state = Lease.STATE_SUSPENDED
self.queue.enqueue_in_order(l)
self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, l.id)
l.print_contents()
- self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
+ self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_IDLE, l.id)
self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
self.logger.info("Lease %i suspended." % (l.id))
@@ -1176,9 +1303,9 @@
self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
l.print_contents()
self.resourcepool.resume_vms(l, rr)
- rr.state = constants.RES_STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
l.print_contents()
- self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME, l.id)
+ self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_VM_RESUME, l.id)
self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
self.logger.info("Resuming lease %i..." % (l.id))
@@ -1187,14 +1314,46 @@
l.print_contents()
# TODO: React to incomplete resume
self.resourcepool.verify_resume(l, rr)
- rr.state = constants.RES_STATE_DONE
- for vnode, pnode in rr.nodes.items():
+ rr.state = ResourceReservation.STATE_DONE
+ for vnode, pnode in rr.vmrr.nodes.items():
self.resourcepool.remove_ramfile(pnode, l.id, vnode)
l.print_contents()
- self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
+ self.updateNodeVMState(rr.vmrr.nodes.values(), constants.DOING_IDLE, l.id)
self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
self.logger.info("Resumed lease %i" % (l.id))
+ def _handle_start_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+ l.print_contents()
+
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+ self.logger.info("Migrating lease %i..." % (l.id))
+
+ def _handle_end_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+ l.print_contents()
+
+# if lease.state == Lease.STATE_SUSPENDED:
+# # Update VM image mappings, since we might be resuming
+# # in different nodes.
+# for vnode, pnode in lease.vmimagemap.items():
+# self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+# lease.vmimagemap = vmrr.nodes
+# for vnode, pnode in lease.vmimagemap.items():
+# self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
+#
+# # Update RAM file mappings
+# for vnode, pnode in lease.memimagemap.items():
+# self.resourcepool.remove_ramfile(pnode, lease.id, vnode)
+# for vnode, pnode in vmrr.nodes.items():
+# self.resourcepool.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
+# lease.memimagemap[vnode] = pnode
+
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+ self.logger.info("Migrated lease %i..." % (l.id))
+
def _handle_end_rr(self, l, rr):
self.slottable.removeReservation(rr)
@@ -1207,7 +1366,7 @@
- def canReserveBestEffort(self):
+ def __can_reserve_besteffort_in_future(self):
return self.numbesteffortres < self.maxres
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-09-11 16:41:18 UTC (rev 490)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-09-11 16:47:36 UTC (rev 491)
@@ -20,7 +20,6 @@
from operator import attrgetter, itemgetter
import haizea.common.constants as constants
import haizea.resourcemanager.datastruct as ds
-from haizea.common.utils import roundDateTimeDelta
import bisect
import copy
import logging
@@ -96,6 +95,9 @@
def add_node(self, resourcepoolnode):
self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
+ def is_empty(self):
+ return (len(self.reservationsByStart) == 0)
+
def dirty(self):
# You're a dirty, dirty slot table and you should be
# ashamed of having outdated caches!
@@ -154,14 +156,28 @@
res = bystart & byend
return list(res)
- def getReservationsStartingBetween(self, start, end):
+ def get_reservations_starting_between(self, start, end):
startitem = KeyValueWrapper(start, None)
enditem = KeyValueWrapper(end, None)
startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+ endpos = bisect.bisect_right(self.reservationsByStart, enditem)
+ res = [x.value for x in self.reservationsByStart[startpos:endpos]]
+ return res
+
+ def get_reservations_ending_between(self, start, end):
+ startitem = KeyValueWrapper(start, None)
+ enditem = KeyValueWrapper(end, None)
+ startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
res = [x.value for x in self.reservationsByStart[startpos:endpos]]
return res
+ def get_reservations_starting_at(self, time):
+ return self.get_reservations_starting_between(time, time)
+
+ def get_reservations_ending_at(self, time):
+ return self.get_reservations_ending_between(time, time)
+
# ONLY for simulation
def getNextPrematureEnd(self, after):
# Inefficient, but ok since this query seldom happens
@@ -181,7 +197,7 @@
def getReservationsWithChangePointsAfter(self, after):
item = KeyValueWrapper(after, None)
startpos = bisect.bisect_right(self.reservationsByStart, item)
- bystart = set([x.value for x in self.reservationsByStart[startpos:]])
+ bystart = set([x.value for x in self.reservationsByStart[:startpos]])
endpos = bisect.bisect_right(self.reservationsByEnd, item)
byend = set([x.value for x in self.reservationsByEnd[endpos:]])
res = bystart | byend
@@ -267,94 +283,7 @@
if p != None:
self.changepointcache.pop()
return p
-
-
- def suspend(self, lease, time):
- suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
- (vmrr, susprr) = lease.get_last_vmrr()
- vmrrnew = copy.copy(vmrr)
-
- suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
- vmrrnew.end = time - suspendtime
-
- vmrrnew.oncomplete = constants.ONCOMPLETE_SUSPEND
-
- self.updateReservationWithKeyChange(vmrr, vmrrnew)
-
- if susprr != None:
- lease.remove_rr(susprr)
- self.removeReservation(susprr)
-
- mappings = vmrr.nodes
- suspres = {}
- for n in mappings.values():
- r = ds.ResourceTuple.create_empty()
- r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
- r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
- suspres[n] = r
-
- newsusprr = ds.SuspensionResourceReservation(lease, time - suspendtime, time, suspres, mappings)
- newsusprr.state = constants.RES_STATE_SCHEDULED
- lease.append_rr(newsusprr)
- self.addReservation(newsusprr)
-
-
- def slideback(self, lease, earliest):
- (vmrr, susprr) = lease.get_last_vmrr()
- vmrrnew = copy.copy(vmrr)
- nodes = vmrrnew.nodes.values()
- if lease.state == constants.LEASE_STATE_SUSPENDED:
- resmrr = lease.prev_rr(vmrr)
- originalstart = resmrr.start
- else:
- resmrr = None
- originalstart = vmrrnew.start
- cp = self.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
- cp = [earliest] + cp
- newstart = None
- for p in cp:
- self.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
- self.availabilitywindow.printContents()
- if self.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
- (end, canfit) = self.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
- if end == originalstart and set(nodes) <= set(canfit.keys()):
- self.logger.debug("Can slide back to %s" % p)
- newstart = p
- break
- if newstart == None:
- # Can't slide back. Leave as is.
- pass
- else:
- diff = originalstart - newstart
- if resmrr != None:
- resmrrnew = copy.copy(resmrr)
- resmrrnew.start -= diff
- resmrrnew.end -= diff
- self.updateReservationWithKeyChange(resmrr, resmrrnew)
- vmrrnew.start -= diff
-
- # If the lease was going to be suspended, check to see if
- # we don't need to suspend any more.
- remdur = lease.duration.get_remaining_duration()
- if susprr != None and vmrrnew.end - newstart >= remdur:
- vmrrnew.end = vmrrnew.start + remdur
- vmrrnew.oncomplete = constants.ONCOMPLETE_ENDLEASE
- lease.remove_rr(susprr)
- self.removeReservation(susprr)
- else:
- vmrrnew.end -= diff
- # ONLY for simulation
- if vmrrnew.prematureend != None:
- vmrrnew.prematureend -= diff
- self.updateReservationWithKeyChange(vmrr, vmrrnew)
- self.dirty()
- self.logger.vdebug("New lease descriptor (after slideback):")
- lease.print_contents()
-
-
-
-
def isFull(self, time):
nodes = self.getAvailability(time)
avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
More information about the Haizea-commit
mailing list