[haizea-commit] r432 - in trunk/src/haizea: common resourcemanager resourcemanager/enact/opennebula resourcemanager/frontends
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Fri Jul 11 08:11:11 CDT 2008
Author: borja
Date: 2008-07-11 08:11:11 -0500 (Fri, 11 Jul 2008)
New Revision: 432
Modified:
trunk/src/haizea/common/config.py
trunk/src/haizea/common/constants.py
trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
trunk/src/haizea/resourcemanager/frontends/opennebula.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler.py
Log:
- Immediate lease scheduling
- Signal handling to gracefully stop on SIGTERM or SIGINT
- Added "onevm" option to config file (location of onevm command)
Modified: trunk/src/haizea/common/config.py
===================================================================
--- trunk/src/haizea/common/config.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/common/config.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -183,6 +183,9 @@
def getONEDB(self):
return self.config.get(constants.OPENNEBULA_SEC, constants.DB_OPT)
+ def getONEvm(self):
+ return self.config.get(constants.OPENNEBULA_SEC, constants.ONEVM_OPT)
+
def getONESuspendResumeRate(self):
return self.config.getint(constants.OPENNEBULA_SEC, constants.ESTIMATESUSPENDRATE_OPT)
Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/common/constants.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -147,6 +147,7 @@
OPENNEBULA_SEC = "opennebula"
DB_OPT = "db"
+ONEVM_OPT = "onevm"
ESTIMATESUSPENDRATE_OPT = "suspendresume-rate-estimate"
DEPLOY_IMAGETRANSFER_SEC = "deploy-imagetransfer"
@@ -299,9 +300,11 @@
ENACT_PACKAGE="haizea.resourcemanager.enact"
-COUNTER_ARACCEPTED="accepted"
-COUNTER_ARREJECTED="rejected"
-COUNTER_BESTEFFORTCOMPLETED="besteffort-completed"
+COUNTER_ARACCEPTED="ar_accepted"
+COUNTER_ARREJECTED="ar_rejected"
+COUNTER_IMACCEPTED="im_accepted"
+COUNTER_IMREJECTED="im_rejected"
+COUNTER_BESTEFFORTCOMPLETED="besteffort_completed"
COUNTER_QUEUESIZE="queuesize"
COUNTER_DISKUSAGE="diskusage"
COUNTER_CPUUTILIZATION="cpuutilization"
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -24,7 +24,7 @@
class VMEnactment(VMEnactmentBase):
def __init__(self, resourcepool):
VMEnactmentBase.__init__(self, resourcepool)
- self.onevm = "/home/borja/bin/onevm"
+ self.onevm = self.resourcepool.rm.config.getONEvm()
self.conn = sqlite.connect(self.resourcepool.rm.config.getONEDB())
self.conn.row_factory = sqlite.Row
@@ -116,7 +116,7 @@
onevm = cur.fetchone()
state = onevm["state"]
if state == 3:
- self.logger.debug("Suspend of L%iV%i correct." % (action.lease_haizea_id, vnode), constants.ONE)
+ self.logger.debug("Resume of L%iV%i correct." % (action.lease_haizea_id, vnode), constants.ONE)
else:
self.logger.warning("ONE did not complete resume of L%i%V%i on time. State is %i" % (action.leaseHaizeaID, vnode, state), constants.ONE)
result = 1
Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -18,19 +18,20 @@
import haizea.common.constants as constants
from haizea.resourcemanager.frontends.base import RequestFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ResourceTuple
+from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
from haizea.common.utils import UNIX2DateTime
from pysqlite2 import dbapi2 as sqlite
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
from haizea.common.utils import roundDateTime
+import operator
HAIZEA_PARAM = "HAIZEA"
-HAIZEA_START = "start"
+HAIZEA_START = "START"
HAIZEA_START_NOW = "now"
HAIZEA_START_BESTEFFORT = "best_effort"
-HAIZEA_DURATION = "duration"
+HAIZEA_DURATION = "DURATION"
HAIZEA_DURATION_UNLIMITED = "unlimited"
-HAIZEA_PREEMPTIBLE = "preemptible"
+HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
HAIZEA_PREEMPTIBLE_YES = "yes"
HAIZEA_PREEMPTIBLE_NO = "no"
@@ -61,16 +62,24 @@
attrs = dict([(r["name"], r["value"]) for r in template])
self.processed.append(req["oid"])
requests.append(self.ONEreq2lease(req, attrs))
+ requests.sort(key=operator.attrgetter("submit_time"))
return requests
def existsMoreRequests(self):
return True
def ONEreq2lease(self, req, attrs):
- haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
+ # If there is no HAIZEA parameter, the default is to treat the
+ # request as an immediate request with unlimited duration
+ if not attrs.has_key(HAIZEA_PARAM):
+ haizea_param = {HAIZEA_START: HAIZEA_START_NOW,
+ HAIZEA_DURATION: HAIZEA_DURATION_UNLIMITED,
+ HAIZEA_PREEMPTIBLE: HAIZEA_PREEMPTIBLE_NO}
+ else:
+ haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
start = haizea_param[HAIZEA_START]
if start == HAIZEA_START_NOW:
- pass
+ return self.create_immediate_lease(req, attrs, haizea_param)
elif start == HAIZEA_START_BESTEFFORT:
return self.create_besteffort_lease(req, attrs, haizea_param)
else:
@@ -87,7 +96,7 @@
numnodes = 1
resreq = ResourceTuple.create_empty()
resreq.set_by_type(constants.RES_CPU, float(attrs[ONE_CPU]))
- resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY]))
+ resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY])/1000)
duration = haizea_param[HAIZEA_DURATION]
if duration == HAIZEA_DURATION_UNLIMITED:
@@ -137,4 +146,18 @@
leasereq.vnode_enactment_info[1] = int(req["oid"])
leasereq.set_scheduler(self.rm.scheduler)
return leasereq
+
+ def create_immediate_lease(self, req, attrs, haizea_param):
+ tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
+
+ leasereq = ImmediateLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
+ leasereq.state = constants.LEASE_STATE_PENDING
+ # Enactment info should be changed to the "array id" when groups
+ # are implemented in OpenNebula
+ leasereq.enactment_info = int(req["oid"])
+ # Only one node for now
+ leasereq.vnode_enactment_info = {}
+ leasereq.vnode_enactment_info[1] = int(req["oid"])
+ leasereq.set_scheduler(self.rm.scheduler)
+ return leasereq
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/rm.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -35,13 +35,15 @@
import haizea.common.constants as constants
from haizea.resourcemanager.frontends.tracefile import TracefileFrontend
from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease
+from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease
from haizea.resourcemanager.resourcepool import ResourcePool
from haizea.resourcemanager.scheduler import Scheduler
from haizea.resourcemanager.log import Logger
from haizea.common.utils import abstract, roundDateTime
import operator
+import signal
+import sys
from time import sleep
from mx.DateTime import now, TimeDelta
@@ -110,6 +112,8 @@
# Create counters to keep track of interesting data.
self.stats.createCounter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
self.stats.createCounter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
+ self.stats.createCounter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
+ self.stats.createCounter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
self.stats.createCounter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
self.stats.createCounter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
self.stats.createCounter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
@@ -126,6 +130,9 @@
# Stop collecting data (this finalizes counters)
self.stats.stop()
+ # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
+ # do with leases that are still running.
+
self.logger.status(" Completed best-effort leases: %i" % self.stats.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
self.logger.status(" Accepted AR leases: %i" % self.stats.counters[constants.COUNTER_ARACCEPTED], constants.RM)
self.logger.status(" Rejected AR leases: %i" % self.stats.counters[constants.COUNTER_ARREJECTED], constants.RM)
@@ -162,14 +169,19 @@
ar_leases = [req for req in requests if isinstance(req, ARLease)]
be_leases = [req for req in requests if isinstance(req, BestEffortLease)]
+ im_leases = [req for req in requests if isinstance(req, ImmediateLease)]
# Queue best-effort
for req in be_leases:
self.scheduler.enqueue(req)
+
+ # Add AR leases and immediate leases
+ for req in ar_leases + im_leases:
+ self.scheduler.add_pending_lease(req)
- # Process AR leases, and run the scheduling function.
+ # Run the scheduling function.
try:
- self.scheduler.schedule(ar_leases, nexttime)
+ self.scheduler.schedule(nexttime)
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception in scheduling function. Dumping state..." , constants.RM)
@@ -228,6 +240,7 @@
"""Return True if there are any leases still "in the system" """
return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
+ # TODO: Replace this with a more general event handling system
def notify_end_vm(self, lease, rr):
"""Notifies the resource manager that a VM has ended prematurely.
@@ -511,14 +524,15 @@
event). Otherwise, set it to (now + quantum)
- Sleep until next-wake-up-time
- The clock never stops.
- TODO: Add signal capturing so we can use a signal to gracefully
- stop the clock.
-
+ The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
+ foreground) or a SIGTERM signal is received.
"""
self.rm.logger.status("Starting simulated clock", constants.CLOCK)
self.rm.stats.start(self.get_start_time())
+ signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
+ signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
+
done = False
# Main loop
while not done:
@@ -555,20 +569,26 @@
sleep((nextwakeup - now()).seconds)
else:
self.lastwakeup = nextwakeup
-
- # Stop
- self.rm.printStatus()
- self.rm.logger.status("Stopping clock", constants.CLOCK)
- self.rm.stop()
def print_stats(self, loglevel):
"""See docstring in base Clock class."""
pass
-
+
+ def signalhandler_gracefulstop(self, signum, frame):
+ """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
+ sigstr = ""
+ if signum == signal.SIGTERM:
+ sigstr = " (SIGTERM)"
+ elif signum == signal.SIGINT:
+ sigstr = " (SIGINT)"
+ self.rm.logger.status("Received signal %i%s" %(signum, sigstr), constants.CLOCK)
+ self.rm.logger.status("Stopping gracefully...", constants.CLOCK)
+ self.rm.stop()
+ sys.exit()
if __name__ == "__main__":
from haizea.common.config import RMConfig
- CONFIGFILE = "../../../etc/sample.conf"
+ CONFIGFILE = "../../../etc/sample_opennebula.conf"
CONFIG = RMConfig.fromFile(CONFIGFILE)
RM = ResourceManager(CONFIG)
RM.start()
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-07-11 13:11:11 UTC (rev 432)
@@ -36,7 +36,9 @@
from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeployment
from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
+from haizea.resourcemanager.datastruct import ARLease, ImmediateLease
+
class SchedException(Exception):
"""A simple exception class used for scheduling exceptions"""
pass
@@ -79,6 +81,7 @@
self.scheduledleases = ds.LeaseTable(self)
self.completedleases = ds.LeaseTable(self)
self.rejectedleases = ds.LeaseTable(self)
+ self.pending_leases = []
deploy_type = self.rm.config.get_lease_deployment_type()
if deploy_type == constants.DEPLOYMENT_UNMANAGED:
@@ -105,78 +108,22 @@
self.maxres = self.rm.config.getMaxReservations()
self.numbesteffortres = 0
- def schedule(self, requests, nexttime):
- if self.rm.config.getNodeSelectionPolicy() == constants.NODESELECTION_AVOIDPREEMPT:
- avoidpreempt = True
- else:
- avoidpreempt = False
+ def schedule(self, nexttime):
+ ar_leases = [req for req in self.pending_leases if isinstance(req, ARLease)]
+ im_leases = [req for req in self.pending_leases if isinstance(req, ImmediateLease)]
+ self.pending_leases = []
+ # Process immediate requests
+ for lease_req in im_leases:
+ self.__process_im_request(lease_req, nexttime)
+
# Process AR requests
- for lease_req in requests:
- self.rm.logger.debug("LEASE-%i Processing request (AR)" % lease_req.id, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Start %s" % (lease_req.id, lease_req.start), constants.SCHED)
- self.rm.logger.debug("LEASE-%i Duration %s" % (lease_req.id, lease_req.duration), constants.SCHED)
- self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
- self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
+ for lease_req in ar_leases:
+ self.__process_ar_request(lease_req, nexttime)
- accepted = False
- try:
- self.schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
- self.scheduledleases.add(lease_req)
- self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
- accepted = True
- except SchedException, msg:
- # If our first try avoided preemption, try again
- # without avoiding preemption.
- # TODO: Roll this into the exact slot fitting algorithm
- if avoidpreempt:
- try:
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
- self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
- self.schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
- self.scheduledleases.add(lease_req)
- self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
- accepted = True
- except SchedException, msg:
- self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
- else:
- self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
- if accepted:
- self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
- else:
- self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.id, constants.SCHED)
-
-
- done = False
- newqueue = ds.Queue(self)
- while not done and not self.is_queue_empty():
- if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
- self.rm.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.", constants.SCHED)
- done = True
- else:
- lease_req = self.queue.dequeue()
- try:
- self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Processing request (BEST-EFFORT)" % lease_req.id, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Duration: %s" % (lease_req.id, lease_req.duration), constants.SCHED)
- self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
- self.schedule_besteffort_lease(lease_req, nexttime)
- self.scheduledleases.add(lease_req)
- self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
- except SchedException, msg:
- # Put back on queue
- newqueue.enqueue(lease_req)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
- self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.id, constants.SCHED)
- if not self.rm.config.isBackfilling():
- done = True
+ # Process best-effort requests
+ self.__process_queue(nexttime)
- for lease in self.queue:
- newqueue.enqueue(lease)
-
- self.queue = newqueue
def process_reservations(self, nowtime):
starting = [l for l in self.scheduledleases.entries.values() if l.has_starting_reservations(nowtime)]
@@ -202,13 +149,21 @@
handler = ReservationEventHandler(on_start=on_start, on_end=on_end)
self.handlers[type] = handler
-
def enqueue(self, lease_req):
"""Queues a best-effort lease request"""
self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
self.queue.enqueue(lease_req)
self.rm.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested), constants.SCHED)
+ def add_pending_lease(self, lease_req):
+ """
+ Adds a pending lease request, to be scheduled as soon as
+ the scheduling function is called. Unlike best-effort leases,
+ if one these leases can't be scheduled immediately, it is
+ rejected (instead of being placed on a queue, in case resources
+ become available later on).
+ """
+ self.pending_leases.append(lease_req)
def is_queue_empty(self):
"""Return True is the queue is empty, False otherwise"""
@@ -236,8 +191,48 @@
# reservations that we can slide back.
self.reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
+ def __process_ar_request(self, lease_req, nexttime):
+ self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
+ self.rm.logger.debug(" Start : %s" % lease_req.start, constants.SCHED)
+ self.rm.logger.debug(" Duration: %s" % lease_req.duration, constants.SCHED)
+ self.rm.logger.debug(" ResReq : %s" % lease_req.requested_resources, constants.SCHED)
+
+ if self.rm.config.getNodeSelectionPolicy() == constants.NODESELECTION_AVOIDPREEMPT:
+ avoidpreempt = True
+ else:
+ avoidpreempt = False
+
+ accepted = False
+ try:
+ self.__schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
+ self.scheduledleases.add(lease_req)
+ self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ accepted = True
+ except SchedException, msg:
+ # If our first try avoided preemption, try again
+ # without avoiding preemption.
+ # TODO: Roll this into the exact slot fitting algorithm
+ if avoidpreempt:
+ try:
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
+ self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
+ self.scheduledleases.add(lease_req)
+ self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ accepted = True
+ except SchedException, msg:
+ self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ else:
+ self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ if accepted:
+ self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
+ else:
+ self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.id, constants.SCHED)
+
- def schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+ def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
start = lease_req.start.requested
end = lease_req.start.requested + lease_req.duration.requested
try:
@@ -248,7 +243,6 @@
self.rm.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id), constants.SCHED)
for lease in leases:
self.preempt(lease, time=start)
-
# Add VM resource reservations
vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, constants.ONCOMPLETE_ENDLEASE, False)
@@ -265,7 +259,37 @@
except SlotFittingException, msg:
raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
- def schedule_besteffort_lease(self, req, nexttime):
+ def __process_queue(self, nexttime):
+ done = False
+ newqueue = ds.Queue(self)
+ while not done and not self.is_queue_empty():
+ if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
+ self.rm.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.", constants.SCHED)
+ done = True
+ else:
+ lease_req = self.queue.dequeue()
+ try:
+ self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id, constants.SCHED)
+ self.rm.logger.debug(" Duration: %s" % lease_req.duration, constants.SCHED)
+ self.rm.logger.debug(" ResReq : %s" % lease_req.requested_resources, constants.SCHED)
+ self.__schedule_besteffort_lease(lease_req, nexttime)
+ self.scheduledleases.add(lease_req)
+ self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ except SchedException, msg:
+ # Put back on queue
+ newqueue.enqueue(lease_req)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.id, constants.SCHED)
+ if not self.rm.config.isBackfilling():
+ done = True
+
+ for lease in self.queue:
+ newqueue.enqueue(lease)
+
+ self.queue = newqueue
+
+
+ def __schedule_besteffort_lease(self, req, nexttime):
# Determine earliest start time in each node
if req.state == constants.LEASE_STATE_PENDING:
# Figure out earliest start times based on
@@ -327,8 +351,24 @@
except SlotFittingException, msg:
raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
+
- def schedule_immediate_lease(self, req, nexttime):
+ def __process_im_request(self, lease_req, nexttime):
+ self.rm.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes), constants.SCHED)
+ self.rm.logger.debug(" Duration: %s" % lease_req.duration, constants.SCHED)
+ self.rm.logger.debug(" ResReq : %s" % lease_req.requested_resources, constants.SCHED)
+
+ try:
+ self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+ self.scheduledleases.add(lease_req)
+ self.rm.stats.incrCounter(constants.COUNTER_IMACCEPTED, lease_req.id)
+ self.rm.logger.info("Immediate lease request #%i has been accepted." % lease_req.id, constants.SCHED)
+ except SchedException, msg:
+ self.rm.stats.incrCounter(constants.COUNTER_IMREJECTED, lease_req.id)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+
+
+ def __schedule_immediate_lease(self, req, nexttime):
# Determine earliest start time in each node
earliest = self.deployment.find_earliest_starting_times(req, nexttime)
try:
More information about the Haizea-commit
mailing list