[haizea-commit] r565 - trunk/src/haizea/resourcemanager
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Feb 12 18:03:46 CST 2009
Author: borja
Date: 2009-02-12 18:03:42 -0600 (Thu, 12 Feb 2009)
New Revision: 565
Modified:
trunk/src/haizea/resourcemanager/rm.py
Log:
Cleaned up exception handling in rm.py
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2009-02-07 00:15:26 UTC (rev 564)
+++ trunk/src/haizea/resourcemanager/rm.py 2009-02-13 00:03:42 UTC (rev 565)
@@ -295,13 +295,14 @@
self.daemonize()
if self.rpc_server:
self.rpc_server.start()
+
# Start the clock
self.clock.run()
def stop(self):
- """Stops the resource manager"""
+ """Stops the resource manager gracefully and exits"""
- self.logger.status("Stopping resource manager")
+ self.logger.status("Stopping resource manager gracefully...")
# Stop collecting data (this finalizes counters)
self.accounting.stop()
@@ -309,12 +310,10 @@
# TODO: When gracefully stopping mid-scheduling, we need to figure out what to
# do with leases that are still running.
- self.logger.status(" Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
- self.logger.status(" Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
- self.logger.status(" Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
+ self.print_status()
# In debug mode, dump the lease descriptors.
- for lease in self.scheduler.completedleases.entries.values():
+ for lease in self.scheduler.completed_leases.entries.values():
lease.print_contents()
# Write all collected data to disk
@@ -323,7 +322,7 @@
# Stop RPC server
if self.rpc_server != None:
self.rpc_server.stop()
-
+
def process_requests(self, nexttime):
"""Process any new requests in the request frontend
@@ -347,23 +346,18 @@
requests += frontend.get_accumulated_requests()
requests.sort(key=operator.attrgetter("submit_time"))
- for req in requests:
- self.scheduler.request_lease(req)
-
- # Run the scheduling function.
+ # Request leases and run the scheduling function.
try:
+ self.logger.vdebug("Requesting leases")
+ for req in requests:
+ self.scheduler.request_lease(req)
+
+ self.logger.vdebug("Running scheduling function")
self.scheduler.schedule(nexttime)
except UnrecoverableError, exc:
- self.logger.error("Unrecoverable error has happened in scheduling function.")
- self.logger.error("Original exception:")
- self.print_exception(exc.exc, exc.get_traceback())
- self.logger.error("Unrecoverable error traceback:")
- self.print_exception(exc, sys.exc_traceback)
- self.panic()
+ self.__unrecoverable_error(exc)
except Exception, exc:
- self.logger.error("Unexpected exception has happened in scheduling function.")
- self.print_exception(exc, sys.exc_traceback)
- self.panic()
+ self.__unexpected_exception(exc)
def process_reservations(self, time):
@@ -373,87 +367,24 @@
try:
self.scheduler.process_reservations(time)
except UnrecoverableError, exc:
- self.logger.error("Unrecoverable error has happened when processing reservations.")
- self.logger.error("Original exception:")
- self.print_exception(exc.exc, exc.get_traceback())
- self.logger.error("Unrecoverable error traceback:")
- self.print_exception(exc, sys.exc_traceback)
- self.panic()
+ self.__unrecoverable_error(exc)
except Exception, exc:
- self.logger.error("Unexpected exception has happened when processing reservations.")
- self.print_exception(exc, sys.exc_traceback)
- self.panic()
-
- def print_exception(self, exc, exc_traceback):
- tb = traceback.format_tb(exc_traceback)
- for line in tb:
- self.logger.error(line)
- self.logger.error("Message: %s" % exc)
-
- def panic(self):
- treatment = self.config.get("lease-failure-handling")
-
- # Stop the resource manager
- self.stop()
-
- #self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- if treatment == constants.ONFAILURE_EXIT:
- exit(1)
- elif treatment == constants.ONFAILURE_EXIT_RAISE:
- raise
+ self.__unexpected_exception(exc)
+
+ def notify_event(self, lease_id, event):
+ """Notifies an asynchronous event to Haizea.
-
- def print_stats(self, loglevel, verbose=False):
- """Print some basic statistics in the log
-
Arguments:
- loglevel -- log level at which to print stats
- verbose -- if True, will print the lease descriptor of all the scheduled
- and queued leases.
+ lease_id -- ID of lease that is affected by event
+ event -- Event (currently, only the constants.EVENT_END_VM event is supported)
"""
-
- # Print clock stats and the next changepoint in slot table
- self.clock.print_stats(loglevel)
- self.logger.log(loglevel, "Next change point (in slot table): %s" % self.get_next_changepoint())
-
- # Print descriptors of scheduled leases
- scheduled = self.scheduler.leases.entries.keys()
- self.logger.log(loglevel, "Scheduled requests: %i" % len(scheduled))
- if verbose and len(scheduled)>0:
- self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
- for k in scheduled:
- lease = self.scheduler.leases.get_lease(k)
- lease.print_contents(loglevel=loglevel)
- self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
-
- # Print queue size and descriptors of queued leases
- self.logger.log(loglevel, "Queue size: %i" % self.scheduler.queue.length())
- if verbose and self.scheduler.queue.length()>0:
- self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
- for lease in self.scheduler.queue:
- lease.print_contents(loglevel=loglevel)
- self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
-
- def get_next_changepoint(self):
- """Return next changepoint in the slot table"""
- return self.scheduler.slottable.peekNextChangePoint(self.clock.get_time())
-
- def exists_leases_in_rm(self):
- """Return True if there are any leases still "in the system" """
- return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
-
- # TODO: Add more events. This is pending on actually getting interesting
- # events in OpenNebula 1.2. For now, the only event is a prematurely
- # ending VM.
- def notify_event(self, lease_id, event):
try:
lease = self.scheduler.get_lease_by_id(lease_id)
self.scheduler.notify_event(lease, event)
- except Exception, msg:
- # Exit if something goes horribly wrong
- self.logger.error("Exception when notifying an event for lease %i. Dumping state..." % lease_id )
- self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- raise
+ except UnrecoverableError, exc:
+ self.__unrecoverable_error(exc)
+ except Exception, exc:
+ self.__unexpected_exception(exc)
def cancel_lease(self, lease_id):
"""Cancels a lease.
@@ -464,13 +395,93 @@
try:
lease = self.scheduler.get_lease_by_id(lease_id)
self.scheduler.cancel_lease(lease_id)
- except Exception, msg:
- # Exit if something goes horribly wrong
- self.logger.error("Exception when canceling lease %i. Dumping state..." % lease_id)
- self.print_stats(logging.getLevelName("ERROR"), verbose=True)
- raise
+ except UnrecoverableError, exc:
+ self.__unrecoverable_error(exc)
+ except Exception, exc:
+ self.__unexpected_exception(exc)
+
+ def get_next_changepoint(self):
+ """Return next changepoint in the slot table"""
+ return self.scheduler.slottable.peekNextChangePoint(self.clock.get_time())
+
+ def exists_leases_in_rm(self):
+ """Return True if there are any leases still "in the system" """
+ return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
+ def print_status(self):
+ """Prints status summary."""
+
+ leases = self.scheduler.leases.get_leases()
+ completed_leases = self.scheduler.completed_leases.get_leases()
+ self.logger.status("--- Haizea status summary ---")
+ self.logger.status("Number of leases (not including completed): %i" % len(leases))
+ self.logger.status("Completed leases: %i" % len(completed_leases))
+ self.logger.status("Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
+ self.logger.status("Queue size: %i" % self.accounting.data.counters[constants.COUNTER_QUEUESIZE])
+ self.logger.status("Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
+ self.logger.status("Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
+ self.logger.status("Accepted IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMACCEPTED])
+ self.logger.status("Rejected IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMREJECTED])
+ self.logger.status("---- End summary ----")
+
+ def __unrecoverable_error(self, exc):
+ """Handles an unrecoverable error.
+
+ This method prints information on the unrecoverable error and makes Haizea panic.
+ """
+ self.logger.error("An unrecoverable error has happened.")
+ self.logger.error("Original exception:")
+ self.print_exception(exc.exc, exc.get_traceback())
+ self.logger.error("Unrecoverable error traceback:")
+ self.__print_exception(exc, sys.exc_traceback)
+ self.__panic()
+
+ def __unexpected_exception(self, exc):
+ """Handles an unrecoverable error.
+
+ This method prints information on the unrecoverable error and makes Haizea panic.
+ """
+ self.logger.error("An unexpected exception has happened.")
+ self.__print_exception(exc, sys.exc_traceback)
+ self.__panic()
+ def __print_exception(self, exc, exc_traceback):
+ """Prints an exception's traceback to the log."""
+ tb = traceback.format_tb(exc_traceback)
+ for line in tb:
+ self.logger.error(line)
+ self.logger.error("Message: %s" % exc)
+
+
+ def __panic(self):
+ """Makes Haizea crash and burn in a panicked frenzy"""
+
+ self.logger.status("Panicking...")
+
+ # Stop RPC server
+ if self.rpc_server != None:
+ self.rpc_server.stop()
+
+ # Dump state
+ self.print_status()
+ self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
+
+ # Print lease descriptors
+ leases = self.scheduler.leases.get_leases()
+ if len(leases)>0:
+ self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
+ for lease in leases:
+ lease.print_contents()
+ self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
+
+ # Exit
+ treatment = self.config.get("lease-failure-handling")
+ if treatment == constants.ONFAILURE_EXIT:
+ exit(1)
+ elif treatment == constants.ONFAILURE_EXIT_RAISE:
+ raise
+
+
class Clock(object):
"""Base class for the resource manager's clock.
@@ -506,15 +517,7 @@
the main loop of the resource manager."""
return abstract()
- def print_stats(self, loglevel):
- """Print some basic statistics about the clock on the log
- Arguments:
- loglevel -- log level at which statistics should be printed.
- """
- return abstract()
-
-
class SimulatedClock(Clock):
"""Simulates the passage of time... really fast.
@@ -583,7 +586,7 @@
# Print a status message
if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
- self.__print_status()
+ self.rm.print_status()
prevstatustime = self.time
# Skip to next point in time.
@@ -593,19 +596,6 @@
self.logger.status("Stopping simulated clock")
self.rm.stop()
- def print_stats(self, loglevel):
- """See docstring in base Clock class."""
- pass
-
- def __print_status(self):
- """Prints status summary."""
- self.logger.status("STATUS ---Begin---")
- self.logger.status("STATUS Completed best-effort leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
- self.logger.status("STATUS Queue size: %i" % self.rm.accounting.data.counters[constants.COUNTER_QUEUESIZE])
- self.logger.status("STATUS Best-effort reservations: %i" % self.rm.scheduler.numbesteffortres)
- self.logger.status("STATUS Accepted AR leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_ARACCEPTED])
- self.logger.status("STATUS Rejected AR leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_ARREJECTED])
- self.logger.status("STATUS ----End----")
def __get_next_time(self):
"""Determines what is the next point in time to skip to.
@@ -673,8 +663,6 @@
# If we didn't arrive at a new time, and we're not done, we've fallen into
# an infinite loop. This is A Bad Thing(tm).
if newtime == prevtime and done != True:
- self.logger.error("Simulated clock has fallen into an infinite loop. Dumping state..." )
- self.rm.print_stats(logging.getLevelName("ERROR"), verbose=True)
raise Exception, "Simulated clock has fallen into an infinite loop."
return newtime, done
@@ -818,20 +806,15 @@
# Stop the resource manager
self.logger.status("Stopping real clock")
self.rm.stop()
-
- def print_stats(self, loglevel):
- """See docstring in base Clock class."""
- pass
def signalhandler_gracefulstop(self, signum, frame):
"""Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
+
sigstr = ""
if signum == signal.SIGTERM:
sigstr = " (SIGTERM)"
elif signum == signal.SIGINT:
sigstr = " (SIGINT)"
self.logger.status("Received signal %i%s" %(signum, sigstr))
- self.logger.status("Stopping gracefully...")
self.rm.stop()
- sys.exit()
More information about the Haizea-commit
mailing list