[haizea-commit] r429 - in trunk/src/haizea: common resourcemanager resourcemanager/deployment resourcemanager/enact resourcemanager/enact/opennebula resourcemanager/enact/simulated resourcemanager/frontends
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Jul 10 10:36:39 CDT 2008
Author: borja
Date: 2008-07-10 10:36:37 -0500 (Thu, 10 Jul 2008)
New Revision: 429
Modified:
trunk/src/haizea/common/utils.py
trunk/src/haizea/resourcemanager/datastruct.py
trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
trunk/src/haizea/resourcemanager/enact/actions.py
trunk/src/haizea/resourcemanager/enact/base.py
trunk/src/haizea/resourcemanager/enact/opennebula/info.py
trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
trunk/src/haizea/resourcemanager/enact/simulated/info.py
trunk/src/haizea/resourcemanager/enact/simulated/storage.py
trunk/src/haizea/resourcemanager/frontends/opennebula.py
trunk/src/haizea/resourcemanager/frontends/tracefile.py
trunk/src/haizea/resourcemanager/resourcepool.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
trunk/src/haizea/resourcemanager/stats.py
Log:
- Cleaned up datastruct module (docstrings still missing). Multiple modules have been updated since so much stuff depends on the datastruct classes.
- Added "preemptability" attribute to LeaseBase. Updated ResourceManager.__schedule_besteffort accordingly.
- Added ImmediateLease class.
Modified: trunk/src/haizea/common/utils.py
===================================================================
--- trunk/src/haizea/common/utils.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/common/utils.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -94,3 +94,24 @@
f = open (dir + "/" + file, "w")
dump(data, f, protocol = HIGHEST_PROTOCOL)
f.close()
+
+LEASE_ID = 1
+
+def get_lease_id():
+ global LEASE_ID
+ l = LEASE_ID
+ LEASE_ID += 1
+ return l
+
+def pretty_nodemap(nodes):
+ pnodes = list(set(nodes.values()))
+ normmap = [([y[0] for y in nodes.items() if y[1]==x], x) for x in pnodes]
+ for m in normmap: m[0].sort()
+ s = " ".join([", ".join(["V"+`y` for y in x[0]])+" -> P" + `x[1]` for x in normmap])
+ return s
+
+def estimate_transfer_time(size, bandwidth):
+ bandwidthMBs = float(bandwidth) / 8
+ seconds = size / bandwidthMBs
+ return roundDateTimeDelta(DateTime.TimeDelta(seconds = seconds))
+
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/datastruct.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -16,150 +16,62 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+"""This module provides the fundamental data structures (besides the slot table,
+which is in a module of its own) used by Haizea. The module provides four types
+of structures:
+
+* Lease data structures
+ * LeaseBase: Base class for leases
+ * ARLease: Advance reservation lease
+ * BestEffortLease: Best-effort lease
+ * ImmediateLease: Immediate lease
+* Resource reservation (RR) structures:
+ * ResourceReservationBase: Base class for RRs in the slot table
+ * VMResourceReservation: RR representing one or more VMs
+ * SuspensionResourceReservation: RR representing a lease suspension
+ * ResumptionResourceReservation: RR representing a lease resumption
+* Lease containers
+ * Queue: Your run-of-the-mill queue
+ * LeaseTable: Provides easy access to leases in the system
+* Miscellaneous structures
+ * ResourceTuple: A tuple representing a resource usage or requirement
+ * Timestamp: A wrapper around requested/scheduled/actual timestamps
+ * Duration: A wrapper around requested/accumulated/actual durations
+"""
+
from haizea.common.constants import state_str, rstate_str, DS, RES_STATE_SCHEDULED, RES_STATE_ACTIVE, RES_MEM, MIGRATE_NONE, MIGRATE_MEM, MIGRATE_MEMVM, TRANSFER_NONE
-from haizea.common.utils import roundDateTimeDelta
+from haizea.common.utils import roundDateTimeDelta, get_lease_id, pretty_nodemap, estimate_transfer_time
+
from operator import attrgetter
from mx.DateTime import TimeDelta
from math import floor
-leaseID = 1
-def getLeaseID():
- global leaseID
- l = leaseID
- leaseID += 1
- return l
-def resetLeaseID():
- global leaseID
- leaseID = 1
+#-------------------------------------------------------------------#
+# #
+# LEASE DATA STRUCTURES #
+# #
+#-------------------------------------------------------------------#
-def prettyNodemap(nodes):
- pnodes = list(set(nodes.values()))
- normmap = [([y[0] for y in nodes.items() if y[1]==x], x) for x in pnodes]
- for m in normmap: m[0].sort()
- s = " ".join([", ".join(["V"+`y` for y in x[0]])+" -> P" + `x[1]` for x in normmap])
- return s
-class ResourceTuple(object):
- def __init__(self, res):
- self.res = res
-
- @classmethod
- def fromList(cls, l):
- return cls(l[:])
-
- @classmethod
- def copy(cls, rt):
- return cls(rt.res[:])
-
- @classmethod
- def setResourceTypes(cls, resourcetypes):
- cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
- cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
- cls.tuplelength = len(resourcetypes)
-
- @classmethod
- def createEmpty(cls):
- return cls([0 for x in range(cls.tuplelength)])
-
- def fitsIn(self, res2):
- fits = True
- for i in xrange(len(self.res)):
- if self.res[i] > res2.res[i]:
- fits = False
- break
- return fits
-
- def getNumFitsIn(self, res2):
- canfit = 10000 # Arbitrarily large
- for i in xrange(len(self.res)):
- if self.res[i] != 0:
- f = res2.res[i] / self.res[i]
- if f < canfit:
- canfit = f
- return int(floor(canfit))
-
- def decr(self, res2):
- for slottype in xrange(len(self.res)):
- self.res[slottype] -= res2.res[slottype]
-
- def incr(self, res2):
- for slottype in xrange(len(self.res)):
- self.res[slottype] += res2.res[slottype]
-
- def getByType(self, resourcetype):
- return self.res[self.type2pos[resourcetype]]
-
- def setByType(self, resourcetype, value):
- self.res[self.type2pos[resourcetype]] = value
-
- def isZeroOrLess(self):
- return sum([v for v in self.res]) <= 0
-
- def __repr__(self):
- r=""
- for i, x in enumerate(self.res):
- r += "%s:%.2f " % (self.descriptions[i], x)
- return r
-
-class Timestamp(object):
- def __init__(self, requested):
- self.requested = requested
- self.scheduled = None
- self.actual = None
-
- def __repr__(self):
- return "REQ: %s | SCH: %s | ACT: %s" % (self.requested, self.scheduled, self.actual)
-
-class Duration(object):
- def __init__(self, requested, known=None):
- self.original = requested
- self.requested = requested
- self.accumulated = TimeDelta()
- self.actual = None
- # The following is ONLY used in simulation
- self.known = known
-
- def incr(self, t):
- self.requested += t
- if self.known != None:
- self.known += t
-
- def incr_by_percent(self, pct):
- factor = 1 + float(pct)/100
- self.requested = roundDateTimeDelta(self.requested * factor)
- if self.known != None:
- self.requested = roundDateTimeDelta(self.known * factor)
-
- def accumulateDuration(self, t):
- self.accumulated += t
-
- def getRemainingDuration(self):
- return self.requested - self.accumulated
-
- # ONLY in simulation
- def getRemainingKnownDuration(self):
- return self.known - self.accumulated
-
- def __repr__(self):
- return "REQ: %s | ACC: %s | ACT: %s | KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
-
class LeaseBase(object):
- def __init__(self, tSubmit, start, duration, diskImageID, diskImageSize, numnodes, resreq):
+ def __init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, requested_resources, preemptible):
# Lease ID (read only)
- self.leaseID = getLeaseID()
+ self.id = get_lease_id()
# Request attributes (read only)
- self.tSubmit = tSubmit
+ self.submit_time = submit_time
self.start = start
self.duration = duration
self.end = None
- self.diskImageID = diskImageID
- self.diskImageSize = diskImageSize
+ self.diskimage_id = diskimage_id
+ self.diskimage_size = diskimage_size
# TODO: The following assumes homogeneous nodes. Should be modified
# to account for heterogeneous nodes.
self.numnodes = numnodes
- self.resreq = resreq
+ self.requested_resources = requested_resources
+ self.preemptible = preemptible
# Bookkeeping attributes
# (keep track of the lease's state, resource reservations, etc.)
@@ -169,109 +81,113 @@
self.rr = []
# Enactment information. Should only be manipulated by enactment module
- self.enactmentInfo = None
- self.vnodeEnactmentInfo = None
+ self.enactment_info = None
+ self.vnode_enactment_info = None
-
- def setScheduler(self, scheduler):
+ # TODO: Remove the link to the scheduler, and pass all necessary information
+ # as parameters to methods.
+ def set_scheduler(self, scheduler):
self.scheduler = scheduler
self.logger = scheduler.rm.logger
- def printContents(self, loglevel="EXTREMEDEBUG"):
- self.logger.log(loglevel, "Lease ID : %i" % self.leaseID, DS)
- self.logger.log(loglevel, "Submission time: %s" % self.tSubmit, DS)
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ self.logger.log(loglevel, "Lease ID : %i" % self.id, DS)
+ self.logger.log(loglevel, "Submission time: %s" % self.submit_time, DS)
+ self.logger.log(loglevel, "Duration : %s" % self.duration, DS)
self.logger.log(loglevel, "State : %s" % state_str(self.state), DS)
- self.logger.log(loglevel, "VM image : %s" % self.diskImageID, DS)
- self.logger.log(loglevel, "VM image size : %s" % self.diskImageSize, DS)
+ self.logger.log(loglevel, "VM image : %s" % self.diskimage_id, DS)
+ self.logger.log(loglevel, "VM image size : %s" % self.diskimage_size, DS)
self.logger.log(loglevel, "Num nodes : %s" % self.numnodes, DS)
- self.logger.log(loglevel, "Resource req : %s" % self.resreq, DS)
- self.logger.log(loglevel, "VM image map : %s" % prettyNodemap(self.vmimagemap), DS)
- self.logger.log(loglevel, "Mem image map : %s" % prettyNodemap(self.memimagemap), DS)
+ self.logger.log(loglevel, "Resource req : %s" % self.requested_resources, DS)
+ self.logger.log(loglevel, "VM image map : %s" % pretty_nodemap(self.vmimagemap), DS)
+ self.logger.log(loglevel, "Mem image map : %s" % pretty_nodemap(self.memimagemap), DS)
- def printRR(self, loglevel="EXTREMEDEBUG"):
+ def print_rrs(self, loglevel="EXTREMEDEBUG"):
self.logger.log(loglevel, "RESOURCE RESERVATIONS", DS)
self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~", DS)
for r in self.rr:
- r.printContents(loglevel)
+ r.print_contents(loglevel)
self.logger.log(loglevel, "##", DS)
- def appendRR(self, rr):
- self.rr.append(rr)
- def hasStartingReservations(self, time):
- return len(self.getStartingReservations(time)) > 0
+ def has_starting_reservations(self, time):
+ return len(self.get_starting_reservations(time)) > 0
- def hasEndingReservations(self, time):
- return len(self.getEndingReservations(time)) > 0
+ def has_ending_reservations(self, time):
+ return len(self.get_ending_reservations(time)) > 0
- def getStartingReservations(self, time):
+ def get_starting_reservations(self, time):
return [r for r in self.rr if r.start <= time and r.state == RES_STATE_SCHEDULED]
- def getEndingReservations(self, time):
+ def get_ending_reservations(self, time):
return [r for r in self.rr if r.end <= time and r.state == RES_STATE_ACTIVE]
+
+ def get_endtime(self):
+ vmrr, resrr = self.get_last_vmrr()
+ return vmrr.end
+
- def getLastVMRR(self):
+ def append_rr(self, rr):
+ self.rr.append(rr)
+
+ def next_rrs(self, rr):
+ return self.rr[self.rr.index(rr)+1:]
+
+ def prev_rr(self, rr):
+ return self.rr[self.rr.index(rr)-1]
+
+ def get_last_vmrr(self):
if isinstance(self.rr[-1],VMResourceReservation):
return (self.rr[-1], None)
elif isinstance(self.rr[-1],SuspensionResourceReservation):
return (self.rr[-2], self.rr[-1])
-
- def getEnd(self):
- vmrr, resrr = self.getLastVMRR()
- return vmrr.end
-
- def nextRRs(self, rr):
- return self.rr[self.rr.index(rr)+1:]
- def prevRR(self, rr):
- return self.rr[self.rr.index(rr)-1]
-
- def replaceRR(self, rrold, rrnew):
+ def replace_rr(self, rrold, rrnew):
self.rr[self.rr.index(rrold)] = rrnew
- def removeRR(self, rr):
+ def remove_rr(self, rr):
if not rr in self.rr:
raise Exception, "Tried to remove an RR not contained in this lease"
else:
self.rr.remove(rr)
- def removeRRs(self):
+ def clear_rrs(self):
self.rr = []
- def addBootOverhead(self, t):
+
+ def add_boot_overhead(self, t):
self.duration.incr(t)
+
+ def add_runtime_overhead(self, percent):
+ self.duration.incr_by_percent(percent)
- def estimateSuspendResumeTime(self, rate):
- time = float(self.resreq.getByType(RES_MEM)) / rate
+
+ def estimate_suspend_resume_time(self, rate):
+ time = float(self.requested_resources.get_by_type(RES_MEM)) / rate
time = roundDateTimeDelta(TimeDelta(seconds = time))
return time
- # TODO: Should be in common package
- def estimateTransferTime(self, size):
- bandwidth = self.scheduler.rm.config.getBandwidth()
- bandwidthMBs = float(bandwidth) / 8
- seconds = size / bandwidthMBs
- return roundDateTimeDelta(TimeDelta(seconds = seconds))
-
- def estimateImageTransferTime(self):
+ # TODO: Factor out into deployment modules
+ def estimate_image_transfer_time(self, bandwidth):
forceTransferTime = self.scheduler.rm.config.getForceTransferTime()
if forceTransferTime != None:
return forceTransferTime
else:
- return self.estimateTransferTime(self.diskImageSize)
+ return estimate_transfer_time(self.diskimage_size, bandwidth)
- def estimateMigrationTime(self):
+ def estimate_migration_time(self, bandwidth):
whattomigrate = self.scheduler.rm.config.getMustMigrate()
if whattomigrate == MIGRATE_NONE:
return TimeDelta(seconds=0)
else:
if whattomigrate == MIGRATE_MEM:
- mbtotransfer = self.resreq.getByType(RES_MEM)
+ mbtotransfer = self.requested_resources.get_by_type(RES_MEM)
elif whattomigrate == MIGRATE_MEMVM:
- mbtotransfer = self.diskImageSize + self.resreq.getByType(RES_MEM)
- return self.estimateTransferTime(mbtotransfer)
+ mbtotransfer = self.diskimage_size + self.requested_resources.get_by_type(RES_MEM)
+ return estimate_transfer_time(mbtotransfer, bandwidth)
- def getSuspendThreshold(self, initial, suspendrate, migrating=False):
+ # TODO: This whole function has to be rethought
+ def get_suspend_threshold(self, initial, suspendrate, migrating=False, bandwidth=None):
threshold = self.scheduler.rm.config.getSuspendThreshold()
if threshold != None:
# If there is a hard-coded threshold, use that
@@ -288,89 +204,124 @@
# Overestimating, just in case (taking into account that the lease may be
# resumed, but also suspended again)
if migrating:
- threshold = self.estimateMigrationTime() + self.estimateSuspendResumeTime(suspendrate) * 2
+ threshold = self.estimate_suspend_resume_time(suspendrate) * 2
+ #threshold = self.estimate_migration_time(bandwidth) + self.estimate_suspend_resume_time(suspendrate) * 2
else:
- threshold = self.estimateSuspendResumeTime(suspendrate) * 2
+ threshold = self.estimate_suspend_resume_time(suspendrate) * 2
else:
#threshold = self.scheduler.rm.config.getBootOverhead() + deploytime + self.estimateSuspendResumeTime(suspendrate)
- threshold = self.scheduler.rm.config.getBootOverhead() + self.estimateSuspendResumeTime(suspendrate)
+ threshold = self.scheduler.rm.config.getBootOverhead() + self.estimate_suspend_resume_time(suspendrate)
factor = self.scheduler.rm.config.getSuspendThresholdFactor() + 1
return roundDateTimeDelta(threshold * factor)
- def addRuntimeOverhead(self, percent):
- self.duration.incr_by_percent(percent)
+
class ARLease(LeaseBase):
- def __init__(self, tSubmit, tStart, dur, diskImageID, diskImageSize, numnodes, resreq, realdur = None):
- start = Timestamp(tStart)
- duration = Duration(dur)
+ def __init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # AR-specific parameters:
+ realdur = None):
+ start = Timestamp(start)
+ duration = Duration(duration)
duration.known = realdur # ONLY for simulation
- LeaseBase.__init__(self, tSubmit, start, duration, diskImageID, diskImageSize, numnodes, resreq)
+ LeaseBase.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
- def printContents(self, loglevel="EXTREMEDEBUG"):
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
self.logger.log(loglevel, "__________________________________________________", DS)
- LeaseBase.printContents(self, loglevel)
+ LeaseBase.print_contents(self, loglevel)
self.logger.log(loglevel, "Type : AR", DS)
self.logger.log(loglevel, "Start time : %s" % self.start, DS)
- self.logger.log(loglevel, "Duration : %s" % self.duration, DS)
- self.printRR(loglevel)
+ self.print_rrs(loglevel)
self.logger.log(loglevel, "--------------------------------------------------", DS)
class BestEffortLease(LeaseBase):
- def __init__(self, tSubmit, reqdur, diskImageID, diskImageSize, numnodes, resreq, realdur = None):
- start = Timestamp(None) # i.e., start on a best-effort bais
- duration = Duration(reqdur)
+ def __init__(self, submit_time, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # BE-specific parameters:
+ realdur = None):
+ start = Timestamp(None) # i.e., start on a best-effort basis
+ duration = Duration(duration)
duration.known = realdur # ONLY for simulation
# When the images will be available
self.imagesavail = None
- LeaseBase.__init__(self, tSubmit, start, duration, diskImageID, diskImageSize, numnodes, resreq)
+ LeaseBase.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
- def printContents(self, loglevel="EXTREMEDEBUG"):
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
self.logger.log(loglevel, "__________________________________________________", DS)
- LeaseBase.printContents(self, loglevel)
+ LeaseBase.print_contents(self, loglevel)
self.logger.log(loglevel, "Type : BEST-EFFORT", DS)
- self.logger.log(loglevel, "Duration : %s" % self.duration, DS)
self.logger.log(loglevel, "Images Avail @ : %s" % self.imagesavail, DS)
- self.printRR(loglevel)
+ self.print_rrs(loglevel)
self.logger.log(loglevel, "--------------------------------------------------", DS)
- def getSlowdown(self, end, bound=10):
-# timeOnDedicated = self.timeOnDedicated
-# timeOnLoaded = end - self.tSubmit
-# bound = TimeDelta(seconds=bound)
-# if timeOnDedicated < bound:
-# timeOnDedicated = bound
-# return timeOnLoaded / timeOnDedicated
- return 42
+ def get_waiting_time(self):
+ return self.start.actual - self.submit_time
+
+ def get_slowdown(self, bound=10):
+ time_on_dedicated = self.duration.original
+ time_on_loaded = self.end - self.submit_time
+ bound = TimeDelta(seconds=bound)
+ if time_on_dedicated < bound:
+ time_on_dedicated = bound
+ return time_on_loaded / time_on_dedicated
+
+class ImmediateLease(LeaseBase):
+ def __init__(self, submit_time, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # Immediate-specific parameters:
+ realdur = None):
+ start = Timestamp(None) # i.e., start on a best-effort basis
+ duration = Duration(duration)
+ duration.known = realdur # ONLY for simulation
+ LeaseBase.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
+
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ self.logger.log(loglevel, "__________________________________________________", DS)
+ LeaseBase.print_contents(self, loglevel)
+ self.logger.log(loglevel, "Type : IMMEDIATE", DS)
+ self.print_rrs(loglevel)
+ self.logger.log(loglevel, "--------------------------------------------------", DS)
+
+
+#-------------------------------------------------------------------#
+# #
+# RESOURCE RESERVATION #
+# DATA STRUCTURES #
+# #
+#-------------------------------------------------------------------#
+
class ResourceReservationBase(object):
def __init__(self, lease, start, end, res):
+ self.lease = lease
self.start = start
self.end = end
- self.lease = lease
self.state = None
- self.res = res
+ self.resources_in_pnode = res
self.logger = lease.scheduler.rm.logger
- def printContents(self, loglevel="EXTREMEDEBUG"):
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
self.logger.log(loglevel, "Start : %s" % self.start, DS)
self.logger.log(loglevel, "End : %s" % self.end, DS)
self.logger.log(loglevel, "State : %s" % rstate_str(self.state), DS)
- self.logger.log(loglevel, "Resources : \n%s" % "\n".join(["N%i: %s" %(i, x) for i, x in self.res.items()]), DS)
+ self.logger.log(loglevel, "Resources : \n%s" % "\n".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]), DS)
class VMResourceReservation(ResourceReservationBase):
- def __init__(self, lease, start, end, nodes, res, oncomplete, backfillres):
+ def __init__(self, lease, start, end, nodes, res, oncomplete, backfill_reservation):
ResourceReservationBase.__init__(self, lease, start, end, res)
self.nodes = nodes
self.oncomplete = oncomplete
- self.backfillres = backfillres
+ self.backfill_reservation = backfill_reservation
# ONLY for simulation
if lease.duration.known != None:
- remdur = lease.duration.getRemainingKnownDuration()
+ remdur = lease.duration.get_remaining_known_duration()
rrdur = self.end - self.start
if remdur < rrdur:
self.prematureend = self.start + remdur
@@ -379,19 +330,16 @@
else:
self.prematureend = None
- def printContents(self, loglevel="EXTREMEDEBUG"):
- ResourceReservationBase.printContents(self, loglevel)
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ ResourceReservationBase.print_contents(self, loglevel)
if self.prematureend != None:
self.logger.log(loglevel, "Premature end : %s" % self.prematureend, DS)
self.logger.log(loglevel, "Type : VM", DS)
- self.logger.log(loglevel, "Nodes : %s" % prettyNodemap(self.nodes), DS)
+ self.logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes), DS)
self.logger.log(loglevel, "On Complete : %s" % self.oncomplete, DS)
- def isPreemptible(self):
- if isinstance(self.lease,BestEffortLease):
- return True
- elif isinstance(self.lease, ARLease):
- return False
+ def is_preemptible(self):
+ return self.lease.preemptible
class SuspensionResourceReservation(ResourceReservationBase):
@@ -399,12 +347,15 @@
ResourceReservationBase.__init__(self, lease, start, end, res)
self.nodes = nodes
- def printContents(self, loglevel="EXTREMEDEBUG"):
- ResourceReservationBase.printContents(self, loglevel)
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ ResourceReservationBase.print_contents(self, loglevel)
self.logger.log(loglevel, "Type : SUSPEND", DS)
- self.logger.log(loglevel, "Nodes : %s" % prettyNodemap(self.nodes), DS)
+ self.logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes), DS)
- def isPreemptible(self):
+ # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
+ # has wider implications (with a non-trivial handling). For now, we leave them
+ # as non-preemptible, since the probability of preempting a suspension RR is slim.
+ def is_preemptible(self):
return False
class ResumptionResourceReservation(ResourceReservationBase):
@@ -412,51 +363,66 @@
ResourceReservationBase.__init__(self, lease, start, end, res)
self.nodes = nodes
- def printContents(self, loglevel="EXTREMEDEBUG"):
- ResourceReservationBase.printContents(self, loglevel)
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ ResourceReservationBase.print_contents(self, loglevel)
self.logger.log(loglevel, "Type : RESUME", DS)
- self.logger.log(loglevel, "Nodes : %s" % prettyNodemap(self.nodes), DS)
+ self.logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes), DS)
- def isPreemptible(self):
+ # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
+ # has wider implications (with a non-trivial handling). For now, we leave them
+ # as non-preemptible, since the probability of preempting a suspension RR is slim.
+ def is_preemptible(self):
return False
+
+#-------------------------------------------------------------------#
+# #
+# LEASE CONTAINERS #
+# #
+#-------------------------------------------------------------------#
+
class Queue(object):
def __init__(self, scheduler):
self.scheduler = scheduler
- self.q = []
+ self.__q = []
- def isEmpty(self):
- return len(self.q)==0
+ def is_empty(self):
+ return len(self.__q)==0
def enqueue(self, r):
- self.q.append(r)
+ self.__q.append(r)
def dequeue(self):
- return self.q.pop(0)
+ return self.__q.pop(0)
- def enqueueInOrder(self, r):
- self.q.append(r)
- self.q.sort(key=attrgetter("leaseID"))
+ def enqueue_in_order(self, r):
+ self.__q.append(r)
+ self.__q.sort(key=attrgetter("submit_time"))
+ def length(self):
+ return len(self.__q)
+
+ def __iter__(self):
+ return iter(self.__q)
class LeaseTable(object):
def __init__(self, scheduler):
self.scheduler = scheduler
self.entries = {}
- def getLease(self, leaseID):
- return self.entries[leaseID]
+ def get_lease(self, lease_id):
+ return self.entries[lease_id]
- def isEmpty(self):
+ def is_empty(self):
return len(self.entries)==0
def remove(self, lease):
- del self.entries[lease.leaseID]
+ del self.entries[lease.id]
def add(self, lease):
- self.entries[lease.leaseID] = lease
+ self.entries[lease.id] = lease
- def getLeases(self, type=None):
+ def get_leases(self, type=None):
if type==None:
return self.entries.values()
else:
@@ -487,15 +453,114 @@
leases2.update(l)
return list(leases2)
- def getPercentSubmittedTime(self, percent, leasetype=None):
- leases = self.entries.values()
- leases.sort(key=attrgetter("tSubmit"))
- if leasetype != None:
- leases = [l for l in leases if isinstance(l, leasetype)]
- pos = int((len(leases) * (percent / 100.0)) - 1)
- firstsubmission = leases[0].tSubmit
- pctsubmission = leases[pos].tSubmit
- return (pctsubmission - firstsubmission).seconds
+#-------------------------------------------------------------------#
+# #
+# MISCELLANEOUS DATA STRUCTURES CONTAINERS #
+# #
+#-------------------------------------------------------------------#
+
+class ResourceTuple(object):
+ def __init__(self, res):
+ self._res = res
+
+ @classmethod
+ def from_list(cls, l):
+ return cls(l[:])
- def getLastSubmissionTime(self, leasetype=None):
- return self.getPercentSubmittedTime(100, leasetype)
+ @classmethod
+ def copy(cls, rt):
+ return cls(rt._res[:])
+
+ @classmethod
+ def set_resource_types(cls, resourcetypes):
+ cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
+ cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
+ cls.tuplelength = len(resourcetypes)
+
+ @classmethod
+ def create_empty(cls):
+ return cls([0 for x in range(cls.tuplelength)])
+
+ def fits_in(self, res2):
+ fits = True
+ for i in xrange(len(self._res)):
+ if self._res[i] > res2._res[i]:
+ fits = False
+ break
+ return fits
+
+ def get_num_fits_in(self, res2):
+ canfit = 10000 # Arbitrarily large
+ for i in xrange(len(self._res)):
+ if self._res[i] != 0:
+ f = res2._res[i] / self._res[i]
+ if f < canfit:
+ canfit = f
+ return int(floor(canfit))
+
+ def decr(self, res2):
+ for slottype in xrange(len(self._res)):
+ self._res[slottype] -= res2._res[slottype]
+
+ def incr(self, res2):
+ for slottype in xrange(len(self._res)):
+ self._res[slottype] += res2._res[slottype]
+
+ def get_by_type(self, resourcetype):
+ return self._res[self.type2pos[resourcetype]]
+
+ def set_by_type(self, resourcetype, value):
+ self._res[self.type2pos[resourcetype]] = value
+
+ def is_zero_or_less(self):
+ return sum([v for v in self._res]) <= 0
+
+ def __repr__(self):
+ r=""
+ for i, x in enumerate(self._res):
+ r += "%s:%.2f " % (self.descriptions[i], x)
+ return r
+
+
+class Timestamp(object):
+ def __init__(self, requested):
+ self.requested = requested
+ self.scheduled = None
+ self.actual = None
+
+ def __repr__(self):
+ return "REQ: %s | SCH: %s | ACT: %s" % (self.requested, self.scheduled, self.actual)
+
+class Duration(object):
+ def __init__(self, requested, known=None):
+ self.original = requested
+ self.requested = requested
+ self.accumulated = TimeDelta()
+ self.actual = None
+ # The following is ONLY used in simulation
+ self.known = known
+
+ def incr(self, t):
+ self.requested += t
+ if self.known != None:
+ self.known += t
+
+ def incr_by_percent(self, pct):
+ factor = 1 + float(pct)/100
+ self.requested = roundDateTimeDelta(self.requested * factor)
+ if self.known != None:
+ self.requested = roundDateTimeDelta(self.known * factor)
+
+ def accumulate_duration(self, t):
+ self.accumulated += t
+
+ def get_remaining_duration(self):
+ return self.requested - self.accumulated
+
+ # ONLY in simulation
+ def get_remaining_known_duration(self):
+ return self.known - self.accumulated
+
+ def __repr__(self):
+ return "REQ: %s | ACC: %s | ACT: %s | KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
+
Modified: trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -42,7 +42,7 @@
def cancel_deployment(self, lease):
if isinstance(lease, BestEffortLease):
- self.__remove_from_fifo_transfers(lease.leaseID)
+ self.__remove_from_fifo_transfers(lease.id)
def schedule_for_ar(self, lease, vmrr, nexttime):
config = self.scheduler.rm.config
@@ -61,7 +61,7 @@
start = lease.start.requested
end = lease.start.requested + lease.duration.requested
for (vnode, pnode) in nodeassignment.items():
- leaseID = lease.leaseID
+ lease_id = lease.id
self.logger.debug("Scheduling image transfer of '%s' from vnode %i to physnode %i" % (lease.diskImageID, vnode, pnode), constants.SCHED)
if reusealg == constants.REUSE_IMAGECACHES:
@@ -87,7 +87,7 @@
# We've already scheduled a transfer to this node. Reuse it.
self.logger.debug("No need to schedule an image transfer (reusing an existing transfer)", constants.SCHED)
transferRR = transferRRs[pnode]
- transferRR.piggyback(leaseID, vnode, pnode, end)
+ transferRR.piggyback(lease_id, vnode, pnode, end)
else:
filetransfer = self.scheduleImageTransferEDF(lease, {vnode:pnode}, nexttime)
transferRRs[pnode] = filetransfer
@@ -100,7 +100,7 @@
# to add entries to the pools
if reusealg == constants.REUSE_IMAGECACHES:
for (vnode, pnode) in mustpool.items():
- self.resourcepool.addToPool(pnode, lease.diskImageID, leaseID, vnode, start)
+ self.resourcepool.addToPool(pnode, lease.diskImageID, lease_id, vnode, start)
def schedule_for_besteffort(self, lease, vmrr, nexttime):
config = self.scheduler.rm.config
@@ -117,12 +117,12 @@
if reqtransfer == constants.REQTRANSFER_COWPOOL:
# Add to pool
self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode), constants.SCHED)
- self.resourcepool.addToPool(pnode, lease.diskImageID, lease.leaseID, vnode, vmrr.end)
+ self.resourcepool.addToPool(pnode, lease.diskImageID, lease.id, vnode, vmrr.end)
elif reqtransfer == constants.REQTRANSFER_PIGGYBACK:
# We can piggyback on an existing transfer
transferRR = earliest[pnode][2]
- transferRR.piggyback(lease.leaseID, vnode, pnode)
- self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transferRR.lease.leaseID), constants.SCHED)
+ transferRR.piggyback(lease.id, vnode, pnode)
+ self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transferRR.lease.id), constants.SCHED)
piggybacking.append(transferRR)
else:
# Transfer
@@ -216,10 +216,10 @@
newtransfers = transfermap.keys()
res = {}
- resimgnode = ds.ResourceTuple.createEmpty()
- resimgnode.setByType(constants.RES_NETOUT, bandwidth)
- resnode = ds.ResourceTuple.createEmpty()
- resnode.setByType(constants.RES_NETIN, bandwidth)
+ resimgnode = ds.ResourceTuple.create_empty()
+ resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
+ resnode = ds.ResourceTuple.create_empty()
+ resnode.set_by_type(constants.RES_NETIN, bandwidth)
res[self.slottable.EDFnode] = resimgnode
for n in vnodes.values():
res[n] = resnode
@@ -229,7 +229,7 @@
newtransfer.state = constants.RES_STATE_SCHEDULED
newtransfer.file = req.diskImageID
for vnode, pnode in vnodes.items():
- newtransfer.piggyback(req.leaseID, vnode, pnode)
+ newtransfer.piggyback(req.id, vnode, pnode)
newtransfers.append(newtransfer)
def comparedates(x, y):
@@ -315,10 +315,10 @@
# Time to transfer is imagesize / bandwidth, regardless of
# number of nodes
res = {}
- resimgnode = ds.ResourceTuple.createEmpty()
- resimgnode.setByType(constants.RES_NETOUT, bandwidth)
- resnode = ds.ResourceTuple.createEmpty()
- resnode.setByType(constants.RES_NETIN, bandwidth)
+ resimgnode = ds.ResourceTuple.create_empty()
+ resimgnode.set_by_type(constants.RES_NETOUT, bandwidth)
+ resnode = ds.ResourceTuple.create_empty()
+ resnode.set_by_type(constants.RES_NETIN, bandwidth)
res[self.slottable.FIFOnode] = resimgnode
for n in reqtransfers.values():
res[n] = resnode
@@ -330,7 +330,7 @@
newtransfer.file = req.diskImageID
for vnode in reqtransfers:
physnode = reqtransfers[vnode]
- newtransfer.piggyback(req.leaseID, vnode, physnode)
+ newtransfer.piggyback(req.id, vnode, physnode)
self.slottable.addReservation(newtransfer)
newtransfers.append(newtransfer)
@@ -346,14 +346,14 @@
startTime = nexttime
return startTime
- def __remove_from_fifo_transfers(self, leaseID):
+ def __remove_from_fifo_transfers(self, lease_id):
transfers = [t for t in self.transfersFIFO if t.state != constants.RES_STATE_DONE]
toremove = []
for t in transfers:
for pnode in t.transfers:
leases = [l for l, v in t.transfers[pnode]]
- if leaseID in leases:
- newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=leaseID]
+ if lease_id in leases:
+ newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease_id]
t.transfers[pnode] = newtransfers
# Check if the transfer has to be cancelled
a = sum([len(l) for l in t.transfers.values()])
@@ -366,23 +366,23 @@
@staticmethod
def handle_start_filetransfer(sched, lease, rr):
- sched.rm.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.leaseID, constants.SCHED)
- lease.printContents()
+ sched.rm.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id, constants.SCHED)
+ lease.print_contents()
if lease.state == constants.LEASE_STATE_SCHEDULED or lease.state == constants.LEASE_STATE_DEPLOYED:
lease.state = constants.LEASE_STATE_DEPLOYING
rr.state = constants.RES_STATE_ACTIVE
# TODO: Enactment
elif lease.state == constants.LEASE_STATE_SUSPENDED:
pass # This shouldn't happen
- lease.printContents()
+ lease.print_contents()
sched.updateNodeTransferState(rr.transfers.keys(), constants.DOING_TRANSFER)
- sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.leaseID, constants.SCHED)
- sched.logger.info("Starting image transfer for lease %i" % (lease.leaseID), constants.SCHED)
+ sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id, constants.SCHED)
+ sched.logger.info("Starting image transfer for lease %i" % (lease.id), constants.SCHED)
@staticmethod
def handle_end_filetransfer(sched, lease, rr):
- sched.rm.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.leaseID, constants.SCHED)
- lease.printContents()
+ sched.rm.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id, constants.SCHED)
+ lease.print_contents()
if lease.state == constants.LEASE_STATE_DEPLOYING:
lease.state = constants.LEASE_STATE_DEPLOYED
rr.state = constants.RES_STATE_DONE
@@ -390,16 +390,16 @@
vnodes = rr.transfers[physnode]
# Update VM Image maps
- for leaseID, v in vnodes:
- lease = sched.scheduledleases.getLease(leaseID)
+ for lease_id, v in vnodes:
+ lease = sched.scheduledleases.getLease(lease_id)
lease.vmimagemap[v] = physnode
# Find out timeout of image. It will be the latest end time of all the
# leases being used by that image.
leases = [l for (l, v) in vnodes]
maxend=None
- for leaseID in leases:
- l = sched.scheduledleases.getLease(leaseID)
+ for lease_id in leases:
+ l = sched.scheduledleases.getLease(lease_id)
end = lease.getEnd()
if maxend==None or end>maxend:
maxend=end
@@ -408,31 +408,31 @@
elif lease.state == constants.LEASE_STATE_SUSPENDED:
pass
# TODO: Migrating
- lease.printContents()
+ lease.print_contents()
sched.updateNodeTransferState(rr.transfers.keys(), constants.DOING_IDLE)
- sched.rm.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.leaseID, constants.SCHED)
- sched.rm.logger.info("Completed image transfer for lease %i" % (lease.leaseID), constants.SCHED)
+ sched.rm.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id, constants.SCHED)
+ sched.rm.logger.info("Completed image transfer for lease %i" % (lease.id), constants.SCHED)
class FileTransferResourceReservation(ResourceReservationBase):
def __init__(self, lease, res, start=None, end=None):
ResourceReservationBase.__init__(self, lease, start, end, res)
self.deadline = None
self.file = None
- # Dictionary of physnode -> [ (leaseID, vnode)* ]
+ # Dictionary of physnode -> [ (lease_id, vnode)* ]
self.transfers = {}
- def printContents(self, loglevel="EXTREMEDEBUG"):
- ResourceReservationBase.printContents(self, loglevel)
+ def print_contents(self, loglevel="EXTREMEDEBUG"):
+ ResourceReservationBase.print_contents(self, loglevel)
self.logger.log(loglevel, "Type : FILE TRANSFER", constants.DS)
self.logger.log(loglevel, "Deadline : %s" % self.deadline, constants.DS)
self.logger.log(loglevel, "File : %s" % self.file, constants.DS)
self.logger.log(loglevel, "Transfers : %s" % self.transfers, constants.DS)
- def piggyback(self, leaseID, vnode, physnode):
+ def piggyback(self, lease_id, vnode, physnode):
if self.transfers.has_key(physnode):
- self.transfers[physnode].append((leaseID, vnode))
+ self.transfers[physnode].append((lease_id, vnode))
else:
- self.transfers[physnode] = [(leaseID, vnode)]
+ self.transfers[physnode] = [(lease_id, vnode)]
def isPreemptible(self):
return False
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/actions.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/actions.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/actions.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -18,18 +18,18 @@
class EnactmentAction(object):
def __init__(self):
- self.leaseHaizeaID = None
- self.leaseEnactmentInfo = None
+ self.lease_haizea_id = None
+ self.lease_enactment_info = None
def fromRR(self, rr):
- self.leaseHaizeaID = rr.lease.leaseID
- self.leaseEnactmentInfo = rr.lease.enactmentInfo
+ self.lease_haizea_id = rr.lease.id
+ self.lease_enactment_info = rr.lease.enactment_info
class VNode(object):
- def __init__(self, enactmentInfo):
- self.enactmentInfo = enactmentInfo
+ def __init__(self, enactment_info):
+ self.enactment_info = enactment_info
self.pnode = None
- self.res = None
+ self.resources = None
self.diskimage = None
class VMEnactmentAction(EnactmentAction):
@@ -40,10 +40,10 @@
def fromRR(self, rr):
EnactmentAction.fromRR(self, rr)
# TODO: This is very kludgy
- if rr.lease.vnodeEnactmentInfo == None:
+ if rr.lease.vnode_enactment_info == None:
self.vnodes = dict([(vnode+1, VNode(None)) for vnode in range(rr.lease.numnodes)])
else:
- self.vnodes = dict([(vnode, VNode(info)) for (vnode, info) in rr.lease.vnodeEnactmentInfo.items()])
+ self.vnodes = dict([(vnode, VNode(info)) for (vnode, info) in rr.lease.vnode_enactment_info.items()])
class VMEnactmentStartAction(VMEnactmentAction):
def __init__(self):
Modified: trunk/src/haizea/resourcemanager/enact/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/base.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/base.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -25,7 +25,7 @@
self.logger = resourcepool.rm.logger
resourcetypes = self.getResourceTypes() #IGNORE:E1111
- ds.ResourceTuple.setResourceTypes(resourcetypes)
+ ds.ResourceTuple.set_resource_types(resourcetypes)
def getNodes(self):
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -44,17 +44,17 @@
nod_id = i+1
enactID = int(host["hid"])
hostname = host["host_name"]
- capacity = ds.ResourceTuple.createEmpty()
- capacity.setByType(constants.RES_DISK, 80000) # OpenNebula currently doesn't provide this
- capacity.setByType(constants.RES_NETIN, 100) # OpenNebula currently doesn't provide this
- capacity.setByType(constants.RES_NETOUT, 100) # OpenNebula currently doesn't provide this
+ capacity = ds.ResourceTuple.create_empty()
+ capacity.set_by_type(constants.RES_DISK, 80000) # OpenNebula currently doesn't provide this
+ capacity.set_by_type(constants.RES_NETIN, 100) # OpenNebula currently doesn't provide this
+ capacity.set_by_type(constants.RES_NETOUT, 100) # OpenNebula currently doesn't provide this
cur.execute("select name, value from host_attributes where id=%i" % enactID)
attrs = cur.fetchall()
for attr in attrs:
name = attr["name"]
if oneattr2haizea.has_key(name):
- capacity.setByType(oneattr2haizea[name], int(attr["value"]))
- capacity.setByType(constants.RES_CPU, capacity.getByType(constants.RES_CPU) / 100.0)
+ capacity.set_by_type(oneattr2haizea[name], int(attr["value"]))
+ capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
node = Node(self.resourcepool, nod_id, hostname, capacity)
node.enactmentInfo = int(enactID)
self.nodes.append(node)
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/storage.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -23,5 +23,5 @@
StorageEnactmentBase.__init__(self, resourcepool)
self.imagepath="/images/playground/borja"
- def resolveToFile(self, leaseID, vnode, diskImageID):
+ def resolveToFile(self, lease_id, vnode, diskImageID):
return "%s/%s/%s.img" % (self.imagepath, diskImageID, diskImageID)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -42,8 +42,8 @@
vmid = action.vnodes[vnode].enactmentInfo
hostID = action.vnodes[vnode].pnode
image = action.vnodes[vnode].diskimage
- cpu = action.vnodes[vnode].res.getByType(constants.RES_CPU)
- memory = action.vnodes[vnode].res.getByType(constants.RES_MEM)
+ cpu = action.vnodes[vnode].resources.getByType(constants.RES_CPU)
+ memory = action.vnodes[vnode].resources.getByType(constants.RES_MEM)
self.logger.debug("Received request to start VM for L%iV%i on host %i, image=%s, cpu=%i, mem=%i"
% (action.leaseHaizeaID, vnode, hostID, image, cpu, memory), constants.ONE)
Modified: trunk/src/haizea/resourcemanager/enact/simulated/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -28,15 +28,15 @@
self.suspendresumerate = config.getSuspendResumeRate()
numnodes = config.getNumPhysicalNodes()
- bandwidth = config.getBandwidth()
+ self.bandwidth = config.getBandwidth()
capacity = self.parseResourcesString(config.getResourcesPerPhysNode())
self.nodes = [Node(self.resourcepool, i+1, "simul-%i" % (i+1), capacity) for i in range(numnodes)]
# Image repository nodes
- imgcapacity = ds.ResourceTuple.createEmpty()
- imgcapacity.setByType(constants.RES_NETOUT, bandwidth)
+ imgcapacity = ds.ResourceTuple.create_empty()
+ imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
self.FIFOnode = Node(self.resourcepool, numnodes+1, "FIFOnode", imgcapacity)
self.EDFnode = Node(self.resourcepool, numnodes+2, "EDFnode", imgcapacity)
@@ -59,11 +59,11 @@
def parseResourcesString(self, resources):
desc2type = dict([(x[2], x[0]) for x in self.getResourceTypes()])
- capacity=ds.ResourceTuple.createEmpty()
+ capacity=ds.ResourceTuple.create_empty()
for r in resources:
resourcename = r.split(",")[0]
resourcecapacity = r.split(",")[1]
- capacity.setByType(desc2type[resourcename], int(resourcecapacity))
+ capacity.set_by_type(desc2type[resourcename], int(resourcecapacity))
return capacity
def getSuspendResumeRate(self):
Modified: trunk/src/haizea/resourcemanager/enact/simulated/storage.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/enact/simulated/storage.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -26,5 +26,5 @@
def __init__(self, resourcepool):
StorageEnactmentBase.__init__(self, resourcepool)
- def resolveToFile(self, leaseID, vnode, diskImageID):
- return "%s/%s-L%iV%i" % (baseWorkingPath, diskImageID, leaseID, vnode)
\ No newline at end of file
+ def resolveToFile(self, lease_id, vnode, diskImageID):
+ return "%s/%s-L%iV%i" % (baseWorkingPath, diskImageID, lease_id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -85,9 +85,9 @@
vmimage = disk[ONE_DISK_SOURCE]
vmimagesize = 0
numnodes = 1
- resreq = ResourceTuple.createEmpty()
- resreq.setByType(constants.RES_CPU, float(attrs[ONE_CPU]))
- resreq.setByType(constants.RES_MEM, int(attrs[ONE_MEMORY]))
+ resreq = ResourceTuple.create_empty()
+ resreq.set_by_type(constants.RES_CPU, float(attrs[ONE_CPU]))
+ resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY]))
duration = haizea_param[HAIZEA_DURATION]
if duration == HAIZEA_DURATION_UNLIMITED:
@@ -110,7 +110,7 @@
# Only one node for now
leasereq.vnodeEnactmentInfo = {}
leasereq.vnodeEnactmentInfo[1] = int(req["oid"])
- leasereq.setScheduler(self.rm.scheduler)
+ leasereq.set_scheduler(self.rm.scheduler)
return leasereq
def create_ar_lease(self, req, attrs, haizea_param):
@@ -132,6 +132,6 @@
# Only one node for now
leasereq.vnodeEnactmentInfo = {}
leasereq.vnodeEnactmentInfo[1] = int(req["oid"])
- leasereq.setScheduler(self.rm.scheduler)
+ leasereq.set_scheduler(self.rm.scheduler)
return leasereq
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/frontends/tracefile.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/tracefile.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/frontends/tracefile.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -70,11 +70,11 @@
# Add boot + shutdown overhead
overhead = config.getBootOverhead()
for r in self.requests:
- r.addBootOverhead(overhead)
+ r.add_boot_overhead(overhead)
# Make the scheduler reachable from the lease request
for r in self.requests:
- r.setScheduler(rm.scheduler)
+ r.set_scheduler(rm.scheduler)
num_besteffort = len([x for x in self.requests if isinstance(x,BestEffortLease)])
num_ar = len([x for x in self.requests if isinstance(x,ARLease)])
@@ -87,8 +87,8 @@
# requests are in the trace up to the current time
# reported by the resource manager
time = self.rm.clock.get_time()
- nowreq = [r for r in self.requests if r.tSubmit <= time]
- self.requests = [r for r in self.requests if r.tSubmit > time]
+ nowreq = [r for r in self.requests if r.submit_time <= time]
+ self.requests = [r for r in self.requests if r.submit_time > time]
return nowreq
def existsPendingReq(self):
@@ -96,6 +96,6 @@
def getNextReqTime(self):
if self.existsPendingReq():
- return self.requests[0].tSubmit
+ return self.requests[0].submit_time
else:
return None
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/resourcepool.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -89,30 +89,30 @@
# TODO: It might make sense to consider two cases:
# "no image management at all" and "assume master image
# is predeployed, but we still have to make a copy".
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskImageID, lease.diskImageSize, lease.leaseID, vnode)
+ taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
elif lease_deployment_type == constants.DEPLOYMENT_PREDEPLOY:
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskImageID, lease.diskImageSize, lease.leaseID, vnode)
+ taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
elif lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- taintedImage = node.getTaintedImage(lease.leaseID, vnode, lease.diskImageID)
+ taintedImage = node.getTaintedImage(lease.id, vnode, lease.diskimage_id)
if self.reusealg == constants.REUSE_NONE:
if taintedImage == None:
- raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.leaseID, vnode, pnode)
+ raise Exception, "ERROR: No image for L%iV%i is on node %i" % (lease.id, vnode, pnode)
elif self.reusealg == constants.REUSE_IMAGECACHES:
- poolentry = node.getPoolEntry(lease.diskImageID, leaseID=lease.leaseID, vnode=vnode)
+ poolentry = node.getPoolEntry(lease.diskimage_id, lease_id=lease.id, vnode=vnode)
if poolentry == None:
# Not necessarily an error. Maybe the pool was full, and
# we had to fall back on creating a tainted image right
# when the image was transferred. We have to check this.
if taintedImage == None:
- raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.leaseID, vnode, pnode)
+ raise Exception, "ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease.id, vnode, pnode)
else:
# Create tainted image
- taintedImage = self.addTaintedImageToNode(pnode, lease.diskImageID, lease.diskImageSize, lease.leaseID, vnode)
+ taintedImage = self.addTaintedImageToNode(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
# ENACTMENT
# self.storage.createCopyFromCache(pnode, lease.diskImageSize)
startAction.vnodes[vnode].pnode = node.enactmentInfo
startAction.vnodes[vnode].diskimage = taintedImage.filename
- startAction.vnodes[vnode].res = rr.res[pnode]
+ startAction.vnodes[vnode].res = rr.resources_in_pnode[pnode]
self.vm.start(startAction)
@@ -139,22 +139,22 @@
self.getNode(nod_id).printFiles()
if self.reusealg == constants.REUSE_NONE:
- for (leaseID, vnode) in vnodes:
+ for (lease_id, vnode) in vnodes:
img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(leaseID, vnode)
+ img.addMapping(lease_id, vnode)
self.getNode(nod_id).addFile(img)
elif self.reusealg == constants.REUSE_IMAGECACHES:
# Sometimes we might find that the image is already deployed
# (although unused). In that case, don't add another copy to
# the pool. Just "reactivate" it.
if self.getNode(nod_id).isInPool(imagefile):
- for (leaseID, vnode) in vnodes:
- self.getNode(nod_id).addToPool(imagefile, leaseID, vnode, timeout)
+ for (lease_id, vnode) in vnodes:
+ self.getNode(nod_id).addToPool(imagefile, lease_id, vnode, timeout)
else:
img = VMImageFile(imagefile, imagesize, masterimg=True)
img.timeout = timeout
- for (leaseID, vnode) in vnodes:
- img.addMapping(leaseID, vnode)
+ for (lease_id, vnode) in vnodes:
+ img.addMapping(lease_id, vnode)
if self.maxcachesize != constants.CACHESIZE_UNLIMITED:
poolsize = self.getNode(nod_id).getPoolSize()
reqsize = poolsize + imagesize
@@ -168,9 +168,9 @@
# If unsuccessful, this just means we couldn't add the image
# to the pool. We will have to create tainted images to be used
# only by these leases
- for (leaseID, vnode) in vnodes:
+ for (lease_id, vnode) in vnodes:
img = VMImageFile(imagefile, imagesize, masterimg=False)
- img.addMapping(leaseID, vnode)
+ img.addMapping(lease_id, vnode)
self.getNode(nod_id).addFile(img)
else:
self.getNode(nod_id).addFile(img)
@@ -241,38 +241,38 @@
def getEDFRepositoryNode(self):
return self.EDFnode
- def addTaintedImageToNode(self, pnode, diskImageID, imagesize, leaseID, vnode):
- self.rm.logger.debug("Adding tainted image for L%iV%i in pnode=%i" % (leaseID, vnode, pnode), constants.ENACT)
+ def addTaintedImageToNode(self, pnode, diskImageID, imagesize, lease_id, vnode):
+ self.rm.logger.debug("Adding tainted image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode), constants.ENACT)
self.getNode(pnode).printFiles()
- imagefile = self.storage.resolveToFile(leaseID, vnode, diskImageID)
+ imagefile = self.storage.resolveToFile(lease_id, vnode, diskImageID)
img = VMImageFile(imagefile, imagesize, diskImageID=diskImageID, masterimg=False)
- img.addMapping(leaseID, vnode)
+ img.addMapping(lease_id, vnode)
self.getNode(pnode).addFile(img)
self.getNode(pnode).printFiles()
self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
return img
- def checkImage(self, pnode, leaseID, vnode, imagefile):
+ def checkImage(self, pnode, lease_id, vnode, imagefile):
node = self.getNode(pnode)
if self.rm.config.getTransferType() == constants.TRANSFER_NONE:
- self.rm.logger.debug("Adding tainted image for L%iV%i in node %i" % (leaseID, vnode, pnode), constants.ENACT)
+ self.rm.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode), constants.ENACT)
elif self.reusealg == constants.REUSE_NONE:
- if not node.hasTaintedImage(leaseID, vnode, imagefile):
- self.rm.logger.debug("ERROR: Image for L%iV%i is not deployed on node %i" % (leaseID, vnode, pnode), constants.ENACT)
+ if not node.hasTaintedImage(lease_id, vnode, imagefile):
+ self.rm.logger.debug("ERROR: Image for L%iV%i is not deployed on node %i" % (lease_id, vnode, pnode), constants.ENACT)
elif self.reusealg == constants.REUSE_IMAGECACHES:
- poolentry = node.getPoolEntry(imagefile, leaseID=leaseID, vnode=vnode)
+ poolentry = node.getPoolEntry(imagefile, lease_id=lease_id, vnode=vnode)
if poolentry == None:
# Not necessarily an error. Maybe the pool was full, and
# we had to fall back on creating a tainted image right
# when the image was transferred. We have to check this.
- if not node.hasTaintedImage(leaseID, vnode, imagefile):
- self.rm.logger.error("ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (leaseID, vnode, pnode), constants.ENACT)
+ if not node.hasTaintedImage(lease_id, vnode, imagefile):
+ self.rm.logger.error("ERROR: Image for L%iV%i is not in pool on node %i, and there is no tainted image" % (lease_id, vnode, pnode), constants.ENACT)
else:
# Create tainted image
- self.rm.logger.debug("Adding tainted image for L%iV%i in node %i" % (leaseID, vnode, pnode), constants.ENACT)
+ self.rm.logger.debug("Adding tainted image for L%iV%i in node %i" % (lease_id, vnode, pnode), constants.ENACT)
node.printFiles()
img = VMImageFile(imagefile, poolentry.filesize, masterimg=False)
- img.addMapping(leaseID, vnode)
+ img.addMapping(lease_id, vnode)
node.addFile(img)
node.printFiles()
self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
@@ -283,8 +283,8 @@
def getNodesWithImgInPool(self, imagefile, after = None):
return [n.nod_id for n in self.nodes if n.isInPool(imagefile, after=after)]
- def addToPool(self, pnode, imagefile, leaseID, vnode, timeout):
- return self.getNode(pnode).addToPool(imagefile, leaseID, vnode, timeout)
+ def addToPool(self, pnode, imagefile, lease_id, vnode, timeout):
+ return self.getNode(pnode).addToPool(imagefile, lease_id, vnode, timeout)
def removeImage(self, pnode, lease, vnode):
node = self.getNode(pnode)
@@ -311,20 +311,20 @@
self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
- def addRAMFileToNode(self, pnode, leaseID, vnode, size):
+ def addRAMFileToNode(self, pnode, lease_id, vnode, size):
node = self.getNode(pnode)
- self.rm.logger.debug("Adding RAM file for L%iV%i in node %i" % (leaseID, vnode, pnode), constants.ENACT)
+ self.rm.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode), constants.ENACT)
node.printFiles()
- f = RAMImageFile("RAM_L%iV%i" % (leaseID, vnode), size, leaseID, vnode)
+ f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
node.addFile(f)
node.printFiles()
self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
- def removeRAMFileFromNode(self, pnode, leaseID, vnode):
+ def removeRAMFileFromNode(self, pnode, lease_id, vnode):
node = self.getNode(pnode)
- self.rm.logger.debug("Removing RAM file for L%iV%i in node %i" % (leaseID, vnode, pnode), constants.ENACT)
+ self.rm.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode), constants.ENACT)
node.printFiles()
- node.removeRAMFile(leaseID, vnode)
+ node.removeRAMFile(lease_id, vnode)
node.printFiles()
self.rm.stats.appendStat(constants.COUNTER_DISKUSAGE, self.getMaxDiskUsage())
@@ -362,45 +362,45 @@
self.workingspacesize -= img.filesize
def removeRAMFile(self, lease, vnode):
- img = [f for f in self.files if isinstance(f, RAMImageFile) and f.leaseID==lease and f.vnode==vnode]
+ img = [f for f in self.files if isinstance(f, RAMImageFile) and f.id==lease and f.vnode==vnode]
if len(img) > 0:
img = img[0]
self.files.remove(img)
self.workingspacesize -= img.filesize
- def getTaintedImage(self, leaseID, vnode, imagefile):
+ def getTaintedImage(self, lease_id, vnode, imagefile):
images = self.getTaintedImages()
- image = [i for i in images if i.filename == imagefile and i.hasMapping(leaseID, vnode)]
+ image = [i for i in images if i.filename == imagefile and i.hasMapping(lease_id, vnode)]
if len(image) == 0:
return None
elif len(image) == 1:
return image[0]
elif len(image) > 1:
- self.logger.warning("More than one tainted image for L%iV%i on node %i" % (leaseID, vnode, self.nod_id), constants.ENACT)
+ self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id), constants.ENACT)
return image[0]
- def addToPool(self, imagefile, leaseID, vnode, timeout):
+ def addToPool(self, imagefile, lease_id, vnode, timeout):
for f in self.files:
if f.filename == imagefile:
- f.addMapping(leaseID, vnode)
+ f.addMapping(lease_id, vnode)
f.updateTimeout(timeout)
break # Ugh
self.printFiles()
- def getPoolEntry(self, imagefile, after = None, leaseID=None, vnode=None):
+ def getPoolEntry(self, imagefile, after = None, lease_id=None, vnode=None):
images = self.getPoolImages()
images = [i for i in images if i.filename == imagefile]
if after != None:
images = [i for i in images if i.timeout >= after]
- if leaseID != None and vnode != None:
- images = [i for i in images if i.hasMapping(leaseID, vnode)]
+ if lease_id != None and vnode != None:
+ images = [i for i in images if i.hasMapping(lease_id, vnode)]
if len(images)>0:
return images[0]
else:
return None
- def isInPool(self, imagefile, after = None, leaseID=None, vnode=None):
- entry = self.getPoolEntry(imagefile, after = after, leaseID=leaseID, vnode=vnode)
+ def isInPool(self, imagefile, after = None, lease_id=None, vnode=None):
+ entry = self.getPoolEntry(imagefile, after = after, lease_id=lease_id, vnode=vnode)
if entry == None:
return False
else:
@@ -471,11 +471,11 @@
self.masterimg = masterimg
self.timeout = None
- def addMapping(self, leaseID, vnode):
- self.mappings.add((leaseID, vnode))
+ def addMapping(self, lease_id, vnode):
+ self.mappings.add((lease_id, vnode))
- def hasMapping(self, leaseID, vnode):
- return (leaseID, vnode) in self.mappings
+ def hasMapping(self, lease_id, vnode):
+ return (lease_id, vnode) in self.mappings
def hasMappings(self):
return len(self.mappings) > 0
@@ -504,11 +504,11 @@
return "(DISK " + self.filename + " " + vnodemapstr(self.mappings) + " " + master + " " + str(timeout) + ")"
class RAMImageFile(File):
- def __init__(self, filename, filesize, leaseID, vnode):
+ def __init__(self, filename, filesize, lease_id, vnode):
File.__init__(self, filename, filesize)
- self.leaseID = leaseID
+ self.id = lease_id
self.vnode = vnode
def __str__(self):
- mappings = [(self.leaseID, self.vnode)]
+ mappings = [(self.id, self.vnode)]
return "(RAM " + self.filename + " " + vnodemapstr(mappings)+ ")"
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/rm.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -132,7 +132,7 @@
# In debug mode, dump the lease descriptors.
for lease in self.scheduler.completedleases.entries.values():
- lease.printContents()
+ lease.print_contents()
# Write all collected data to disk
self.stats.dumpStatsToDisk()
@@ -158,7 +158,7 @@
requests = []
for frontend in self.frontends:
requests += frontend.getAccumulatedRequests()
- requests.sort(key=operator.attrgetter("tSubmit"))
+ requests.sort(key=operator.attrgetter("submit_time"))
ar_leases = [req for req in requests if isinstance(req, ARLease)]
be_leases = [req for req in requests if isinstance(req, BestEffortLease)]
@@ -208,16 +208,16 @@
if verbose and len(scheduled)>0:
self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv", constants.RM)
for k in scheduled:
- lease = self.scheduler.scheduledleases.getLease(k)
- lease.printContents(loglevel=loglevel)
+ lease = self.scheduler.scheduledleases.get_lease(k)
+ lease.print_contents(loglevel=loglevel)
self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", constants.RM)
# Print queue size and descriptors of queued leases
- self.logger.log(loglevel, "Queue size: %i" % len(self.scheduler.queue.q), constants.RM)
- if verbose and len(self.scheduler.queue.q)>0:
+ self.logger.log(loglevel, "Queue size: %i" % self.scheduler.queue.length(), constants.RM)
+ if verbose and self.scheduler.queue.length()>0:
self.logger.log(loglevel, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv", constants.RM)
- for lease in self.scheduler.queue.q:
- lease.printContents(loglevel=loglevel)
+ for lease in self.scheduler.queue:
+ lease.print_contents(loglevel=loglevel)
self.logger.log(loglevel, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^", constants.RM)
def get_next_changepoint(self):
@@ -338,7 +338,7 @@
# Notify the resource manager about the premature ends
for rr in prematureends:
- self.rm.notifyEndVM(rr.lease, rr)
+ self.rm.notify_end_vm(rr.lease, rr)
# Process reservations starting/stopping at the current time and
# check if there are any new requests.
@@ -429,7 +429,7 @@
# We can also be done if we've specified that we want to stop when
# the best-effort requests are all done or when they've all been submitted.
stopwhen = self.rm.config.stopWhen()
- scheduledbesteffort = self.rm.scheduler.scheduledleases.getLeases(type = BestEffortLease)
+ scheduledbesteffort = self.rm.scheduler.scheduledleases.get_leases(type = BestEffortLease)
pendingbesteffort = [r for r in tracefrontend.requests if isinstance(r, BestEffortLease)]
if stopwhen == constants.STOPWHEN_BEDONE:
if self.rm.scheduler.isQueueEmpty() and len(scheduledbesteffort) + len(pendingbesteffort) == 0:
@@ -539,7 +539,7 @@
self.rm.process_requests(self.nextperiodicwakeup)
# Determine if there's anything to do before the next wakeup time
- nextchangepoint = self.rm.get_next_change_point()
+ nextchangepoint = self.rm.get_next_changepoint()
if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
# We need to wake up earlier to handle a slot table event
nextwakeup = nextchangepoint
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -113,17 +113,17 @@
# Process AR requests
for lease_req in requests:
- self.rm.logger.debug("LEASE-%i Processing request (AR)" % lease_req.leaseID, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Start %s" % (lease_req.leaseID, lease_req.start), constants.SCHED)
- self.rm.logger.debug("LEASE-%i Duration %s" % (lease_req.leaseID, lease_req.duration), constants.SCHED)
- self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.leaseID, lease_req.resreq), constants.SCHED)
- self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.leaseID, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Processing request (AR)" % lease_req.id, constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Start %s" % (lease_req.id, lease_req.start), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Duration %s" % (lease_req.id, lease_req.duration), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
+ self.rm.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested), constants.SCHED)
accepted = False
try:
self.schedule_ar_lease(lease_req, avoidpreempt=avoidpreempt, nexttime=nexttime)
self.scheduledleases.add(lease_req)
- self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.leaseID)
+ self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
# If our first try avoided preemption, try again
@@ -131,22 +131,22 @@
# TODO: Roll this into the exact slot fitting algorithm
if avoidpreempt:
try:
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.leaseID, msg), constants.SCHED)
- self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.leaseID, constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id, constants.SCHED)
self.schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
self.scheduledleases.add(lease_req)
- self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.leaseID)
+ self.rm.stats.incrCounter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
- self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.leaseID)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.leaseID, msg), constants.SCHED)
+ self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
else:
- self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.leaseID)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.leaseID, msg), constants.SCHED)
+ self.rm.stats.incrCounter(constants.COUNTER_ARREJECTED, lease_req.id)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
if accepted:
- self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.leaseID, constants.SCHED)
+ self.rm.logger.info("AR lease request #%i has been accepted." % lease_req.id, constants.SCHED)
else:
- self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.leaseID, constants.SCHED)
+ self.rm.logger.info("AR lease request #%i has been rejected." % lease_req.id, constants.SCHED)
done = False
@@ -158,35 +158,37 @@
else:
lease_req = self.queue.dequeue()
try:
- self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.leaseID, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Processing request (BEST-EFFORT)" % lease_req.leaseID, constants.SCHED)
- self.rm.logger.debug("LEASE-%i Duration: %s" % (lease_req.leaseID, lease_req.duration), constants.SCHED)
- self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.leaseID, lease_req.resreq), constants.SCHED)
+ self.rm.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id, constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Processing request (BEST-EFFORT)" % lease_req.id, constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Duration: %s" % (lease_req.id, lease_req.duration), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i ResReq %s" % (lease_req.id, lease_req.requested_resources), constants.SCHED)
self.schedule_besteffort_lease(lease_req, nexttime)
self.scheduledleases.add(lease_req)
- self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.leaseID)
+ self.rm.stats.decrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
except SchedException, msg:
# Put back on queue
newqueue.enqueue(lease_req)
- self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.leaseID, msg), constants.SCHED)
- self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.leaseID, constants.SCHED)
+ self.rm.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg), constants.SCHED)
+ self.rm.logger.info("Lease %i could not be scheduled at this time." % lease_req.id, constants.SCHED)
if not self.rm.config.isBackfilling():
done = True
-
- newqueue.q += self.queue.q
+
+ for lease in self.queue:
+ newqueue.enqueue(lease)
+
self.queue = newqueue
def process_reservations(self, nowtime):
- starting = [l for l in self.scheduledleases.entries.values() if l.hasStartingReservations(nowtime)]
- ending = [l for l in self.scheduledleases.entries.values() if l.hasEndingReservations(nowtime)]
+ starting = [l for l in self.scheduledleases.entries.values() if l.has_starting_reservations(nowtime)]
+ ending = [l for l in self.scheduledleases.entries.values() if l.has_ending_reservations(nowtime)]
for l in ending:
- rrs = l.getEndingReservations(nowtime)
+ rrs = l.get_ending_reservations(nowtime)
for rr in rrs:
self._handle_end_rr(l, rr)
self.handlers[type(rr)].on_end(self, l, rr)
for l in starting:
- rrs = l.getStartingReservations(nowtime)
+ rrs = l.get_starting_reservations(nowtime)
for rr in rrs:
self.handlers[type(rr)].on_start(self, l, rr)
@@ -203,36 +205,36 @@
def enqueue(self, lease_req):
"""Queues a best-effort lease request"""
- self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, lease_req.leaseID)
+ self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, lease_req.id)
self.queue.enqueue(lease_req)
- self.rm.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.leaseID, lease_req.numnodes, lease_req.duration.requested), constants.SCHED)
+ self.rm.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested), constants.SCHED)
def is_queue_empty(self):
"""Return True is the queue is empty, False otherwise"""
- return self.queue.isEmpty()
+ return self.queue.is_empty()
def exists_scheduled_leases(self):
"""Return True if there are any leases scheduled in the future"""
- return not self.scheduledleases.isEmpty()
+ return not self.scheduledleases.is_empty()
def notify_premature_end_vm(self, l, rr):
- self.rm.logger.info("LEASE-%i The VM has ended prematurely." % l.leaseID, constants.SCHED)
+ self.rm.logger.info("LEASE-%i The VM has ended prematurely." % l.id, constants.SCHED)
self._handle_end_rr(l, rr)
if rr.oncomplete == constants.ONCOMPLETE_SUSPEND:
- rrs = l.nextRRs(rr)
+ rrs = l.next_rrs(rr)
for r in rrs:
- l.removeRR(r)
+ l.remove_rr(r)
self.slottable.removeReservation(r)
rr.oncomplete = constants.ONCOMPLETE_ENDLEASE
rr.end = self.rm.clock.get_time()
self._handle_end_vm(l, rr)
- nexttime = self.rm.clock.getNextSchedulableTime()
+ nexttime = self.rm.clock.get_next_schedulable_time()
if self.rm.config.isBackfilling():
# We need to reevaluate the schedule to see if there are any future
# reservations that we can slide back.
- self.reevaluateSchedule(l, rr.nodes.values(), nexttime, [])
+ self.reevaluate_schedule(l, rr.nodes.values(), nexttime, [])
def schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
@@ -243,7 +245,7 @@
if len(preemptions) > 0:
leases = self.slottable.findLeasesToPreempt(preemptions, start, end)
- self.rm.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.leaseID for l in leases], lease_req.leaseID), constants.SCHED)
+ self.rm.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id), constants.SCHED)
for lease in leases:
self.preempt(lease, time=start)
@@ -251,7 +253,7 @@
# Add VM resource reservations
vmrr = ds.VMResourceReservation(lease_req, start, end, nodeassignment, res, constants.ONCOMPLETE_ENDLEASE, False)
vmrr.state = constants.RES_STATE_SCHEDULED
- lease_req.appendRR(vmrr)
+ lease_req.append_rr(vmrr)
# Schedule deployment overhead
self.deployment.schedule(lease_req, vmrr, nexttime)
@@ -275,130 +277,140 @@
earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(req.numnodes)])
susptype = self.rm.config.getSuspensionType()
- if susptype == constants.SUSPENSION_NONE:
- suspendable = False
- preemptible = True
- elif susptype == constants.SUSPENSION_ALL:
- suspendable = True
- preemptible = True
- elif susptype == constants.SUSPENSION_SERIAL:
- if req.numnodes == 1:
- suspendable = True
- preemptible = True
- else:
- suspendable = False
- preemptible = True
+ if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and req.numnodes == 1):
+ cansuspend = False
+ else:
+ cansuspend = True
+
canmigrate = self.rm.config.isMigrationAllowed()
try:
mustresume = (req.state == constants.LEASE_STATE_SUSPENDED)
canreserve = self.canReserveBestEffort()
- (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve, suspendable=suspendable, preemptible=preemptible, canmigrate=canmigrate, mustresume=mustresume)
+ (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve, suspendable=cansuspend, canmigrate=canmigrate, mustresume=mustresume)
- self.deployment.schedule(req, vmrr, nexttime)
+ # Schedule deployment
+ if req.state != constants.LEASE_STATE_SUSPENDED:
+ self.deployment.schedule(req, vmrr, nexttime)
- # Schedule image transfers
+ # TODO: The following would be more correctly handled in the RR handle functions.
+ # We need to have an explicit MigrationResourceReservation before doing that.
if req.state == constants.LEASE_STATE_SUSPENDED:
- # TODO: This would be more correctly handled in the RR handle functions.
# Update VM image mappings, since we might be resuming
# in different nodes.
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.leaseID, vnode)
+ self.rm.resourcepool.removeImage(pnode, req.id, vnode)
req.vmimagemap = vmrr.nodes
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.addTaintedImageToNode(pnode, req.diskImageID, req.diskImageSize, req.leaseID, vnode)
+ self.rm.resourcepool.addTaintedImageToNode(pnode, req.diskimage_id, req.diskimage_size, req.id, vnode)
+
# Update RAM file mappings
for vnode, pnode in req.memimagemap.items():
- self.rm.resourcepool.removeRAMFileFromNode(pnode, req.leaseID, vnode)
+ self.rm.resourcepool.removeRAMFileFromNode(pnode, req.id, vnode)
for vnode, pnode in vmrr.nodes.items():
- self.rm.resourcepool.addRAMFileToNode(pnode, req.leaseID, vnode, req.resreq.getByType(constants.RES_MEM))
+ self.rm.resourcepool.addRAMFileToNode(pnode, req.id, vnode, req.requested_resources.get_by_type(constants.RES_MEM))
req.memimagemap[vnode] = pnode
# Add resource reservations
if resmrr != None:
- req.appendRR(resmrr)
+ req.append_rr(resmrr)
self.slottable.addReservation(resmrr)
- req.appendRR(vmrr)
+ req.append_rr(vmrr)
self.slottable.addReservation(vmrr)
if susprr != None:
- req.appendRR(susprr)
+ req.append_rr(susprr)
self.slottable.addReservation(susprr)
if reservation:
self.numbesteffortres += 1
- req.printContents()
+ req.print_contents()
except SlotFittingException, msg:
raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
+ def schedule_immediate_lease(self, req, nexttime):
+ # Determine earliest start time in each node
+ earliest = self.deployment.find_earliest_starting_times(req, nexttime)
+ try:
+ (resmrr, vmrr, susprr, reservation) = self.slottable.fitBestEffort(req, earliest, canreserve=False, suspendable=False, canmigrate=False, mustresume=False)
+ # Schedule deployment
+ self.deployment.schedule(req, vmrr, nexttime)
+
+ req.append_rr(vmrr)
+ self.slottable.addReservation(vmrr)
+
+ req.print_contents()
+ except SlotFittingException, msg:
+ raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+
def preempt(self, req, time):
- self.rm.logger.info("Preempting lease #%i..." % (req.leaseID), constants.SCHED)
+ self.rm.logger.info("Preempting lease #%i..." % (req.id), constants.SCHED)
self.rm.logger.edebug("Lease before preemption:", constants.SCHED)
- req.printContents()
- vmrr, susprr = req.getLastVMRR()
+ req.print_contents()
+ vmrr, susprr = req.get_last_vmrr()
suspendresumerate = self.rm.resourcepool.info.getSuspendResumeRate()
if vmrr.state == constants.RES_STATE_SCHEDULED and vmrr.start >= time:
- self.rm.logger.info("... lease #%i has been cancelled and requeued." % req.leaseID, constants.SCHED)
+ self.rm.logger.info("... lease #%i has been cancelled and requeued." % req.id, constants.SCHED)
self.rm.logger.debug("Lease was set to start in the middle of the preempting lease.", constants.SCHED)
req.state = constants.LEASE_STATE_PENDING
- if vmrr.backfillres == True:
+ if vmrr.backfill_reservation == True:
self.numbesteffortres -= 1
- req.removeRR(vmrr)
+ req.remove_rr(vmrr)
self.slottable.removeReservation(vmrr)
if susprr != None:
- req.removeRR(susprr)
+ req.remove_rr(susprr)
self.slottable.removeReservation(susprr)
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.leaseID, vnode)
+ self.rm.resourcepool.removeImage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
- self.queue.enqueueInOrder(req)
- self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.leaseID)
+ self.queue.enqueue_in_order(req)
+ self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
else:
susptype = self.rm.config.getSuspensionType()
timebeforesuspend = time - vmrr.start
# TODO: Determine if it is in fact the initial VMRR or not. Right now
# we conservatively overestimate
canmigrate = self.rm.config.isMigrationAllowed()
- suspendthreshold = req.getSuspendThreshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
+ suspendthreshold = req.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
# We can't suspend if we're under the suspend threshold
suspendable = timebeforesuspend >= suspendthreshold
if suspendable and (susptype == constants.SUSPENSION_ALL or (req.numnodes == 1 and susptype == constants.SUSPENSION_SERIAL)):
- self.rm.logger.info("... lease #%i will be suspended at %s." % (req.leaseID, time), constants.SCHED)
+ self.rm.logger.info("... lease #%i will be suspended at %s." % (req.id, time), constants.SCHED)
self.slottable.suspend(req, time)
else:
- self.rm.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % req.leaseID, constants.SCHED)
+ self.rm.logger.info("... lease #%i has been cancelled and requeued (cannot be suspended)" % req.id, constants.SCHED)
req.state = constants.LEASE_STATE_PENDING
- if vmrr.backfillres == True:
+ if vmrr.backfill_reservation == True:
self.numbesteffortres -= 1
- req.removeRR(vmrr)
+ req.remove_rr(vmrr)
self.slottable.removeReservation(vmrr)
if susprr != None:
- req.removeRR(susprr)
+ req.remove_rr(susprr)
self.slottable.removeReservation(susprr)
if req.state == constants.LEASE_STATE_SUSPENDED:
- resmrr = req.prevRR(vmrr)
- req.removeRR(resmrr)
+ resmrr = req.prev_rr(vmrr)
+ req.remove_rr(resmrr)
self.slottable.removeReservation(resmrr)
for vnode, pnode in req.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, req.leaseID, vnode)
+ self.rm.resourcepool.removeImage(pnode, req.id, vnode)
self.deployment.cancel_deployment(req)
req.vmimagemap = {}
self.scheduledleases.remove(req)
- self.queue.enqueueInOrder(req)
- self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.leaseID)
+ self.queue.enqueue_in_order(req)
+ self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
self.rm.logger.edebug("Lease after preemption:", constants.SCHED)
- req.printContents()
+ req.print_contents()
- def reevaluateSchedule(self, endinglease, nodes, nexttime, checkedleases):
+ def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
self.rm.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime), constants.SCHED)
leases = self.scheduledleases.getNextLeasesScheduledInNodes(nexttime, nodes)
leases = [l for l in leases if isinstance(l, ds.BestEffortLease) and not l in checkedleases]
for l in leases:
- self.rm.logger.debug("Found lease %i" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("Found lease %i" % l.id, constants.SCHED)
+ l.print_contents()
# Earliest time can't be earlier than time when images will be
# available in node
earliest = max(nexttime, l.imagesavail)
@@ -417,8 +429,8 @@
#-------------------------------------------------------------------#
def _handle_start_vm(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleStartVM" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleStartVM" % l.id, constants.SCHED)
+ l.print_contents()
if l.state == constants.LEASE_STATE_DEPLOYED:
l.state = constants.LEASE_STATE_ACTIVE
rr.state = constants.RES_STATE_ACTIVE
@@ -438,17 +450,17 @@
rr.state = constants.RES_STATE_ACTIVE
# No enactment to do here, since all the suspend/resume actions are
# handled during the suspend/resume RRs
- l.printContents()
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RUN)
- self.rm.logger.debug("LEASE-%i End of handleStartVM" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Started VMs for lease %i on nodes %s" % (l.leaseID, rr.nodes.values()), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleStartVM" % l.id, constants.SCHED)
+ self.rm.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()), constants.SCHED)
def _handle_end_vm(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleEndVM" % l.leaseID, constants.SCHED)
- self.rm.logger.edebug("LEASE-%i Before:" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleEndVM" % l.id, constants.SCHED)
+ self.rm.logger.edebug("LEASE-%i Before:" % l.id, constants.SCHED)
+ l.print_contents()
diff = self.rm.clock.get_time() - rr.start
- l.duration.accumulateDuration(diff)
+ l.duration.accumulate_duration(diff)
rr.state = constants.RES_STATE_DONE
if rr.oncomplete == constants.ONCOMPLETE_ENDLEASE:
self.rm.resourcepool.stopVMs(l, rr)
@@ -457,69 +469,69 @@
self.completedleases.add(l)
self.scheduledleases.remove(l)
for vnode, pnode in l.vmimagemap.items():
- self.rm.resourcepool.removeImage(pnode, l.leaseID, vnode)
+ self.rm.resourcepool.removeImage(pnode, l.id, vnode)
if isinstance(l, ds.BestEffortLease):
- self.rm.stats.incrCounter(constants.COUNTER_BESTEFFORTCOMPLETED, l.leaseID)
+ self.rm.stats.incrCounter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
if isinstance(l, ds.BestEffortLease):
- if rr.backfillres == True:
+ if rr.backfill_reservation == True:
self.numbesteffortres -= 1
- self.rm.logger.edebug("LEASE-%i After:" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.edebug("LEASE-%i After:" % l.id, constants.SCHED)
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
- self.rm.logger.debug("LEASE-%i End of handleEndVM" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Stopped VMs for lease %i on nodes %s" % (l.leaseID, rr.nodes.values()), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleEndVM" % l.id, constants.SCHED)
+ self.rm.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()), constants.SCHED)
def _handle_start_suspend(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleStartSuspend" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id, constants.SCHED)
+ l.print_contents()
rr.state = constants.RES_STATE_ACTIVE
self.rm.resourcepool.suspendVMs(l, rr)
for vnode, pnode in rr.nodes.items():
- self.rm.resourcepool.addRAMFileToNode(pnode, l.leaseID, vnode, l.resreq.getByType(constants.RES_MEM))
+ self.rm.resourcepool.addRAMFileToNode(pnode, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
l.memimagemap[vnode] = pnode
- l.printContents()
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_SUSPEND)
- self.rm.logger.debug("LEASE-%i End of handleStartSuspend" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Suspending lease %i..." % (l.leaseID), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleStartSuspend" % l.id, constants.SCHED)
+ self.rm.logger.info("Suspending lease %i..." % (l.id), constants.SCHED)
def _handle_end_suspend(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleEndSuspend" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id, constants.SCHED)
+ l.print_contents()
# TODO: React to incomplete suspend
self.rm.resourcepool.verifySuspend(l, rr)
rr.state = constants.RES_STATE_DONE
l.state = constants.LEASE_STATE_SUSPENDED
self.scheduledleases.remove(l)
- self.queue.enqueueInOrder(l)
- self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, l.leaseID)
- l.printContents()
+ self.queue.enqueue_in_order(l)
+ self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, l.id)
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
- self.rm.logger.debug("LEASE-%i End of handleEndSuspend" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Lease %i suspended." % (l.leaseID), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleEndSuspend" % l.id, constants.SCHED)
+ self.rm.logger.info("Lease %i suspended." % (l.id), constants.SCHED)
def _handle_start_resume(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleStartResume" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleStartResume" % l.id, constants.SCHED)
+ l.print_contents()
self.rm.resourcepool.resumeVMs(l, rr)
rr.state = constants.RES_STATE_ACTIVE
- l.printContents()
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_VM_RESUME)
- self.rm.logger.debug("LEASE-%i End of handleStartResume" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Resuming lease %i..." % (l.leaseID), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleStartResume" % l.id, constants.SCHED)
+ self.rm.logger.info("Resuming lease %i..." % (l.id), constants.SCHED)
def _handle_end_resume(self, l, rr):
- self.rm.logger.debug("LEASE-%i Start of handleEndResume" % l.leaseID, constants.SCHED)
- l.printContents()
+ self.rm.logger.debug("LEASE-%i Start of handleEndResume" % l.id, constants.SCHED)
+ l.print_contents()
# TODO: React to incomplete resume
self.rm.resourcepool.verifyResume(l, rr)
rr.state = constants.RES_STATE_DONE
for vnode, pnode in rr.nodes.items():
- self.rm.resourcepool.removeRAMFileFromNode(pnode, l.leaseID, vnode)
- l.printContents()
+ self.rm.resourcepool.removeRAMFileFromNode(pnode, l.id, vnode)
+ l.print_contents()
self.updateNodeVMState(rr.nodes.values(), constants.DOING_IDLE)
- self.rm.logger.debug("LEASE-%i End of handleEndResume" % l.leaseID, constants.SCHED)
- self.rm.logger.info("Resumed lease %i" % (l.leaseID), constants.SCHED)
+ self.rm.logger.debug("LEASE-%i End of handleEndResume" % l.id, constants.SCHED)
+ self.rm.logger.info("Resumed lease %i" % (l.id), constants.SCHED)
def _handle_end_rr(self, l, rr):
self.slottable.removeReservation(rr)
@@ -527,8 +539,8 @@
def __enqueue_in_order(self, req):
- self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.leaseID)
- self.queue.enqueueInOrder(req)
+ self.rm.stats.incrCounter(constants.COUNTER_QUEUESIZE, req.id)
+ self.queue.enqueue_in_order(req)
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -117,10 +117,10 @@
reservations = self.getReservationsAt(time)
# Find how much resources are available on each node
for r in reservations:
- for node in r.res:
- nodes[node].capacity.decr(r.res[node])
- if not r.isPreemptible():
- nodes[node].capacitywithpreemption.decr(r.res[node])
+ for node in r.resources_in_pnode:
+ nodes[node].capacity.decr(r.resources_in_pnode[node])
+ if not r.is_preemptible():
+ nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
self.availabilitycache[time] = nodes
@@ -141,7 +141,7 @@
if resreq != None:
newnodes = []
for i, node in nodes:
- if not resreq.fitsIn(node.capacity) and not resreq.fitsIn(node.capacitywithpreemption):
+ if not resreq.fits_in(node.capacity) and not resreq.fits_in(node.capacitywithpreemption):
pass
else:
newnodes.append((i, node))
@@ -151,8 +151,8 @@
def getUtilization(self, time, restype=constants.RES_CPU):
nodes = self.getAvailability(time)
- total = sum([n.capacity.getByType(restype) for n in self.nodes.nodelist])
- avail = sum([n.capacity.getByType(restype) for n in nodes.values()])
+ total = sum([n.capacity.get_by_type(restype) for n in self.nodes.nodelist])
+ avail = sum([n.capacity.get_by_type(restype) for n in nodes.values()])
return 1.0 - (float(avail)/total)
def getReservationsAt(self, time):
@@ -218,7 +218,7 @@
# TODO: Might be more efficient to resort lists
self.removeReservation(rrold)
self.addReservation(rrnew)
- rrold.lease.replaceRR(rrold, rrnew)
+ rrold.lease.replace_rr(rrold, rrnew)
self.dirty()
@@ -249,7 +249,7 @@
changepoints = set()
res = self.getReservationsWithChangePointsAfter(after)
for rr in res:
- if nodes == None or (nodes != None and len(set(rr.res.keys()) & set(nodes)) > 0):
+ if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
if rr.start > after:
changepoints.add(rr.start)
if rr.end > after:
@@ -278,12 +278,12 @@
return p
def fitExact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
- leaseID = leasereq.leaseID
+ lease_id = leasereq.id
start = leasereq.start.requested
end = leasereq.start.requested + leasereq.duration.requested
- diskImageID = leasereq.diskImageID
+ diskImageID = leasereq.diskimage_id
numnodes = leasereq.numnodes
- resreq = leasereq.resreq
+ resreq = leasereq.requested_resources
self.availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
self.availabilitywindow.printContents(withpreemption = False)
@@ -399,16 +399,16 @@
def findLeasesToPreempt(self, mustpreempt, startTime, endTime):
def comparepreemptability(rrX, rrY):
- if rrX.lease.tSubmit > rrY.lease.tSubmit:
+ if rrX.lease.tSubmit > rrY.lease.submit_time:
return constants.BETTER
- elif rrX.lease.tSubmit < rrY.lease.tSubmit:
+ elif rrX.lease.tSubmit < rrY.lease.submit_time:
return constants.WORSE
else:
return constants.EQUAL
def preemptedEnough(amountToPreempt):
for node in amountToPreempt:
- if not amountToPreempt[node].isZeroOrLess():
+ if not amountToPreempt[node].is_zero_or_less():
return False
return True
@@ -418,12 +418,12 @@
nodes = set(mustpreempt.keys())
reservationsAtStart = self.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if r.isPreemptible()
- and len(set(r.res.keys()) & nodes)>0]
+ reservationsAtStart = [r for r in reservationsAtStart if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
reservationsAtMiddle = self.getReservationsStartingBetween(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if r.isPreemptible()
- and len(set(r.res.keys()) & nodes)>0]
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
reservationsAtStart.sort(comparepreemptability)
reservationsAtMiddle.sort(comparepreemptability)
@@ -437,11 +437,11 @@
# The following will really only come into play when we have
# multiple VMs per node
mustpreemptres = False
- for n in r.res.keys():
+ for n in r.resources_in_pnode.keys():
# Don't need to preempt if we've already preempted all
# the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].isZeroOrLess():
- amountToPreempt[n].decr(r.res[n])
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
mustpreemptres = True
if mustpreemptres:
atstart.add(r)
@@ -464,28 +464,29 @@
if r.start <= cp and cp < r.end]
for r in reservations:
mustpreemptres = False
- for n in r.res.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].isZeroOrLess():
- amountToPreempt[n].decr(r.res[n])
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
mustpreemptres = True
if mustpreemptres:
atmiddle.add(r)
if preemptedEnough(amountToPreempt):
break
- self.rm.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.leaseID for r in atstart], constants.ST)
- self.rm.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.leaseID for r in atmiddle], constants.ST)
+ self.rm.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart], constants.ST)
+ self.rm.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle], constants.ST)
leases = [r.lease for r in atstart|atmiddle]
return leases
- def fitBestEffort(self, lease, earliest, canreserve, suspendable, preemptible, canmigrate, mustresume):
- leaseID = lease.leaseID
- remdur = lease.duration.getRemainingDuration()
+ def fitBestEffort(self, lease, earliest, canreserve, suspendable, canmigrate, mustresume):
+ lease_id = lease.id
+ remdur = lease.duration.get_remaining_duration()
numnodes = lease.numnodes
- resreq = lease.resreq
+ resreq = lease.requested_resources
+ preemptible = lease.preemptible
suspendresumerate = self.resourcepool.info.getSuspendResumeRate()
#
@@ -496,26 +497,29 @@
# If we can't migrate, we have to stay in the
# nodes where the lease is currently deployed
if mustresume and not canmigrate:
- vmrr, susprr = lease.getLastVMRR()
+ vmrr, susprr = lease.get_last_vmrr()
curnodes = set(vmrr.nodes.values())
suspendthreshold = lease.getSuspendThreshold(initial=False, suspendrate=suspendresumerate, migrating=False)
if mustresume and canmigrate:
# If we have to resume this lease, make sure that
# we have enough time to transfer the images.
- migratetime = lease.estimateMigrationTime()
+ # TODO: Get bandwidth another way. Right now, the
+ # image node bandwidth is the same as the bandwidt
+ # in the other nodes, but this won't always be true.
+ migratetime = lease.estimate_migration_time(self.rm.resourcepool.imagenode_bandwidth)
earliesttransfer = self.rm.clock.get_time() + migratetime
for n in earliest:
earliest[n][0] = max(earliest[n][0], earliesttransfer)
- suspendthreshold = lease.getSuspendThreshold(initial=False, suspendrate=suspendresumerate, migrating=True)
+ suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=True)
if mustresume:
- resumetime = lease.estimateSuspendResumeTime(suspendresumerate)
+ resumetime = lease.estimate_suspend_resume_time(suspendresumerate)
# Must allocate time for resumption too
remdur += resumetime
else:
- suspendthreshold = lease.getSuspendThreshold(initial=True, suspendrate=suspendresumerate)
+ suspendthreshold = lease.get_suspend_threshold(initial=True, suspendrate=suspendresumerate)
#
@@ -598,7 +602,7 @@
if mustresume:
# If we're resuming, we prefer resuming in the nodes we're already
# deployed in, to minimize the number of transfers.
- vmrr, susprr = lease.getLastVMRR()
+ vmrr, susprr = lease.get_last_vmrr()
nodes = set(vmrr.nodes.values())
availnodes = set(physnodes)
deplnodes = availnodes.intersection(nodes)
@@ -637,9 +641,9 @@
if mustresume:
resmres = {}
for n in mappings.values():
- r = ds.ResourceTuple.createEmpty()
- r.setByType(constants.RES_MEM, resreq.getByType(constants.RES_MEM))
- r.setByType(constants.RES_DISK, resreq.getByType(constants.RES_DISK))
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
resmres[n] = r
resmrr = ds.ResumptionResourceReservation(lease, start-resumetime, start, resmres, mappings)
resmrr.state = constants.RES_STATE_SCHEDULED
@@ -648,9 +652,9 @@
if mustsuspend:
suspres = {}
for n in mappings.values():
- r = ds.ResourceTuple.createEmpty()
- r.setByType(constants.RES_MEM, resreq.getByType(constants.RES_MEM))
- r.setByType(constants.RES_DISK, resreq.getByType(constants.RES_DISK))
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, resreq.get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, resreq.get_by_type(constants.RES_DISK))
suspres[n] = r
susprr = ds.SuspensionResourceReservation(lease, end, end + suspendtime, suspres, mappings)
susprr.state = constants.RES_STATE_SCHEDULED
@@ -667,7 +671,7 @@
res_str = " (resuming)"
if mustsuspend:
susp_str = " (suspending)"
- self.rm.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.leaseID, mappings.values(), start, res_str, end, susp_str), constants.ST)
+ self.rm.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str), constants.ST)
return resmrr, vmrr, susprr, reservation
@@ -715,10 +719,10 @@
def suspend(self, lease, time):
suspendresumerate = self.resourcepool.info.getSuspendResumeRate()
- (vmrr, susprr) = lease.getLastVMRR()
+ (vmrr, susprr) = lease.get_last_vmrr()
vmrrnew = copy.copy(vmrr)
- suspendtime = lease.estimateSuspendResumeTime(suspendresumerate)
+ suspendtime = lease.estimate_suspend_resume_time(suspendresumerate)
vmrrnew.end = time - suspendtime
vmrrnew.oncomplete = constants.ONCOMPLETE_SUSPEND
@@ -726,29 +730,29 @@
self.updateReservationWithKeyChange(vmrr, vmrrnew)
if susprr != None:
- lease.removeRR(susprr)
+ lease.remove_rr(susprr)
self.removeReservation(susprr)
mappings = vmrr.nodes
suspres = {}
for n in mappings.values():
- r = ds.ResourceTuple.createEmpty()
- r.setByType(constants.RES_MEM, vmrr.res[n].getByType(constants.RES_MEM))
- r.setByType(constants.RES_DISK, vmrr.res[n].getByType(constants.RES_DISK))
+ r = ds.ResourceTuple.create_empty()
+ r.set_by_type(constants.RES_MEM, vmrr.resources_in_pnode[n].get_by_type(constants.RES_MEM))
+ r.set_by_type(constants.RES_DISK, vmrr.resources_in_pnode[n].get_by_type(constants.RES_DISK))
suspres[n] = r
newsusprr = ds.SuspensionResourceReservation(lease, time - suspendtime, time, suspres, mappings)
newsusprr.state = constants.RES_STATE_SCHEDULED
- lease.appendRR(newsusprr)
+ lease.append_rr(newsusprr)
self.addReservation(newsusprr)
def slideback(self, lease, earliest):
- (vmrr, susprr) = lease.getLastVMRR()
+ (vmrr, susprr) = lease.get_last_vmrr()
vmrrnew = copy.copy(vmrr)
nodes = vmrrnew.nodes.values()
if lease.state == constants.LEASE_STATE_SUSPENDED:
- resmrr = lease.prevRR(vmrr)
+ resmrr = lease.prev_rr(vmrr)
originalstart = resmrr.start
else:
resmrr = None
@@ -757,7 +761,7 @@
cp = [earliest] + cp
newstart = None
for p in cp:
- self.availabilitywindow.initWindow(p, lease.resreq, canpreempt=False)
+ self.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
self.availabilitywindow.printContents()
if self.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
(end, canfit) = self.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
@@ -779,11 +783,11 @@
# If the lease was going to be suspended, check to see if
# we don't need to suspend any more.
- remdur = lease.duration.getRemainingDuration()
+ remdur = lease.duration.get_remaining_duration()
if susprr != None and vmrrnew.end - newstart >= remdur:
vmrrnew.end = vmrrnew.start + remdur
vmrrnew.oncomplete = constants.ONCOMPLETE_ENDLEASE
- lease.removeRR(susprr)
+ lease.remove_rr(susprr)
self.removeReservation(susprr)
else:
vmrrnew.end -= diff
@@ -793,7 +797,7 @@
self.updateReservationWithKeyChange(vmrr, vmrrnew)
self.dirty()
self.rm.logger.edebug("New lease descriptor (after slideback):", constants.ST)
- lease.printContents()
+ lease.print_contents()
def prioritizenodes(self, canfit, diskImageID, start, canpreempt, avoidpreempt):
@@ -904,11 +908,11 @@
self.canfit = 0
self.canfitpreempt = 0
else:
- self.canfit = resreq.getNumFitsIn(avail)
+ self.canfit = resreq.get_num_fits_in(avail)
if availpreempt == None:
self.canfitpreempt = 0
else:
- self.canfitpreempt = resreq.getNumFitsIn(availpreempt)
+ self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
def getCanfit(self, canpreempt):
if canpreempt:
@@ -963,10 +967,10 @@
# Decrease in the window
for node in newnodes:
capacity = availatpoint[node].capacity
- fits = self.resreq.getNumFitsIn(capacity)
+ fits = self.resreq.get_num_fits_in(capacity)
if canpreempt:
capacitywithpreemption = availatpoint[node].capacitywithpreemption
- fitswithpreemption = self.resreq.getNumFitsIn(capacitywithpreemption)
+ fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
prevavail = self.avail[node][-1]
if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
Modified: trunk/src/haizea/resourcemanager/stats.py
===================================================================
--- trunk/src/haizea/resourcemanager/stats.py 2008-07-10 08:58:05 UTC (rev 428)
+++ trunk/src/haizea/resourcemanager/stats.py 2008-07-10 15:36:37 UTC (rev 429)
@@ -43,15 +43,15 @@
self.counterLists[counterID] = []
self.counterAvgType[counterID] = avgtype
- def incrCounter(self, counterID, leaseID = None):
+ def incrCounter(self, counterID, lease_id = None):
time = self.rm.clock.get_time()
- self.appendStat(counterID, self.counters[counterID] + 1, leaseID, time)
+ self.appendStat(counterID, self.counters[counterID] + 1, lease_id, time)
- def decrCounter(self, counterID, leaseID = None):
+ def decrCounter(self, counterID, lease_id = None):
time = self.rm.clock.get_time()
- self.appendStat(counterID, self.counters[counterID] - 1, leaseID, time)
+ self.appendStat(counterID, self.counters[counterID] - 1, lease_id, time)
- def appendStat(self, counterID, value, leaseID = None, time = None):
+ def appendStat(self, counterID, value, lease_id = None, time = None):
if time == None:
time = self.rm.clock.get_time()
if len(self.counterLists[counterID]) > 0:
@@ -62,7 +62,7 @@
if time == prevtime:
self.counterLists[counterID][-1][2] = value
else:
- self.counterLists[counterID].append([time, leaseID, value])
+ self.counterLists[counterID].append([time, lease_id, value])
def start(self, time):
self.starttime = time
@@ -132,7 +132,7 @@
stats = []
for v in data:
time = v[0]
- leaseID = v[1]
+ lease_id = v[1]
value = v[2]
if prevTime != None:
timediff = time - prevTime
@@ -141,7 +141,7 @@
avg = accum/time
else:
avg = value
- stats.append((time, leaseID, value, avg))
+ stats.append((time, lease_id, value, avg))
prevTime = time
prevValue = value
@@ -192,7 +192,7 @@
leases.entries = self.rm.scheduler.completedleases.entries
# Remove some data that won't be necessary in the reporting tools
for l in leases.entries.values():
- l.removeRRs()
+ l.clear_rrs()
l.scheduler = None
l.logger = None
pickle(leases, self.datadir, constants.LEASESFILE)
More information about the Haizea-commit
mailing list