[haizea-commit] r605 - in branches/TP2.0/src/haizea/core: . scheduler scheduler/preparation_schedulers

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Tue Jul 14 12:09:08 CDT 2009


Author: borja
Date: 2009-07-14 12:09:07 -0500 (Tue, 14 Jul 2009)
New Revision: 605

Modified:
   branches/TP2.0/src/haizea/core/leases.py
   branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
   branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
   branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
   branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
Log:
- Image transfers are working again, although that code still needs considerable cleaning up.
- Got rid of memimagemap and diskimagemap from the Lease class, since they didn't actually do anything useful.

Modified: branches/TP2.0/src/haizea/core/leases.py
===================================================================
--- branches/TP2.0/src/haizea/core/leases.py	2009-07-14 15:27:35 UTC (rev 604)
+++ branches/TP2.0/src/haizea/core/leases.py	2009-07-14 17:09:07 UTC (rev 605)
@@ -106,8 +106,6 @@
         # (keep track of the lease's state, resource reservations, etc.)
         self.state = LeaseStateMachine()
         self.numnodes = len(requested_resources)
-        self.diskimagemap = {}
-        self.memimagemap = {}
         self.preparation_rrs = []
         self.vm_rrs = []
 
@@ -139,8 +137,6 @@
         self.logger.log(loglevel, "State          : %s" % Lease.state_str[self.get_state()])
         self.logger.log(loglevel, "Resource req   : %s" % self.requested_resources)
         self.logger.log(loglevel, "Software       : %s" % self.software)
-        self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
-        self.logger.log(loglevel, "Mem image map  : %s" % pretty_nodemap(self.memimagemap))
         self.print_rrs(loglevel)
         self.logger.log(loglevel, "--------------------------------------------------")
 

Modified: branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py	2009-07-14 15:27:35 UTC (rev 604)
+++ branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py	2009-07-14 17:09:07 UTC (rev 605)
@@ -474,6 +474,7 @@
         is_ready = False
         preparation_rrs = []
         if lease_state == Lease.STATE_SUSPENDED_QUEUED:
+            #self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
             self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
         else:
             preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)

Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py	2009-07-14 15:27:35 UTC (rev 604)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py	2009-07-14 17:09:07 UTC (rev 605)
@@ -19,7 +19,7 @@
 import haizea.common.constants as constants
 from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
 from haizea.core.scheduler.slottable import ResourceReservation
-from haizea.core.leases import Lease
+from haizea.core.leases import Lease, Capacity
 from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException
 from haizea.common.utils import estimate_transfer_time, get_config
 from haizea.core.scheduler.slottable import ResourceTuple
@@ -53,26 +53,30 @@
         self.handlers ={}
         self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
                                 sched    = self,
-                                on_start = ImageTransferPreparationScheduler.handle_start_filetransfer,
-                                on_end   = ImageTransferPreparationScheduler.handle_end_filetransfer)
+                                on_start = ImageTransferPreparationScheduler._handle_start_filetransfer,
+                                on_end   = ImageTransferPreparationScheduler._handle_end_filetransfer)
 
+        self.handlers[DiskImageMigrationResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = ImageTransferPreparationScheduler._handle_start_migrate,
+                                on_end   = ImageTransferPreparationScheduler._handle_end_migrate)
+
     def schedule(self, lease, vmrr, nexttime):
-        if isinstance(lease, ARLease):
-            return self.schedule_for_ar(lease, vmrr, nexttime)
-        elif isinstance(lease, BestEffortLease):
-            return self.schedule_for_besteffort(lease, vmrr, nexttime)
+        if lease.get_type() == Lease.ADVANCE_RESERVATION:
+            return self.__schedule_deadline(lease, vmrr, nexttime)
+        elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
+            return self.__schedule_asap(lease, vmrr, nexttime)
             
     def cancel_preparation(self, lease):
         if isinstance(lease, BestEffortLease):
             self.__remove_from_fifo_transfers(lease.id)
         self.cleanup(lease)
-        lease.diskimagemap = {}
 
         
     def is_ready(self, lease, vmrr):
         return False        
         
-    def schedule_for_ar(self, lease, vmrr, nexttime):
+    def __schedule_deadline(self, lease, vmrr, nexttime):
         config = get_config()
         mechanism = config.get("transfer-mechanism")
         reusealg = config.get("diskimage-reuse")
@@ -89,7 +93,7 @@
         end = lease.start.requested + lease.duration.requested
         for (vnode, pnode) in nodeassignment.items():
             lease_id = lease.id
-            self.logger.debug("Scheduling image transfer of '%s' from vnode %i to physnode %i" % (lease.diskimage_id, vnode, pnode))
+            self.logger.debug("Scheduling image transfer of '%s' from vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
 
             if reusealg == constants.REUSE_IMAGECACHES:
                 if self.resourcepool.exists_reusable_image(pnode, lease.diskimage_id, start):
@@ -123,7 +127,7 @@
                 
         return [filetransfer], is_ready
 
-    def schedule_for_besteffort(self, lease, vmrr, nexttime):
+    def __schedule_asap(self, lease, vmrr, nexttime):
         config = get_config()
         mechanism = config.get("transfer-mechanism")
         reusealg = config.get("diskimage-reuse")
@@ -154,17 +158,8 @@
 
         if len(musttransfer)>0:
             transferRRs = self.schedule_imagetransfer_fifo(lease, musttransfer, nexttime)
-            endtransfer = transferRRs[-1].end
-            lease.imagesavail = endtransfer
-
-        if len(piggybacking) > 0: 
-            endtimes = [t.end for t in piggybacking]
-            if len(musttransfer) > 0:
-                endtimes.append(endtransfer)
-            lease.imagesavail = max(endtimes)
             
         if len(musttransfer)==0 and len(piggybacking)==0:
-            lease.imagesavail = nexttime
             is_ready = True
             
         return transferRRs, is_ready
@@ -238,18 +233,18 @@
         newtransfers = transfermap.keys()
         
         res = {}
-        resimgnode = ResourceTuple.create_empty()
-        resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
-        resnode = ResourceTuple.create_empty()
-        resnode.set_by_type(constants.RES_NETIN, bandwidth)
-        res[self.edf_node.nod_id] = resimgnode
+        resimgnode = Capacity([constants.RES_NETOUT])
+        resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
+        resnode = Capacity([constants.RES_NETIN])
+        resnode.set_quantity(constants.RES_NETIN, bandwidth)
+        res[self.edf_node.nod_id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
         for n in vnodes.values():
-            res[n] = resnode
+            res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
         
         newtransfer = FileTransferResourceReservation(req, res)
         newtransfer.deadline = req.start.requested
         newtransfer.state = ResourceReservation.STATE_SCHEDULED
-        newtransfer.file = req.diskimage_id
+        newtransfer.file = req.software.image_id
         for vnode, pnode in vnodes.items():
             newtransfer.piggyback(req.id, vnode, pnode)
         newtransfers.append(newtransfer)
@@ -338,19 +333,20 @@
             # Time to transfer is imagesize / bandwidth, regardless of 
             # number of nodes
             res = {}
-            resimgnode = ResourceTuple.create_empty()
-            resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
-            resnode = ResourceTuple.create_empty()
-            resnode.set_by_type(constants.RES_NETIN, bandwidth)
-            res[self.fifo_node.nod_id] = resimgnode
+            resimgnode = Capacity([constants.RES_NETOUT])
+            resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
+            resnode = Capacity([constants.RES_NETIN])
+            resnode.set_quantity(constants.RES_NETIN, bandwidth)
+            res[self.fifo_node.nod_id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
             for n in reqtransfers.values():
-                res[n] = resnode
+                res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
+             
             newtransfer = FileTransferResourceReservation(req, res)
             newtransfer.start = startTime
             newtransfer.end = startTime+imgTransferTime
             newtransfer.deadline = None
             newtransfer.state = ResourceReservation.STATE_SCHEDULED
-            newtransfer.file = req.diskimage_id
+            newtransfer.file = req.software.image_id
             for vnode in reqtransfers:
                 physnode = reqtransfers[vnode]
                 newtransfer.piggyback(req.id, vnode, physnode)
@@ -366,7 +362,7 @@
         if forceTransferTime != None:
             return forceTransferTime
         else:      
-            return estimate_transfer_time(lease.diskimage_size, bandwidth)    
+            return estimate_transfer_time(lease.software.image_size, bandwidth)    
     
     def get_next_fifo_transfer_time(self, nexttime):
         transfers = [t for t in self.transfers_fifo if t.state != ResourceReservation.STATE_DONE]
@@ -395,7 +391,7 @@
             self.transfers_fifo.remove(t)
 
     @staticmethod
-    def handle_start_filetransfer(sched, lease, rr):
+    def _handle_start_filetransfer(sched, lease, rr):
         sched.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id)
         lease.print_contents()
         lease_state = lease.get_state()
@@ -411,7 +407,7 @@
         sched.logger.info("Starting image transfer for lease %i" % (lease.id))
 
     @staticmethod
-    def handle_end_filetransfer(sched, lease, rr):
+    def _handle_end_filetransfer(sched, lease, rr):
         sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
         lease.print_contents()
         lease_state = lease.get_state()
@@ -420,12 +416,7 @@
             rr.state = ResourceReservation.STATE_DONE
             for physnode in rr.transfers:
                 vnodes = rr.transfers[physnode]
-                
-#                # Update VM Image maps
-#                for lease_id, v in vnodes:
-#                    lease = sched.leases.get_lease(lease_id)
-#                    lease.diskimagemap[v] = physnode
-#                    
+ 
 #                # Find out timeout of image. It will be the latest end time of all the
 #                # leases being used by that image.
 #                leases = [l for (l, v) in vnodes]
@@ -437,13 +428,21 @@
 #                        maxend=end
                 maxend = None
                 # TODO: ENACTMENT: Verify the image was transferred correctly
-                sched.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
+                sched.add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
         else:
             raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
 
         lease.print_contents()
         sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
         sched.logger.info("Completed image transfer for lease %i" % (lease.id))
+
+    @staticmethod
+    def _handle_start_migrate(sched, lease, rr):
+        pass
+
+    @staticmethod
+    def _handle_end_migrate(sched, lease, rr):
+        pass
         
     def add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
         self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
@@ -495,9 +494,14 @@
         pnode.print_files()
         
     def cleanup(self, lease):
-        for vnode, pnode in lease.diskimagemap.items():
-            self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+        pass
+        # TODO: should get values from VMRR
+        #for vnode, pnode in lease.diskimagemap.items():
+        #    self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
 
+class DiskImageMigrationResourceReservation(ResourceReservation):
+    pass
+
 class FileTransferResourceReservation(ResourceReservation):
     def __init__(self, lease, res, start=None, end=None):
         ResourceReservation.__init__(self, lease, start, end, res)

Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py	2009-07-14 15:27:35 UTC (rev 604)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py	2009-07-14 17:09:07 UTC (rev 605)
@@ -38,8 +38,9 @@
             
     def cancel_preparation(self, lease):
         self.cleanup(lease)
-        lease.diskimagemap = {}
 
     def cleanup(self, lease):
-        for vnode, pnode in lease.diskimagemap.items():
-            self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file
+        pass
+        # TODO: should get values from VMRR
+        #for vnode, pnode in lease.diskimagemap.items():
+        #    self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file

Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-14 15:27:35 UTC (rev 604)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py	2009-07-14 17:09:07 UTC (rev 605)
@@ -736,10 +736,6 @@
             
             try:
                 self.resourcepool.start_vms(l, rr)
-                # The next two lines have to be moved somewhere more
-                # appropriate inside the resourcepool module
-                for (vnode, pnode) in rr.nodes.items():
-                    l.diskimagemap[vnode] = pnode
             except EnactmentError, exc:
                 self.logger.error("Enactment error when starting VMs.")
                 # Right now, this is a non-recoverable error, so we just
@@ -829,10 +825,7 @@
             # In the future, it may be possible to react to these
             # kind of errors.
             raise            
-        
-        for vnode in rr.vnodes:
-            pnode = rr.vmrr.nodes[vnode]
-            l.memimagemap[vnode] = pnode
+
         if rr.is_first():
             l.set_state(Lease.STATE_SUSPENDING)
             l.print_contents()
@@ -906,16 +899,13 @@
             dest = rr.transfers[vnode][1]
             
             # Commenting for now
-            # Update VM image mappings
+            # Has to be moved to preparation scheduler migrate handler
             #self.resourcepool.remove_diskimage(origin, l.id, vnode)
             #self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
-            #l.diskimagemap[vnode] = dest
 
-            # Commenting for now
-            # Update RAM file mappings
-            #self.resourcepool.remove_ramfile(origin, l.id, vnode)
-            #self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
-            #l.memimagemap[vnode] = dest
+            # Update RAM files
+            self.resourcepool.remove_ramfile(origin, l.id, vnode)
+            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
         
         rr.state = ResourceReservation.STATE_DONE
         l.print_contents()



More information about the Haizea-commit mailing list