[haizea-commit] r438 - in trunk: etc src/haizea/cli src/haizea/common src/haizea/resourcemanager

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Wed Jul 16 11:15:16 CDT 2008


Author: borja
Date: 2008-07-16 11:15:16 -0500 (Wed, 16 Jul 2008)
New Revision: 438

Modified:
   trunk/etc/sample.conf
   trunk/src/haizea/cli/commands.py
   trunk/src/haizea/common/config.py
   trunk/src/haizea/common/constants.py
   trunk/src/haizea/common/utils.py
   trunk/src/haizea/resourcemanager/resourcepool.py
   trunk/src/haizea/resourcemanager/rm.py
   trunk/src/haizea/resourcemanager/scheduler.py
   trunk/src/haizea/resourcemanager/stats.py
Log:
- Cleaned up stats.py
- Divided haizea.resourcemanager.stats.Stats into StatsData and StatsCollection. The former only contains the data (and will soon include convenient get methods) and the latter is in charge of collecting the data throughout Haizea's run.
- Modified stats module so all the collected data will be saved to a single file (containing a StatsData object) instead of multiple files in a directory. This has been done to simplify the upcoming analysis modules (so there will be a common object -StatsData- that can be used by both the scheduler and the analysis module)
- Updated other modules accordingly

Modified: trunk/etc/sample.conf
===================================================================
--- trunk/etc/sample.conf	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/etc/sample.conf	2008-07-16 16:15:16 UTC (rev 438)
@@ -51,12 +51,12 @@
 #         from a repository node before the lease can start.
 lease-deployment: unmanaged
 
-# Option: datadir
+# Option: datafile
 # Required: No
-# Description: This is the directory where statistics on
+# Description: This is the file where statistics on
 # the scheduler's run will be saved to (waiting time of leases,
 # utilization data, etc.). If omitted, no data will be saved.
-datadir: /var/tmp/haizea/results
+datafile: /var/tmp/haizea/results.dat
 
 
 

Modified: trunk/src/haizea/cli/commands.py
===================================================================
--- trunk/src/haizea/cli/commands.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/cli/commands.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -58,7 +58,6 @@
         profile = c.getProfile()
         tracefile = c.getTracefile()
         injfile = c.getInjectfile()
-        datadir = c.getDataDir()
         name = gen_traceinj_name(tracefile, injfile)
         configfile = etcdir + "/%s_%s.conf" % (profile, name)
         fc = open(configfile, "w")
@@ -95,9 +94,9 @@
         profile = c.getProfile()
         tracefile = c.getTracefile()
         injfile = c.getInjectfile()
-        datadir = c.getDataDir()
+        datafile = c.getDataFile()
         name = gen_traceinj_name(tracefile, injfile)
-        if not opt.onlymissing or not os.path.exists(datadir):
+        if not opt.onlymissing or not os.path.exists(datafile):
             configfile = etcdir + "/%s_%s.conf" % (profile, name)
             templatedata.append((profile, name, configfile))
 

Modified: trunk/src/haizea/common/config.py
===================================================================
--- trunk/src/haizea/common/config.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/common/config.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -117,8 +117,8 @@
         else:
             return self.config.get(constants.GENERAL_SEC, constants.LEASE_DEPLOYMENT_OPT)
 
-    def getDataDir(self):
-        return self.config.get(constants.GENERAL_SEC, constants.DATADIR_OPT)
+    def getDataFile(self):
+        return self.config.get(constants.GENERAL_SEC, constants.DATAFILE_OPT)
 
     #
     # SIMULATION OPTIONS
@@ -381,8 +381,10 @@
                     # Add datadir option
                     datadirname = genDataDirName(profile, tracefile, injectfile)
                     basedatadir = self.config.get(constants.MULTI_SEC, constants.BASEDATADIR_OPT)
-                    datadir = basedatadir + "/" + datadirname
-                    profileconfig.set(constants.GENERAL_SEC, constants.DATADIR_OPT, datadir)
+                    # TODO: Change this so there will be a single directory with all the
+                    # data files, instead of multiple directories
+                    datafile = basedatadir + "/" + datadirname + "/haizea.dat"
+                    profileconfig.set(constants.GENERAL_SEC, constants.DATAFILE_OPT, datadir)
                     
                     # Set profile option (only used internally)
                     profileconfig.set(constants.GENERAL_SEC, constants.PROFILE_OPT, profile)

Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/common/constants.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -236,12 +236,6 @@
 WORSE = 1
 
 
-# Data filenames
-COUNTERSFILE="data.dat"
-LEASESFILE="leases.dat"
-DOINGFILE="doing.dat"
-
-
 # Types of final tables in report generation
 TABLE_FINALVALUE="final-value"
 TABLE_FINALTIME="final-time"

Modified: trunk/src/haizea/common/utils.py
===================================================================
--- trunk/src/haizea/common/utils.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/common/utils.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -58,8 +58,8 @@
     caller = inspect.stack()[1][3]
     raise NotImplementedError(caller + ' must be implemented in subclass')
 
-def pickle(data, dir, file):
-    f = open (dir + "/" + file, "w")
+def pickle(data, file):
+    f = open (file, "w")
     dump(data, f, protocol = HIGHEST_PROTOCOL)
     f.close()
 

Modified: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/resourcemanager/resourcepool.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -181,7 +181,7 @@
                     
         self.getNode(nod_id).printFiles()
         
-        self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+        self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
     
     def verifyFileTransfer(self):
         pass
@@ -218,8 +218,15 @@
     #def resumeDone(self, lease, rr):
     #    pass
 
+    def poll_unscheduled_vm_end(self):
+        pass
+
+    # TODO
+    # The following should be implemented to handle asynchronous
+    # notifications of a VM ending
+    #def notify_vm_done(self, lease, rr):
+    #    pass
     
-    
     def getNodes(self):
         return self.nodes
 
@@ -249,7 +256,7 @@
         img.addMapping(lease_id, vnode)
         self.getNode(pnode).addFile(img)
         self.getNode(pnode).printFiles()
-        self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+        self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
         return img
         
     def checkImage(self, pnode, lease_id, vnode, imagefile):
@@ -275,7 +282,7 @@
                 img.addMapping(lease_id, vnode)
                 node.addFile(img)
                 node.printFiles()
-                self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+                self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
     
     def isInPool(self, pnode, imagefile, time):
         return self.getNode(pnode).isInPool(imagefile, after=time)
@@ -309,7 +316,7 @@
 
         node.printFiles()
         
-        self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+        self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
         
     def addRAMFileToNode(self, pnode, lease_id, vnode, size):
         node = self.getNode(pnode)
@@ -318,7 +325,7 @@
         f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
         node.addFile(f)        
         node.printFiles()
-        self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+        self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
 
     def removeRAMFileFromNode(self, pnode, lease_id, vnode):
         node = self.getNode(pnode)
@@ -326,7 +333,7 @@
         node.printFiles()
         node.removeRAMFile(lease_id, vnode)
         node.printFiles()
-        self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+        self.rm.stats.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
         
     def getMaxDiskUsage(self):
         return max([n.getTotalFileSize() for n in self.nodes])
@@ -343,6 +350,8 @@
         # enactment-specific information
         self.enactment_info = None
         # Kludgy way of keeping track of utilization
+        # TODO: Compute this information based on the lease reservations,
+        # either on the fly or when Haizea stops running.
         self.transfer_doing = constants.DOING_IDLE
         self.vm_doing = constants.DOING_IDLE
         

Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/resourcemanager/rm.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -102,7 +102,7 @@
             self.frontends = [OpenNebulaFrontend(self)]
 
         # Statistics collection 
-        self.stats = stats.Stats(self, self.config.getDataDir())
+        self.stats = stats.StatsCollection(self, self.config.getDataFile())
 
         
     def start(self):
@@ -110,14 +110,14 @@
         self.logger.status("Starting resource manager", constants.RM)
 
         # Create counters to keep track of interesting data.
-        self.stats.createCounter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
-        self.stats.createCounter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
-        self.stats.createCounter(constants.COUNTER_CPUUTILIZATION, constants.AVERAGE_TIMEWEIGHTED)
+        self.stats.create_counter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
+        self.stats.create_counter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
+        self.stats.create_counter(constants.COUNTER_CPUUTILIZATION, constants.AVERAGE_TIMEWEIGHTED)
         
         # Start the clock
         self.clock.run()
@@ -133,16 +133,16 @@
         # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
         #       do with leases that are still running.
 
-        self.logger.status("  Completed best-effort leases: %i" % self.stats.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
-        self.logger.status("  Accepted AR leases: %i" % self.stats.counters[constants.COUNTER_ARACCEPTED], constants.RM)
-        self.logger.status("  Rejected AR leases: %i" % self.stats.counters[constants.COUNTER_ARREJECTED], constants.RM)
+        self.logger.status("  Completed best-effort leases: %i" % self.stats.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
+        self.logger.status("  Accepted AR leases: %i" % self.stats.data.counters[constants.COUNTER_ARACCEPTED], constants.RM)
+        self.logger.status("  Rejected AR leases: %i" % self.stats.data.counters[constants.COUNTER_ARREJECTED], constants.RM)
         
         # In debug mode, dump the lease descriptors.
         for lease in self.scheduler.completedleases.entries.values():
             lease.print_contents()
             
         # Write all collected data to disk
-        self.stats.dumpStatsToDisk()
+        self.stats.save_to_disk()
         
     def process_requests(self, nexttime):
         """Process any new requests in the request frontend
@@ -381,11 +381,11 @@
     def __print_status(self):
         """Prints status summary."""
         self.logger.status("STATUS ---Begin---", constants.RM)
-        self.logger.status("STATUS Completed best-effort leases: %i" % self.rm.stats.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
-        self.logger.status("STATUS Queue size: %i" % self.rm.stats.counters[constants.COUNTER_QUEUESIZE], constants.RM)
+        self.logger.status("STATUS Completed best-effort leases: %i" % self.rm.stats.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED], constants.RM)
+        self.logger.status("STATUS Queue size: %i" % self.rm.stats.data.counters[constants.COUNTER_QUEUESIZE], constants.RM)
         self.logger.status("STATUS Best-effort reservations: %i" % self.rm.scheduler.numbesteffortres, constants.RM)
-        self.logger.status("STATUS Accepted AR leases: %i" % self.rm.stats.counters[constants.COUNTER_ARACCEPTED], constants.RM)
-        self.logger.status("STATUS Rejected AR leases: %i" % self.rm.stats.counters[constants.COUNTER_ARREJECTED], constants.RM)
+        self.logger.status("STATUS Accepted AR leases: %i" % self.rm.stats.data.counters[constants.COUNTER_ARACCEPTED], constants.RM)
+        self.logger.status("STATUS Rejected AR leases: %i" % self.rm.stats.data.counters[constants.COUNTER_ARREJECTED], constants.RM)
         self.logger.status("STATUS ----End----", constants.RM)        
     
     def __get_next_time(self):
@@ -588,7 +588,7 @@
 
 if __name__ == "__main__":
     from haizea.common.config import RMConfig
-    CONFIGFILE = "../../../etc/sample_opennebula.conf"
+    CONFIGFILE = "../../../etc/sample.conf"
     CONFIG = RMConfig.fromFile(CONFIGFILE)
     RM = ResourceManager(CONFIG)
     RM.start()
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -140,7 +140,7 @@
                 self.handlers[type(rr)].on_start(self, l, rr)
 
         util = self.slottable.getUtilization(nowtime)
-        self.rm.stats.appendStat(constants.COUNTER_CPUUTILIZATION, util)
+        self.rm.stats.append_stat(constants.COUNTER_CPUUTILIZATION, util)
         # TODO: Should be moved to rm.py
         self.rm.stats.tick()
         
@@ -151,7 +151,7 @@
     
     def enqueue(self, lease_req):
         """Queues a best-effort lease request"""
-        self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
+        self.rm.stats.incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
         self.queue.enqueue(lease_req)
         self.rm.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested), constants.SCHED)
 
@@ -206,7 +206,7 @@
         try:
             self.__schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
             self.scheduledleases.add(lease_req)
-            self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+            self.rm.stats.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
             accepted = True
         except SchedException, msg:
             # If our first try avoided preemption, try again
@@ -218,13 +218,13 @@
                     self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
                     self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
                     self.scheduledleases.add(lease_req)
-                    self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
+                    self.rm.stats.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
                     accepted = True
                 except SchedException, msg:
-                    self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+                    self.rm.stats.incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
                     self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
             else:
-                self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+                self.rm.stats.incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
                 self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
         if accepted:
             self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
@@ -274,7 +274,7 @@
                     self.rm.logger.debug("  ResReq  : %s" % lease_req.requested_resources, constants.SCHED)
                     self.__schedule_besteffort_lease(lease_req, nexttime)
                     self.scheduledleases.add(lease_req)
-                    self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
+                    self.rm.stats.decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
                 except SchedException, msg:
                     # Put back on queue
                     newqueue.enqueue(lease_req)
@@ -361,10 +361,10 @@
         try:
             self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
             self.scheduledleases.add(lease_req)
-            self.rm.stats.incrCounter(constants.COUNTER_IMACCEPTED, lease_req.id)
+            self.rm.stats.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
             self.rm.logger.info("Immediate lease request #%i has been accepted." % lease_req.id, constants.SCHED)
         except SchedException, msg:
-            self.rm.stats.incrCounter(constants.COUNTER_IMREJECTED, lease_req.id)
+            self.rm.stats.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
             self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
         
         
@@ -407,7 +407,7 @@
             req.vmimagemap = {}
             self.scheduledleases.remove(req)
             self.queue.enqueue_in_order(req)
-            self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
+            self.rm.stats.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
         else:
             susptype = self.rm.config.getSuspensionType()
             timebeforesuspend = time - vmrr.start
@@ -440,7 +440,7 @@
                 req.vmimagemap = {}
                 self.scheduledleases.remove(req)
                 self.queue.enqueue_in_order(req)
-                self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
+                self.rm.stats.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
         self.rm.logger.edebug("Lease after preemption:", constants.SCHED)
         req.print_contents()
         
@@ -491,7 +491,7 @@
             # No enactment to do here, since all the suspend/resume actions are
             # handled during the suspend/resume RRs
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RUN)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RUN, l.id)
         self.rm.logger.debug("LEASE-%i End of handleStartVM" % l.id, constants.SCHED)
         self.rm.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()), constants.SCHED)
 
@@ -511,14 +511,14 @@
             for vnode, pnode in l.vmimagemap.items():
                 self.rm.resourcepool.removeImage(pnode, l.id, vnode)
             if isinstance(l, ds.BestEffortLease):
-                self.rm.stats.incrCounter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+                self.rm.stats.incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
        
         if isinstance(l, ds.BestEffortLease):
             if rr.backfill_reservation == True:
                 self.numbesteffortres -= 1
         self.rm.logger.edebug("LEASE-%i After:" % l.id, constants.SCHED)
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
         self.rm.logger.debug("LEASE-%i End of handleEndVM" % l.id, constants.SCHED)
         self.rm.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()), constants.SCHED)
 
@@ -531,7 +531,7 @@
             self.rm.resourcepool.addRAMFileToNode(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
             l.memimagemap[vnode] = pnode
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
         self.rm.logger.debug("LEASE-%i End of handleStartSuspend" % l.id, constants.SCHED)
         self.rm.logger.info("Suspending lease %i..." % (l.id), constants.SCHED)
 
@@ -544,9 +544,9 @@
         l.state = constants.LEASE_STATE_SUSPENDED
         self.scheduledleases.remove(l)
         self.queue.enqueue_in_order(l)
-        self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, l.id)
+        self.rm.stats.incr_counter(constants.COUNTER_QUEUESIZE, l.id)
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
         self.rm.logger.debug("LEASE-%i End of handleEndSuspend" % l.id, constants.SCHED)
         self.rm.logger.info("Lease %i suspended." % (l.id), constants.SCHED)
 
@@ -556,7 +556,7 @@
         self.rm.resourcepool.resumeVMs(l, rr)
         rr.state = constants.RES_STATE_ACTIVE
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME, l.id)
         self.rm.logger.debug("LEASE-%i End of handleStartResume" % l.id, constants.SCHED)
         self.rm.logger.info("Resuming lease %i..." % (l.id), constants.SCHED)
 
@@ -569,7 +569,7 @@
         for vnode, pnode in rr.nodes.items():
             self.rm.resourcepool.removeRAMFileFromNode(pnode, l.id, vnode)
         l.print_contents()
-        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
+        self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
         self.rm.logger.debug("LEASE-%i End of handleEndResume" % l.id, constants.SCHED)
         self.rm.logger.info("Resumed lease %i" % (l.id), constants.SCHED)
 
@@ -579,7 +579,7 @@
 
 
     def __enqueue_in_order(self, req):
-        self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
+        self.rm.stats.incr_counter(constants.COUNTER_QUEUESIZE, req.id)
         self.queue.enqueue_in_order(req)
 
     
@@ -589,10 +589,10 @@
         return self.numbesteffortres < self.maxres
     
                         
-    def updateNodeVMState(self, nodes, state):
+    def updateNodeVMState(self, nodes, state, lease_id):
         for n in nodes:
             self.rm.resourcepool.getNode(n).vm_doing = state
 
-    def updateNodeTransferState(self, nodes, state):
+    def updateNodeTransferState(self, nodes, state, lease_id):
         for n in nodes:
             self.rm.resourcepool.getNode(n).transfer_doing = state

Modified: trunk/src/haizea/resourcemanager/stats.py
===================================================================
--- trunk/src/haizea/resourcemanager/stats.py	2008-07-16 15:00:19 UTC (rev 437)
+++ trunk/src/haizea/resourcemanager/stats.py	2008-07-16 16:15:16 UTC (rev 438)
@@ -19,62 +19,68 @@
 import os
 import os.path
 import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
 from haizea.common.utils import pickle
 from errno import EEXIST
 
-class Stats(object):
-    def __init__(self, rm, datadir):
-        self.rm = rm
-        self.datadir = datadir
-    
+class StatsData(object):
+    def __init__(self):
         # Counters
-        self.counters={}
-        self.counterLists={}
-        self.counterAvgType={}
+        self.counters = {}
+        self.counter_lists = {}
+        self.counter_avg_type = {}
     
         # What are the nodes doing?
-        self.doing = []        
-        self.nodes=dict([(i+1,[]) for i in range(self.rm.resourcepool.getNumNodes())])
- 
+        self.nodes = {}      
         
-    def createCounter(self, counterID, avgtype, initial=0):
-        self.counters[counterID] = initial
-        self.counterLists[counterID] = []
-        self.counterAvgType[counterID] = avgtype
+        # Lease data
+        self.leases = {}
 
-    def incrCounter(self, counterID, lease_id = None):
+class StatsCollection(object):
+    def __init__(self, rm, datafile):
+        self.data = StatsData()
+        self.rm = rm
+        self.datafile = datafile   
+        self.starttime = None
+
+    def create_counter(self, counter_id, avgtype, initial=0):
+        self.data.counters[counter_id] = initial
+        self.data.counter_lists[counter_id] = []
+        self.data.counter_avg_type[counter_id] = avgtype
+
+    def incr_counter(self, counter_id, lease_id = None):
         time = self.rm.clock.get_time()
-        self.appendStat(counterID, self.counters[counterID] + 1, lease_id, time)
+        self.append_stat(counter_id, self.data.counters[counter_id] + 1, lease_id, time)
 
-    def decrCounter(self, counterID, lease_id = None):
+    def decr_counter(self, counter_id, lease_id = None):
         time = self.rm.clock.get_time()
-        self.appendStat(counterID, self.counters[counterID] - 1, lease_id, time)
+        self.append_stat(counter_id, self.data.counters[counter_id] - 1, lease_id, time)
         
-    def appendStat(self, counterID, value, lease_id = None, time = None):
+    def append_stat(self, counter_id, value, lease_id = None, time = None):
         if time == None:
             time = self.rm.clock.get_time()
-        if len(self.counterLists[counterID]) > 0:
-            prevtime = self.counterLists[counterID][-1][0]
+        if len(self.data.counter_lists[counter_id]) > 0:
+            prevtime = self.data.counter_lists[counter_id][-1][0]
         else:
             prevtime = None
-        self.counters[counterID] = value
+        self.data.counters[counter_id] = value
         if time == prevtime:
-            self.counterLists[counterID][-1][2] = value
+            self.data.counter_lists[counter_id][-1][2] = value
         else:
-            self.counterLists[counterID].append([time, lease_id, value])
+            self.data.counter_lists[counter_id].append([time, lease_id, value])
+
         
     def start(self, time):
         self.starttime = time
         
         # Start the counters
-        for counterID in self.counters:
-            initial = self.counters[counterID]
-            self.appendStat(counterID, initial, time = time)
+        for counter_id in self.data.counters:
+            initial = self.data.counters[counter_id]
+            self.append_stat(counter_id, initial, time = time)
         
         # Start the doing
-        for n in self.nodes:
-            self.nodes[n].append((time, constants.DOING_IDLE))
+        numnodes = self.rm.resourcepool.getNumNodes()
+        for n in range(numnodes):
+            self.data.nodes[n+1] = [(time, constants.DOING_IDLE)]
 
     def tick(self):
         time = self.rm.clock.get_time()
@@ -82,75 +88,75 @@
         for node in self.rm.resourcepool.nodes:
             nodenum = node.nod_id
             doing = node.getState()
-            (lasttime, lastdoing) = self.nodes[nodenum][-1]
+            (lasttime, lastdoing) = self.data.nodes[nodenum][-1]
             if doing == lastdoing:
                 # No need to update
                 pass
             else:
                 if lasttime == time:
-                        self.nodes[nodenum][-1] = (time, doing)
+                    self.data.nodes[nodenum][-1] = (time, doing)
                 else:
-                    self.nodes[nodenum].append((time, doing))
+                    self.data.nodes[nodenum].append((time, doing))
         
     def stop(self):
         time = self.rm.clock.get_time()
 
         # Stop the counters
-        for counterID in self.counters:
-            self.appendStat(counterID, self.counters[counterID], time=time)
+        for counter_id in self.data.counters:
+            self.append_stat(counter_id, self.data.counters[counter_id], time=time)
         
         # Add the averages
-        for counterID in self.counters:
-            l = self.normalizeTimes(self.counterLists[counterID])
-            avgtype = self.counterAvgType[counterID]
+        for counter_id in self.data.counters:
+            l = self.normalize_times(self.data.counter_lists[counter_id])
+            avgtype = self.data.counter_avg_type[counter_id]
             if avgtype == constants.AVERAGE_NONE:
-                self.counterLists[counterID] = self.addNoAverage(l)
+                self.data.counter_lists[counter_id] = self.add_no_average(l)
             elif avgtype == constants.AVERAGE_NORMAL:
-                self.counterLists[counterID] = self.addAverage(l)
+                self.data.counter_lists[counter_id] = self.add_average(l)
             elif avgtype == constants.AVERAGE_TIMEWEIGHTED:
-                self.counterLists[counterID] = self.addTimeWeightedAverage(l)
+                self.data.counter_lists[counter_id] = self.add_timeweighted_average(l)
         
         # Stop the doing
         for node in self.rm.resourcepool.nodes:
             nodenum = node.nod_id
             doing = node.vm_doing
-            (lasttime, lastdoing) = self.nodes[nodenum][-1]
+            (lasttime, lastdoing) = self.data.nodes[nodenum][-1]
             if time != lasttime:
-                self.nodes[nodenum].append((time, doing))
+                self.data.nodes[nodenum].append((time, doing))
+                
+        self.normalize_doing()
             
-    def normalizeTimes(self, data):
+    def normalize_times(self, data):
         return [((v[0] - self.starttime).seconds, v[1], v[2]) for v in data]
         
-    def addNoAverage(self, data):
-        return [(v[0], v[1], v[2],None) for v in data]
+    def add_no_average(self, data):
+        return [(v[0], v[1], v[2], None) for v in data]
     
-    def addTimeWeightedAverage(self, data):
+    def add_timeweighted_average(self, data):
         accum = 0
-        prevTime = None
-        prevValue = None
-        startVM = None
+        prev_time = None
+        prev_value = None
         stats = []
         for v in data:
             time = v[0]
             lease_id = v[1]
             value = v[2]
-            if prevTime != None:
-                timediff = time - prevTime
-                weightedValue = prevValue*timediff
-                accum += weightedValue
+            if prev_time != None:
+                timediff = time - prev_time
+                weighted_value = prev_value*timediff
+                accum += weighted_value
                 avg = accum/time
             else:
                 avg = value
             stats.append((time, lease_id, value, avg))
-            prevTime = time
-            prevValue = value
+            prev_time = time
+            prev_value = value
         
         return stats        
     
-    def addAverage(self, data):
-        accum=0
-        count=0
-        startVM = None
+    def add_average(self, data):
+        accum = 0
+        count = 0
         stats = []
         for v in data:
             value = v[2]
@@ -161,45 +167,40 @@
         
         return stats          
     
-    def getNodesDoing(self):
-        starttime = self.rm.clock.get_start_time()
-        nodes=dict([(i+1,[]) for i in range(self.rm.resourcepool.getNumNodes())])
-        for n in self.nodes:
+    def normalize_doing(self):
+        nodes = dict([(i+1, []) for i in range(self.rm.resourcepool.getNumNodes())])
+        for n in self.data.nodes:
             nodes[n] = []
             prevtime = None
             prevdoing = None
-            for (time, doing) in self.nodes[n]:
+            for (time, doing) in self.data.nodes[n]:
                 if prevtime != None:
                     difftime = (time-prevtime).seconds
                     nodes[n].append((difftime, prevdoing))
                 prevtime = time
                 prevdoing = doing
-        return nodes
+        self.data.nodes = nodes
     
-    def dumpStatsToDisk(self):
+    def save_to_disk(self):
         try:
-            if not os.path.exists(self.datadir):
-                os.makedirs(self.datadir)
+            dirname = os.path.dirname(self.datafile)
+            if not os.path.exists(dirname):
+                os.makedirs(dirname)
         except OSError, e:
             if e.errno != EEXIST:
                 raise e
     
-        # Save counters
-        pickle(self.counterLists, self.datadir, constants.COUNTERSFILE)
-        
-        # Save lease data
-        leases = ds.LeaseTable(None)
-        leases.entries = self.rm.scheduler.completedleases.entries
+        # Add lease data
+        leases = self.rm.scheduler.completedleases.entries
         # Remove some data that won't be necessary in the reporting tools
-        for l in leases.entries.values():
+        for l in leases.values():
             l.clear_rrs()
             l.scheduler = None
             l.logger = None
-        pickle(leases, self.datadir, constants.LEASESFILE)
-        
-        # Save utilization data
-        doing = self.getNodesDoing()
-        pickle(doing, self.datadir, constants.DOINGFILE)
+            self.data.leases[l.id] = l
 
+        # Save data
+        pickle(self.data, self.datafile)
+
                 
             



More information about the Haizea-commit mailing list