[haizea-commit] r607 - in branches/TP2.0/src/haizea: common core core/enact core/scheduler core/scheduler/preparation_schedulers policies
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Jul 16 11:55:12 CDT 2009
Author: borja
Date: 2009-07-16 11:55:07 -0500 (Thu, 16 Jul 2009)
New Revision: 607
Modified:
branches/TP2.0/src/haizea/common/constants.py
branches/TP2.0/src/haizea/core/enact/simulated.py
branches/TP2.0/src/haizea/core/leases.py
branches/TP2.0/src/haizea/core/manager.py
branches/TP2.0/src/haizea/core/scheduler/__init__.py
branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
branches/TP2.0/src/haizea/core/scheduler/mapper.py
branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/__init__.py
branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
branches/TP2.0/src/haizea/core/scheduler/resourcepool.py
branches/TP2.0/src/haizea/core/scheduler/slottable.py
branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
branches/TP2.0/src/haizea/policies/host_selection.py
Log:
- Cleaned up image transfer scheduling code (most notably removing the requirement for two separate image repositories, one for best-effort leases and another one for AR leases).
- Cleaned up the way the resource pool and the slot table handle nodes.
- Many other small fixes to get the image transfer scheduler working.
Modified: branches/TP2.0/src/haizea/common/constants.py
===================================================================
--- branches/TP2.0/src/haizea/common/constants.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/common/constants.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -67,12 +67,6 @@
CLOCK_SIMULATED = "simulated"
CLOCK_REAL = "real"
-# Transfer required in deployment
-REQTRANSFER_NO = 0
-REQTRANSFER_YES = 1
-REQTRANSFER_COWPOOL = 2
-REQTRANSFER_PIGGYBACK = 3
-
# Misc
BETTER = -1
EQUAL = 0
Modified: branches/TP2.0/src/haizea/core/enact/simulated.py
===================================================================
--- branches/TP2.0/src/haizea/core/enact/simulated.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/enact/simulated.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -16,6 +16,7 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+from haizea.core.leases import Capacity
from haizea.core.scheduler.resourcepool import ResourcePoolNode
from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
import haizea.common.constants as constants
@@ -50,9 +51,9 @@
nodes = site.nodes.get_all_nodes()
- self.nodes = [ResourcePoolNode(id, "simul-%i" % id, capacity) for (id, capacity) in nodes.items()]
- for n in self.nodes:
- n.enactment_info = n.nod_id
+ self.nodes = dict([(id, ResourcePoolNode(id, "simul-%i" % id, capacity)) for (id, capacity) in nodes.items()])
+ for node in self.nodes.values():
+ node.enactment_info = node.id
def get_nodes(self):
return self.nodes
@@ -107,25 +108,17 @@
self.bandwidth = config.get("imagetransfer-bandwidth")
- # Commenting for now
- #imgcapacity = ResourceTuple.create_empty()
- #imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
+ imgcapacity = Capacity([constants.RES_NETOUT])
+ imgcapacity.set_quantity(constants.RES_NETOUT, self.bandwidth)
- imgcapacity = None
-
# TODO: Determine node number based on site
- self.fifo_node = ResourcePoolNode(1000, "FIFOnode", imgcapacity)
- self.edf_node = ResourcePoolNode(1001, "EDFnode", imgcapacity)
+ self.imagenode = ResourcePoolNode(1000, "image_node", imgcapacity)
- def get_edf_node(self):
- return self.edf_node
-
- def get_fifo_node(self):
- return self.fifo_node
-
- # Commenting for now
+ def get_imagenode(self):
+ return self.imagenode
+
def get_aux_nodes(self):
- return [] #[self.edf_node, self.fifo_node]
+ return [self.imagenode]
def get_bandwidth(self):
return self.bandwidth
Modified: branches/TP2.0/src/haizea/core/leases.py
===================================================================
--- branches/TP2.0/src/haizea/core/leases.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/leases.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -168,6 +168,12 @@
def append_preparationrr(self, vmrr):
self.preparation_rrs.append(vmrr)
+
+ def remove_preparationrr(self, preparation_rr):
+ if not preparation_rr in self.preparation_rrs:
+ raise Exception, "Tried to remove a preparation RR not contained in this lease"
+ else:
+ self.preparation_rrs.remove(preparation_rr)
def get_last_vmrr(self):
return self.vm_rrs[-1]
Modified: branches/TP2.0/src/haizea/core/manager.py
===================================================================
--- branches/TP2.0/src/haizea/core/manager.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/manager.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -153,7 +153,7 @@
slottable = SlotTable(site.get_resource_types_with_max_instances())
for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
rt = slottable.create_resource_tuple_from_capacity(n.capacity)
- slottable.add_node(rt)
+ slottable.add_node(n.id, rt)
# Policy manager
admission = self.config.get("policy.admission")
Modified: branches/TP2.0/src/haizea/core/scheduler/__init__.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/__init__.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/__init__.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -81,4 +81,12 @@
self.on_start_method(self.sched, lease, rr)
def on_end(self, lease, rr):
- self.on_end_method(self.sched, lease, rr)
\ No newline at end of file
+ self.on_end_method(self.sched, lease, rr)
+
+class EarliestStartingTime(object):
+ EARLIEST_NOPREPARATION = 0
+ EARLIEST_MIGRATION = 1
+
+ def __init__(self, time, type):
+ self.time = time
+ self.type = type
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -381,7 +381,8 @@
vmrr = l.get_last_vmrr()
self.vm_scheduler.cancel_vm(vmrr)
l.remove_vmrr(vmrr)
- # TODO: This earliest is sure to change to something else
+ # TODO: Clean up (transfers, etc.)
+ l.state = Lease.STATE_PENDING
self.__schedule_lease(l, nexttime)
def is_queue_empty(self):
@@ -451,15 +452,13 @@
# Figure out earliest start times based on
# image schedule and reusable images
earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
- elif lease_state == Lease.STATE_READY or lease_state == Lease.STATE_SCHEDULED:
- # This is a lease that is being rescheduled.
- # TODO: The following is not really what has to be done
- earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
elif lease_state == Lease.STATE_SUSPENDED_QUEUED or lease_state == Lease.STATE_SUSPENDED_SCHEDULED:
# No need to transfer images from repository
# (only intra-node transfer)
- migr_time = self.vm_scheduler.estimate_migration_time(lease)
- earliest = dict([(node+1, [nexttime + migr_time, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
+ earliest = self.preparation_scheduler.find_earliest_migration_times(lease, nexttime)
+ migr_time = self.preparation_scheduler.estimate_migration_time(lease)
+ for pnode in earliest:
+ earliest[pnode].time += migr_time
else:
raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
@@ -477,7 +476,7 @@
#self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
else:
- preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+ preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
# At this point, the lease is feasible.
# Commit changes by adding RRs to lease and to slot table
@@ -688,4 +687,5 @@
def get_leases_by_state(self, state):
return [e for e in self.entries.values() if e.get_state() == state]
+
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/mapper.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/mapper.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/mapper.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -39,13 +39,12 @@
def __init__(self, slottable, policy):
Mapper.__init__(self, slottable, policy)
- def map(self, lease, requested_resources, start, end, strictend, onlynodes = None):
- aw = self.slottable.get_availability_window(start, onlynodes)
+ def map(self, lease, requested_resources, start, end, strictend):
+ aw = self.slottable.get_availability_window(start)
+
+ nodes = aw.get_nodes_at(start)
- if onlynodes == None:
- onlynodes = aw.get_nodes_at(start)
-
- pnodes = self.policy.sort_hosts(onlynodes, start, lease)
+ pnodes = self.policy.sort_hosts(nodes, start, lease)
vnodes = self.__sort_vnodes(requested_resources)
vnodes.reverse()
leases = aw.get_leases_until(end)
Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/__init__.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/__init__.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/__init__.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -27,4 +27,4 @@
self.logger = logging.getLogger("DEPLOY")
def cleanup(self, lease):
- abstract()
+ abstract()
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -20,25 +20,21 @@
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
from haizea.core.scheduler.slottable import ResourceReservation
from haizea.core.leases import Lease, Capacity
-from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException
+from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
from haizea.common.utils import estimate_transfer_time, get_config
from haizea.core.scheduler.slottable import ResourceTuple
-
import copy
+import bisect
+
class ImageTransferPreparationScheduler(PreparationScheduler):
def __init__(self, slottable, resourcepool, deployment_enact):
PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
- # TODO: The following two should be merged into
- # something like this:
- # self.image_node = self.deployment_enact.get_image_node()
- self.fifo_node = self.deployment_enact.get_fifo_node()
- self.edf_node = self.deployment_enact.get_edf_node()
+ self.imagenode = self.deployment_enact.get_imagenode()
- self.transfers_edf = []
- self.transfers_fifo = []
+ self.transfers = []
self.completed_transfers = []
config = get_config()
@@ -61,30 +57,74 @@
on_start = ImageTransferPreparationScheduler._handle_start_migrate,
on_end = ImageTransferPreparationScheduler._handle_end_migrate)
- def schedule(self, lease, vmrr, nexttime):
+ def schedule(self, lease, vmrr, earliest):
if lease.get_type() == Lease.ADVANCE_RESERVATION:
- return self.__schedule_deadline(lease, vmrr, nexttime)
+ return self.__schedule_deadline(lease, vmrr, earliest)
elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
- return self.__schedule_asap(lease, vmrr, nexttime)
+ return self.__schedule_asap(lease, vmrr, earliest)
+
+ def find_earliest_starting_times(self, lease, nexttime):
+ node_ids = [node.id for node in self.resourcepool.get_nodes()]
+ config = get_config()
+ mechanism = config.get("transfer-mechanism")
+ reusealg = config.get("diskimage-reuse")
+ avoidredundant = config.get("avoid-redundant-transfers")
+
+ # Figure out earliest times assuming we have to transfer the images
+ transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
+ if mechanism == constants.TRANSFER_UNICAST:
+ transfer_duration *= lease.numnodes
+ start = self.__get_next_transfer_slot(nexttime, transfer_duration)
+ earliest = {}
+ for node in node_ids:
+ earliest[node] = ImageTransferEarliestStartingTime(start + transfer_duration, ImageTransferEarliestStartingTime.EARLIEST_IMAGETRANSFER)
+ earliest[node].transfer_start = start
+
+ # Check if we can reuse images
+ if reusealg == constants.REUSE_IMAGECACHES:
+ nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.software.image_id)
+ for node in nodeswithimg:
+ earliest[node].time = nexttime
+ earliest[node].type = ImageTransferEarliestStartingTime.EARLIEST_REUSE
+
+
+ # Check if we can avoid redundant transfers
+ if avoidredundant:
+ if mechanism == constants.TRANSFER_UNICAST:
+ # Piggybacking not supported if unicasting
+ # each individual image
+ pass
+ if mechanism == constants.TRANSFER_MULTICAST:
+ # We can only piggyback on transfers that haven't started yet
+ transfers = [t for t in self.transfers if t.state == ResourceReservation.STATE_SCHEDULED]
+ for t in transfers:
+ if t.file == lease.software.image_id:
+ start = t.end
+ if start > nexttime:
+ for n in earliest:
+ if start < earliest[n].time:
+ earliest[n].time = start
+ earliest[n].type = ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK
+ earliest[n].piggybacking_on = t
+
+ return earliest
def cancel_preparation(self, lease):
- if isinstance(lease, BestEffortLease):
- self.__remove_from_fifo_transfers(lease.id)
- self.cleanup(lease)
-
+ toremove = self.__remove_transfers(lease)
+ for t in toremove:
+ t.lease.remove_preparationrr(t)
+ self.slottable.remove_reservation(t)
+ self.__remove_files(lease)
- def is_ready(self, lease, vmrr):
- return False
+ def cleanup(self, lease):
+ self.__remove_files(lease)
+
- def __schedule_deadline(self, lease, vmrr, nexttime):
+ def __schedule_deadline(self, lease, vmrr, earliest):
config = get_config()
- mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
is_ready = False
-
- if avoidredundant:
- pass # TODO
musttransfer = {}
mustpool = {}
@@ -93,7 +133,7 @@
end = lease.start.requested + lease.duration.requested
for (vnode, pnode) in nodeassignment.items():
lease_id = lease.id
- self.logger.debug("Scheduling image transfer of '%s' from vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
+ self.logger.debug("Scheduling image transfer of '%s' for vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
if reusealg == constants.REUSE_IMAGECACHES:
if self.resourcepool.exists_reusable_image(pnode, lease.diskimage_id, start):
@@ -109,14 +149,10 @@
if len(musttransfer) == 0:
is_ready = True
else:
- if mechanism == constants.TRANSFER_UNICAST:
- pass
- # TODO: Not supported
- elif mechanism == constants.TRANSFER_MULTICAST:
- try:
- filetransfer = self.schedule_imagetransfer_edf(lease, musttransfer, nexttime)
- except NotSchedulableException, exc:
- raise
+ try:
+ transfer_rrs = self.__schedule_imagetransfer_edf(lease, musttransfer, earliest)
+ except NotSchedulableException, exc:
+ raise
# No chance of scheduling exception at this point. It's safe
# to add entries to the pools
@@ -125,30 +161,30 @@
self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- return [filetransfer], is_ready
+ return transfer_rrs, is_ready
- def __schedule_asap(self, lease, vmrr, nexttime):
+
+ def __schedule_asap(self, lease, vmrr, earliest):
config = get_config()
- mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
- earliest = self.find_earliest_starting_times(lease, nexttime)
+
is_ready = False
- transferRRs = []
+ transfer_rrs = []
musttransfer = {}
piggybacking = []
for (vnode, pnode) in vmrr.nodes.items():
- reqtransfer = earliest[pnode][1]
- if reqtransfer == constants.REQTRANSFER_COWPOOL:
+ earliest_type = earliest[pnode].type
+ if earliest_type == ImageTransferEarliestStartingTime.EARLIEST_REUSE:
# Add to pool
self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, vmrr.end)
self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
- elif reqtransfer == constants.REQTRANSFER_PIGGYBACK:
+ elif earliest_type == ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK:
# We can piggyback on an existing transfer
- transferRR = earliest[pnode][2]
- transferRR.piggyback(lease.id, vnode, pnode)
+ transfer_rr = earliest[pnode].piggybacking_on
+ transfer_rr.piggyback(lease.id, vnode, pnode)
self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transferRR.lease.id))
piggybacking.append(transferRR)
else:
@@ -157,238 +193,149 @@
self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
if len(musttransfer)>0:
- transferRRs = self.schedule_imagetransfer_fifo(lease, musttransfer, nexttime)
+ transfer_rrs = self.__schedule_imagetransfer_fifo(lease, musttransfer, earliest)
if len(musttransfer)==0 and len(piggybacking)==0:
is_ready = True
- return transferRRs, is_ready
+ return transfer_rrs, is_ready
- def find_earliest_starting_times(self, lease, nexttime):
- nodIDs = [n.nod_id for n in self.resourcepool.get_nodes()]
- config = get_config()
- mechanism = config.get("transfer-mechanism")
- reusealg = config.get("diskimage-reuse")
- avoidredundant = config.get("avoid-redundant-transfers")
-
- # Figure out starting time assuming we have to transfer the image
- nextfifo = self.get_next_fifo_transfer_time(nexttime)
-
- imgTransferTime=self.estimate_image_transfer_time(lease, self.imagenode_bandwidth)
-
- # Find worst-case earliest start time
- if lease.numnodes == 1:
- startTime = nextfifo + imgTransferTime
- earliest = dict([(node, [startTime, constants.REQTRANSFER_YES]) for node in nodIDs])
- else:
- # Unlike the previous case, we may have to find a new start time
- # for all the nodes.
- if mechanism == constants.TRANSFER_UNICAST:
- pass
- # TODO: If transferring each image individually, this will
- # make determining what images can be reused more complicated.
- if mechanism == constants.TRANSFER_MULTICAST:
- startTime = nextfifo + imgTransferTime
- earliest = dict([(node, [startTime, constants.REQTRANSFER_YES]) for node in nodIDs]) # TODO: Take into account reusable images
-
- # Check if we can reuse images
- if reusealg==constants.REUSE_IMAGECACHES:
- nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.diskimage_id)
- for node in nodeswithimg:
- earliest[node] = [nexttime, constants.REQTRANSFER_COWPOOL]
-
-
- # Check if we can avoid redundant transfers
- if avoidredundant:
- if mechanism == constants.TRANSFER_UNICAST:
- pass
- # TODO
- if mechanism == constants.TRANSFER_MULTICAST:
- # We can only piggyback on transfers that haven't started yet
- transfers = [t for t in self.transfers_fifo if t.state == ResourceReservation.STATE_SCHEDULED]
- for t in transfers:
- if t.file == lease.diskimage_id:
- startTime = t.end
- if startTime > nexttime:
- for n in earliest:
- if startTime < earliest[n]:
- earliest[n] = [startTime, constants.REQTRANSFER_PIGGYBACK, t]
- return earliest
-
- def schedule_imagetransfer_edf(self, req, vnodes, nexttime):
+ def __schedule_imagetransfer_edf(self, lease, musttransfer, earliest):
# Estimate image transfer time
bandwidth = self.deployment_enact.get_bandwidth()
- imgTransferTime=self.estimate_image_transfer_time(req, bandwidth)
+ config = get_config()
+ mechanism = config.get("transfer-mechanism")
+ transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
+ if mechanism == constants.TRANSFER_UNICAST:
+ transfer_duration *= len(musttransfer)
# Determine start time
- activetransfers = [t for t in self.transfers_edf if t.state == ResourceReservation.STATE_ACTIVE]
- if len(activetransfers) > 0:
- startTime = activetransfers[-1].end
- else:
- startTime = nexttime
-
- # TODO: Only save a copy of start/end times, not the whole RR
- transfermap = dict([(copy.copy(t), t) for t in self.transfers_edf if t.state == ResourceReservation.STATE_SCHEDULED])
- newtransfers = transfermap.keys()
-
+ start = self.__get_last_transfer_slot(lease.start.requested, transfer_duration)
+
res = {}
resimgnode = Capacity([constants.RES_NETOUT])
resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
resnode = Capacity([constants.RES_NETIN])
resnode.set_quantity(constants.RES_NETIN, bandwidth)
- res[self.edf_node.nod_id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
- for n in vnodes.values():
- res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
+ res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
+ for pnode in musttransfer.values():
+ res[pnode] = self.slottable.create_resource_tuple_from_capacity(resnode)
- newtransfer = FileTransferResourceReservation(req, res)
- newtransfer.deadline = req.start.requested
+ newtransfer = FileTransferResourceReservation(lease, res)
+ newtransfer.deadline = lease.start.requested
newtransfer.state = ResourceReservation.STATE_SCHEDULED
- newtransfer.file = req.software.image_id
- for vnode, pnode in vnodes.items():
- newtransfer.piggyback(req.id, vnode, pnode)
- newtransfers.append(newtransfer)
-
- def comparedates(x, y):
- dx=x.deadline
- dy=y.deadline
- if dx>dy:
- return 1
- elif dx==dy:
- # If deadlines are equal, we break the tie by order of arrival
- # (currently, we just check if this is the new transfer)
- if x == newtransfer:
- return 1
- elif y == newtransfer:
- return -1
- else:
- return 0
- else:
- return -1
+ newtransfer.file = lease.software.image_id
+ newtransfer.start = start
+ newtransfer.end = start + transfer_duration
+ for vnode, pnode in musttransfer.items():
+ newtransfer.piggyback(lease, vnode, pnode)
- # Order transfers by deadline
- newtransfers.sort(comparedates)
-
- # Compute start times and make sure that deadlines are met
- fits = True
- for t in newtransfers:
- if t == newtransfer:
- duration = imgTransferTime
- else:
- duration = t.end - t.start
-
- t.start = startTime
- t.end = startTime + duration
- if t.end > t.deadline:
- fits = False
- break
- startTime = t.end
-
- if not fits:
- raise NotSchedulableException, "Adding this lease results in an unfeasible image transfer schedule."
-
- # Push image transfers as close as possible to their deadlines.
- feasibleEndTime=newtransfers[-1].deadline
- for t in reversed(newtransfers):
- if t == newtransfer:
- duration = imgTransferTime
- else:
- duration = t.end - t.start
-
- newEndTime=min([t.deadline, feasibleEndTime])
- t.end=newEndTime
- newStartTime=newEndTime-duration
- t.start=newStartTime
- feasibleEndTime=newStartTime
+ bisect.insort(self.transfers, newtransfer)
- # Make changes
- for new_t in newtransfers:
- if new_t == newtransfer:
- self.transfers_edf.append(new_t)
- else:
- t_original = transfermap[new_t]
- old_start = t_original.start
- old_end = t_original.end
- t_original.start = new_t.start
- t_original.end = new_t.end
- self.slottable.update_reservation_with_key_change(t_original, old_start, old_end)
-
- return newtransfer
+ return [newtransfer]
- def schedule_imagetransfer_fifo(self, req, reqtransfers, nexttime):
+ def __schedule_imagetransfer_fifo(self, lease, musttransfer, earliest):
# Estimate image transfer time
bandwidth = self.imagenode_bandwidth
- imgTransferTime=self.estimate_image_transfer_time(req, bandwidth)
config = get_config()
mechanism = config.get("transfer-mechanism")
- startTime = self.get_next_fifo_transfer_time(nexttime)
- newtransfers = []
+ # The starting time is the first available slot, which was
+ # included in the "earliest" dictionary.
+ pnodes = musttransfer.values()
+ start = earliest[pnodes[0]].transfer_start
+ transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
+ res = {}
+ resimgnode = Capacity([constants.RES_NETOUT])
+ resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
+ resnode = Capacity([constants.RES_NETIN])
+ resnode.set_quantity(constants.RES_NETIN, bandwidth)
+ res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
+ for n in musttransfer.values():
+ res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
+
+ newtransfer = FileTransferResourceReservation(lease, res)
+ newtransfer.start = start
if mechanism == constants.TRANSFER_UNICAST:
- pass
- # TODO: If transferring each image individually, this will
- # make determining what images can be reused more complicated.
+ newtransfer.end = start + (len(musttransfer) * transfer_duration)
if mechanism == constants.TRANSFER_MULTICAST:
- # Time to transfer is imagesize / bandwidth, regardless of
- # number of nodes
- res = {}
- resimgnode = Capacity([constants.RES_NETOUT])
- resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
- resnode = Capacity([constants.RES_NETIN])
- resnode.set_quantity(constants.RES_NETIN, bandwidth)
- res[self.fifo_node.nod_id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
- for n in reqtransfers.values():
- res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
-
- newtransfer = FileTransferResourceReservation(req, res)
- newtransfer.start = startTime
- newtransfer.end = startTime+imgTransferTime
- newtransfer.deadline = None
- newtransfer.state = ResourceReservation.STATE_SCHEDULED
- newtransfer.file = req.software.image_id
- for vnode in reqtransfers:
- physnode = reqtransfers[vnode]
- newtransfer.piggyback(req.id, vnode, physnode)
- newtransfers.append(newtransfer)
+ newtransfer.end = start + transfer_duration
+
+ newtransfer.deadline = None
+ newtransfer.state = ResourceReservation.STATE_SCHEDULED
+ newtransfer.file = lease.software.image_id
+ for vnode, pnode in musttransfer.items():
+ newtransfer.piggyback(lease, vnode, pnode)
- self.transfers_fifo += newtransfers
+ bisect.insort(self.transfers, newtransfer)
- return newtransfers
+ return [newtransfer]
- def estimate_image_transfer_time(self, lease, bandwidth):
+
+ def __estimate_image_transfer_time(self, lease, bandwidth):
config = get_config()
- forceTransferTime = config.get("force-imagetransfer-time")
- if forceTransferTime != None:
- return forceTransferTime
+ force_transfer_time = config.get("force-imagetransfer-time")
+ if force_transfer_time != None:
+ return force_transfer_time
else:
return estimate_transfer_time(lease.software.image_size, bandwidth)
- def get_next_fifo_transfer_time(self, nexttime):
- transfers = [t for t in self.transfers_fifo if t.state != ResourceReservation.STATE_DONE]
- if len(transfers) > 0:
- startTime = transfers[-1].end
+
+ def __get_next_transfer_slot(self, nexttime, required_duration):
+ # This can probably be optimized by using one of the many
+ # "list of holes" algorithms out there
+ if len(self.transfers) == 0:
+ return nexttime
+ elif nexttime + required_duration <= self.transfers[0].start:
+ return nexttime
else:
- startTime = nexttime
- return startTime
+ for i in xrange(len(self.transfers) - 1):
+ if self.transfers[i].end != self.transfers[i+1].start:
+ hole_duration = self.transfers[i+1].start - self.transfers[i].end
+ if hole_duration >= required_duration:
+ return self.transfers[i].end
+ return self.transfers[-1].end
+
+
+ def __get_last_transfer_slot(self, deadline, required_duration):
+ # This can probably be optimized by using one of the many
+ # "list of holes" algorithms out there
+ if len(self.transfers) == 0:
+ return deadline - required_duration
+ elif self.transfers[-1].end + required_duration <= deadline:
+ return deadline - required_duration
+ else:
+ for i in xrange(len(self.transfers) - 1, 0, -1):
+ if self.transfers[i].start != self.transfer[i-1].end:
+ hole_duration = self.transfer[i].start - self.transfers[i-1].end
+ if hole_duration >= required_duration:
+ return self.transfer[i].start - required_duration
+ return self.transfers[0].start - required_duration
- def __remove_from_fifo_transfers(self, lease_id):
- transfers = [t for t in self.transfers_fifo if t.state != ResourceReservation.STATE_DONE]
+ def __remove_transfers(self, lease):
+ print lease
toremove = []
- for t in transfers:
+ for t in self.transfers:
for pnode in t.transfers:
leases = [l for l, v in t.transfers[pnode]]
- if lease_id in leases:
- newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease_id]
+ print leases
+ if lease in leases:
+ newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease]
t.transfers[pnode] = newtransfers
# Check if the transfer has to be cancelled
a = sum([len(l) for l in t.transfers.values()])
if a == 0:
- t.lease.removeRR(t)
- self.slottable.removeReservation(t)
toremove.append(t)
for t in toremove:
- self.transfers_fifo.remove(t)
+ self.transfers.remove(t)
+
+ return toremove
+
+ def __remove_files(self, lease):
+ for vnode, pnode in lease.get_last_vmrr().nodes.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
@staticmethod
def _handle_start_filetransfer(sched, lease, rr):
@@ -402,6 +349,8 @@
else:
raise InconsistentLeaseStateError(l, doing = "starting a file transfer")
+ # TODO: Check for piggybacking
+
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
sched.logger.info("Starting image transfer for lease %i" % (lease.id))
@@ -428,10 +377,11 @@
# maxend=end
maxend = None
# TODO: ENACTMENT: Verify the image was transferred correctly
- sched.add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
+ sched._add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
else:
raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
+ sched.transfers.remove(rr)
lease.print_contents()
sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
sched.logger.info("Completed image transfer for lease %i" % (lease.id))
@@ -444,14 +394,14 @@
def _handle_end_migrate(sched, lease, rr):
pass
- def add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
+ def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
pnode = self.resourcepool.get_node(pnode_id)
if self.reusealg == constants.REUSE_NONE:
- for (lease_id, vnode) in vnodes:
- self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
+ for (lease, vnode) in vnodes:
+ self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease.id, vnode)
elif self.reusealg == constants.REUSE_IMAGECACHES:
# Sometimes we might find that the image is already deployed
# (although unused). In that case, don't add another copy to
@@ -493,11 +443,6 @@
pnode.print_files()
- def cleanup(self, lease):
- pass
- # TODO: should get values from VMRR
- #for vnode, pnode in lease.diskimagemap.items():
- # self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
class DiskImageMigrationResourceReservation(ResourceReservation):
pass
@@ -524,4 +469,19 @@
self.transfers[physnode] = [(lease_id, vnode)]
def is_preemptible(self):
- return False
\ No newline at end of file
+ return False
+
+ def __cmp__(self, rr):
+ return cmp(self.start, rr.start)
+
+class ImageTransferEarliestStartingTime(EarliestStartingTime):
+ EARLIEST_IMAGETRANSFER = 2
+ EARLIEST_REUSE = 3
+ EARLIEST_PIGGYBACK = 4
+
+ def __init__(self, time, type):
+ EarliestStartingTime.__init__(self, time, type)
+ self.transfer_start = None
+ self.piggybacking_on = None
+
+
Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -17,6 +17,7 @@
# -------------------------------------------------------------------------- #
from haizea.core.leases import Lease
+from haizea.core.scheduler import EarliestStartingTime
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
import haizea.common.constants as constants
@@ -25,22 +26,21 @@
PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
self.handlers = {}
- # Add dummy disk images
def schedule(self, lease, vmrr, nexttime):
- for (vnode, pnode) in vmrr.nodes.items():
- self.resourcepool.add_diskimage(pnode, "foobar", 100, lease.id, vnode)
+ # Nothing to do
return [], True
def find_earliest_starting_times(self, lease, nexttime):
- nod_ids = [n.nod_id for n in self.resourcepool.get_nodes()]
- earliest = dict([(node, [nexttime, constants.REQTRANSFER_NO, None]) for node in nod_ids])
+ # The earliest starting time is "nexttime" on all nodes.
+ node_ids = [node.id for node in self.resourcepool.get_nodes()]
+ earliest = {}
+ for node in node_ids:
+ earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
return earliest
def cancel_preparation(self, lease):
self.cleanup(lease)
def cleanup(self, lease):
- pass
- # TODO: should get values from VMRR
- #for vnode, pnode in lease.diskimagemap.items():
- # self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
\ No newline at end of file
+ # Nothing to clean up.
+ pass
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/resourcepool.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/resourcepool.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/resourcepool.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -103,7 +103,7 @@
self.vm.verify_resume(verify_resume_action)
def get_nodes(self):
- return self.nodes
+ return self.nodes.values()
# An auxiliary node is a host whose resources are going to be scheduled, but
# where no VMs are actually going to run. For example, a disk image repository node.
@@ -116,8 +116,8 @@
def get_num_nodes(self):
return len(self.nodes)
- def get_node(self, nod_id):
- return self.nodes[nod_id-1]
+ def get_node(self, node_id):
+ return self.nodes[node_id]
def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
@@ -164,12 +164,12 @@
get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
def get_max_disk_usage(self):
- return max([n.get_disk_usage() for n in self.nodes])
+ return max([n.get_disk_usage() for n in self.nodes.values()])
class ResourcePoolNode(object):
- def __init__(self, nod_id, hostname, capacity):
+ def __init__(self, node_id, hostname, capacity):
self.logger = logging.getLogger("RESOURCEPOOL")
- self.nod_id = nod_id
+ self.id = node_id
self.hostname = hostname
self.capacity = capacity
self.files = []
@@ -222,12 +222,12 @@
images = ""
if len(self.files) > 0:
images = ", ".join([str(img) for img in self.files])
- self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
+ self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
def xmlrpc_marshall(self):
# Convert to something we can send through XMLRPC
h = {}
- h["id"] = self.nod_id
+ h["id"] = self.id
h["hostname"] = self.hostname
h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
@@ -302,20 +302,20 @@
# for other images
def get_nodes_with_reusable_image(self, diskimage_id, after = None):
- return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
+ return [n.id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
def exists_reusable_image(self, pnode_id, diskimage_id, after):
return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
- def __init__(self, nod_id, hostname, capacity):
- Node.__init__(self, nod_id, hostname, capacity)
+ def __init__(self, node_id, hostname, capacity):
+ Node.__init__(self, node_id, hostname, capacity)
self.reusable_images = []
@classmethod
def from_node(cls, n):
- node = cls(n.nod_id, n.hostname, n.capacity)
+ node = cls(n.id, n.hostname, n.capacity)
node.enactment_info = n.enactment_info
return node
@@ -385,7 +385,7 @@
images = ""
if len(self.reusable_images) > 0:
images = ", ".join([str(img) for img in self.reusable_images])
- self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
+ self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
class ReusableDiskImageFile(File):
def __init__(self, filename, filesize, diskimage_id, timeout):
Modified: branches/TP2.0/src/haizea/core/scheduler/slottable.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/slottable.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/slottable.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -169,29 +169,7 @@
def __init__(self, capacity):
self.capacity = ResourceTuple.copy(capacity)
-class NodeList(object):
- def __init__(self):
- self.nodelist = []
-
- def add(self, node):
- self.nodelist.append(node)
- def __getitem__(self, n):
- return self.nodelist[n-1]
-
- def __iter__(self):
- return iter(self.nodelist)
-
- def copy(self):
- nodelist = NodeList()
- for n in self.nodelist:
- nodelist.add(Node(n.capacity))
- return nodelist
-
- def to_dict(self):
- nodelist = self.copy()
- return dict([(i+1, v) for i, v in enumerate(nodelist)])
-
class KeyValueWrapper(object):
def __init__(self, key, value):
self.key = key
@@ -201,10 +179,6 @@
return cmp(self.key, other.key)
-
-
-
-
class SlotTable(object):
"""Slot Table
@@ -224,7 +198,7 @@
def __init__(self, resource_types):
self.logger = logging.getLogger("SLOT")
- self.nodes = NodeList()
+ self.nodes = {}
self.resource_types = resource_types
self.reservations_by_start = []
self.reservations_by_end = []
@@ -242,8 +216,8 @@
self.rtuple_restype2pos[rt] = pos
pos = pos + 1
- def add_node(self, resourcetuple):
- self.nodes.add(Node(resourcetuple))
+ def add_node(self, node_id, resourcetuple):
+ self.nodes[node_id] = Node(resourcetuple)
def create_empty_resource_tuple(self):
return ResourceTuple(self, [0] * self.rtuple_len)
@@ -270,7 +244,7 @@
return (avail == 0)
def get_total_capacity(self, restype):
- return sum([n.capacity.get_by_type(restype) for n in self.nodes.nodelist])
+ return sum([n.capacity.get_by_type(restype) for n in self.nodes.values()])
def get_reservations_at(self, time):
item = KeyValueWrapper(time, None)
@@ -387,17 +361,13 @@
self.reservations_by_end.pop(posend)
self.__dirty()
- def get_availability(self, time, min_capacity=None, onlynodes=None):
+ def get_availability(self, time, min_capacity=None):
if not self.availabilitycache.has_key(time):
self.__get_availability_cache_miss(time)
# Cache miss
nodes = self.availabilitycache[time]
- if onlynodes != None:
- onlynodes = set(onlynodes)
- nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
-
# Keep only those nodes with enough resources
if min_capacity != None:
newnodes = {}
@@ -468,21 +438,9 @@
else:
return min(time1, time2)
- def get_availability_window(self, start, onlynodes = None):
- cache_miss = False
- if self.awcache == None:
- cache_miss = True
- else:
- if self.awcache_onlynodes != None:
- if onlynodes == None:
- cache_miss = True
- elif not set(onlynodes).issubset(set(self.awcache_onlynodes)):
- cache_miss = True
- if start < self.awcache_time:
- cache_miss = True
-
- if cache_miss:
- self.__get_aw_cache_miss(start, onlynodes)
+ def get_availability_window(self, start):
+ if self.awcache == None or start < self.awcache_time:
+ self.__get_aw_cache_miss(start)
return self.awcache
@@ -539,7 +497,7 @@
def __get_availability_cache_miss(self, time):
- allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
+ allnodes = set(self.nodes.keys())
nodes = {}
reservations = self.get_reservations_at(time)
@@ -558,17 +516,15 @@
self.availabilitycache[time] = nodes
- def __get_aw_cache_miss(self, time, onlynodes):
- self.awcache = AvailabilityWindow(self, time, onlynodes)
+ def __get_aw_cache_miss(self, time):
+ self.awcache = AvailabilityWindow(self, time)
self.awcache_time = time
- self.awcache_onlynodes = onlynodes
def __dirty(self):
# You're a dirty, dirty slot table and you should be
# ashamed of having outdated caches!
self.availabilitycache = {}
self.awcache_time = None
- self.awcache_onlynodes = None
self.awcache = None
@@ -646,21 +602,20 @@
resources will be available?"
"""
- def __init__(self, slottable, time, onlynodes = None):
+ def __init__(self, slottable, time):
self.slottable = slottable
self.logger = logging.getLogger("SLOTTABLE.WIN")
self.time = time
- self.onlynodes = onlynodes
self.leases = set()
-
- self.cp_list = [self.time] + self.slottable.get_changepoints_after(time, nodes=onlynodes)
+ self.cp_list = [self.time] + self.slottable.get_changepoints_after(time)
+
# Create initial changepoint hash table
self.changepoints = dict([(cp,ChangepointAvail()) for cp in self.cp_list])
for cp in self.changepoints.values():
- for node_id, node in enumerate(self.slottable.nodes):
- cp.add_node(node_id + 1, node.capacity)
+ for node_id, node in self.slottable.nodes.items():
+ cp.add_node(node_id, node.capacity)
rrs = self.slottable.get_reservations_after(time)
rrs.sort(key=attrgetter("start"))
Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -146,7 +146,7 @@
# Nodes may not be available at a changepoint because images
# cannot be transferred at that time.
if not mustresume:
- cps = [(node, e[0]) for node, e in earliest.items()]
+ cps = [(node, e.time) for node, e in earliest.items()]
cps.sort(key=itemgetter(1))
curcp = None
changepoints = []
@@ -405,8 +405,7 @@
requested_resources,
start,
end,
- strictend = False,
- onlynodes = onlynodes)
+ strictend = False)
if mapping != None:
if actualend < end:
Modified: branches/TP2.0/src/haizea/policies/host_selection.py
===================================================================
--- branches/TP2.0/src/haizea/policies/host_selection.py 2009-07-15 16:23:35 UTC (rev 606)
+++ branches/TP2.0/src/haizea/policies/host_selection.py 2009-07-16 16:55:07 UTC (rev 607)
@@ -13,7 +13,7 @@
HostSelectionPolicyBase.__init__(self, slottable)
def get_host_score(self, node, time, lease):
- aw = self.slottable.get_availability_window(time, onlynodes = [node])
+ aw = self.slottable.get_availability_window(time)
leases_in_node_horizon = 4
More information about the Haizea-commit
mailing list