[haizea-commit] r486 - in trunk/src/haizea/resourcemanager: . deployment enact enact/opennebula enact/simulated
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Wed Sep 3 12:35:20 CDT 2008
Author: borja
Date: 2008-09-03 12:35:19 -0500 (Wed, 03 Sep 2008)
New Revision: 486
Modified:
trunk/src/haizea/resourcemanager/accounting.py
trunk/src/haizea/resourcemanager/deployment/base.py
trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
trunk/src/haizea/resourcemanager/deployment/unmanaged.py
trunk/src/haizea/resourcemanager/enact/actions.py
trunk/src/haizea/resourcemanager/enact/base.py
trunk/src/haizea/resourcemanager/enact/opennebula/info.py
trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
trunk/src/haizea/resourcemanager/enact/simulated/info.py
trunk/src/haizea/resourcemanager/enact/simulated/storage.py
trunk/src/haizea/resourcemanager/enact/simulated/vm.py
trunk/src/haizea/resourcemanager/resourcepool.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/rpcserver.py
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Log:
- Moved ResourcePool inside the Scheduler, instead of inside the ResourceManager
- Started cleaning up ResourcePool code (general clean up and factoring code out to the deployment modules)
NOTE: Only unmanaged deployment works right now. Image transfer scheduling is temporarily broken.
Modified: trunk/src/haizea/resourcemanager/accounting.py
===================================================================
--- trunk/src/haizea/resourcemanager/accounting.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/accounting.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -102,16 +102,16 @@
self.append_stat(counter_id, initial, time = time)
# Start the doing
- numnodes = self.rm.resourcepool.getNumNodes()
+ numnodes = self.rm.scheduler.resourcepool.get_num_nodes()
for n in range(numnodes):
self.data.nodes[n+1] = [(time, constants.DOING_IDLE)]
def tick(self):
time = self.rm.clock.get_time()
# Update the doing
- for node in self.rm.resourcepool.nodes:
+ for node in self.rm.scheduler.resourcepool.nodes:
nodenum = node.nod_id
- doing = node.getState()
+ doing = node.get_state()
(lasttime, lastdoing) = self.data.nodes[nodenum][-1]
if doing == lastdoing:
# No need to update
@@ -141,7 +141,7 @@
self.data.counter_lists[counter_id] = self.add_timeweighted_average(l)
# Stop the doing
- for node in self.rm.resourcepool.nodes:
+ for node in self.rm.scheduler.resourcepool.nodes:
nodenum = node.nod_id
doing = node.vm_doing
(lasttime, lastdoing) = self.data.nodes[nodenum][-1]
@@ -192,7 +192,7 @@
return stats
def normalize_doing(self):
- nodes = dict([(i+1, []) for i in range(self.rm.resourcepool.getNumNodes())])
+ nodes = dict([(i+1, []) for i in range(self.rm.scheduler.resourcepool.get_num_nodes())])
for n in self.data.nodes:
nodes[n] = []
prevtime = None
Modified: trunk/src/haizea/resourcemanager/deployment/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/base.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/deployment/base.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -22,7 +22,7 @@
def __init__(self, scheduler):
self.scheduler = scheduler
self.slottable = scheduler.slottable
- self.resourcepool = scheduler.rm.resourcepool
+ self.resourcepool = scheduler.resourcepool
self.logger = logging.getLogger("DEPLOY")
Modified: trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -26,10 +26,29 @@
class ImageTransferDeployment(DeploymentBase):
def __init__(self, scheduler):
DeploymentBase.__init__(self, scheduler)
+
+ # TODO: The following two should be merged into
+ # something like this:
+ # self.imageNode = self.info.getImageNode()
+ self.FIFOnode = self.info.getFIFONode()
+ self.EDFnode = self.info.getEDFNode()
+
self.transfersEDF = []
self.transfersFIFO = []
self.completedTransfers = []
+ self.lease_deployment_type = self.rm.config.get("lease-preparation")
+ if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+ self.reusealg = self.rm.config.get("diskimage-reuse")
+ if self.reusealg == constants.REUSE_IMAGECACHES:
+ self.maxcachesize = self.rm.config.get("diskimage-cache-size")
+ else:
+ self.maxcachesize = None
+ else:
+ self.reusealg = None
+
+ self.imagenode_bandwidth = self.info.get_bandwidth()
+
self.scheduler.register_handler(type = FileTransferResourceReservation,
on_start = ImageTransferDeployment.handle_start_filetransfer,
on_end = ImageTransferDeployment.handle_end_filetransfer)
@@ -404,7 +423,7 @@
if maxend==None or end>maxend:
maxend=end
# TODO: ENACTMENT: Verify the image was transferred correctly
- sched.rm.resourcepool.addImageToNode(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
+ self.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
elif lease.state == constants.LEASE_STATE_SUSPENDED:
pass
# TODO: Migrating
@@ -412,7 +431,119 @@
sched.updateNodeTransferState(rr.transfers.keys(), constants.DOING_IDLE, lease.id)
sched.rm.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
sched.rm.logger.info("Completed image transfer for lease %i" % (lease.id))
+
+ def add_diskimages(self, nod_id, imagefile, imagesize, vnodes, timeout=None):
+ self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, nod_id))
+ self.getNode(nod_id).printFiles()
+ if self.reusealg == constants.REUSE_NONE:
+ for (lease_id, vnode) in vnodes:
+ img = VMImageFile(imagefile, imagesize, masterimg=False)
+ img.addMapping(lease_id, vnode)
+ self.getNode(nod_id).addFile(img)
+ elif self.reusealg == constants.REUSE_IMAGECACHES:
+ # Sometimes we might find that the image is already deployed
+ # (although unused). In that case, don't add another copy to
+ # the pool. Just "reactivate" it.
+ if self.getNode(nod_id).isInPool(imagefile):
+ for (lease_id, vnode) in vnodes:
+ self.getNode(nod_id).addToPool(imagefile, lease_id, vnode, timeout)
+ else:
+ img = VMImageFile(imagefile, imagesize, masterimg=True)
+ img.timeout = timeout
+ for (lease_id, vnode) in vnodes:
+ img.addMapping(lease_id, vnode)
+ if self.maxcachesize != constants.CACHESIZE_UNLIMITED:
+ poolsize = self.getNode(nod_id).getPoolSize()
+ reqsize = poolsize + imagesize
+ if reqsize > self.maxcachesize:
+ desiredsize = self.maxcachesize - imagesize
+ self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (nod_id, reqsize, desiredsize))
+ self.getNode(nod_id).printFiles()
+ success = self.getNode(nod_id).purgePoolDownTo(self.maxcachesize)
+ if not success:
+ self.logger.debug("Unable to add to pool. Creating tainted image instead.")
+ # If unsuccessful, this just means we couldn't add the image
+ # to the pool. We will have to create tainted images to be used
+ # only by these leases
+ for (lease_id, vnode) in vnodes:
+ img = VMImageFile(imagefile, imagesize, masterimg=False)
+ img.addMapping(lease_id, vnode)
+ self.getNode(nod_id).addFile(img)
+ else:
+ self.getNode(nod_id).addFile(img)
+ else:
+ self.getNode(nod_id).addFile(img)
+ else:
+ self.getNode(nod_id).addFile(img)
+
+ self.getNode(nod_id).printFiles()
+
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+
+# def checkImage(self, pnode, lease_id, vnode, imagefile):
+# node = self.getNode(pnode)
+# if self.rm.config.get("lease-preparation") == constants.DEPLOYMENT_UNMANAGED:
+# self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
+# elif self.reusealg == constants.REUSE_NONE:
+# if not node.hasTaintedImage(lease_id, vnode, imagefile):
+# self.logger.debug("ERROR: Image for L%iV%i is not deployed on node %i" % (lease_id, vnode, pnode))
+# elif self.reusealg == constants.REUSE_IMAGECACHES:
+# poolentry = node.getPoolEntry(imagefile, lease_id=lease_id, vnode=vnode)
+# if poolentry == None:
+# # Not necessarily an error. Maybe the pool was full, and
+# # we had to fall back on creating a tainted image right
+# # when the image was transferred. We have to check this.
+# if not node.hasTaintedImage(lease_id, vnode, imagefile):
+# self.logger.error("ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease_id, vnode, pnode))
+# else:
+# # Create tainted image
+# self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
+# node.printFiles()
+# img = VMImageFile(imagefile, poolentry.filesize, masterimg=False)
+# img.addMapping(lease_id, vnode)
+# node.addFile(img)
+# node.printFiles()
+# self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+
+
+ def check(self, lease, vmrr):
+ # Check that all the required disk images are available,
+ # and determine what their physical filenames are.
+ # Note that it is the enactment module's responsibility to
+ # mark an image as correctly deployed. The check we do here
+ # is (1) to catch scheduling errors (i.e., the image transfer
+ # was not scheduled) and (2) to create disk images if
+ # we can reuse a reusable image in the node'.
+ # TODO: However, we're assuming CoW, which means the enactment
+ # must support it too. If we can't assume CoW, we would have to
+ # make a copy of the master image (which takes time), and should
+ # be scheduled.
+
+ for (vnode, pnode) in vmrr.nodes.items():
+ node = self.resourcepool.getNode(pnode)
+
+ taintedImage = None
+
+ taintedImage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+ if self.reusealg == constants.REUSE_NONE:
+ if taintedImage == None:
+ raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.id, vnode, pnode)
+ elif self.reusealg == constants.REUSE_IMAGECACHES:
+ poolentry = node.getPoolEntry(lease.diskimage_id, lease_id=lease.id, vnode=vnode)
+ if poolentry == None:
+ # Not necessarily an error. Maybe the pool was full, and
+ # we had to fall back on creating a tainted image right
+ # when the image was transferred. We have to check this.
+ if taintedImage == None:
+ raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.id, vnode, pnode)
+ else:
+ # Create tainted image
+ taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
+ # ENACTMENT
+ # self.storage.createCopyFromCache(pnode, lease.diskImageSize)
+
+
class FileTransferResourceReservation(ResourceReservationBase):
def __init__(self, lease, res, start=None, end=None):
ResourceReservationBase.__init__(self, lease, start, end, res)
Modified: trunk/src/haizea/resourcemanager/deployment/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/unmanaged.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/deployment/unmanaged.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -23,13 +23,37 @@
def __init__(self, scheduler):
DeploymentBase.__init__(self, scheduler)
+ # Add dummy disk images
def schedule(self, lease, vmrr, nexttime):
lease.state = constants.LEASE_STATE_DEPLOYED
+ for (vnode, pnode) in vmrr.nodes.items():
+ self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
def find_earliest_starting_times(self, lease_req, nexttime):
- nodIDs = [n.nod_id for n in self.resourcepool.getNodes()]
- earliest = dict([(node, [nexttime, constants.REQTRANSFER_NO, None]) for node in nodIDs])
+ nod_ids = [n.nod_id for n in self.resourcepool.get_nodes()]
+ earliest = dict([(node, [nexttime, constants.REQTRANSFER_NO, None]) for node in nod_ids])
return earliest
def cancel_deployment(self, lease):
- pass
\ No newline at end of file
+ pass
+
+ def check(self, lease, vmrr):
+ # Check that all the required disk images are available,
+ # and determine what their physical filenames are.
+ # Note that it is the enactment module's responsibility to
+ # mark an image as correctly deployed. The check we do here
+ # is (1) to catch scheduling errors (i.e., the image transfer
+ # was not scheduled).
+
+ for (vnode, pnode) in vmrr.nodes.items():
+ node = self.resourcepool.get_node(pnode)
+
+ diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+ if diskimage == None:
+ raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.id, vnode, pnode)
+
+ return True
+
+ def cleanup(self, lease, vmrr):
+ for vnode, pnode in lease.vmimagemap.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/actions.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/actions.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/actions.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -21,7 +21,7 @@
self.lease_haizea_id = None
self.lease_enactment_info = None
- def fromRR(self, rr):
+ def from_rr(self, rr):
self.lease_haizea_id = rr.lease.id
self.lease_enactment_info = rr.lease.enactment_info
@@ -37,8 +37,8 @@
EnactmentAction.__init__(self)
self.vnodes = {}
- def fromRR(self, rr):
- EnactmentAction.fromRR(self, rr)
+ def from_rr(self, rr):
+ EnactmentAction.from_rr(self, rr)
# TODO: This is very kludgy
if rr.lease.vnode_enactment_info == None:
self.vnodes = dict([(vnode+1, VNode(None)) for vnode in range(rr.lease.numnodes)])
Modified: trunk/src/haizea/resourcemanager/enact/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/base.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/base.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -23,15 +23,15 @@
def __init__(self, resourcepool):
self.resourcepool = resourcepool
- resourcetypes = self.getResourceTypes() #IGNORE:E1111
+ resourcetypes = self.get_resource_types() #IGNORE:E1111
ds.ResourceTuple.set_resource_types(resourcetypes)
- def getNodes(self):
+ def get_nodes(self):
""" Returns the nodes in the resource pool. """
abstract()
- def getFIFOnode(self):
+ def get_fifo_node(self):
""" Returns the image node for FIFO transfers
Note that this function will disappear as soon
@@ -40,7 +40,7 @@
"""
abstract()
- def getEDFnode(self):
+ def get_edf_node(self):
""" Returns the image node for EDF transfers
Note that this function will disappear as soon
@@ -49,7 +49,7 @@
"""
abstract()
- def getResourceTypes(self):
+ def get_resource_types(self):
abstract()
class StorageEnactmentBase(object):
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -67,23 +67,23 @@
self.FIFOnode = None
self.EDFnode = None
- def getNodes(self):
+ def get_nodes(self):
return self.nodes
- def getEDFNode(self):
+ def get_edf_node(self):
return self.EDFnode
- def getFIFONode(self):
+ def get_fifo_node(self):
return self.FIFOnode
- def getResourceTypes(self):
+ def get_resource_types(self):
return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
(constants.RES_MEM, constants.RESTYPE_INT, "Mem"),
(constants.RES_DISK, constants.RESTYPE_INT, "Disk"),
(constants.RES_NETIN, constants.RESTYPE_INT, "Net (in)"),
(constants.RES_NETOUT, constants.RESTYPE_INT, "Net (out)")]
- def getSuspendResumeRate(self):
+ def get_suspendresume_rate(self):
return self.suspendresumerate
def get_bandwidth(self):
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -23,5 +23,5 @@
StorageEnactmentBase.__init__(self, resourcepool)
self.imagepath="/images/playground/borja"
- def resolveToFile(self, lease_id, vnode, diskImageID):
+ def resolve_to_file(self, lease_id, vnode, diskImageID):
return "%s/%s/%s.img" % (self.imagepath, diskImageID, diskImageID)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -91,7 +91,7 @@
else:
raise Exception, "Error when running onevm resume (status=%i, output='%s')" % (status, output)
- def verifySuspend(self, action):
+ def verify_suspend(self, action):
# TODO: Do a single query
result = 0
for vnode in action.vnodes:
@@ -108,7 +108,7 @@
result = 1
return result
- def verifyResume(self, action):
+ def verify_resume(self, action):
# TODO: Do a single query
result = 0
for vnode in action.vnodes:
Modified: trunk/src/haizea/resourcemanager/enact/simulated/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -45,16 +45,16 @@
self.FIFOnode = Node(self.resourcepool, numnodes+1, "FIFOnode", imgcapacity)
self.EDFnode = Node(self.resourcepool, numnodes+2, "EDFnode", imgcapacity)
- def getNodes(self):
+ def get_nodes(self):
return self.nodes
- def getEDFNode(self):
+ def get_edf_Node(self):
return self.EDFnode
- def getFIFONode(self):
+ def get_fifo_node(self):
return self.FIFOnode
- def getResourceTypes(self):
+ def get_resource_types(self):
return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
(constants.RES_MEM, constants.RESTYPE_INT, "Mem"),
(constants.RES_DISK, constants.RESTYPE_INT, "Disk"),
@@ -63,7 +63,7 @@
def parse_resources_string(self, resources):
resources = resources.split(";")
- desc2type = dict([(x[2], x[0]) for x in self.getResourceTypes()])
+ desc2type = dict([(x[2], x[0]) for x in self.get_resource_types()])
capacity=ds.ResourceTuple.create_empty()
for r in resources:
resourcename = r.split(",")[0]
@@ -71,7 +71,7 @@
capacity.set_by_type(desc2type[resourcename], int(resourcecapacity))
return capacity
- def getSuspendResumeRate(self):
+ def get_suspendresume_rate(self):
return self.suspendresumerate
def get_bandwidth(self):
Modified: trunk/src/haizea/resourcemanager/enact/simulated/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -26,5 +26,5 @@
def __init__(self, resourcepool):
StorageEnactmentBase.__init__(self, resourcepool)
- def resolveToFile(self, lease_id, vnode, diskImageID):
+ def resolve_to_file(self, lease_id, vnode, diskImageID):
return "%s/%s-L%iV%i" % (baseWorkingPath, diskImageID, lease_id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -50,8 +50,8 @@
self.logger.debug("Received request to resume VM for L%iV%i"
% (action.lease_haizea_id, vnode))
- def verifySuspend(self, action):
+ def verify_suspend(self, action):
return 0
- def verifyResume(self, action):
+ def verify_resume(self, action):
return 0
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -22,36 +22,20 @@
import logging
class ResourcePool(object):
- def __init__(self, rm):
- self.rm = rm
+ def __init__(self, scheduler):
+ self.rm = scheduler.rm
self.logger = logging.getLogger("RESOURCEPOOL")
-
+
self.info = None
self.vm = None
self.storage = None
- self.loadEnactmentModules()
+ self.load_enactment_modules()
- self.nodes = self.info.getNodes()
- # TODO: The following two should be merged into
- # something like this:
- # self.imageNode = self.info.getImageNode()
- self.FIFOnode = self.info.getFIFONode()
- self.EDFnode = self.info.getEDFNode()
+ self.nodes = self.info.get_nodes()
- self.imagenode_bandwidth = self.info.get_bandwidth()
-
- self.lease_deployment_type = self.rm.config.get("lease-preparation")
- if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- self.reusealg = self.rm.config.get("diskimage-reuse")
- if self.reusealg == constants.REUSE_IMAGECACHES:
- self.maxcachesize = self.rm.config.get("diskimage-cache-size")
- else:
- self.maxcachesize = None
- else:
- self.reusealg = None
- def loadEnactmentModules(self):
+ def load_enactment_modules(self):
mode = self.rm.config.get("mode")
try:
exec "import %s.%s as enact" % (constants.ENACT_PACKAGE, mode)
@@ -63,156 +47,47 @@
raise
- def startVMs(self, lease, rr):
- startAction = actions.VMEnactmentStartAction()
- startAction.fromRR(rr)
+ def start_vms(self, lease, rr):
+ start_action = actions.VMEnactmentStartAction()
+ start_action.from_rr(rr)
- # Check that all the required tainted images are available,
- # and determine what their physical filenames are.
- # Note that it is the enactment module's responsibility to
- # mark an image as correctly deployed. The check we do here
- # is (1) to catch scheduling errors (i.e., the image transfer
- # was not scheduled) and (2) to create tainted images if
- # we can reuse a master image in the node's image pool.
- # TODO: However, we're assuming CoW, which means the enactment
- # must support it too. If we can't assume CoW, we would have to
- # make a copy of the master image (which takes time), and should
- # be scheduled.
+ # TODO: Get tainted image
+
for (vnode, pnode) in rr.nodes.items():
- node = self.getNode(pnode)
-
- taintedImage = None
-
- # TODO: Factor this out
- lease_deployment_type = self.rm.config.get("lease-preparation")
- if lease_deployment_type == constants.DEPLOYMENT_UNMANAGED:
- # If we assume predeployment, we mark that there is a new
- # tainted image, but there is no need to go to the enactment
- # module (we trust that the image is predeployed, or that
- # the VM enactment is taking care of this, e.g., by making
- # a copy right before the VM starts; this is a Bad Thing
- # but out of our control if we assume predeployment).
- # TODO: It might make sense to consider two cases:
- # "no image management at all" and "assume master image
- # is predeployed, but we still have to make a copy".
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- elif lease_deployment_type == constants.DEPLOYMENT_PREDEPLOY:
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- elif lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- taintedImage = node.getTaintedImage(lease.id, vnode, lease.diskimage_id)
- if self.reusealg == constants.REUSE_NONE:
- if taintedImage == None:
- raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.id, vnode, pnode)
- elif self.reusealg == constants.REUSE_IMAGECACHES:
- poolentry = node.getPoolEntry(lease.diskimage_id, lease_id=lease.id, vnode=vnode)
- if poolentry == None:
- # Not necessarily an error. Maybe the pool was full, and
- # we had to fall back on creating a tainted image right
- # when the image was transferred. We have to check this.
- if taintedImage == None:
- raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.id, vnode, pnode)
- else:
- # Create tainted image
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- # ENACTMENT
- # self.storage.createCopyFromCache(pnode, lease.diskImageSize)
- startAction.vnodes[vnode].pnode = node.enactment_info
- startAction.vnodes[vnode].diskimage = taintedImage.filename
- startAction.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
+ node = self.get_node(pnode)
+ diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+ start_action.vnodes[vnode].pnode = node.enactment_info
+ start_action.vnodes[vnode].diskimage = diskimage.filename
+ start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
try:
- self.vm.start(startAction)
+ self.vm.start(start_action)
except Exception, msg:
self.logger.error("Enactment of start VM failed: %s" % msg)
- self.rm.cancel_lease(lease.id)
+ self.rm.fail_lease(lease.id)
- def stopVMs(self, lease, rr):
- stopAction = actions.VMEnactmentStopAction()
- stopAction.fromRR(rr)
+ def stop_vms(self, lease, rr):
+ stop_action = actions.VMEnactmentStopAction()
+ stop_action.from_rr(rr)
try:
- self.vm.stop(stopAction)
+ self.vm.stop(stop_action)
except Exception, msg:
self.logger.error("Enactment of end VM failed: %s" % msg)
- self.rm.cancel_lease(lease)
-
- def transferFiles(self):
- pass
-
- def verifyImageTransfer(self):
- pass
-
- def createTaintedImage(self):
- pass
-
- def addImageToCache(self):
- pass
-
- # TODO: This has to be divided into the above three functions.
- def addImageToNode(self, nod_id, imagefile, imagesize, vnodes, timeout=None):
- self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, nod_id))
- self.getNode(nod_id).printFiles()
-
- if self.reusealg == constants.REUSE_NONE:
- for (lease_id, vnode) in vnodes:
- img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(lease_id, vnode)
- self.getNode(nod_id).addFile(img)
- elif self.reusealg == constants.REUSE_IMAGECACHES:
- # Sometimes we might find that the image is already deployed
- # (although unused). In that case, don't add another copy to
- # the pool. Just "reactivate" it.
- if self.getNode(nod_id).isInPool(imagefile):
- for (lease_id, vnode) in vnodes:
- self.getNode(nod_id).addToPool(imagefile, lease_id, vnode, timeout)
- else:
- img = VMImageFile(imagefile, imagesize, masterimg=True)
- img.timeout = timeout
- for (lease_id, vnode) in vnodes:
- img.addMapping(lease_id, vnode)
- if self.maxcachesize != constants.CACHESIZE_UNLIMITED:
- poolsize = self.getNode(nod_id).getPoolSize()
- reqsize = poolsize + imagesize
- if reqsize > self.maxcachesize:
- desiredsize = self.maxcachesize - imagesize
- self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (nod_id, reqsize, desiredsize))
- self.getNode(nod_id).printFiles()
- success = self.getNode(nod_id).purgePoolDownTo(self.maxcachesize)
- if not success:
- self.logger.debug("Unable to add to pool. Creating tainted image instead.")
- # If unsuccessful, this just means we couldn't add the image
- # to the pool. We will have to create tainted images to be used
- # only by these leases
- for (lease_id, vnode) in vnodes:
- img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(lease_id, vnode)
- self.getNode(nod_id).addFile(img)
- else:
- self.getNode(nod_id).addFile(img)
- else:
- self.getNode(nod_id).addFile(img)
- else:
- self.getNode(nod_id).addFile(img)
-
- self.getNode(nod_id).printFiles()
-
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
-
- def verifyFileTransfer(self):
- pass
-
- def suspendVMs(self, lease, rr):
- suspendAction = actions.VMEnactmentSuspendAction()
- suspendAction.fromRR(rr)
+ self.rm.fail_lease(lease)
+
+ def suspend_vms(self, lease, rr):
+ suspend_action = actions.VMEnactmentSuspendAction()
+ suspend_action.from_rr(rr)
try:
- self.vm.suspend(suspendAction)
+ self.vm.suspend(suspend_action)
except Exception, msg:
self.logger.error("Enactment of suspend VM failed: %s" % msg)
- self.rm.cancel_lease(lease)
+ self.rm.fail_lease(lease)
- def verifySuspend(self, lease, rr):
- verifySuspendAction = actions.VMEnactmentConfirmSuspendAction()
- verifySuspendAction.fromRR(rr)
- self.vm.verifySuspend(verifySuspendAction)
+ def verify_suspend(self, lease, rr):
+ verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
+ verify_suspend_action.from_rr(rr)
+ self.vm.verify_suspend(verify_suspend_action)
# TODO
# The following should be implemented to handle asynchronous
@@ -220,19 +95,19 @@
#def suspendDone(self, lease, rr):
# pass
- def resumeVMs(self, lease, rr):
- resumeAction = actions.VMEnactmentResumeAction()
- resumeAction.fromRR(rr)
+ def resume_vms(self, lease, rr):
+ resume_action = actions.VMEnactmentResumeAction()
+ resume_action.from_rr(rr)
try:
- self.vm.resume(resumeAction)
+ self.vm.resume(resume_action)
except Exception, msg:
self.logger.error("Enactment of resume VM failed: %s" % msg)
- self.rm.cancel_lease(lease)
+ self.rm.fail_lease(lease)
- def verifyResume(self, lease, rr):
- verifyResumeAction = actions.VMEnactmentConfirmResumeAction()
- verifyResumeAction.fromRR(rr)
- self.vm.verifyResume(verifyResumeAction)
+ def verify_resume(self, lease, rr):
+ verify_resume_action = actions.VMEnactmentConfirmResumeAction()
+ verify_resume_action.from_rr(rr)
+ self.vm.verify_resume(verify_resume_action)
# TODO
# The following should be implemented to handle asynchronous
@@ -240,168 +115,91 @@
#def resumeDone(self, lease, rr):
# pass
- def poll_unscheduled_vm_end(self):
- pass
-
# TODO
# The following should be implemented to handle asynchronous
# notifications of a VM ending
#def notify_vm_done(self, lease, rr):
# pass
- def getNodes(self):
+ def get_nodes(self):
return self.nodes
- def getNumNodes(self):
+ def get_num_nodes(self):
return len(self.nodes)
- def getNode(self, nod_id):
+ def get_node(self, nod_id):
return self.nodes[nod_id-1]
-
- def getNodesWithImg(self, imgURI):
- return [n.nod_id for n in self.nodes if n.isImgDeployed(imgURI)]
-
- def getNodesWithImgLater(self, imgURI, time):
- return [n.nod_id for n in self.nodes if n.isImgDeployedLater(imgURI, time)]
-
- def getFIFORepositoryNode(self):
- return self.FIFOnode
-
- def getEDFRepositoryNode(self):
- return self.EDFnode
- def addTaintedImageToNode(self, pnode, diskImageID, imagesize, lease_id, vnode):
- self.logger.debug("Adding tainted image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
- self.getNode(pnode).printFiles()
- imagefile = self.storage.resolveToFile(lease_id, vnode, diskImageID)
- img = VMImageFile(imagefile, imagesize, diskImageID=diskImageID, masterimg=False)
- img.addMapping(lease_id, vnode)
- self.getNode(pnode).addFile(img)
- self.getNode(pnode).printFiles()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+ def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
+ self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
+ self.get_node(pnode).print_files()
+ imagefile = self.storage.resolve_to_file(lease_id, vnode, diskimage_id)
+ img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
+ self.get_node(pnode).add_file(img)
+ self.get_node(pnode).print_files()
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
return img
-
- def checkImage(self, pnode, lease_id, vnode, imagefile):
- node = self.getNode(pnode)
- if self.rm.config.get("lease-preparation") == constants.DEPLOYMENT_UNMANAGED:
- self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
- elif self.reusealg == constants.REUSE_NONE:
- if not node.hasTaintedImage(lease_id, vnode, imagefile):
- self.logger.debug("ERROR: Image for L%iV%i is not deployed on node %i" % (lease_id, vnode, pnode))
- elif self.reusealg == constants.REUSE_IMAGECACHES:
- poolentry = node.getPoolEntry(imagefile, lease_id=lease_id, vnode=vnode)
- if poolentry == None:
- # Not necessarily an error. Maybe the pool was full, and
- # we had to fall back on creating a tainted image right
- # when the image was transferred. We have to check this.
- if not node.hasTaintedImage(lease_id, vnode, imagefile):
- self.logger.error("ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease_id, vnode, pnode))
- else:
- # Create tainted image
- self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
- node.printFiles()
- img = VMImageFile(imagefile, poolentry.filesize, masterimg=False)
- img.addMapping(lease_id, vnode)
- node.addFile(img)
- node.printFiles()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
-
- def isInPool(self, pnode, imagefile, time):
- return self.getNode(pnode).isInPool(imagefile, after=time)
-
- def getNodesWithImgInPool(self, imagefile, after = None):
- return [n.nod_id for n in self.nodes if n.isInPool(imagefile, after=after)]
-
- def addToPool(self, pnode, imagefile, lease_id, vnode, timeout):
- return self.getNode(pnode).addToPool(imagefile, lease_id, vnode, timeout)
-
- def removeImage(self, pnode, lease, vnode):
- node = self.getNode(pnode)
- node.printFiles()
- if self.reusealg == constants.REUSE_IMAGECACHES:
- self.logger.debug("Removing pooled images for L%iV%i in node %i" % (lease, vnode, pnode))
- toremove = []
- for img in node.getPoolImages():
- if (lease, vnode) in img.mappings:
- img.mappings.remove((lease, vnode))
- node.printFiles()
- # Keep image around, even if it isn't going to be used
- # by any VMs. It might be reused later on.
- # It will be purged if space has to be made available
- # for other images
- for img in toremove:
- node.files.remove(img)
- node.printFiles()
+
+ def remove_diskimage(self, pnode, lease, vnode):
+ node = self.get_node(pnode)
+ node.print_files()
- self.logger.debug("Removing tainted images for L%iV%i in node %i" % (lease, vnode, pnode))
- node.removeTainted(lease, vnode)
+ self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
+ node.remove_diskimage(lease, vnode)
- node.printFiles()
+ node.print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
- def addRAMFileToNode(self, pnode, lease_id, vnode, size):
- node = self.getNode(pnode)
+ def add_ramfile(self, pnode, lease_id, vnode, size):
+ node = self.get_node(pnode)
self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
- node.printFiles()
+ node.print_files()
f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
- node.addFile(f)
- node.printFiles()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+ node.add_file(f)
+ node.print_files()
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
- def removeRAMFileFromNode(self, pnode, lease_id, vnode):
- node = self.getNode(pnode)
+ def remove_ramfile(self, pnode, lease_id, vnode):
+ node = self.get_node(pnode)
self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
- node.printFiles()
- node.removeRAMFile(lease_id, vnode)
- node.printFiles()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
+ node.print_files()
+ node.remove_ramfile(lease_id, vnode)
+ node.print_files()
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
- def getMaxDiskUsage(self):
- return max([n.getTotalFileSize() for n in self.nodes])
+ def get_max_disk_usage(self):
+ return max([n.get_disk_usage() for n in self.nodes])
class Node(object):
def __init__(self, resourcepool, nod_id, hostname, capacity):
- self.resourcepool = resourcepool
self.logger = logging.getLogger("RESOURCEPOOL")
+ self.resourcepool = resourcepool
self.nod_id = nod_id
self.hostname = hostname
- self.files = []
- self.workingspacesize = 0
self.capacity = capacity
+ self.files = []
+
# enactment-specific information
self.enactment_info = None
+
# Kludgy way of keeping track of utilization
# TODO: Compute this information based on the lease reservations,
# either on the fly or when Haizea stops running.
self.transfer_doing = constants.DOING_IDLE
self.vm_doing = constants.DOING_IDLE
- def getCapacity(self):
+ def get_capacity(self):
return self.capacity
- def addFile(self, f):
+ def add_file(self, f):
self.files.append(f)
- if not (isinstance(f, VMImageFile) and f.masterimg==True):
- self.workingspacesize += f.filesize
- def removeTainted(self, lease, vnode):
- img = [f for f in self.files if isinstance(f, VMImageFile) and f.masterimg==False and f.hasMapping(lease, vnode)]
- if len(img) > 0:
- img = img[0]
- self.files.remove(img)
- self.workingspacesize -= img.filesize
-
- def removeRAMFile(self, lease, vnode):
- img = [f for f in self.files if isinstance(f, RAMImageFile) and f.id==lease and f.vnode==vnode]
- if len(img) > 0:
- img = img[0]
- self.files.remove(img)
- self.workingspacesize -= img.filesize
-
- def getTaintedImage(self, lease_id, vnode, imagefile):
- images = self.getTaintedImages()
- image = [i for i in images if i.filename == imagefile and i.hasMapping(lease_id, vnode)]
+ def get_diskimage(self, lease_id, vnode, diskimage_id):
+ image = [f for f in self.files if isinstance(f, DiskImageFile) and
+ f.diskimage_id == diskimage_id and
+ f.lease_id == lease_id and
+ f.vnode == vnode]
if len(image) == 0:
return None
elif len(image) == 1:
@@ -409,49 +207,139 @@
elif len(image) > 1:
self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
return image[0]
+
+ def remove_diskimage(self, lease_id, vnode):
+ image = [f for f in self.files if isinstance(f, DiskImageFile) and
+ f.lease_id == lease_id and
+ f.vnode == vnode]
+ if len(image) > 0:
+ image = image[0]
+ self.files.remove(image)
+
+ def remove_ramfile(self, lease_id, vnode):
+ ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
+ if len(ramfile) > 0:
+ ramfile = ramfile[0]
+ self.files.remove(ramfile)
+
- def addToPool(self, imagefile, lease_id, vnode, timeout):
+ def get_disk_usage(self):
+ return sum([f.filesize for f in self.files])
+
+
+ def get_diskimages(self):
+ return [f for f in self.files if isinstance(f, DiskImageFile)]
+
+ def print_files(self):
+ images = ""
+ if len(self.files) > 0:
+ images = ", ".join([str(img) for img in self.files])
+ self.logger.vdebug("Node %i has %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
+
+ def get_state(self):
+ if self.vm_doing == constants.DOING_IDLE and self.transfer_doing == constants.DOING_TRANSFER:
+ return constants.DOING_TRANSFER_NOVM
+ else:
+ return self.vm_doing
+
+ def xmlrpc_marshall(self):
+ # Convert to something we can send through XMLRPC
+ h = {}
+ h["id"] = self.nod_id
+ h["hostname"] = self.hostname
+ h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
+ h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
+
+ return h
+
+
+
+class File(object):
+ def __init__(self, filename, filesize):
+ self.filename = filename
+ self.filesize = filesize
+
+class DiskImageFile(File):
+ def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
+ File.__init__(self, filename, filesize)
+ self.lease_id = lease_id
+ self.vnode = vnode
+ self.diskimage_id = diskimage_id
+
+ def __str__(self):
+ return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
+
+
+class RAMImageFile(File):
+ def __init__(self, filename, filesize, lease_id, vnode):
+ File.__init__(self, filename, filesize)
+ self.lease_id = lease_id
+ self.vnode = vnode
+
+ def __str__(self):
+ return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
+
+class ResourcePoolWithReusableImages(object):
+ def __init__(self, scheduler):
+ ResourcePool.__init__(self, scheduler)
+
+ def remove_diskimage(self, pnode, lease, vnode):
+ ResourcePool.remove_diskimage(self, pnode, lease, vnode)
+ self.logger.debug("Removing pooled images for L%iV%i in node %i" % (lease, vnode, pnode))
+ toremove = []
+ for img in node.getPoolImages():
+ if (lease, vnode) in img.mappings:
+ img.mappings.remove((lease, vnode))
+ node.printFiles()
+ # Keep image around, even if it isn't going to be used
+ # by any VMs. It might be reused later on.
+ # It will be purged if space has to be made available
+ # for other images
+ for img in toremove:
+ node.files.remove(img)
+ node.printFiles()
+
+
+
+class NodeWithReusableImages(Node):
+ def __init__(self, resourcepool, nod_id, hostname, capacity):
+ Node.__init__(self, resourcepool, nod_id, hostname, capacity)
+ self.reusable_images = []
+
+ def add_mapping_to_existing_reusable_image(self, imagefile, lease_id, vnode, timeout):
for f in self.files:
if f.filename == imagefile:
- f.addMapping(lease_id, vnode)
- f.updateTimeout(timeout)
+ f.add_mapping(lease_id, vnode)
+ f.update_timeout(timeout)
break # Ugh
- self.printFiles()
+ self.print_files()
- def getPoolEntry(self, imagefile, after = None, lease_id=None, vnode=None):
- images = self.getPoolImages()
- images = [i for i in images if i.filename == imagefile]
+ def get_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
+ images = [i for i in self.reusable_images if i.filename == imagefile]
if after != None:
images = [i for i in images if i.timeout >= after]
if lease_id != None and vnode != None:
- images = [i for i in images if i.hasMapping(lease_id, vnode)]
+ images = [i for i in images if i.has_mapping(lease_id, vnode)]
if len(images)>0:
return images[0]
else:
return None
- def isInPool(self, imagefile, after = None, lease_id=None, vnode=None):
- entry = self.getPoolEntry(imagefile, after = after, lease_id=lease_id, vnode=vnode)
+ def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
+ entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
if entry == None:
return False
else:
return True
-
- def getTotalFileSize(self):
- return self.workingspacesize
- def getPoolImages(self):
- return [f for f in self.files if isinstance(f, VMImageFile) and f.masterimg==True]
+ def get_reusable_images(self):
+ return self.reusable_images
- def getTaintedImages(self):
- return [f for f in self.files if isinstance(f, VMImageFile) and f.masterimg==False]
-
- def getPoolSize(self):
- return sum([f.filesize for f in self.getPoolImages()])
+ def get_reusable_images_size(self):
+ return sum([f.filesize for f in self.reusable_images])
- def purgeOldestUnusedImage(self):
- pool = self.getPoolImages()
- unused = [img for img in pool if not img.hasMappings()]
+ def purge_oldest_unused_image(self):
+ unused = [img for img in self.reusable_images if not img.has_mappings()]
if len(unused) == 0:
return 0
else:
@@ -460,71 +348,43 @@
for img in i:
if img.timeout < oldest.timeout:
oldest = img
- self.files.remove(oldest)
+ self.reusable_images.remove(oldest)
return 1
- def purgePoolDownTo(self, target):
+ def purge_downto(self, target):
done = False
while not done:
- removed = self.purgeOldestUnusedImage()
+ removed = self.purge_oldest_unused_image()
if removed==0:
done = True
success = False
elif removed == 1:
- if self.getPoolSize() <= target:
+ if self.get_reusable_images_size() <= target:
done = True
success = True
return success
-
- def printFiles(self):
- images = ""
- if len(self.files) > 0:
- images = ", ".join([str(img) for img in self.files])
- self.logger.vdebug("Node %i has %iMB %s" % (self.nod_id, self.getTotalFileSize(), images))
- def getState(self):
- if self.vm_doing == constants.DOING_IDLE and self.transfer_doing == constants.DOING_TRANSFER:
- return constants.DOING_TRANSFER_NOVM
- else:
- return self.vm_doing
-
- def xmlrpc_marshall(self):
- # Convert to something we can send through XMLRPC
- h = {}
- h["id"] = self.nod_id
- h["hostname"] = self.hostname
- h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
- h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
-
- return h
-
-class File(object):
- def __init__(self, filename, filesize):
- self.filename = filename
- self.filesize = filesize
-
-class VMImageFile(File):
- def __init__(self, filename, filesize, diskImageID=None, masterimg = False):
+class ReusableDiskImageFile(File):
+ def __init__(self, filename, filesize, diskimage_id):
File.__init__(self, filename, filesize)
- self.diskImageID = diskImageID
+ self.diskimage_id = diskimage_id
self.mappings = set([])
- self.masterimg = masterimg
self.timeout = None
- def addMapping(self, lease_id, vnode):
+ def add_mapping(self, lease_id, vnode):
self.mappings.add((lease_id, vnode))
- def hasMapping(self, lease_id, vnode):
+ def has_mapping(self, lease_id, vnode):
return (lease_id, vnode) in self.mappings
- def hasMappings(self):
+ def has_mappings(self):
return len(self.mappings) > 0
- def updateTimeout(self, timeout):
+ def update_timeout(self, timeout):
if timeout > self.timeout:
self.timeout = timeout
- def isExpired(self, curTime):
+ def is_expired(self, curTime):
if self.timeout == None:
return False
elif self.timeout > curTime:
@@ -533,22 +393,9 @@
return False
def __str__(self):
- if self.masterimg == True:
- master="POOL"
- else:
- master="TAINTED"
if self.timeout == None:
timeout = "NOTIMEOUT"
else:
timeout = self.timeout
- return "(DISK " + self.filename + " " + vnodemapstr(self.mappings) + " " + master + " " + str(timeout) + ")"
+ return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
-class RAMImageFile(File):
- def __init__(self, filename, filesize, lease_id, vnode):
- File.__init__(self, filename, filesize)
- self.id = lease_id
- self.vnode = vnode
-
- def __str__(self):
- mappings = [(self.id, self.vnode)]
- return "(RAM " + self.filename + " " + vnodemapstr(mappings)+ ")"
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/rm.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -37,7 +37,6 @@
from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
from haizea.resourcemanager.frontends.rpc import RPCFrontend
from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease
-from haizea.resourcemanager.resourcepool import ResourcePool
from haizea.resourcemanager.scheduler import Scheduler
from haizea.resourcemanager.rpcserver import RPCServer
from haizea.common.utils import abstract, roundDateTime, Singleton
@@ -102,9 +101,6 @@
# RPC server
self.rpc_server = RPCServer(self)
-
- # Resource pool
- self.resourcepool = ResourcePool(self)
# Scheduler
self.scheduler = Scheduler(self)
@@ -273,7 +269,7 @@
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception in scheduling function. Dumping state..." )
- self.print_stats("ERROR", verbose=True)
+ self.print_stats(logging.getLevelName("ERROR"), verbose=True)
raise
def process_reservations(self, time):
@@ -354,7 +350,6 @@
self.print_stats("ERROR", verbose=True)
raise
-
class Clock(object):
"""Base class for the resource manager's clock.
Modified: trunk/src/haizea/resourcemanager/rpcserver.py
===================================================================
--- trunk/src/haizea/resourcemanager/rpcserver.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/rpcserver.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -66,7 +66,7 @@
return [l.xmlrpc_marshall() for l in self.rm.scheduler.queue]
def get_hosts(self):
- return [h.xmlrpc_marshall() for h in self.rm.resourcepool.nodes]
+ return [h.xmlrpc_marshall() for h in self.rm.scheduler.resourcepool.nodes]
def notify_event(self, lease_id, enactment_id, event):
pass
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -37,6 +37,7 @@
from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation
+from haizea.resourcemanager.resourcepool import ResourcePool
import logging
@@ -76,13 +77,17 @@
"""
def __init__(self, rm):
self.rm = rm
- self.logger = logging.getLogger("SCHEDULER")
+ self.logger = logging.getLogger("SCHED")
+ self.resourcepool = ResourcePool(self)
self.slottable = SlotTable(self)
self.queue = ds.Queue(self)
self.scheduledleases = ds.LeaseTable(self)
self.completedleases = ds.LeaseTable(self)
self.pending_leases = []
+ for n in self.resourcepool.get_nodes():
+ self.slottable.add_node(n)
+
self.handlers = {}
self.register_handler(type = ds.VMResourceReservation,
@@ -219,6 +224,21 @@
l = self.queue.get_lease(lease_id)
self.queue.remove_lease(lease)
+ def fail_lease(self, lease_id):
+ """Transitions a lease to a failed state, and does any necessary cleaning up
+
+ TODO: For now, just use the cancelling algorithm
+
+ Arguments:
+ lease -- Lease to fail
+ """
+ try:
+ self.cancel_lease(lease_id)
+ except Exception, msg:
+ # Exit if something goes horribly wrong
+ self.logger.error("Exception when failing lease %i. Dumping state..." % lease_id)
+ self.print_stats("ERROR", verbose=True)
+ raise
def notify_event(self, lease_id, event):
time = self.rm.clock.get_time()
@@ -351,16 +371,16 @@
# Update VM image mappings, since we might be resuming
# in different nodes.
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.id, vnode)
+ self.resourcepool.remove_diskimage(pnode, req.id, vnode)
req.vmimagemap = vmrr.nodes
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.addTaintedImageToNode(pnode, req.diskimage_id, req.diskimage_size, req.id, vnode)
+ self.resourcepool.add_diskimage(pnode, req.diskimage_id, req.diskimage_size, req.id, vnode)
# Update RAM file mappings
for vnode, pnode in req.memimagemap.items():
- self.rm.resourcepool.removeRAMFileFromNode(pnode, req.id, vnode)
+ self.resourcepool.remove_ramfile(pnode, req.id, vnode)
for vnode, pnode in vmrr.nodes.items():
- self.rm.resourcepool.addRAMFileToNode(pnode, req.id, vnode, req.requested_resources.get_by_type(constants.RES_MEM))
+ self.resourcepool.add_ramfile(pnode, req.id, vnode, req.requested_resources.get_by_type(constants.RES_MEM))
req.memimagemap[vnode] = pnode
# Add resource reservations
@@ -417,7 +437,7 @@
self.logger.vdebug("Lease before preemption:")
req.print_contents()
vmrr, susprr = req.get_last_vmrr()
- suspendresumerate = self.rm.resourcepool.info.getSuspendResumeRate()
+ suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
if vmrr.state == constants.RES_STATE_SCHEDULED and vmrr.start >= time:
self.logger.info("... lease #%i has been cancelled and requeued." % req.id)
@@ -431,7 +451,7 @@
req.remove_rr(susprr)
self.slottable.removeReservation(susprr)
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.id, vnode)
+ self.resourcepool.removeImage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
@@ -464,7 +484,7 @@
req.remove_rr(resmrr)
self.slottable.removeReservation(resmrr)
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.id, vnode)
+ self.resourcepool.removeImage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
@@ -507,7 +527,7 @@
l.start.actual = now_time
try:
- self.rm.resourcepool.startVMs(l, rr)
+ self.resourcepool.start_vms(l, rr)
# The next two lines have to be moved somewhere more
# appropriate inside the resourcepool module
for (vnode, pnode) in rr.nodes.items():
@@ -537,14 +557,13 @@
l.duration.accumulate_duration(diff)
rr.state = constants.RES_STATE_DONE
if rr.oncomplete == constants.ONCOMPLETE_ENDLEASE:
- self.rm.resourcepool.stopVMs(l, rr)
+ self.resourcepool.stop_vms(l, rr)
l.state = constants.LEASE_STATE_DONE
l.duration.actual = l.duration.accumulated
l.end = now_time
self.completedleases.add(l)
self.scheduledleases.remove(l)
- for vnode, pnode in l.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, l.id, vnode)
+ self.deployment.cleanup(l, rr)
if isinstance(l, ds.BestEffortLease):
self.rm.accounting.incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
@@ -578,9 +597,9 @@
self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
l.print_contents()
rr.state = constants.RES_STATE_ACTIVE
- self.rm.resourcepool.suspendVMs(l, rr)
+ self.resourcepool.suspend_vms(l, rr)
for vnode, pnode in rr.nodes.items():
- self.rm.resourcepool.addRAMFileToNode(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
+ self.resourcepool.add_ramfile(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
l.memimagemap[vnode] = pnode
l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND, l.id)
@@ -591,7 +610,7 @@
self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
l.print_contents()
# TODO: React to incomplete suspend
- self.rm.resourcepool.verifySuspend(l, rr)
+ self.resourcepool.verify_suspend(l, rr)
rr.state = constants.RES_STATE_DONE
l.state = constants.LEASE_STATE_SUSPENDED
self.scheduledleases.remove(l)
@@ -605,7 +624,7 @@
def _handle_start_resume(self, l, rr):
self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
l.print_contents()
- self.rm.resourcepool.resumeVMs(l, rr)
+ self.resourcepool.resume_vms(l, rr)
rr.state = constants.RES_STATE_ACTIVE
l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME, l.id)
@@ -616,10 +635,10 @@
self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
l.print_contents()
# TODO: React to incomplete resume
- self.rm.resourcepool.verifyResume(l, rr)
+ self.resourcepool.verify_resume(l, rr)
rr.state = constants.RES_STATE_DONE
for vnode, pnode in rr.nodes.items():
- self.rm.resourcepool.removeRAMFileFromNode(pnode, l.id, vnode)
+ self.resourcepool.remove_ramfile(pnode, l.id, vnode)
l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE, l.id)
self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
@@ -643,11 +662,11 @@
def updateNodeVMState(self, nodes, state, lease_id):
for n in nodes:
- self.rm.resourcepool.getNode(n).vm_doing = state
+ self.resourcepool.get_node(n).vm_doing = state
def updateNodeTransferState(self, nodes, state, lease_id):
for n in nodes:
- self.rm.resourcepool.getNode(n).transfer_doing = state
+ self.resourcepool.get_node(n).transfer_doing = state
def is_backfilling(self):
return self.maxres > 0
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-09-02 11:44:45 UTC (rev 485)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-09-03 17:35:19 UTC (rev 486)
@@ -39,8 +39,8 @@
self.resourcepoolnode = resourcepoolnode
@classmethod
- def fromResourcePoolNode(cls, node):
- capacity = node.getCapacity()
+ def from_resourcepool_node(cls, node):
+ capacity = node.get_capacity()
return cls(capacity, capacity, node)
class NodeList(object):
@@ -82,8 +82,8 @@
def __init__(self, scheduler):
self.scheduler = scheduler
self.rm = scheduler.rm
+ self.resourcepool = scheduler.resourcepool
self.logger = logging.getLogger("SLOTTABLE")
- self.resourcepool = scheduler.rm.resourcepool
self.nodes = NodeList()
self.reservations = []
self.reservationsByStart = []
@@ -91,23 +91,11 @@
self.availabilitycache = {}
self.changepointcache = None
- # Create nodes
- for n in self.resourcepool.getNodes():
- self.nodes.add(Node.fromResourcePoolNode(n))
-
- # Create image nodes
- FIFOnode = self.resourcepool.getFIFORepositoryNode()
- EDFnode = self.resourcepool.getEDFRepositoryNode()
-
- if FIFOnode != None and EDFnode != None:
- self.nodes.add(Node.fromResourcePoolNode(FIFOnode))
- self.nodes.add(Node.fromResourcePoolNode(EDFnode))
-
- self.FIFOnode = FIFOnode.nod_id
- self.EDFnode = EDFnode.nod_id
-
self.availabilitywindow = AvailabilityWindow(self)
+ def add_node(self, resourcepoolnode):
+ self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
+
def dirty(self):
# You're a dirty, dirty slot table and you should be
# ashamed of having outdated caches!
@@ -496,7 +484,7 @@
numnodes = lease.numnodes
resreq = lease.requested_resources
preemptible = lease.preemptible
- suspendresumerate = self.resourcepool.info.getSuspendResumeRate()
+ suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
#
# STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
@@ -516,7 +504,7 @@
# TODO: Get bandwidth another way. Right now, the
# image node bandwidth is the same as the bandwidt
# in the other nodes, but this won't always be true.
- migratetime = lease.estimate_migration_time(self.rm.resourcepool.imagenode_bandwidth)
+ migratetime = lease.estimate_migration_time(self.rm.scheduler.resourcepool.info.get_bandwidth())
earliesttransfer = self.rm.clock.get_time() + migratetime
for n in earliest:
@@ -729,7 +717,7 @@
return start, end, canfit, mustsuspend
def suspend(self, lease, time):
- suspendresumerate = self.resourcepool.info.getSuspendResumeRate()
+ suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
(vmrr, susprr) = lease.get_last_vmrr()
vmrrnew = copy.copy(vmrr)
@@ -825,11 +813,11 @@
# TODO: The deployment module should just provide a list of nodes
# it prefers
nodeswithimg=[]
- self.lease_deployment_type = self.rm.config.get("lease-preparation")
- if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- reusealg = self.rm.config.get("diskimage-reuse")
- if reusealg==constants.REUSE_IMAGECACHES:
- nodeswithimg = self.rm.resourcepool.getNodesWithImgInPool(diskImageID, start)
+ #self.lease_deployment_type = self.rm.config.get("lease-preparation")
+ #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+ # reusealg = self.rm.config.get("diskimage-reuse")
+ # if reusealg==constants.REUSE_IMAGECACHES:
+ # nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
# Compares node x and node y.
# Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
More information about the Haizea-commit
mailing list