[haizea-commit] r487 - in trunk/src/haizea/resourcemanager: . deployment enact enact/opennebula enact/simulated
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Sep 4 12:28:38 CDT 2008
Author: borja
Date: 2008-09-04 12:28:38 -0500 (Thu, 04 Sep 2008)
New Revision: 487
Added:
trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py
trunk/src/haizea/resourcemanager/enact/simulated/deployment.py
Removed:
trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
trunk/src/haizea/resourcemanager/enact/simulated/storage.py
Modified:
trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
trunk/src/haizea/resourcemanager/enact/base.py
trunk/src/haizea/resourcemanager/enact/simulated/__init__.py
trunk/src/haizea/resourcemanager/enact/simulated/info.py
trunk/src/haizea/resourcemanager/enact/simulated/vm.py
trunk/src/haizea/resourcemanager/resourcepool.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Log:
- Cleaned up image transfer + caching code
- Renamed "storage" enactment modules to (more appropriate) "deployment" enactment modules, and factored out some code from resourcepool and deployment modules there.
Modified: trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -30,24 +30,21 @@
# TODO: The following two should be merged into
# something like this:
# self.imageNode = self.info.getImageNode()
- self.FIFOnode = self.info.getFIFONode()
- self.EDFnode = self.info.getEDFNode()
+ self.fifo_node = self.resourcepool.deployment.get_fifo_node()
+ self.edf_node = self.resourcepool.deployment.get_edf_node()
- self.transfersEDF = []
- self.transfersFIFO = []
- self.completedTransfers = []
-
- self.lease_deployment_type = self.rm.config.get("lease-preparation")
- if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- self.reusealg = self.rm.config.get("diskimage-reuse")
- if self.reusealg == constants.REUSE_IMAGECACHES:
- self.maxcachesize = self.rm.config.get("diskimage-cache-size")
- else:
- self.maxcachesize = None
+ self.transfers_edf = []
+ self.transfers_fifo = []
+ self.completed_transfers = []
+
+ config = self.scheduler.rm.config
+ self.reusealg = config.get("diskimage-reuse")
+ if self.reusealg == constants.REUSE_IMAGECACHES:
+ self.maxcachesize = config.get("diskimage-cache-size")
else:
- self.reusealg = None
+ self.maxcachesize = None
- self.imagenode_bandwidth = self.info.get_bandwidth()
+ self.imagenode_bandwidth = self.resourcepool.deployment.get_bandwidth()
self.scheduler.register_handler(type = FileTransferResourceReservation,
on_start = ImageTransferDeployment.handle_start_filetransfer,
@@ -84,7 +81,7 @@
self.logger.debug("Scheduling image transfer of '%s' from vnode %i to physnode %i" % (lease.diskimage_id, vnode, pnode))
if reusealg == constants.REUSE_IMAGECACHES:
- if self.resourcepool.isInPool(pnode, lease.diskimage_id, start):
+ if self.resourcepool.exists_reusable_image(pnode, lease.diskimage_id, start):
self.logger.debug("No need to schedule an image transfer (reusing an image in pool)")
mustpool[vnode] = pnode
else:
@@ -108,18 +105,18 @@
transferRR = transferRRs[pnode]
transferRR.piggyback(lease_id, vnode, pnode, end)
else:
- filetransfer = self.scheduleImageTransferEDF(lease, {vnode:pnode}, nexttime)
+ filetransfer = self.schedule_imagetransfer_edf(lease, {vnode:pnode}, nexttime)
transferRRs[pnode] = filetransfer
lease.appendRR(filetransfer)
elif mechanism == constants.TRANSFER_MULTICAST:
- filetransfer = self.scheduleImageTransferEDF(lease, musttransfer, nexttime)
+ filetransfer = self.schedule_imagetransfer_edf(lease, musttransfer, nexttime)
lease.append_rr(filetransfer)
# No chance of scheduling exception at this point. It's safe
# to add entries to the pools
if reusealg == constants.REUSE_IMAGECACHES:
for (vnode, pnode) in mustpool.items():
- self.resourcepool.addToPool(pnode, lease.diskImageID, lease_id, vnode, start)
+ self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
def schedule_for_besteffort(self, lease, vmrr, nexttime):
config = self.scheduler.rm.config
@@ -136,7 +133,7 @@
if reqtransfer == constants.REQTRANSFER_COWPOOL:
# Add to pool
self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
- self.resourcepool.addToPool(pnode, lease.diskimage_id, lease.id, vnode, vmrr.end)
+ self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, vmrr.end)
elif reqtransfer == constants.REQTRANSFER_PIGGYBACK:
# We can piggyback on an existing transfer
transferRR = earliest[pnode][2]
@@ -148,7 +145,7 @@
musttransfer[vnode] = pnode
self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
if len(musttransfer)>0:
- transferRRs = self.scheduleImageTransferFIFO(lease, musttransfer, nexttime)
+ transferRRs = self.schedule_imagetransfer_fifo(lease, musttransfer, nexttime)
endtransfer = transferRRs[-1].end
lease.imagesavail = endtransfer
else:
@@ -168,16 +165,16 @@
def find_earliest_starting_times(self, lease_req, nexttime):
- nodIDs = [n.nod_id for n in self.resourcepool.getNodes()]
+ nodIDs = [n.nod_id for n in self.resourcepool.get_nodes()]
config = self.scheduler.rm.config
mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
# Figure out starting time assuming we have to transfer the image
- nextfifo = self.getNextFIFOTransferTime(nexttime)
+ nextfifo = self.get_next_fifo_transfer_time(nexttime)
- imgTransferTime=lease_req.estimate_image_transfer_time(self.resourcepool.imagenode_bandwidth)
+ imgTransferTime=lease_req.estimate_image_transfer_time(self.imagenode_bandwidth)
# Find worst-case earliest start time
if lease_req.numnodes == 1:
@@ -196,7 +193,7 @@
# Check if we can reuse images
if reusealg==constants.REUSE_IMAGECACHES:
- nodeswithimg = self.resourcepool.getNodesWithImgInPool(lease_req.diskimage_id)
+ nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease_req.diskimage_id)
for node in nodeswithimg:
earliest[node] = [nexttime, constants.REQTRANSFER_COWPOOL]
@@ -208,7 +205,7 @@
# TODO
if mechanism == constants.TRANSFER_MULTICAST:
# We can only piggyback on transfers that haven't started yet
- transfers = [t for t in self.transfersFIFO if t.state == constants.RES_STATE_SCHEDULED]
+ transfers = [t for t in self.transfers_fifo if t.state == constants.RES_STATE_SCHEDULED]
for t in transfers:
if t.file == lease_req.diskImageID:
startTime = t.end
@@ -219,19 +216,19 @@
return earliest
- def scheduleImageTransferEDF(self, req, vnodes, nexttime):
+ def schedule_imagetransfer_edf(self, req, vnodes, nexttime):
# Estimate image transfer time
- bandwidth = self.resourcepool.imagenode_bandwidth
+ bandwidth = self.resourcepool.deployment.get_bandwidth()
imgTransferTime=req.estimate_image_transfer_time(bandwidth)
# Determine start time
- activetransfers = [t for t in self.transfersEDF if t.state == constants.RES_STATE_ACTIVE]
+ activetransfers = [t for t in self.transfers_edf if t.state == constants.RES_STATE_ACTIVE]
if len(activetransfers) > 0:
startTime = activetransfers[-1].end
else:
startTime = nexttime
- transfermap = dict([(copy.copy(t), t) for t in self.transfersEDF if t.state == constants.RES_STATE_SCHEDULED])
+ transfermap = dict([(copy.copy(t), t) for t in self.transfers_edf if t.state == constants.RES_STATE_SCHEDULED])
newtransfers = transfermap.keys()
res = {}
@@ -239,7 +236,7 @@
resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
resnode = ds.ResourceTuple.create_empty()
resnode.set_by_type(constants.RES_NETIN, bandwidth)
- res[self.slottable.EDFnode] = resimgnode
+ res[self.edf_node.nod_id] = resimgnode
for n in vnodes.values():
res[n] = resnode
@@ -307,22 +304,22 @@
for t in newtransfers:
if t == newtransfer:
self.slottable.addReservation(t)
- self.transfersEDF.append(t)
+ self.transfers_edf.append(t)
else:
tOld = transfermap[t]
- self.transfersEDF.remove(tOld)
- self.transfersEDF.append(t)
+ self.transfers_edf.remove(tOld)
+ self.transfers_edf.append(t)
self.slottable.updateReservationWithKeyChange(tOld, t)
return newtransfer
- def scheduleImageTransferFIFO(self, req, reqtransfers, nexttime):
+ def schedule_imagetransfer_fifo(self, req, reqtransfers, nexttime):
# Estimate image transfer time
- bandwidth = self.resourcepool.imagenode_bandwidth
+ bandwidth = self.imagenode_bandwidth
imgTransferTime=req.estimate_image_transfer_time(bandwidth)
config = self.scheduler.rm.config
mechanism = config.get("transfer-mechanism")
- startTime = self.getNextFIFOTransferTime(nexttime)
+ startTime = self.get_next_fifo_transfer_time(nexttime)
newtransfers = []
@@ -338,7 +335,7 @@
resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
resnode = ds.ResourceTuple.create_empty()
resnode.set_by_type(constants.RES_NETIN, bandwidth)
- res[self.slottable.FIFOnode] = resimgnode
+ res[self.fifo_node.nod_id] = resimgnode
for n in reqtransfers.values():
res[n] = resnode
newtransfer = FileTransferResourceReservation(req, res)
@@ -353,12 +350,12 @@
self.slottable.addReservation(newtransfer)
newtransfers.append(newtransfer)
- self.transfersFIFO += newtransfers
+ self.transfers_fifo += newtransfers
return newtransfers
- def getNextFIFOTransferTime(self, nexttime):
- transfers = [t for t in self.transfersFIFO if t.state != constants.RES_STATE_DONE]
+ def get_next_fifo_transfer_time(self, nexttime):
+ transfers = [t for t in self.transfers_fifo if t.state != constants.RES_STATE_DONE]
if len(transfers) > 0:
startTime = transfers[-1].end
else:
@@ -366,7 +363,7 @@
return startTime
def __remove_from_fifo_transfers(self, lease_id):
- transfers = [t for t in self.transfersFIFO if t.state != constants.RES_STATE_DONE]
+ transfers = [t for t in self.transfers_fifo if t.state != constants.RES_STATE_DONE]
toremove = []
for t in transfers:
for pnode in t.transfers:
@@ -381,7 +378,7 @@
self.slottable.removeReservation(t)
toremove.append(t)
for t in toremove:
- self.transfersFIFO.remove(t)
+ self.transfers_fifo.remove(t)
@staticmethod
def handle_start_filetransfer(sched, lease, rr):
@@ -423,93 +420,63 @@
if maxend==None or end>maxend:
maxend=end
# TODO: ENACTMENT: Verify the image was transferred correctly
- self.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
- elif lease.state == constants.LEASE_STATE_SUSPENDED:
- pass
- # TODO: Migrating
+ sched.deployment.add_diskimages(physnode, rr.file, lease.diskimage_size, vnodes, timeout=maxend)
+
lease.print_contents()
sched.updateNodeTransferState(rr.transfers.keys(), constants.DOING_IDLE, lease.id)
sched.rm.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
sched.rm.logger.info("Completed image transfer for lease %i" % (lease.id))
- def add_diskimages(self, nod_id, imagefile, imagesize, vnodes, timeout=None):
- self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, nod_id))
- self.getNode(nod_id).printFiles()
+ def add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
+ self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
+ pnode = self.resourcepool.get_node(pnode_id)
+
if self.reusealg == constants.REUSE_NONE:
for (lease_id, vnode) in vnodes:
- img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(lease_id, vnode)
- self.getNode(nod_id).addFile(img)
+ self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
elif self.reusealg == constants.REUSE_IMAGECACHES:
# Sometimes we might find that the image is already deployed
# (although unused). In that case, don't add another copy to
# the pool. Just "reactivate" it.
- if self.getNode(nod_id).isInPool(imagefile):
+ if pnode.exists_reusable_image(diskimage_id):
for (lease_id, vnode) in vnodes:
- self.getNode(nod_id).addToPool(imagefile, lease_id, vnode, timeout)
+ pnode.add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
else:
- img = VMImageFile(imagefile, imagesize, masterimg=True)
- img.timeout = timeout
- for (lease_id, vnode) in vnodes:
- img.addMapping(lease_id, vnode)
- if self.maxcachesize != constants.CACHESIZE_UNLIMITED:
- poolsize = self.getNode(nod_id).getPoolSize()
- reqsize = poolsize + imagesize
+ if self.maxcachesize == constants.CACHESIZE_UNLIMITED:
+ can_add_to_cache = True
+ else:
+ # We may have to remove images from the cache
+ cachesize = pnode.get_reusable_images_size()
+ reqsize = cachesize + diskimage_size
if reqsize > self.maxcachesize:
- desiredsize = self.maxcachesize - imagesize
- self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (nod_id, reqsize, desiredsize))
- self.getNode(nod_id).printFiles()
- success = self.getNode(nod_id).purgePoolDownTo(self.maxcachesize)
+ # Have to shrink cache
+ desiredsize = self.maxcachesize - diskimage_size
+ self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (pnode_id, reqsize, desiredsize))
+ pnode.print_files()
+ success = pnode.purge_downto(self.maxcachesize)
if not success:
- self.logger.debug("Unable to add to pool. Creating tainted image instead.")
- # If unsuccessful, this just means we couldn't add the image
- # to the pool. We will have to create tainted images to be used
- # only by these leases
- for (lease_id, vnode) in vnodes:
- img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(lease_id, vnode)
- self.getNode(nod_id).addFile(img)
+ can_add_to_cache = False
else:
- self.getNode(nod_id).addFile(img)
+ can_add_to_cache = True
else:
- self.getNode(nod_id).addFile(img)
+ can_add_to_cache = True
+
+ if can_add_to_cache:
+ self.resourcepool.add_reusable_image(pnode_id, diskimage_id, diskimage_size, vnodes, timeout)
else:
- self.getNode(nod_id).addFile(img)
+ # This just means we couldn't add the image
+ # to the pool. We will have to create disk images to be used
+ # only by these leases
+ self.logger.debug("Unable to add to pool. Must create individual disk images instead.")
+ for (lease_id, vnode) in vnodes:
+ self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
- self.getNode(nod_id).printFiles()
+ pnode.print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
-# def checkImage(self, pnode, lease_id, vnode, imagefile):
-# node = self.getNode(pnode)
-# if self.rm.config.get("lease-preparation") == constants.DEPLOYMENT_UNMANAGED:
-# self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
-# elif self.reusealg == constants.REUSE_NONE:
-# if not node.hasTaintedImage(lease_id, vnode, imagefile):
-# self.logger.debug("ERROR: Image for L%iV%i is not deployed on node %i" % (lease_id, vnode, pnode))
-# elif self.reusealg == constants.REUSE_IMAGECACHES:
-# poolentry = node.getPoolEntry(imagefile, lease_id=lease_id, vnode=vnode)
-# if poolentry == None:
-# # Not necessarily an error. Maybe the pool was full, and
-# # we had to fall back on creating a tainted image right
-# # when the image was transferred. We have to check this.
-# if not node.hasTaintedImage(lease_id, vnode, imagefile):
-# self.logger.error("ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease_id, vnode, pnode))
-# else:
-# # Create tainted image
-# self.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode))
-# node.printFiles()
-# img = VMImageFile(imagefile, poolentry.filesize, masterimg=False)
-# img.addMapping(lease_id, vnode)
-# node.addFile(img)
-# node.printFiles()
-# self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
-
-
def check(self, lease, vmrr):
- # Check that all the required disk images are available,
- # and determine what their physical filenames are.
+ # Check that all the required disk images are available.
# Note that it is the enactment module's responsibility to
# mark an image as correctly deployed. The check we do here
# is (1) to catch scheduling errors (i.e., the image transfer
@@ -520,29 +487,30 @@
# make a copy of the master image (which takes time), and should
# be scheduled.
- for (vnode, pnode) in vmrr.nodes.items():
- node = self.resourcepool.getNode(pnode)
+ for (vnode, pnode_id) in vmrr.nodes.items():
+ pnode = self.resourcepool.get_node(pnode_id)
- taintedImage = None
-
- taintedImage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+ diskimage = pnode.get_diskimage(lease.id, vnode, lease.diskimage_id)
if self.reusealg == constants.REUSE_NONE:
- if taintedImage == None:
+ if diskimage == None:
raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.id, vnode, pnode)
elif self.reusealg == constants.REUSE_IMAGECACHES:
- poolentry = node.getPoolEntry(lease.diskimage_id, lease_id=lease.id, vnode=vnode)
- if poolentry == None:
+ reusable_image = pnode.get_reusable_image(lease.diskimage_id, lease_id=lease.id, vnode=vnode)
+ if reusable_image == None:
# Not necessarily an error. Maybe the pool was full, and
# we had to fall back on creating a tainted image right
# when the image was transferred. We have to check this.
- if taintedImage == None:
- raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.id, vnode, pnode)
+ if diskimage == None:
+ raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.id, vnode, pnode_id)
else:
# Create tainted image
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
+ self.resourcepool.add_diskimage(pnode_id, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
# ENACTMENT
# self.storage.createCopyFromCache(pnode, lease.diskImageSize)
+ def cleanup(self, lease, vmrr):
+ for vnode, pnode in lease.vmimagemap.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
class FileTransferResourceReservation(ResourceReservationBase):
def __init__(self, lease, res, start=None, end=None):
Modified: trunk/src/haizea/resourcemanager/enact/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/base.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/base.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -26,11 +26,17 @@
resourcetypes = self.get_resource_types() #IGNORE:E1111
ds.ResourceTuple.set_resource_types(resourcetypes)
-
def get_nodes(self):
""" Returns the nodes in the resource pool. """
abstract()
+ def get_resource_types(self):
+ abstract()
+
+class DeploymentEnactmentBase(object):
+ def __init__(self, resourcepool):
+ self.resourcepool = resourcepool
+
def get_fifo_node(self):
""" Returns the image node for FIFO transfers
@@ -48,13 +54,6 @@
their respective algorithms)
"""
abstract()
-
- def get_resource_types(self):
- abstract()
-
-class StorageEnactmentBase(object):
- def __init__(self, resourcepool):
- self.resourcepool = resourcepool
class VMEnactmentBase(object):
def __init__(self, resourcepool):
Copied: trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py (from rev 486, trunk/src/haizea/resourcemanager/enact/opennebula/storage.py)
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py (rev 0)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -0,0 +1,27 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from haizea.resourcemanager.enact.base import DeploymentEnactmentBase
+
+class DeploymentEnactment(DeploymentEnactmentBase):
+ def __init__(self, resourcepool):
+ DeploymentEnactmentBase.__init__(self, resourcepool)
+ self.imagepath="/images/playground/borja"
+
+ def resolve_to_file(self, lease_id, vnode, diskImageID):
+ return "%s/%s/%s.img" % (self.imagepath, diskImageID, diskImageID)
\ No newline at end of file
Deleted: trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -1,27 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.resourcemanager.enact.base import StorageEnactmentBase
-
-class StorageEnactment(StorageEnactmentBase):
- def __init__(self, resourcepool):
- StorageEnactmentBase.__init__(self, resourcepool)
- self.imagepath="/images/playground/borja"
-
- def resolve_to_file(self, lease_id, vnode, diskImageID):
- return "%s/%s/%s.img" % (self.imagepath, diskImageID, diskImageID)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/__init__.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/simulated/__init__.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -18,8 +18,8 @@
from haizea.resourcemanager.enact.simulated.info import ResourcePoolInfo
from haizea.resourcemanager.enact.simulated.vm import VMEnactment
-from haizea.resourcemanager.enact.simulated.storage import StorageEnactment
+from haizea.resourcemanager.enact.simulated.deployment import DeploymentEnactment
info=ResourcePoolInfo
-storage=StorageEnactment
+deployment=DeploymentEnactment
vm=VMEnactment
\ No newline at end of file
Copied: trunk/src/haizea/resourcemanager/enact/simulated/deployment.py (from rev 486, trunk/src/haizea/resourcemanager/enact/simulated/storage.py)
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/deployment.py (rev 0)
+++ trunk/src/haizea/resourcemanager/enact/simulated/deployment.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -0,0 +1,59 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from haizea.resourcemanager.enact.base import DeploymentEnactmentBase
+from haizea.resourcemanager.resourcepool import Node
+import haizea.resourcemanager.datastruct as ds
+import haizea.common.constants as constants
+import logging
+
+baseCachePath="/vm/cache"
+baseWorkingPath="/vm/working"
+stagingPath="/vm/staging"
+
+class DeploymentEnactment(DeploymentEnactmentBase):
+ def __init__(self, resourcepool):
+ DeploymentEnactmentBase.__init__(self, resourcepool)
+ self.logger = logging.getLogger("ENACT.SIMUL.INFO")
+ config = self.resourcepool.rm.config
+
+ self.bandwidth = config.get("imagetransfer-bandwidth")
+
+ # Image repository nodes
+ numnodes = config.get("simul.nodes")
+
+ imgcapacity = ds.ResourceTuple.create_empty()
+ imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
+
+ self.fifo_node = Node(self.resourcepool, numnodes+1, "FIFOnode", imgcapacity)
+ self.edf_node = Node(self.resourcepool, numnodes+2, "EDFnode", imgcapacity)
+
+ def get_edf_node(self):
+ return self.edf_node
+
+ def get_fifo_node(self):
+ return self.fifo_node
+
+ def get_aux_nodes(self):
+ return [self.edf_node, self.fifo_node]
+
+ def get_bandwidth(self):
+ return self.bandwidth
+
+ def resolve_to_file(self, lease_id, vnode, diskimage_id):
+ return "%s/%s-L%iV%i" % (baseWorkingPath, diskimage_id, lease_id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -30,30 +30,16 @@
self.suspendresumerate = config.get("simul.suspendresume-rate")
numnodes = config.get("simul.nodes")
- self.bandwidth = config.get("imagetransfer-bandwidth")
capacity = self.parse_resources_string(config.get("simul.resources"))
self.nodes = [Node(self.resourcepool, i+1, "simul-%i" % (i+1), capacity) for i in range(numnodes)]
for n in self.nodes:
n.enactment_info = n.nod_id
-
- # Image repository nodes
- imgcapacity = ds.ResourceTuple.create_empty()
- imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
-
- self.FIFOnode = Node(self.resourcepool, numnodes+1, "FIFOnode", imgcapacity)
- self.EDFnode = Node(self.resourcepool, numnodes+2, "EDFnode", imgcapacity)
def get_nodes(self):
return self.nodes
- def get_edf_Node(self):
- return self.EDFnode
-
- def get_fifo_node(self):
- return self.FIFOnode
-
def get_resource_types(self):
return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
(constants.RES_MEM, constants.RESTYPE_INT, "Mem"),
@@ -73,6 +59,6 @@
def get_suspendresume_rate(self):
return self.suspendresumerate
-
- def get_bandwidth(self):
- return self.bandwidth
+
+ def get_migration_bandwidth(self):
+ return 100 # TODO: Get from config file
Deleted: trunk/src/haizea/resourcemanager/enact/simulated/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -1,30 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.resourcemanager.enact.base import StorageEnactmentBase
-
-baseCachePath="/vm/cache"
-baseWorkingPath="/vm/working"
-stagingPath="/vm/staging"
-
-class StorageEnactment(StorageEnactmentBase):
- def __init__(self, resourcepool):
- StorageEnactmentBase.__init__(self, resourcepool)
-
- def resolve_to_file(self, lease_id, vnode, diskImageID):
- return "%s/%s-L%iV%i" % (baseWorkingPath, diskImageID, lease_id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -32,6 +32,7 @@
image = action.vnodes[vnode].diskimage
cpu = action.vnodes[vnode].resources.get_by_type(constants.RES_CPU)
memory = action.vnodes[vnode].resources.get_by_type(constants.RES_MEM)
+ print (action.lease_haizea_id, vnode, pnode, image, cpu, memory)
self.logger.debug("Received request to start VM for L%iV%i on host %i, image=%s, cpu=%i, mem=%i"
% (action.lease_haizea_id, vnode, pnode, image, cpu, memory))
Modified: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -23,12 +23,13 @@
class ResourcePool(object):
def __init__(self, scheduler):
+ self.scheduler = scheduler
self.rm = scheduler.rm
- self.logger = logging.getLogger("RESOURCEPOOL")
+ self.logger = logging.getLogger("RPOOL")
self.info = None
self.vm = None
- self.storage = None
+ self.deployment = None
self.load_enactment_modules()
@@ -41,7 +42,7 @@
exec "import %s.%s as enact" % (constants.ENACT_PACKAGE, mode)
self.info = enact.info(self) #IGNORE:E0602
self.vm = enact.vm(self) #IGNORE:E0602
- self.storage = enact.storage(self) #IGNORE:E0602
+ self.deployment = enact.deployment(self) #IGNORE:E0602
except Exception, msg:
self.logger.error("Unable to load enactment modules for mode '%s'" % mode)
raise
@@ -50,9 +51,7 @@
def start_vms(self, lease, rr):
start_action = actions.VMEnactmentStartAction()
start_action.from_rr(rr)
-
- # TODO: Get tainted image
-
+
for (vnode, pnode) in rr.nodes.items():
node = self.get_node(pnode)
diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
@@ -64,7 +63,7 @@
self.vm.start(start_action)
except Exception, msg:
self.logger.error("Enactment of start VM failed: %s" % msg)
- self.rm.fail_lease(lease.id)
+ self.scheduler.fail_lease(lease.id)
def stop_vms(self, lease, rr):
stop_action = actions.VMEnactmentStopAction()
@@ -123,6 +122,14 @@
def get_nodes(self):
return self.nodes
+
+ # An auxiliary node is a host whose resources are going to be scheduled, but
+ # where no VMs are actually going to run. For example, a disk image repository node.
+ def get_aux_nodes(self):
+ # TODO: We're only asking the deployment enactment module for auxiliary nodes.
+ # There might be a scenario where the info enactment module also reports
+ # auxiliary nodes.
+ return self.deployment.get_aux_nodes()
def get_num_nodes(self):
return len(self.nodes)
@@ -132,11 +139,17 @@
def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
+
+ self.logger.vdebug("Files BEFORE:")
self.get_node(pnode).print_files()
- imagefile = self.storage.resolve_to_file(lease_id, vnode, diskimage_id)
+
+ imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
self.get_node(pnode).add_file(img)
+
+ self.logger.vdebug("Files AFTER:")
self.get_node(pnode).print_files()
+
self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
return img
@@ -234,7 +247,7 @@
images = ""
if len(self.files) > 0:
images = ", ".join([str(img) for img in self.files])
- self.logger.vdebug("Node %i has %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
+ self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
def get_state(self):
if self.vm_doing == constants.DOING_IDLE and self.transfer_doing == constants.DOING_TRANSFER:
@@ -279,26 +292,51 @@
def __str__(self):
return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
-class ResourcePoolWithReusableImages(object):
+class ResourcePoolWithReusableImages(ResourcePool):
def __init__(self, scheduler):
ResourcePool.__init__(self, scheduler)
+
+ self.nodes = [NodeWithReusableImages.from_node(n) for n in self.nodes]
- def remove_diskimage(self, pnode, lease, vnode):
- ResourcePool.remove_diskimage(self, pnode, lease, vnode)
- self.logger.debug("Removing pooled images for L%iV%i in node %i" % (lease, vnode, pnode))
- toremove = []
- for img in node.getPoolImages():
+ def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
+ self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
+
+ self.logger.vdebug("Files BEFORE:")
+ self.get_node(pnode).print_files()
+
+ imagefile = "reusable-%s" % diskimage_id
+ img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
+ for (lease_id, vnode) in mappings:
+ img.add_mapping(lease_id, vnode)
+
+ self.get_node(pnode).add_reusable_image(img)
+
+ self.logger.vdebug("Files AFTER:")
+ self.get_node(pnode).print_files()
+
+ self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ return img
+
+ def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
+ self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
+
+ def remove_diskimage(self, pnode_id, lease, vnode):
+ ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
+ self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
+ for img in self.get_node(pnode_id).get_reusable_images():
if (lease, vnode) in img.mappings:
img.mappings.remove((lease, vnode))
- node.printFiles()
+ self.get_node(pnode_id).print_files()
# Keep image around, even if it isn't going to be used
# by any VMs. It might be reused later on.
# It will be purged if space has to be made available
# for other images
- for img in toremove:
- node.files.remove(img)
- node.printFiles()
+
+ def get_nodes_with_reusable_image(self, diskimage_id, after = None):
+ return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
+ def exists_reusable_image(self, pnode_id, diskimage_id, after):
+ return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
class NodeWithReusableImages(Node):
@@ -306,16 +344,25 @@
Node.__init__(self, resourcepool, nod_id, hostname, capacity)
self.reusable_images = []
- def add_mapping_to_existing_reusable_image(self, imagefile, lease_id, vnode, timeout):
- for f in self.files:
- if f.filename == imagefile:
+ @classmethod
+ def from_node(cls, n):
+ node = cls(n.resourcepool, n.nod_id, n.hostname, n.capacity)
+ node.enactment_info = n.enactment_info
+ return node
+
+ def add_reusable_image(self, f):
+ self.reusable_images.append(f)
+
+ def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
+ for f in self.reusable_images:
+ if f.diskimage_id == diskimage_id:
f.add_mapping(lease_id, vnode)
f.update_timeout(timeout)
break # Ugh
self.print_files()
- def get_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
- images = [i for i in self.reusable_images if i.filename == imagefile]
+ def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
+ images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
if after != None:
images = [i for i in images if i.timeout >= after]
if lease_id != None and vnode != None:
@@ -364,12 +411,19 @@
success = True
return success
+ def print_files(self):
+ Node.print_files(self)
+ images = ""
+ if len(self.reusable_images) > 0:
+ images = ", ".join([str(img) for img in self.reusable_images])
+ self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
+
class ReusableDiskImageFile(File):
- def __init__(self, filename, filesize, diskimage_id):
+ def __init__(self, filename, filesize, diskimage_id, timeout):
File.__init__(self, filename, filesize)
self.diskimage_id = diskimage_id
self.mappings = set([])
- self.timeout = None
+ self.timeout = timeout
def add_mapping(self, lease_id, vnode):
self.mappings.add((lease_id, vnode))
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/rm.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -333,7 +333,7 @@
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception when notifying an event for lease %i. Dumping state..." % lease_id )
- self.print_stats("ERROR", verbose=True)
+ self.print_stats(logging.getLevelName("ERROR"), verbose=True)
raise
def cancel_lease(self, lease_id):
@@ -347,7 +347,7 @@
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception when canceling lease %i. Dumping state..." % lease_id)
- self.print_stats("ERROR", verbose=True)
+ self.print_stats(logging.getLevelName("ERROR"), verbose=True)
raise
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -37,7 +37,7 @@
from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
from haizea.resourcemanager.datastruct import ARLease, ImmediateLease, VMResourceReservation
-from haizea.resourcemanager.resourcepool import ResourcePool
+from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
import logging
@@ -78,14 +78,17 @@
def __init__(self, rm):
self.rm = rm
self.logger = logging.getLogger("SCHED")
- self.resourcepool = ResourcePool(self)
+ if self.rm.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
+ self.resourcepool = ResourcePoolWithReusableImages(self)
+ else:
+ self.resourcepool = ResourcePool(self)
self.slottable = SlotTable(self)
self.queue = ds.Queue(self)
self.scheduledleases = ds.LeaseTable(self)
self.completedleases = ds.LeaseTable(self)
self.pending_leases = []
- for n in self.resourcepool.get_nodes():
+ for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
self.slottable.add_node(n)
self.handlers = {}
@@ -233,11 +236,12 @@
lease -- Lease to fail
"""
try:
+ raise
self.cancel_lease(lease_id)
except Exception, msg:
# Exit if something goes horribly wrong
self.logger.error("Exception when failing lease %i. Dumping state..." % lease_id)
- self.print_stats("ERROR", verbose=True)
+ self.rm.print_stats(logging.getLevelName("ERROR"), verbose=True)
raise
def notify_event(self, lease_id, event):
@@ -451,7 +455,7 @@
req.remove_rr(susprr)
self.slottable.removeReservation(susprr)
for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.removeImage(pnode, req.id, vnode)
+ self.resourcepool.remove_diskimage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
@@ -484,7 +488,7 @@
req.remove_rr(resmrr)
self.slottable.removeReservation(resmrr)
for vnode, pnode in req.vmimagemap.items():
- self.resourcepool.removeImage(pnode, req.id, vnode)
+ self.resourcepool.remove_diskimage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
@@ -527,6 +531,7 @@
l.start.actual = now_time
try:
+ self.deployment.check(l, rr)
self.resourcepool.start_vms(l, rr)
# The next two lines have to be moved somewhere more
# appropriate inside the resourcepool module
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-09-03 17:35:19 UTC (rev 486)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-09-04 17:28:38 UTC (rev 487)
@@ -83,7 +83,7 @@
self.scheduler = scheduler
self.rm = scheduler.rm
self.resourcepool = scheduler.resourcepool
- self.logger = logging.getLogger("SLOTTABLE")
+ self.logger = logging.getLogger("SLOT")
self.nodes = NodeList()
self.reservations = []
self.reservationsByStart = []
@@ -485,6 +485,7 @@
resreq = lease.requested_resources
preemptible = lease.preemptible
suspendresumerate = self.resourcepool.info.get_suspendresume_rate()
+ migration_bandwidth = self.resourcepool.info.get_migration_bandwidth()
#
# STEP 1: TAKE INTO ACCOUNT VM RESUMPTION (IF ANY)
@@ -501,10 +502,7 @@
if mustresume and canmigrate:
# If we have to resume this lease, make sure that
# we have enough time to transfer the images.
- # TODO: Get bandwidth another way. Right now, the
- # image node bandwidth is the same as the bandwidt
- # in the other nodes, but this won't always be true.
- migratetime = lease.estimate_migration_time(self.rm.scheduler.resourcepool.info.get_bandwidth())
+ migratetime = lease.estimate_migration_time(migration_bandwidth)
earliesttransfer = self.rm.clock.get_time() + migratetime
for n in earliest:
More information about the Haizea-commit
mailing list