[haizea-commit] r590 - branches/TP2.0/src/haizea/core

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Wed Jun 17 12:50:42 CDT 2009


Author: borja
Date: 2009-06-17 12:50:41 -0500 (Wed, 17 Jun 2009)
New Revision: 590

Removed:
   branches/TP2.0/src/haizea/core/haizea.py
Log:
Deleting old rm.py and haizea.py files


Deleted: branches/TP2.0/src/haizea/core/haizea.py
===================================================================
--- branches/TP2.0/src/haizea/core/haizea.py	2009-06-17 17:49:22 UTC (rev 589)
+++ branches/TP2.0/src/haizea/core/haizea.py	2009-06-17 17:50:41 UTC (rev 590)
@@ -1,846 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago                                 #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
-# Complutense de Madrid (dsa-research.org)                                   #
-#                                                                            #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
-# not use this file except in compliance with the License. You may obtain    #
-# a copy of the License at                                                   #
-#                                                                            #
-# http://www.apache.org/licenses/LICENSE-2.0                                 #
-#                                                                            #
-# Unless required by applicable law or agreed to in writing, software        #
-# distributed under the License is distributed on an "AS IS" BASIS,          #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
-# See the License for the specific language governing permissions and        #
-# limitations under the License.                                             #
-# -------------------------------------------------------------------------- #
-
-"""The haizea module is the root of Haizea. If you want to
-see where the ball starts rolling, look at the following two functions:
-
-* haizea.Haizea.__init__()
-* haizea.Haizea.start()
-
-This module provides the following classes:
-
-* Haizea: The resource manager itself. Pretty much everything else
-  is contained in this class.
-* Clock: A base class for the resource manager's clock.
-* SimulatedClock: A clock for simulations.
-* RealClock: A clock that advances in realtime.
-"""
-
-import haizea.core.accounting as accounting
-import haizea.common.constants as constants
-from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
-from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
-from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
-from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
-from haizea.core.frontends.tracefile import TracefileFrontend
-from haizea.core.frontends.opennebula import OpenNebulaFrontend
-from haizea.core.frontends.rpc import RPCFrontend
-from haizea.core.leases import BestEffortLease
-from haizea.core.scheduler import UnrecoverableError
-from haizea.core.scheduler.lease_scheduler import LeaseScheduler
-from haizea.core.scheduler.vm_scheduler import VMScheduler
-from haizea.core.scheduler.slottable import SlotTable
-from haizea.core.scheduler.policy import Policy
-from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from haizea.core.rpcserver import RPCServer
-from haizea.common.utils import abstract, round_datetime, Singleton
-
-import operator
-import logging
-import signal
-import sys, os
-import traceback
-from time import sleep
-from math import ceil
-from mx.DateTime import now, TimeDelta
-
-DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
-DAEMON_STDERR = "/var/tmp/haizea.err"
-DEFAULT_LOGFILE = "/var/tmp/haizea.log"
-
-class Haizea(Singleton):
-    """Haizea
-    
-    This class is the root of Haizea. Pretty much everything else (scheduler,
-    enactment modules, etc.) is contained in this class. The Haizea
-    class is meant to be a singleton.
-    
-    """
-    
-    def __init__(self, config, daemon=False, pidfile=None):
-        """Initializes the resource manager.
-        
-        Argument:
-        config -- a populated instance of haizea.common.config.RMConfig
-        daemon -- True if Haizea must run as a daemon, False if it must
-                  run in the foreground
-        pidfile -- When running as a daemon, file to save pid to
-        """
-        self.config = config
-        
-        # Create the haizea components
-        
-        mode = config.get("mode")
-        
-        self.daemon = daemon
-        self.pidfile = pidfile
-
-        if mode == "simulated":
-            self.init_simulated_mode()
-        elif mode == "opennebula":
-            self.init_opennebula_mode()
-            
-        # Statistics collection 
-        self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
-        
-        self.logger = logging.getLogger("RM")
-
-    def init_simulated_mode(self):
-        """Initializes the resource manager in simulated mode
-        
-        """
-
-        # Simulated-time simulations always run in the foreground
-        clock = self.config.get("clock")
-        if clock == constants.CLOCK_SIMULATED:
-            self.daemon = False
-        
-        self.init_logging()
-                
-        if clock == constants.CLOCK_SIMULATED:
-            starttime = self.config.get("starttime")
-            self.clock = SimulatedClock(self, starttime)
-            self.rpc_server = None
-        elif clock == constants.CLOCK_REAL:
-            wakeup_interval = self.config.get("wakeup-interval")
-            non_sched = self.config.get("non-schedulable-interval")
-            self.clock = RealClock(self, wakeup_interval, non_sched)
-            self.rpc_server = RPCServer(self)
-                    
-        # Enactment modules
-        info_enact = SimulatedResourcePoolInfo()
-        vm_enact = SimulatedVMEnactment()
-        deploy_enact = SimulatedDeploymentEnactment()
-                
-        # Resource pool
-        preparation_type = self.config.get("lease-preparation")
-        if preparation_type == constants.PREPARATION_TRANSFER:
-            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
-                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
-            else:
-                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
-        else:
-            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
-    
-        # Slot table
-        slottable = SlotTable()
-        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
-            slottable.add_node(n)
-        
-        # Preparation scheduler
-        if preparation_type == constants.PREPARATION_UNMANAGED:
-            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
-        elif preparation_type == constants.PREPARATION_TRANSFER:
-            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
-    
-        # VM Scheduler
-        vm_scheduler = VMScheduler(slottable, resourcepool)
-    
-        # Lease Scheduler
-        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
-        
-        # Policy engine
-        self.policy = Policy()
-        
-        # Lease request frontends
-        if clock == constants.CLOCK_SIMULATED:
-            # In pure simulation, we can only use the tracefile frontend
-            self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
-        elif clock == constants.CLOCK_REAL:
-            # In simulation with a real clock, only the RPC frontend can be used
-            self.frontends = [RPCFrontend(self)] 
-        
-    def init_opennebula_mode(self):
-        """Initializes the resource manager in OpenNebula mode
-        
-        """
-        self.init_logging()
-
-        # The clock
-        wakeup_interval = self.config.get("wakeup-interval")
-        non_sched = self.config.get("non-schedulable-interval")
-        dry_run = self.config.get("dry-run")
-        fastforward = dry_run
-        self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
-        
-        # RPC server
-        if dry_run:
-            # No need for an RPC server when doing a dry run
-            self.rpc_server = None
-        else:
-            self.rpc_server = RPCServer(self)
-        
-        # Enactment modules
-        info_enact = OpenNebulaResourcePoolInfo()
-        vm_enact = OpenNebulaVMEnactment()
-        # No deployment in OpenNebula. Using dummy one for now.
-        deploy_enact = OpenNebulaDummyDeploymentEnactment()
-            
-        # Resource pool
-        resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
-
-        # Slot table
-        slottable = SlotTable()
-        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
-            slottable.add_node(n)
-
-
-        # Deployment module
-        preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
-
-        # VM Scheduler
-        vm_scheduler = VMScheduler(slottable, resourcepool)
-    
-        # Lease Scheduler
-        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
-
-        # Lease request frontends
-        self.frontends = [OpenNebulaFrontend(self)]        
-        
-        
-    def init_logging(self):
-        """Initializes logging
-        
-        """
-
-        from haizea.core.log import HaizeaLogger
-        logger = logging.getLogger("")
-        if self.daemon:
-            handler = logging.FileHandler(self.config.get("logfile"))
-        else:
-            handler = logging.StreamHandler()
-        formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
-        handler.setFormatter(formatter)
-        logger.addHandler(handler)
-        level = logging.getLevelName(self.config.get("loglevel"))
-        logger.setLevel(level)
-        logging.setLoggerClass(HaizeaLogger)
-
-        
-    def daemonize(self):
-        """Daemonizes the Haizea process.
-        
-        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
-        
-        """
-        # First fork
-        try:
-            pid = os.fork()
-            if pid > 0: 
-                # Exit first parent
-                sys.exit(0) 
-        except OSError, e:
-            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
-            sys.exit(1)
-    
-        # Decouple from parent environment.
-        os.chdir(".")
-        os.umask(0)
-        os.setsid()
-    
-        # Second fork
-        try:
-            pid = os.fork()
-            if pid > 0: 
-                # Exit second parent.
-                sys.exit(0) 
-        except OSError, e:
-            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
-            sys.exit(2)
-            
-        # Open file descriptors and print start message
-        si = file(DAEMON_STDIN, 'r')
-        so = file(DAEMON_STDOUT, 'a+')
-        se = file(DAEMON_STDERR, 'a+', 0)
-        pid = os.getpid()
-        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
-        sys.stderr.flush()
-        file(self.pidfile,'w+').write("%i\n" % pid)
-        
-        # Redirect standard file descriptors.
-        os.close(sys.stdin.fileno())
-        os.close(sys.stdout.fileno())
-        os.close(sys.stderr.fileno())
-        os.dup2(si.fileno(), sys.stdin.fileno())
-        os.dup2(so.fileno(), sys.stdout.fileno())
-        os.dup2(se.fileno(), sys.stderr.fileno())
-
-    def start(self):
-        """Starts the resource manager"""
-        self.logger.info("Starting resource manager")
-
-        # Create counters to keep track of interesting data.
-        self.accounting.create_counter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
-        self.accounting.create_counter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
-        self.accounting.create_counter(constants.COUNTER_UTILIZATION, constants.AVERAGE_NONE)
-        
-        if self.daemon:
-            self.daemonize()
-        if self.rpc_server:
-            self.rpc_server.start()
-            
-        # Start the clock
-        try:
-            self.clock.run()
-        except UnrecoverableError, exc:
-            self.__unrecoverable_error(exc)
-        except Exception, exc:
-            self.__unexpected_exception(exc)
-
-    def stop(self):
-        """Stops the resource manager by stopping the clock"""
-        self.clock.stop()
-        
-    def graceful_stop(self):
-        """Stops the resource manager gracefully and exits"""
-        
-        self.logger.status("Stopping resource manager gracefully...")
-        
-        # Stop collecting data (this finalizes counters)
-        self.accounting.stop()
-        
-        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
-        #       do with leases that are still running.
-
-        self.print_status()
-        
-        # In debug mode, dump the lease descriptors.
-        for lease in self.scheduler.completed_leases.entries.values():
-            lease.print_contents()
-            
-        # Write all collected data to disk
-        self.accounting.save_to_disk()
-        
-        # Stop RPC server
-        if self.rpc_server != None:
-            self.rpc_server.stop()
-                    
-    def process_requests(self, nexttime):
-        """Process any new requests in the request frontend
-        
-        Checks the request frontend to see if there are any new requests that
-        have to be processed. AR leases are sent directly to the schedule.
-        Best-effort leases are queued.
-        
-        Arguments:
-        nexttime -- The next time at which the scheduler can allocate resources.
-                    This is meant to be provided by the clock simply as a sanity
-                    measure when running in real time (to avoid scheduling something
-                    "now" to actually have "now" be in the past once the scheduling
-                    function returns. i.e., nexttime has nothing to do with whether 
-                    there are resources available at that time or not.
-        
-        """        
-        
-        # Get requests from frontend
-        requests = []
-        for frontend in self.frontends:
-            requests += frontend.get_accumulated_requests()
-        requests.sort(key=operator.attrgetter("submit_time"))
-        
-        # Request leases and run the scheduling function.
-        try:
-            self.logger.vdebug("Requesting leases")
-            for req in requests:
-                self.scheduler.request_lease(req)
-
-            self.logger.vdebug("Running scheduling function")
-            self.scheduler.schedule(nexttime)
-        except UnrecoverableError, exc:
-            self.__unrecoverable_error(exc)
-        except Exception, exc:
-            self.__unexpected_exception(exc)
-
-
-    def process_reservations(self, time):
-        """Process reservations starting/stopping at specified time"""
-        
-        # The scheduler takes care of this.
-        try:
-            self.scheduler.process_reservations(time)
-        except UnrecoverableError, exc:
-            self.__unrecoverable_error(exc)
-        except Exception, exc:
-            self.__unexpected_exception(exc)
-         
-    def notify_event(self, lease_id, event):
-        """Notifies an asynchronous event to Haizea.
-        
-        Arguments:
-        lease_id -- ID of lease that is affected by event
-        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
-        """
-        try:
-            lease = self.scheduler.get_lease_by_id(lease_id)
-            self.scheduler.notify_event(lease, event)
-        except UnrecoverableError, exc:
-            self.__unrecoverable_error(exc)
-        except Exception, exc:
-            self.__unexpected_exception(exc)
-        
-    def cancel_lease(self, lease_id):
-        """Cancels a lease.
-        
-        Arguments:
-        lease_id -- ID of lease to cancel
-        """    
-        try:
-            lease = self.scheduler.get_lease_by_id(lease_id)
-            self.scheduler.cancel_lease(lease)
-        except UnrecoverableError, exc:
-            self.__unrecoverable_error(exc)
-        except Exception, exc:
-            self.__unexpected_exception(exc)
-            
-    def get_next_changepoint(self):
-        """Return next changepoint in the slot table"""
-        return self.scheduler.slottable.peekNextChangePoint(self.clock.get_time())
-   
-    def exists_leases_in_rm(self):
-        """Return True if there are any leases still "in the system" """
-        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
-
-    def print_status(self):
-        """Prints status summary."""
-        
-        leases = self.scheduler.leases.get_leases()
-        completed_leases = self.scheduler.completed_leases.get_leases()
-        self.logger.status("--- Haizea status summary ---")
-        self.logger.status("Number of leases (not including completed): %i" % len(leases))
-        self.logger.status("Completed leases: %i" % len(completed_leases))
-        self.logger.status("Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
-        self.logger.status("Queue size: %i" % self.accounting.data.counters[constants.COUNTER_QUEUESIZE])
-        self.logger.status("Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
-        self.logger.status("Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
-        self.logger.status("Accepted IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMACCEPTED])
-        self.logger.status("Rejected IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMREJECTED])
-        self.logger.status("---- End summary ----")        
-
-    def __unrecoverable_error(self, exc):
-        """Handles an unrecoverable error.
-        
-        This method prints information on the unrecoverable error and makes Haizea panic.
-        """
-        self.logger.error("An unrecoverable error has happened.")
-        self.logger.error("Original exception:")
-        self.__print_exception(exc.exc, exc.get_traceback())
-        self.logger.error("Unrecoverable error traceback:")
-        self.__print_exception(exc, sys.exc_traceback)
-        self.__panic()
-
-    def __unexpected_exception(self, exc):
-        """Handles an unrecoverable error.
-        
-        This method prints information on the unrecoverable error and makes Haizea panic.
-        """
-        self.logger.error("An unexpected exception has happened.")
-        self.__print_exception(exc, sys.exc_traceback)
-        self.__panic()
-            
-    def __print_exception(self, exc, exc_traceback):
-        """Prints an exception's traceback to the log."""
-        tb = traceback.format_tb(exc_traceback)
-        for line in tb:
-            self.logger.error(line)
-        self.logger.error("Message: %s" % exc)
-
-    
-    def __panic(self):
-        """Makes Haizea crash and burn in a panicked frenzy"""
-        
-        self.logger.status("Panicking...")
-
-        # Stop RPC server
-        if self.rpc_server != None:
-            self.rpc_server.stop()
-
-        # Dump state
-        self.print_status()
-        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
-
-        # Print lease descriptors
-        leases = self.scheduler.leases.get_leases()
-        if len(leases)>0:
-            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
-            for lease in leases:
-                lease.print_contents()
-            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
-
-        # Exit
-        treatment = self.config.get("lease-failure-handling")
-        if treatment == constants.ONFAILURE_EXIT_RAISE:
-            raise
-        else:
-            exit(1)
-
-            
-class Clock(object):
-    """Base class for the resource manager's clock.
-    
-    The clock is in charge of periodically waking the resource manager so it
-    will process new requests and handle existing reservations. This is a
-    base class defining abstract methods.
-    
-    """
-    def __init__(self, haizea):
-        self.haizea = haizea
-        self.done = False
-    
-    def get_time(self): 
-        """Return the current time"""
-        return abstract()
-
-    def get_start_time(self): 
-        """Return the time at which the clock started ticking"""
-        return abstract()
-    
-    def get_next_schedulable_time(self): 
-        """Return the next time at which resources could be scheduled.
-        
-        The "next schedulable time" server sanity measure when running 
-        in real time (to avoid scheduling something "now" to actually 
-        have "now" be in the past once the scheduling function returns. 
-        i.e., the "next schedulable time" has nothing to do with whether 
-        there are resources available at that time or not.
-        """
-        return abstract()
-    
-    def run(self):
-        """Start and run the clock. This function is, in effect,
-        the main loop of the resource manager."""
-        return abstract()     
-
-    def stop(self):
-        """Stop the clock.
-        
-        Stopping the clock makes Haizea exit.
-        """
-        self.done = True    
-    
-        
-class SimulatedClock(Clock):
-    """Simulates the passage of time... really fast.
-    
-    The simulated clock steps through time to produce an ideal schedule.
-    See the run() function for a description of how time is incremented
-    exactly in the simulated clock.
-    
-    """
-    
-    def __init__(self, haizea, starttime):
-        """Initialize the simulated clock, starting at the provided starttime"""
-        Clock.__init__(self, haizea)
-        self.starttime = starttime
-        self.time = starttime
-        self.logger = logging.getLogger("CLOCK")
-        self.statusinterval = self.haizea.config.get("status-message-interval")
-       
-    def get_time(self):
-        """See docstring in base Clock class."""
-        return self.time
-    
-    def get_start_time(self):
-        """See docstring in base Clock class."""
-        return self.starttime
-
-    def get_next_schedulable_time(self):
-        """See docstring in base Clock class."""
-        return self.time    
-    
-    def run(self):
-        """Runs the simulated clock through time.
-        
-        The clock starts at the provided start time. At each point in time,
-        it wakes up the resource manager and then skips to the next time
-        where "something" is happening (see __get_next_time for a more
-        rigorous description of this).
-        
-        The clock stops when there is nothing left to do (no pending or 
-        queue requests, and no future reservations)
-        
-        The simulated clock can only work in conjunction with the
-        tracefile request frontend.
-        """
-        self.logger.status("Starting simulated clock")
-        self.haizea.accounting.start(self.get_start_time())
-        prevstatustime = self.time
-        
-        # Main loop
-        while not self.done:
-            # Check to see if there are any leases which are ending prematurely.
-            # Note that this is unique to simulation.
-            prematureends = self.haizea.scheduler.slottable.getPrematurelyEndingRes(self.time)
-            
-            # Notify the resource manager about the premature ends
-            for rr in prematureends:
-                self.haizea.notify_event(rr.lease.id, constants.EVENT_END_VM)
-                
-            # Process reservations starting/stopping at the current time and
-            # check if there are any new requests.
-            self.haizea.process_reservations(self.time)
-            self.haizea.process_requests(self.time)
-            
-            # Since processing requests may have resulted in new reservations
-            # starting now, we process reservations again.
-            self.haizea.process_reservations(self.time)
-            
-            # Print a status message
-            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
-                self.haizea.print_status()
-                prevstatustime = self.time
-                
-            # Skip to next point in time.
-            self.time, self.done = self.__get_next_time()
-                    
-        self.logger.status("Simulated clock has stopped")
-
-        # Stop the resource manager
-        self.haizea.graceful_stop()
-        
-    
-    def __get_next_time(self):
-        """Determines what is the next point in time to skip to.
-        
-        At a given point in time, the next time is the earliest of the following:
-        * The arrival of the next lease request
-        * The start or end of a reservation (a "changepoint" in the slot table)
-        * A premature end of a lease
-        """
-        
-        # Determine candidate next times
-        tracefrontend = self.__get_trace_frontend()
-        nextchangepoint = self.haizea.get_next_changepoint()
-        nextprematureend = self.haizea.scheduler.slottable.getNextPrematureEnd(self.time)
-        nextreqtime = tracefrontend.get_next_request_time()
-        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
-        self.logger.debug("Next request time: %s" % nextreqtime)
-        self.logger.debug("Next premature end: %s" % nextprematureend)
-        
-        # The previous time is now
-        prevtime = self.time
-        
-        # We initialize the next time to now too, to detect if
-        # we've been unable to determine what the next time is.
-        newtime = self.time
-        
-        # Find the earliest of the three, accounting for None values
-        if nextchangepoint != None and nextreqtime == None:
-            newtime = nextchangepoint
-        elif nextchangepoint == None and nextreqtime != None:
-            newtime = nextreqtime
-        elif nextchangepoint != None and nextreqtime != None:
-            newtime = min(nextchangepoint, nextreqtime)
-            
-        if nextprematureend != None:
-            newtime = min(nextprematureend, newtime)
-            
-        if nextchangepoint == newtime:
-            # Note that, above, we just "peeked" the next changepoint in the slottable.
-            # If it turns out we're skipping to that point in time, then we need to
-            # "get" it (this is because changepoints in the slottable are cached to
-            # minimize access to the slottable. This optimization turned out to
-            # be more trouble than it's worth and will probably be removed sometime
-            # soon.
-            newtime = self.haizea.scheduler.slottable.getNextChangePoint(newtime)
-            
-        # If there's no more leases in the system, and no more pending requests,
-        # then we're done.
-        if not self.haizea.exists_leases_in_rm() and not tracefrontend.exists_more_requests():
-            self.done = True
-        
-        # We can also be done if we've specified that we want to stop when
-        # the best-effort requests are all done or when they've all been submitted.
-        stopwhen = self.haizea.config.get("stop-when")
-        besteffort = self.haizea.scheduler.leases.get_leases(type = BestEffortLease)
-        pendingbesteffort = [r for r in tracefrontend.requests if isinstance(r, BestEffortLease)]
-        if stopwhen == constants.STOPWHEN_BEDONE:
-            if self.haizea.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
-                self.done = True
-        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
-            if len(pendingbesteffort) == 0:
-                self.done = True
-                
-        # If we didn't arrive at a new time, and we're not done, we've fallen into
-        # an infinite loop. This is A Bad Thing(tm).
-        if newtime == prevtime and self.done != True:
-            raise Exception, "Simulated clock has fallen into an infinite loop."
-        
-        return newtime, self.done
-
-    def __get_trace_frontend(self):
-        """Gets the tracefile frontend from the resource manager"""
-        frontends = self.haizea.frontends
-        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
-        if len(tracef) != 1:
-            raise Exception, "The simulated clock can only work with a tracefile request frontend."
-        else:
-            return tracef[0] 
-        
-        
-class RealClock(Clock):
-    """A realtime clock.
-    
-    The real clock wakes up periodically to, in turn, tell the resource manager
-    to wake up. The real clock can also be run in a "fastforward" mode for
-    debugging purposes (however, unlike the simulated clock, the clock will
-    always skip a fixed amount of time into the future).
-    """
-    def __init__(self, haizea, quantum, non_sched, fastforward = False):
-        """Initializes the real clock.
-        
-        Arguments:
-        haizea -- the resource manager
-        quantum -- interval between clock wakeups
-        fastforward -- if True, the clock won't actually sleep
-                       for the duration of the quantum."""
-        Clock.__init__(self, haizea)
-        self.fastforward = fastforward
-        if not self.fastforward:
-            self.lastwakeup = None
-        else:
-            self.lastwakeup = round_datetime(now())
-        self.logger = logging.getLogger("CLOCK")
-        self.starttime = self.get_time()
-        self.nextschedulable = None
-        self.nextperiodicwakeup = None
-        self.quantum = TimeDelta(seconds=quantum)
-        self.non_sched = TimeDelta(seconds=non_sched)
-               
-    def get_time(self):
-        """See docstring in base Clock class."""
-        if not self.fastforward:
-            return now()
-        else:
-            return self.lastwakeup
-    
-    def get_start_time(self):
-        """See docstring in base Clock class."""
-        return self.starttime
-
-    def get_next_schedulable_time(self):
-        """See docstring in base Clock class."""
-        return self.nextschedulable    
-    
-    def run(self):
-        """Runs the real clock through time.
-        
-        The clock starts when run() is called. In each iteration of the main loop
-        it will do the following:
-        - Wake up the resource manager
-        - Determine if there will be anything to do before the next
-          time the clock will wake up (after the quantum has passed). Note
-          that this information is readily available on the slot table.
-          If so, set next-wakeup-time to (now + time until slot table
-          event). Otherwise, set it to (now + quantum)
-        - Sleep until next-wake-up-time
-        
-        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
-        foreground) or a SIGTERM signal is received.
-        """
-        self.logger.status("Starting clock")
-        self.haizea.accounting.start(self.get_start_time())
-        
-        try:
-            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
-            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
-        except ValueError, exc:
-            # This means Haizea is not the main thread, which will happen
-            # when running it as part of a py.test. We simply ignore this
-            # to allow the test to continue.
-            pass
-        
-        # Main loop
-        while not self.done:
-            self.logger.status("Waking up to manage resources")
-            
-            # Save the waking time. We want to use a consistent time in the 
-            # resource manager operations (if we use now(), we'll get a different
-            # time every time)
-            if not self.fastforward:
-                self.lastwakeup = round_datetime(self.get_time())
-            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
-                
-            # Next schedulable time
-            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
-            
-            # Wake up the resource manager
-            self.haizea.process_reservations(self.lastwakeup)
-            # TODO: Compute nextschedulable here, before processing requests
-            self.haizea.process_requests(self.nextschedulable)
-            
-            # Next wakeup time
-            time_now = now()
-            if self.lastwakeup + self.quantum <= time_now:
-                quantums = (time_now - self.lastwakeup) / self.quantum
-                quantums = int(ceil(quantums)) * self.quantum
-                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
-            else:
-                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
-            
-            # Determine if there's anything to do before the next wakeup time
-            nextchangepoint = self.haizea.get_next_changepoint()
-            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
-                # We need to wake up earlier to handle a slot table event
-                nextwakeup = nextchangepoint
-                self.haizea.scheduler.slottable.getNextChangePoint(self.lastwakeup)
-                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
-            else:
-                # Nothing to do before waking up
-                nextwakeup = self.nextperiodicwakeup
-                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
-            
-            # The only exit condition from the real clock is if the stop_when_no_more_leases
-            # is set to True, and there's no more work left to do.
-            # TODO: This first if is a kludge. Other options should only interact with
-            # options through the configfile's get method. The "stop-when-no-more-leases"
-            # option is currently OpenNebula-specific (while the real clock isn't; it can
-            # be used by both the simulator and the OpenNebula mode). This has to be
-            # fixed.            
-            if self.haizea.config._options.has_key("stop-when-no-more-leases"):
-                stop_when_no_more_leases = self.haizea.config.get("stop-when-no-more-leases")
-                if stop_when_no_more_leases and not self.haizea.exists_leases_in_rm():
-                    self.done = True
-            
-            # Sleep
-            if not self.done:
-                if not self.fastforward:
-                    sleep((nextwakeup - now()).seconds)
-                else:
-                    self.lastwakeup = nextwakeup
-
-        self.logger.status("Real clock has stopped")
-
-        # Stop the resource manager
-        self.haizea.graceful_stop()
-    
-    def signalhandler_gracefulstop(self, signum, frame):
-        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
-        
-        sigstr = ""
-        if signum == signal.SIGTERM:
-            sigstr = " (SIGTERM)"
-        elif signum == signal.SIGINT:
-            sigstr = " (SIGINT)"
-        self.logger.status("Received signal %i%s" %(signum, sigstr))
-        self.done = True
-



More information about the Haizea-commit mailing list