[haizea-commit] r565 - trunk/src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Thu Feb 12 18:03:46 CST 2009


Author: borja
Date: 2009-02-12 18:03:42 -0600 (Thu, 12 Feb 2009)
New Revision: 565

Modified:
   trunk/src/haizea/resourcemanager/rm.py
Log:
Cleaned up exception handling in rm.py

Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py	2009-02-07 00:15:26 UTC (rev 564)
+++ trunk/src/haizea/resourcemanager/rm.py	2009-02-13 00:03:42 UTC (rev 565)
@@ -295,13 +295,14 @@
             self.daemonize()
         if self.rpc_server:
             self.rpc_server.start()
+            
         # Start the clock
         self.clock.run()
         
     def stop(self):
-        """Stops the resource manager"""
+        """Stops the resource manager gracefully and exits"""
         
-        self.logger.status("Stopping resource manager")
+        self.logger.status("Stopping resource manager gracefully...")
         
         # Stop collecting data (this finalizes counters)
         self.accounting.stop()
@@ -309,12 +310,10 @@
         # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
         #       do with leases that are still running.
 
-        self.logger.status("  Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
-        self.logger.status("  Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
-        self.logger.status("  Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
+        self.print_status()
         
         # In debug mode, dump the lease descriptors.
-        for lease in self.scheduler.completedleases.entries.values():
+        for lease in self.scheduler.completed_leases.entries.values():
             lease.print_contents()
             
         # Write all collected data to disk
@@ -323,7 +322,7 @@
         # Stop RPC server
         if self.rpc_server != None:
             self.rpc_server.stop()
-        
+                    
     def process_requests(self, nexttime):
         """Process any new requests in the request frontend
         
@@ -347,23 +346,18 @@
             requests += frontend.get_accumulated_requests()
         requests.sort(key=operator.attrgetter("submit_time"))
         
-        for req in requests:
-            self.scheduler.request_lease(req)
-        
-        # Run the scheduling function.
+        # Request leases and run the scheduling function.
         try:
+            self.logger.vdebug("Requesting leases")
+            for req in requests:
+                self.scheduler.request_lease(req)
+
+            self.logger.vdebug("Running scheduling function")
             self.scheduler.schedule(nexttime)
         except UnrecoverableError, exc:
-            self.logger.error("Unrecoverable error has happened in scheduling function.")
-            self.logger.error("Original exception:")
-            self.print_exception(exc.exc, exc.get_traceback())
-            self.logger.error("Unrecoverable error traceback:")
-            self.print_exception(exc, sys.exc_traceback)
-            self.panic()
+            self.__unrecoverable_error(exc)
         except Exception, exc:
-            self.logger.error("Unexpected exception has happened in scheduling function.")
-            self.print_exception(exc, sys.exc_traceback)
-            self.panic()
+            self.__unexpected_exception(exc)
 
 
     def process_reservations(self, time):
@@ -373,87 +367,24 @@
         try:
             self.scheduler.process_reservations(time)
         except UnrecoverableError, exc:
-            self.logger.error("Unrecoverable error has happened when processing reservations.")
-            self.logger.error("Original exception:")
-            self.print_exception(exc.exc, exc.get_traceback())
-            self.logger.error("Unrecoverable error traceback:")
-            self.print_exception(exc, sys.exc_traceback)
-            self.panic()
+            self.__unrecoverable_error(exc)
         except Exception, exc:
-            self.logger.error("Unexpected exception has happened when processing reservations.")
-            self.print_exception(exc, sys.exc_traceback)
-            self.panic()
-            
-    def print_exception(self, exc, exc_traceback):
-        tb = traceback.format_tb(exc_traceback)
-        for line in tb:
-            self.logger.error(line)
-        self.logger.error("Message: %s" % exc)
-    
-    def panic(self):
-        treatment = self.config.get("lease-failure-handling")
-
-        # Stop the resource manager
-        self.stop()
-
-        #self.print_stats(logging.getLevelName("ERROR"), verbose=True)
-        if treatment == constants.ONFAILURE_EXIT:
-            exit(1)
-        elif treatment == constants.ONFAILURE_EXIT_RAISE:
-            raise
+            self.__unexpected_exception(exc)
+         
+    def notify_event(self, lease_id, event):
+        """Notifies an asynchronous event to Haizea.
         
-        
-    def print_stats(self, loglevel, verbose=False):
-        """Print some basic statistics in the log
-        
         Arguments:
-        loglevel -- log level at which to print stats
-        verbose -- if True, will print the lease descriptor of all the scheduled
-                   and queued leases.
+        lease_id -- ID of lease that is affected by event
+        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
         """
-        
-        # Print clock stats and the next changepoint in slot table
-        self.clock.print_stats(loglevel)
-        self.logger.log(loglevel, "Next change point (in slot table): %s" % self.get_next_changepoint())
-
-        # Print descriptors of scheduled leases
-        scheduled = self.scheduler.leases.entries.keys()
-        self.logger.log(loglevel, "Scheduled requests: %i" % len(scheduled))
-        if verbose and len(scheduled)>0:
-            self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
-            for k in scheduled:
-                lease = self.scheduler.leases.get_lease(k)
-                lease.print_contents(loglevel=loglevel)
-            self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
-
-        # Print queue size and descriptors of queued leases
-        self.logger.log(loglevel, "Queue size: %i" % self.scheduler.queue.length())
-        if verbose and self.scheduler.queue.length()>0:
-            self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
-            for lease in self.scheduler.queue:
-                lease.print_contents(loglevel=loglevel)
-            self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
-
-    def get_next_changepoint(self):
-        """Return next changepoint in the slot table"""
-        return self.scheduler.slottable.peekNextChangePoint(self.clock.get_time())
-   
-    def exists_leases_in_rm(self):
-        """Return True if there are any leases still "in the system" """
-        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
-    
-    # TODO: Add more events. This is pending on actually getting interesting
-    # events in OpenNebula 1.2. For now, the only event is a prematurely
-    # ending VM.
-    def notify_event(self, lease_id, event):
         try:
             lease = self.scheduler.get_lease_by_id(lease_id)
             self.scheduler.notify_event(lease, event)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            self.logger.error("Exception when notifying an event for lease %i. Dumping state..." % lease_id )
-            self.print_stats(logging.getLevelName("ERROR"), verbose=True)
-            raise            
+        except UnrecoverableError, exc:
+            self.__unrecoverable_error(exc)
+        except Exception, exc:
+            self.__unexpected_exception(exc)
         
     def cancel_lease(self, lease_id):
         """Cancels a lease.
@@ -464,13 +395,93 @@
         try:
             lease = self.scheduler.get_lease_by_id(lease_id)
             self.scheduler.cancel_lease(lease_id)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            self.logger.error("Exception when canceling lease %i. Dumping state..." % lease_id)
-            self.print_stats(logging.getLevelName("ERROR"), verbose=True)
-            raise          
+        except UnrecoverableError, exc:
+            self.__unrecoverable_error(exc)
+        except Exception, exc:
+            self.__unexpected_exception(exc)
+            
+    def get_next_changepoint(self):
+        """Return next changepoint in the slot table"""
+        return self.scheduler.slottable.peekNextChangePoint(self.clock.get_time())
+   
+    def exists_leases_in_rm(self):
+        """Return True if there are any leases still "in the system" """
+        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
 
+    def print_status(self):
+        """Prints status summary."""
+        
+        leases = self.scheduler.leases.get_leases()
+        completed_leases = self.scheduler.completed_leases.get_leases()
+        self.logger.status("--- Haizea status summary ---")
+        self.logger.status("Number of leases (not including completed): %i" % len(leases))
+        self.logger.status("Completed leases: %i" % len(completed_leases))
+        self.logger.status("Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
+        self.logger.status("Queue size: %i" % self.accounting.data.counters[constants.COUNTER_QUEUESIZE])
+        self.logger.status("Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
+        self.logger.status("Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
+        self.logger.status("Accepted IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMACCEPTED])
+        self.logger.status("Rejected IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMREJECTED])
+        self.logger.status("---- End summary ----")        
+
+    def __unrecoverable_error(self, exc):
+        """Handles an unrecoverable error.
+        
+        This method prints information on the unrecoverable error and makes Haizea panic.
+        """
+        self.logger.error("An unrecoverable error has happened.")
+        self.logger.error("Original exception:")
+        self.print_exception(exc.exc, exc.get_traceback())
+        self.logger.error("Unrecoverable error traceback:")
+        self.__print_exception(exc, sys.exc_traceback)
+        self.__panic()
+
+    def __unexpected_exception(self, exc):
+        """Handles an unrecoverable error.
+        
+        This method prints information on the unrecoverable error and makes Haizea panic.
+        """
+        self.logger.error("An unexpected exception has happened.")
+        self.__print_exception(exc, sys.exc_traceback)
+        self.__panic()
             
+    def __print_exception(self, exc, exc_traceback):
+        """Prints an exception's traceback to the log."""
+        tb = traceback.format_tb(exc_traceback)
+        for line in tb:
+            self.logger.error(line)
+        self.logger.error("Message: %s" % exc)
+
+    
+    def __panic(self):
+        """Makes Haizea crash and burn in a panicked frenzy"""
+        
+        self.logger.status("Panicking...")
+
+        # Stop RPC server
+        if self.rpc_server != None:
+            self.rpc_server.stop()
+
+        # Dump state
+        self.print_status()
+        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
+
+        # Print lease descriptors
+        leases = self.scheduler.leases.get_leases()
+        if len(leases)>0:
+            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
+            for lease in leases:
+                lease.print_contents()
+            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
+
+        # Exit
+        treatment = self.config.get("lease-failure-handling")
+        if treatment == constants.ONFAILURE_EXIT:
+            exit(1)
+        elif treatment == constants.ONFAILURE_EXIT_RAISE:
+            raise
+
+            
 class Clock(object):
     """Base class for the resource manager's clock.
     
@@ -506,15 +517,7 @@
         the main loop of the resource manager."""
         return abstract()     
     
-    def print_stats(self, loglevel):
-        """Print some basic statistics about the clock on the log
         
-        Arguments:
-        loglevel -- log level at which statistics should be printed.
-        """
-        return abstract()
-    
-        
 class SimulatedClock(Clock):
     """Simulates the passage of time... really fast.
     
@@ -583,7 +586,7 @@
             
             # Print a status message
             if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
-                self.__print_status()
+                self.rm.print_status()
                 prevstatustime = self.time
                 
             # Skip to next point in time.
@@ -593,19 +596,6 @@
         self.logger.status("Stopping simulated clock")
         self.rm.stop()
         
-    def print_stats(self, loglevel):
-        """See docstring in base Clock class."""
-        pass
-        
-    def __print_status(self):
-        """Prints status summary."""
-        self.logger.status("STATUS ---Begin---")
-        self.logger.status("STATUS Completed best-effort leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
-        self.logger.status("STATUS Queue size: %i" % self.rm.accounting.data.counters[constants.COUNTER_QUEUESIZE])
-        self.logger.status("STATUS Best-effort reservations: %i" % self.rm.scheduler.numbesteffortres)
-        self.logger.status("STATUS Accepted AR leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_ARACCEPTED])
-        self.logger.status("STATUS Rejected AR leases: %i" % self.rm.accounting.data.counters[constants.COUNTER_ARREJECTED])
-        self.logger.status("STATUS ----End----")        
     
     def __get_next_time(self):
         """Determines what is the next point in time to skip to.
@@ -673,8 +663,6 @@
         # If we didn't arrive at a new time, and we're not done, we've fallen into
         # an infinite loop. This is A Bad Thing(tm).
         if newtime == prevtime and done != True:
-            self.logger.error("Simulated clock has fallen into an infinite loop. Dumping state..." )
-            self.rm.print_stats(logging.getLevelName("ERROR"), verbose=True)
             raise Exception, "Simulated clock has fallen into an infinite loop."
         
         return newtime, done
@@ -818,20 +806,15 @@
         # Stop the resource manager
         self.logger.status("Stopping real clock")
         self.rm.stop()
-          
-    def print_stats(self, loglevel):
-        """See docstring in base Clock class."""
-        pass
     
     def signalhandler_gracefulstop(self, signum, frame):
         """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
+        
         sigstr = ""
         if signum == signal.SIGTERM:
             sigstr = " (SIGTERM)"
         elif signum == signal.SIGINT:
             sigstr = " (SIGINT)"
         self.logger.status("Received signal %i%s" %(signum, sigstr))
-        self.logger.status("Stopping gracefully...")
         self.rm.stop()
-        sys.exit()
 



More information about the Haizea-commit mailing list