[haizea-commit] r432 - in trunk/src/haizea: common resourcemanager resourcemanager/enact/opennebula resourcemanager/frontends

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Fri Jul 11 08:11:11 CDT 2008


Author: borja
Date: 2008-07-11 08:11:11 -0500 (Fri, 11 Jul 2008)
New Revision: 432

Modified:
   trunk/src/haizea/common/config.py
   trunk/src/haizea/common/constants.py
   trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
   trunk/src/haizea/resourcemanager/frontends/opennebula.py
   trunk/src/haizea/resourcemanager/rm.py
   trunk/src/haizea/resourcemanager/scheduler.py
Log:
- Immediate lease scheduling
- Signal handling to gracefully stop on SIGTERM or SIGINT
- Added "onevm" option to config file (location of onevm command)

Modified: trunk/src/haizea/common/config.py
===================================================================
--- trunk/src/haizea/common/config.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/common/config.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -183,6 +183,9 @@
     def getONEDB(self):
         return self.config.get(constants.OPENNEBULA_SEC, constants.DB_OPT)
 
+    def getONEvm(self):
+        return self.config.get(constants.OPENNEBULA_SEC, constants.ONEVM_OPT)
+
     def getONESuspendResumeRate(self):
         return self.config.getint(constants.OPENNEBULA_SEC, constants.ESTIMATESUSPENDRATE_OPT)
 

Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/common/constants.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -147,6 +147,7 @@
 
 OPENNEBULA_SEC = "opennebula"
 DB_OPT = "db"
+ONEVM_OPT = "onevm"
 ESTIMATESUSPENDRATE_OPT = "suspendresume-rate-estimate"
 
 DEPLOY_IMAGETRANSFER_SEC = "deploy-imagetransfer"
@@ -299,9 +300,11 @@
 
 ENACT_PACKAGE="haizea.resourcemanager.enact"
 
-COUNTER_ARACCEPTED="accepted"
-COUNTER_ARREJECTED="rejected"
-COUNTER_BESTEFFORTCOMPLETED="besteffort-completed"
+COUNTER_ARACCEPTED="ar_accepted"
+COUNTER_ARREJECTED="ar_rejected"
+COUNTER_IMACCEPTED="im_accepted"
+COUNTER_IMREJECTED="im_rejected"
+COUNTER_BESTEFFORTCOMPLETED="besteffort_completed"
 COUNTER_QUEUESIZE="queuesize"
 COUNTER_DISKUSAGE="diskusage"
 COUNTER_CPUUTILIZATION="cpuutilization"

Modified: trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/vm.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/vm.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -24,7 +24,7 @@
 class VMEnactment(VMEnactmentBase):
     def __init__(self, resourcepool):
         VMEnactmentBase.__init__(self, resourcepool)
-        self.onevm = "/home/borja/bin/onevm"
+        self.onevm = self.resourcepool.rm.config.getONEvm()
         
         self.conn = sqlite.connect(self.resourcepool.rm.config.getONEDB())
         self.conn.row_factory = sqlite.Row
@@ -116,7 +116,7 @@
             onevm = cur.fetchone()        
             state = onevm["state"]
             if state == 3:
-                self.logger.debug("Suspend of L%iV%i correct." % (action.lease_haizea_id, vnode), constants.ONE)
+                self.logger.debug("Resume of L%iV%i correct." % (action.lease_haizea_id, vnode), constants.ONE)
             else:
                 self.logger.warning("ONE did not complete resume of L%i%V%i on time. State is %i" % (action.leaseHaizeaID, vnode, state), constants.ONE)
                 result = 1

Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -18,19 +18,20 @@
 
 import haizea.common.constants as constants
 from haizea.resourcemanager.frontends.base import RequestFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ResourceTuple
+from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
 from haizea.common.utils import UNIX2DateTime
 from pysqlite2 import dbapi2 as sqlite
 from mx.DateTime import DateTimeDelta, TimeDelta, ISO
 from haizea.common.utils import roundDateTime
+import operator
 
 HAIZEA_PARAM = "HAIZEA"
-HAIZEA_START = "start"
+HAIZEA_START = "START"
 HAIZEA_START_NOW = "now"
 HAIZEA_START_BESTEFFORT = "best_effort"
-HAIZEA_DURATION = "duration"
+HAIZEA_DURATION = "DURATION"
 HAIZEA_DURATION_UNLIMITED = "unlimited"
-HAIZEA_PREEMPTIBLE = "preemptible"
+HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
 HAIZEA_PREEMPTIBLE_YES = "yes"
 HAIZEA_PREEMPTIBLE_NO = "no"
 
@@ -61,16 +62,24 @@
             attrs = dict([(r["name"], r["value"]) for r in template])
             self.processed.append(req["oid"])
             requests.append(self.ONEreq2lease(req, attrs))
+        requests.sort(key=operator.attrgetter("submit_time"))
         return requests
 
     def existsMoreRequests(self):
         return True
     
     def ONEreq2lease(self, req, attrs):
-        haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
+        # If there is no HAIZEA parameter, the default is to treat the
+        # request as an immediate request with unlimited duration
+        if not attrs.has_key(HAIZEA_PARAM):
+            haizea_param = {HAIZEA_START: HAIZEA_START_NOW,
+                            HAIZEA_DURATION: HAIZEA_DURATION_UNLIMITED,
+                            HAIZEA_PREEMPTIBLE: HAIZEA_PREEMPTIBLE_NO}
+        else:
+            haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
         start = haizea_param[HAIZEA_START]
         if start == HAIZEA_START_NOW:
-            pass
+            return self.create_immediate_lease(req, attrs, haizea_param)
         elif start  == HAIZEA_START_BESTEFFORT:
             return self.create_besteffort_lease(req, attrs, haizea_param)
         else:
@@ -87,7 +96,7 @@
         numnodes = 1
         resreq = ResourceTuple.create_empty()
         resreq.set_by_type(constants.RES_CPU, float(attrs[ONE_CPU]))
-        resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY]))
+        resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY])/1000)
 
         duration = haizea_param[HAIZEA_DURATION]
         if duration == HAIZEA_DURATION_UNLIMITED:
@@ -137,4 +146,18 @@
         leasereq.vnode_enactment_info[1] = int(req["oid"])
         leasereq.set_scheduler(self.rm.scheduler)
         return leasereq
+
+    def create_immediate_lease(self, req, attrs, haizea_param):
+        tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
+ 
+        leasereq = ImmediateLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
+        leasereq.state = constants.LEASE_STATE_PENDING
+        # Enactment info should be changed to the "array id" when groups
+        # are implemented in OpenNebula
+        leasereq.enactment_info = int(req["oid"])
+        # Only one node for now
+        leasereq.vnode_enactment_info = {}
+        leasereq.vnode_enactment_info[1] = int(req["oid"])
+        leasereq.set_scheduler(self.rm.scheduler)
+        return leasereq
         
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/rm.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -35,13 +35,15 @@
 import haizea.common.constants as constants
 from haizea.resourcemanager.frontends.tracefile import TracefileFrontend
 from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease 
+from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease 
 from haizea.resourcemanager.resourcepool import ResourcePool
 from haizea.resourcemanager.scheduler import Scheduler
 from haizea.resourcemanager.log import Logger
 from haizea.common.utils import abstract, roundDateTime
 
 import operator
+import signal
+import sys
 from time import sleep
 from mx.DateTime import now, TimeDelta
 
@@ -110,6 +112,8 @@
         # Create counters to keep track of interesting data.
         self.stats.createCounter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
         self.stats.createCounter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
+        self.stats.createCounter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
+        self.stats.createCounter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
         self.stats.createCounter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
         self.stats.createCounter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
         self.stats.createCounter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
@@ -126,6 +130,9 @@
         # Stop collecting data (this finalizes counters)
         self.stats.stop()
 
+        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
+        #       do with leases that are still running.
+
         self.logger.status("  Completed best-effort leases: %i" % self.stats.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
         self.logger.status("  Accepted AR leases: %i" % self.stats.counters[constants.COUNTER_ARACCEPTED], constants.RM)
         self.logger.status("  Rejected AR leases: %i" % self.stats.counters[constants.COUNTER_ARREJECTED], constants.RM)
@@ -162,14 +169,19 @@
                 
         ar_leases = [req for req in requests if isinstance(req, ARLease)]
         be_leases = [req for req in requests if isinstance(req, BestEffortLease)]
+        im_leases = [req for req in requests if isinstance(req, ImmediateLease)]
         
         # Queue best-effort
         for req in be_leases:
             self.scheduler.enqueue(req)
+            
+        # Add AR leases and immediate leases
+        for req in ar_leases + im_leases:
+            self.scheduler.add_pending_lease(req)
         
-        # Process AR leases, and run the scheduling function.
+        # Run the scheduling function.
         try:
-            self.scheduler.schedule(ar_leases, nexttime)
+            self.scheduler.schedule(nexttime)
         except Exception, msg:
             # Exit if something goes horribly wrong
             self.logger.error("Exception in scheduling function. Dumping state..." , constants.RM)
@@ -228,6 +240,7 @@
         """Return True if there are any leases still "in the system" """
         return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
     
+    # TODO: Replace this with a more general event handling system
     def notify_end_vm(self, lease, rr):
         """Notifies the resource manager that a VM has ended prematurely.
         
@@ -511,14 +524,15 @@
           event). Otherwise, set it to (now + quantum)
         - Sleep until next-wake-up-time
         
-        The clock never stops.
-        TODO: Add signal capturing so we can use a signal to gracefully
-              stop the clock.
-        
+        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
+        foreground) or a SIGTERM signal is received.
         """
         self.rm.logger.status("Starting simulated clock", constants.CLOCK)
         self.rm.stats.start(self.get_start_time())
         
+        signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
+        signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
+        
         done = False
         # Main loop
         while not done:
@@ -555,20 +569,26 @@
                 sleep((nextwakeup - now()).seconds)
             else:
                 self.lastwakeup = nextwakeup
-
-        # Stop
-        self.rm.printStatus()
-        self.rm.logger.status("Stopping clock", constants.CLOCK)
-        self.rm.stop()
           
     def print_stats(self, loglevel):
         """See docstring in base Clock class."""
         pass
-          
+    
+    def signalhandler_gracefulstop(self, signum, frame):
+        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
+        sigstr = ""
+        if signum == signal.SIGTERM:
+            sigstr = " (SIGTERM)"
+        elif signum == signal.SIGINT:
+            sigstr = " (SIGINT)"
+        self.rm.logger.status("Received signal %i%s" %(signum, sigstr), constants.CLOCK)
+        self.rm.logger.status("Stopping gracefully...", constants.CLOCK)
+        self.rm.stop()
+        sys.exit()
 
 if __name__ == "__main__":
     from haizea.common.config import RMConfig
-    CONFIGFILE = "../../../etc/sample.conf"
+    CONFIGFILE = "../../../etc/sample_opennebula.conf"
     CONFIG = RMConfig.fromFile(CONFIGFILE)
     RM = ResourceManager(CONFIG)
     RM.start()
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-07-11 13:09:08 UTC (rev 431)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-07-11 13:11:11 UTC (rev 432)
@@ -36,7 +36,9 @@
 from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeployment
 from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
 from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
+from haizea.resourcemanager.datastruct import ARLease, ImmediateLease 
 
+
 class SchedException(Exception):
     """A simple exception class used for scheduling exceptions"""
     pass
@@ -79,6 +81,7 @@
         self.scheduledleases = ds.LeaseTable(self)
         self.completedleases = ds.LeaseTable(self)
         self.rejectedleases = ds.LeaseTable(self)
+        self.pending_leases = []
 
         deploy_type = self.rm.config.get_lease_deployment_type()
         if deploy_type == constants.DEPLOYMENT_UNMANAGED:
@@ -105,78 +108,22 @@
         self.maxres = self.rm.config.getMaxReservations()
         self.numbesteffortres = 0
     
-    def schedule(self, requests, nexttime):        
-        if self.rm.config.getNodeSelectionPolicy() == constants.NODESELECTION_AVOIDPREEMPT:
-            avoidpreempt = True
-        else:
-            avoidpreempt = False
+    def schedule(self, nexttime):        
+        ar_leases = [req for req in self.pending_leases if isinstance(req, ARLease)]
+        im_leases = [req for req in self.pending_leases if isinstance(req, ImmediateLease)]
+        self.pending_leases = []
         
+        # Process immediate requests
+        for lease_req in im_leases:
+            self.__process_im_request(lease_req, nexttime)
+
         # Process AR requests
-        for lease_req in requests:
-            self.rm.logger.debug("LEASE-%i Processing request (AR)" % lease_req.id, constants.SCHED)
-            self.rm.logger.debug("LEASE-%i Start    %s" % (lease_req.id, lease_req.start), constants.SCHED)
-            self.rm.logger.debug("LEASE-%i Duration %s" % (lease_req.id, lease_req.duration), constants.SCHED)
-            self.rm.logger.debug("LEASE-%i ResReq   %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
-            self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
+        for lease_req in ar_leases:
+            self.__process_ar_request(lease_req, nexttime)
             
-            accepted = False
-            try:
-                self.schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
-                self.scheduledleases.add(lease_req)
-                self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
-                accepted = True
-            except SchedException, msg:
-                # If our first try avoided preemption, try again
-                # without avoiding preemption.
-                # TODO: Roll this into the exact slot fitting algorithm
-                if avoidpreempt:
-                    try:
-                        self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
-                        self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
-                        self.schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
-                        self.scheduledleases.add(lease_req)
-                        self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
-                        accepted = True
-                    except SchedException, msg:
-                        self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
-                        self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
-                else:
-                    self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
-                    self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
-            if accepted:
-                self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
-            else:
-                self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.id, constants.SCHED)
-                
-
-        done = False
-        newqueue = ds.Queue(self)
-        while not done and not self.is_queue_empty():
-            if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
-                self.rm.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.", constants.SCHED)
-                done = True
-            else:
-                lease_req = self.queue.dequeue()
-                try:
-                    self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id, constants.SCHED)
-                    self.rm.logger.debug("LEASE-%i Processing request (BEST-EFFORT)" % lease_req.id, constants.SCHED)
-                    self.rm.logger.debug("LEASE-%i Duration: %s" % (lease_req.id, lease_req.duration), constants.SCHED)
-                    self.rm.logger.debug("LEASE-%i ResReq  %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
-                    self.schedule_besteffort_lease(lease_req, nexttime)
-                    self.scheduledleases.add(lease_req)
-                    self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
-                except SchedException, msg:
-                    # Put back on queue
-                    newqueue.enqueue(lease_req)
-                    self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
-                    self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.id, constants.SCHED)
-                    if not self.rm.config.isBackfilling():
-                        done = True
+        # Process best-effort requests
+        self.__process_queue(nexttime)
         
-        for lease in self.queue:
-            newqueue.enqueue(lease)
-        
-        self.queue = newqueue 
     
     def process_reservations(self, nowtime):
         starting = [l for l in self.scheduledleases.entries.values() if l.has_starting_reservations(nowtime)]
@@ -202,13 +149,21 @@
         handler = ReservationEventHandler(on_start=on_start, on_end=on_end)
         self.handlers[type] = handler        
     
-    
     def enqueue(self, lease_req):
         """Queues a best-effort lease request"""
         self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
         self.queue.enqueue(lease_req)
         self.rm.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested), constants.SCHED)
 
+    def add_pending_lease(self, lease_req):
+        """
+        Adds a pending lease request, to be scheduled as soon as
+        the scheduling function is called. Unlike best-effort leases,
+        if one these leases can't be scheduled immediately, it is
+        rejected (instead of being placed on a queue, in case resources
+        become available later on).
+        """
+        self.pending_leases.append(lease_req)
 
     def is_queue_empty(self):
         """Return True is the queue is empty, False otherwise"""
@@ -236,8 +191,48 @@
             # reservations that we can slide back.
             self.reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
     
+    def __process_ar_request(self, lease_req, nexttime):
+        self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
+        self.rm.logger.debug("  Start   : %s" % lease_req.start, constants.SCHED)
+        self.rm.logger.debug("  Duration: %s" % lease_req.duration, constants.SCHED)
+        self.rm.logger.debug("  ResReq  : %s" % lease_req.requested_resources, constants.SCHED)
+        
+        if self.rm.config.getNodeSelectionPolicy() == constants.NODESELECTION_AVOIDPREEMPT:
+            avoidpreempt = True
+        else:
+            avoidpreempt = False
+
+        accepted = False
+        try:
+            self.__schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
+            self.scheduledleases.add(lease_req)
+            self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+            accepted = True
+        except SchedException, msg:
+            # If our first try avoided preemption, try again
+            # without avoiding preemption.
+            # TODO: Roll this into the exact slot fitting algorithm
+            if avoidpreempt:
+                try:
+                    self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+                    self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
+                    self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
+                    self.scheduledleases.add(lease_req)
+                    self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+                    accepted = True
+                except SchedException, msg:
+                    self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+                    self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+            else:
+                self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+                self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+        if accepted:
+            self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
+        else:
+            self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.id, constants.SCHED)
+        
     
-    def schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+    def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
         start = lease_req.start.requested
         end = lease_req.start.requested + lease_req.duration.requested
         try:
@@ -248,7 +243,6 @@
                 self.rm.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id), constants.SCHED)
                 for lease in leases:
                     self.preempt(lease, time=start)
-            
 
             # Add VM resource reservations
             vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, constants.ONCOMPLETE_ENDLEASE, False)
@@ -265,7 +259,37 @@
         except SlotFittingException, msg:
             raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
 
-    def schedule_besteffort_lease(self, req, nexttime):
+    def __process_queue(self, nexttime):
+        done = False
+        newqueue = ds.Queue(self)
+        while not done and not self.is_queue_empty():
+            if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
+                self.rm.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.", constants.SCHED)
+                done = True
+            else:
+                lease_req = self.queue.dequeue()
+                try:
+                    self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id, constants.SCHED)
+                    self.rm.logger.debug("  Duration: %s" % lease_req.duration, constants.SCHED)
+                    self.rm.logger.debug("  ResReq  : %s" % lease_req.requested_resources, constants.SCHED)
+                    self.__schedule_besteffort_lease(lease_req, nexttime)
+                    self.scheduledleases.add(lease_req)
+                    self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
+                except SchedException, msg:
+                    # Put back on queue
+                    newqueue.enqueue(lease_req)
+                    self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+                    self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.id, constants.SCHED)
+                    if not self.rm.config.isBackfilling():
+                        done = True
+        
+        for lease in self.queue:
+            newqueue.enqueue(lease)
+        
+        self.queue = newqueue 
+        
+
+    def __schedule_besteffort_lease(self, req, nexttime):
         # Determine earliest start time in each node
         if req.state == constants.LEASE_STATE_PENDING:
             # Figure out earliest start times based on
@@ -327,8 +351,24 @@
             
         except SlotFittingException, msg:
             raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
+
         
-    def schedule_immediate_lease(self, req, nexttime):
+    def __process_im_request(self, lease_req, nexttime):
+        self.rm.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes), constants.SCHED)
+        self.rm.logger.debug("  Duration: %s" % lease_req.duration, constants.SCHED)
+        self.rm.logger.debug("  ResReq  : %s" % lease_req.requested_resources, constants.SCHED)
+        
+        try:
+            self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+            self.scheduledleases.add(lease_req)
+            self.rm.stats.incrCounter(constants.COUNTER_IMACCEPTED, lease_req.id)
+            self.rm.logger.info("Immediate lease request #%i has been accepted." % lease_req.id, constants.SCHED)
+        except SchedException, msg:
+            self.rm.stats.incrCounter(constants.COUNTER_IMREJECTED, lease_req.id)
+            self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+        
+        
+    def __schedule_immediate_lease(self, req, nexttime):
         # Determine earliest start time in each node
         earliest = self.deployment.find_earliest_starting_times(req, nexttime)
         try:



More information about the Haizea-commit mailing list