[haizea-commit] r554 - in trunk/src/haizea: common resourcemanager resourcemanager/enact resourcemanager/frontends resourcemanager/scheduler resourcemanager/scheduler/preparation_schedulers traces
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Tue Jan 6 05:56:35 CST 2009
Author: borja
Date: 2009-01-06 05:56:27 -0600 (Tue, 06 Jan 2009)
New Revision: 554
Added:
trunk/src/haizea/resourcemanager/leases.py
trunk/src/haizea/resourcemanager/scheduler/
trunk/src/haizea/resourcemanager/scheduler/__init__.py
trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/
trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
trunk/src/haizea/resourcemanager/scheduler/slottable.py
trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
Removed:
trunk/src/haizea/resourcemanager/datastruct.py
trunk/src/haizea/resourcemanager/deployment/
trunk/src/haizea/resourcemanager/resourcepool.py
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Modified:
trunk/src/haizea/common/constants.py
trunk/src/haizea/resourcemanager/accounting.py
trunk/src/haizea/resourcemanager/configfile.py
trunk/src/haizea/resourcemanager/enact/__init__.py
trunk/src/haizea/resourcemanager/enact/opennebula.py
trunk/src/haizea/resourcemanager/enact/simulated.py
trunk/src/haizea/resourcemanager/frontends/opennebula.py
trunk/src/haizea/resourcemanager/frontends/rpc.py
trunk/src/haizea/resourcemanager/frontends/tracefile.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
trunk/src/haizea/traces/readers.py
Log:
Started refactoring scheduling code into more manageable modules + classes. Note that this leaves some parts of the scheduler broken for the time being.
Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/common/constants.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -65,9 +65,9 @@
RUNTIMEOVERHEAD_ALL="all"
RUNTIMEOVERHEAD_BE="besteffort"
-DEPLOYMENT_UNMANAGED = "unmanaged"
-DEPLOYMENT_PREDEPLOY = "predeployed-images"
-DEPLOYMENT_TRANSFER = "imagetransfer"
+PREPARATION_UNMANAGED = "unmanaged"
+PREPARATION_PREDEPLOY = "predeployed-images"
+PREPARATION_TRANSFER = "imagetransfer"
CLOCK_SIMULATED = "simulated"
CLOCK_REAL = "real"
Modified: trunk/src/haizea/resourcemanager/accounting.py
===================================================================
--- trunk/src/haizea/resourcemanager/accounting.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/accounting.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -19,7 +19,6 @@
import os
import os.path
import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
from haizea.common.utils import pickle, get_config, get_clock
from errno import EEXIST
Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/configfile.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -86,9 +86,9 @@
getter = "lease-preparation",
type = OPTTYPE_STRING,
required = False,
- default = constants.DEPLOYMENT_UNMANAGED,
- valid = [constants.DEPLOYMENT_UNMANAGED,
- constants.DEPLOYMENT_TRANSFER],
+ default = constants.PREPARATION_UNMANAGED,
+ valid = [constants.PREPARATION_UNMANAGED,
+ constants.PREPARATION_TRANSFER],
doc = """
Sets how the scheduler will handle the
preparation overhead of leases. Valid values are:
Deleted: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/datastruct.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -1,724 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-"""This module provides the fundamental data structures (besides the slot table,
-which is in a module of its own) used by Haizea. The module provides four types
-of structures:
-
-* Lease data structures
- * Lease: Base class for leases
- * ARLease: Advance reservation lease
- * BestEffortLease: Best-effort lease
- * ImmediateLease: Immediate lease
-* Resource reservation (RR) structures:
- * ResourceReservationBase: Base class for RRs in the slot table
- * VMResourceReservation: RR representing one or more VMs
- * SuspensionResourceReservation: RR representing a lease suspension
- * ResumptionResourceReservation: RR representing a lease resumption
-* Lease containers
- * Queue: Your run-of-the-mill queue
- * LeaseTable: Provides easy access to leases in the system
-* Miscellaneous structures
- * ResourceTuple: A tuple representing a resource usage or requirement
- * Timestamp: A wrapper around requested/scheduled/actual timestamps
- * Duration: A wrapper around requested/accumulated/actual durations
-"""
-
-from haizea.common.constants import RES_MEM, MIGRATE_NONE, MIGRATE_MEM, MIGRATE_MEMDISK, LOGLEVEL_VDEBUG
-from haizea.common.utils import StateMachine, round_datetime_delta, get_lease_id, pretty_nodemap, estimate_transfer_time, xmlrpc_marshall_singlevalue
-
-from operator import attrgetter
-from mx.DateTime import TimeDelta
-from math import floor
-
-import logging
-
-
-#-------------------------------------------------------------------#
-# #
-# LEASE DATA STRUCTURES #
-# #
-#-------------------------------------------------------------------#
-
-
-class Lease(object):
- # Lease states
- STATE_NEW = 0
- STATE_PENDING = 1
- STATE_REJECTED = 2
- STATE_SCHEDULED = 3
- STATE_QUEUED = 4
- STATE_CANCELLED = 5
- STATE_PREPARING = 6
- STATE_READY = 7
- STATE_ACTIVE = 8
- STATE_SUSPENDING = 9
- STATE_SUSPENDED = 10
- STATE_MIGRATING = 11
- STATE_RESUMING = 12
- STATE_RESUMED_READY = 13
- STATE_DONE = 14
- STATE_FAIL = 15
-
- state_str = {STATE_NEW : "New",
- STATE_PENDING : "Pending",
- STATE_REJECTED : "Rejected",
- STATE_SCHEDULED : "Scheduled",
- STATE_QUEUED : "Queued",
- STATE_CANCELLED : "Cancelled",
- STATE_PREPARING : "Preparing",
- STATE_READY : "Ready",
- STATE_ACTIVE : "Active",
- STATE_SUSPENDING : "Suspending",
- STATE_SUSPENDED : "Suspended",
- STATE_MIGRATING : "Migrating",
- STATE_RESUMING : "Resuming",
- STATE_RESUMED_READY: "Resumed-Ready",
- STATE_DONE : "Done",
- STATE_FAIL : "Fail"}
-
- def __init__(self, submit_time, start, duration, diskimage_id,
- diskimage_size, numnodes, requested_resources, preemptible):
- # Lease ID (read only)
- self.id = get_lease_id()
-
- # Request attributes (read only)
- self.submit_time = submit_time
- self.start = start
- self.duration = duration
- self.end = None
- self.diskimage_id = diskimage_id
- self.diskimage_size = diskimage_size
- # TODO: The following assumes homogeneous nodes. Should be modified
- # to account for heterogeneous nodes.
- self.numnodes = numnodes
- self.requested_resources = requested_resources
- self.preemptible = preemptible
-
- # Bookkeeping attributes
- # (keep track of the lease's state, resource reservations, etc.)
- self.state = Lease.STATE_NEW
- self.diskimagemap = {}
- self.memimagemap = {}
- self.deployment_rrs = []
- self.vm_rrs = []
-
- # Enactment information. Should only be manipulated by enactment module
- self.enactment_info = None
- self.vnode_enactment_info = dict([(n+1, None) for n in range(numnodes)])
-
- self.logger = logging.getLogger("LEASES")
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Lease ID : %i" % self.id)
- self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
- self.logger.log(loglevel, "Duration : %s" % self.duration)
- self.logger.log(loglevel, "State : %s" % Lease.state_str[self.state])
- self.logger.log(loglevel, "Disk image : %s" % self.diskimage_id)
- self.logger.log(loglevel, "Disk image size: %s" % self.diskimage_size)
- self.logger.log(loglevel, "Num nodes : %s" % self.numnodes)
- self.logger.log(loglevel, "Resource req : %s" % self.requested_resources)
- self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
- self.logger.log(loglevel, "Mem image map : %s" % pretty_nodemap(self.memimagemap))
-
- def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
- if len(self.deployment_rrs) > 0:
- self.logger.log(loglevel, "DEPLOYMENT RESOURCE RESERVATIONS")
- self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
- for r in self.deployment_rrs:
- r.print_contents(loglevel)
- self.logger.log(loglevel, "##")
- self.logger.log(loglevel, "VM RESOURCE RESERVATIONS")
- self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~")
- for r in self.vm_rrs:
- r.print_contents(loglevel)
- self.logger.log(loglevel, "##")
-
- def get_endtime(self):
- vmrr = self.get_last_vmrr()
- return vmrr.end
-
- def append_vmrr(self, vmrr):
- self.vm_rrs.append(vmrr)
-
- def append_deployrr(self, vmrr):
- self.deployment_rrs.append(vmrr)
-
- def get_last_vmrr(self):
- return self.vm_rrs[-1]
-
- def update_vmrr(self, rrold, rrnew):
- self.vm_rrs[self.vm_rrs.index(rrold)] = rrnew
-
- def remove_vmrr(self, vmrr):
- if not vmrr in self.vm_rrs:
- raise Exception, "Tried to remove an VM RR not contained in this lease"
- else:
- self.vm_rrs.remove(vmrr)
-
- def clear_rrs(self):
- self.deployment_rrs = []
- self.vm_rrs = []
-
- def add_boot_overhead(self, t):
- self.duration.incr(t)
-
- def add_runtime_overhead(self, percent):
- self.duration.incr_by_percent(percent)
-
- def xmlrpc_marshall(self):
- # Convert to something we can send through XMLRPC
- l = {}
- l["id"] = self.id
- l["submit_time"] = xmlrpc_marshall_singlevalue(self.submit_time)
- l["start_req"] = xmlrpc_marshall_singlevalue(self.start.requested)
- l["start_sched"] = xmlrpc_marshall_singlevalue(self.start.scheduled)
- l["start_actual"] = xmlrpc_marshall_singlevalue(self.start.actual)
- l["duration_req"] = xmlrpc_marshall_singlevalue(self.duration.requested)
- l["duration_acc"] = xmlrpc_marshall_singlevalue(self.duration.accumulated)
- l["duration_actual"] = xmlrpc_marshall_singlevalue(self.duration.actual)
- l["end"] = xmlrpc_marshall_singlevalue(self.end)
- l["diskimage_id"] = self.diskimage_id
- l["diskimage_size"] = self.diskimage_size
- l["numnodes"] = self.numnodes
- l["resources"] = `self.requested_resources`
- l["preemptible"] = self.preemptible
- l["state"] = self.state
- l["vm_rrs"] = [vmrr.xmlrpc_marshall() for vmrr in self.vm_rrs]
-
- return l
-
-class LeaseStateMachine(StateMachine):
- initial_state = Lease.STATE_NEW
- transitions = {Lease.STATE_NEW: [(Lease.STATE_PENDING, "")],
- Lease.STATE_PENDING: [(Lease.STATE_SCHEDULED, ""),
- (Lease.STATE_QUEUED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_REJECTED, "")],
- Lease.STATE_SCHEDULED: [(Lease.STATE_PREPARING, ""),
- (Lease.STATE_READY, ""),
- (Lease.STATE_MIGRATING, ""),
- (Lease.STATE_RESUMING, ""),
- (Lease.STATE_CANCELLED, "")],
- Lease.STATE_QUEUED: [(Lease.STATE_SCHEDULED, ""),
- (Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, "")],
- Lease.STATE_PREPARING: [(Lease.STATE_READY, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_READY: [(Lease.STATE_ACTIVE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_ACTIVE: [(Lease.STATE_SUSPENDING, ""),
- (Lease.STATE_DONE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_SUSPENDING: [(Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_SUSPENDED: [(Lease.STATE_QUEUED, ""),
- (Lease.STATE_MIGRATING, ""),
- (Lease.STATE_RESUMING, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_MIGRATING: [(Lease.STATE_SUSPENDED, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_RESUMING: [(Lease.STATE_RESUMED_READY, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
- Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE, ""),
- (Lease.STATE_CANCELLED, ""),
- (Lease.STATE_FAIL, "")],
-
- # Final states
- Lease.STATE_DONE: [],
- Lease.STATE_CANCELLED: [],
- Lease.STATE_FAIL: [],
- Lease.STATE_REJECTED: [],
- }
-
- def __init__(self):
- StateMachine.__init__(initial_state, transitions)
-
-class ARLease(Lease):
- def __init__(self, submit_time, start, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible,
- # AR-specific parameters:
- realdur = None):
- start = Timestamp(start)
- duration = Duration(duration)
- if realdur != duration.requested:
- duration.known = realdur # ONLY for simulation
- Lease.__init__(self, submit_time, start, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible)
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "__________________________________________________")
- Lease.print_contents(self, loglevel)
- self.logger.log(loglevel, "Type : AR")
- self.logger.log(loglevel, "Start time : %s" % self.start)
- self.print_rrs(loglevel)
- self.logger.log(loglevel, "--------------------------------------------------")
-
- def xmlrpc_marshall(self):
- l = Lease.xmlrpc_marshall(self)
- l["type"] = "AR"
- return l
-
-
-class BestEffortLease(Lease):
- def __init__(self, submit_time, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible,
- # BE-specific parameters:
- realdur = None):
- start = Timestamp(None) # i.e., start on a best-effort basis
- duration = Duration(duration)
- if realdur != duration.requested:
- duration.known = realdur # ONLY for simulation
- # When the images will be available
- self.imagesavail = None
- Lease.__init__(self, submit_time, start, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible)
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "__________________________________________________")
- Lease.print_contents(self, loglevel)
- self.logger.log(loglevel, "Type : BEST-EFFORT")
- self.logger.log(loglevel, "Images Avail @ : %s" % self.imagesavail)
- self.print_rrs(loglevel)
- self.logger.log(loglevel, "--------------------------------------------------")
-
- def get_waiting_time(self):
- return self.start.actual - self.submit_time
-
- def get_slowdown(self, bound=10):
- time_on_dedicated = self.duration.original
- time_on_loaded = self.end - self.submit_time
- bound = TimeDelta(seconds=bound)
- if time_on_dedicated < bound:
- time_on_dedicated = bound
- return time_on_loaded / time_on_dedicated
-
- def xmlrpc_marshall(self):
- l = Lease.xmlrpc_marshall(self)
- l["type"] = "BE"
- return l
-
-
-class ImmediateLease(Lease):
- def __init__(self, submit_time, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible,
- # Immediate-specific parameters:
- realdur = None):
- start = Timestamp(None) # i.e., start on a best-effort basis
- duration = Duration(duration)
- duration.known = realdur # ONLY for simulation
- Lease.__init__(self, submit_time, start, duration, diskimage_id,
- diskimage_size, numnodes, resreq, preemptible)
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "__________________________________________________")
- Lease.print_contents(self, loglevel)
- self.logger.log(loglevel, "Type : IMMEDIATE")
- self.print_rrs(loglevel)
- self.logger.log(loglevel, "--------------------------------------------------")
-
- def xmlrpc_marshall(self):
- l = Lease.xmlrpc_marshall(self)
- l["type"] = "IM"
- return l
-
-
-#-------------------------------------------------------------------#
-# #
-# RESOURCE RESERVATION #
-# DATA STRUCTURES #
-# #
-#-------------------------------------------------------------------#
-
-
-class ResourceReservation(object):
-
- # Resource reservation states
- STATE_SCHEDULED = 0
- STATE_ACTIVE = 1
- STATE_DONE = 2
-
- state_str = {STATE_SCHEDULED : "Scheduled",
- STATE_ACTIVE : "Active",
- STATE_DONE : "Done"}
-
- def __init__(self, lease, start, end, res):
- self.lease = lease
- self.start = start
- self.end = end
- self.state = None
- self.resources_in_pnode = res
- self.logger = logging.getLogger("LEASES")
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Start : %s" % self.start)
- self.logger.log(loglevel, "End : %s" % self.end)
- self.logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state])
- self.logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
-
- def xmlrpc_marshall(self):
- # Convert to something we can send through XMLRPC
- rr = {}
- rr["start"] = xmlrpc_marshall_singlevalue(self.start)
- rr["end"] = xmlrpc_marshall_singlevalue(self.end)
- rr["state"] = self.state
- return rr
-
-class VMResourceReservation(ResourceReservation):
- def __init__(self, lease, start, end, nodes, res, backfill_reservation):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.nodes = nodes # { vnode -> pnode }
- self.backfill_reservation = backfill_reservation
- self.pre_rrs = []
- self.post_rrs = []
-
- # ONLY for simulation
- self.__update_prematureend()
-
- def update_start(self, time):
- self.start = time
- # ONLY for simulation
- self.__update_prematureend()
-
- def update_end(self, time):
- self.end = time
- # ONLY for simulation
- self.__update_prematureend()
-
- # ONLY for simulation
- def __update_prematureend(self):
- if self.lease.duration.known != None:
- remdur = self.lease.duration.get_remaining_known_duration()
- rrdur = self.end - self.start
- if remdur < rrdur:
- self.prematureend = self.start + remdur
- else:
- self.prematureend = None
- else:
- self.prematureend = None
-
- def get_final_end(self):
- if len(self.post_rrs) == 0:
- return self.end
- else:
- return self.post_rrs[-1].end
-
- def is_suspending(self):
- return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
-
- def is_shutting_down(self):
- return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- for resmrr in self.pre_rrs:
- resmrr.print_contents(loglevel)
- self.logger.log(loglevel, "--")
- self.logger.log(loglevel, "Type : VM")
- self.logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes))
- if self.prematureend != None:
- self.logger.log(loglevel, "Premature end : %s" % self.prematureend)
- ResourceReservation.print_contents(self, loglevel)
- for susprr in self.post_rrs:
- self.logger.log(loglevel, "--")
- susprr.print_contents(loglevel)
-
- def is_preemptible(self):
- return self.lease.preemptible
-
- def xmlrpc_marshall(self):
- rr = ResourceReservation.xmlrpc_marshall(self)
- rr["type"] = "VM"
- rr["nodes"] = self.nodes.items()
- return rr
-
-
-class SuspensionResourceReservation(ResourceReservation):
- def __init__(self, lease, start, end, res, vnodes, vmrr):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.vmrr = vmrr
- self.vnodes = vnodes
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Type : SUSPEND")
- self.logger.log(loglevel, "Vnodes : %s" % self.vnodes)
- ResourceReservation.print_contents(self, loglevel)
-
- def is_first(self):
- return (self == self.vmrr.post_rrs[0])
-
- def is_last(self):
- return (self == self.vmrr.post_rrs[-1])
-
- # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
- # has wider implications (with a non-trivial handling). For now, we leave them
- # as non-preemptible, since the probability of preempting a suspension RR is slim.
- def is_preemptible(self):
- return False
-
- def xmlrpc_marshall(self):
- rr = ResourceReservation.xmlrpc_marshall(self)
- rr["type"] = "SUSP"
- return rr
-
-class ResumptionResourceReservation(ResourceReservation):
- def __init__(self, lease, start, end, res, vnodes, vmrr):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.vmrr = vmrr
- self.vnodes = vnodes
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Type : RESUME")
- self.logger.log(loglevel, "Vnodes : %s" % self.vnodes)
- ResourceReservation.print_contents(self, loglevel)
-
- def is_first(self):
- resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
- return (self == resm_rrs[0])
-
- def is_last(self):
- resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
- return (self == resm_rrs[-1])
-
- # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
- # has wider implications (with a non-trivial handling). For now, we leave them
- # as non-preemptible, since the probability of preempting a resumption RR is slim.
- def is_preemptible(self):
- return False
-
- def xmlrpc_marshall(self):
- rr = ResourceReservation.xmlrpc_marshall(self)
- rr["type"] = "RESM"
- return rr
-
-class ShutdownResourceReservation(ResourceReservation):
- def __init__(self, lease, start, end, res, vnodes, vmrr):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.vmrr = vmrr
- self.vnodes = vnodes
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Type : SHUTDOWN")
- ResourceReservation.print_contents(self, loglevel)
-
- def is_preemptible(self):
- return True
-
- def xmlrpc_marshall(self):
- rr = ResourceReservation.xmlrpc_marshall(self)
- rr["type"] = "SHTD"
- return rr
-
-class MigrationResourceReservation(ResourceReservation):
- def __init__(self, lease, start, end, res, vmrr, transfers):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.vmrr = vmrr
- self.transfers = transfers
-
- def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Type : MIGRATE")
- self.logger.log(loglevel, "Transfers : %s" % self.transfers)
- ResourceReservation.print_contents(self, loglevel)
-
- def is_preemptible(self):
- return False
-
-#-------------------------------------------------------------------#
-# #
-# LEASE CONTAINERS #
-# #
-#-------------------------------------------------------------------#
-
-class Queue(object):
- def __init__(self, scheduler):
- self.scheduler = scheduler
- self.__q = []
-
- def is_empty(self):
- return len(self.__q)==0
-
- def enqueue(self, r):
- self.__q.append(r)
-
- def dequeue(self):
- return self.__q.pop(0)
-
- def enqueue_in_order(self, r):
- self.__q.append(r)
- self.__q.sort(key=attrgetter("submit_time"))
-
- def length(self):
- return len(self.__q)
-
- def has_lease(self, lease_id):
- return (1 == len([l for l in self.__q if l.id == lease_id]))
-
- def get_lease(self, lease_id):
- return [l for l in self.__q if l.id == lease_id][0]
-
- def remove_lease(self, lease):
- self.__q.remove(lease)
-
- def __iter__(self):
- return iter(self.__q)
-
-class LeaseTable(object):
- def __init__(self, scheduler):
- self.scheduler = scheduler
- self.entries = {}
-
- def has_lease(self, lease_id):
- return self.entries.has_key(lease_id)
-
- def get_lease(self, lease_id):
- return self.entries[lease_id]
-
- def is_empty(self):
- return len(self.entries)==0
-
- def remove(self, lease):
- del self.entries[lease.id]
-
- def add(self, lease):
- self.entries[lease.id] = lease
-
- def get_leases(self, type=None):
- if type==None:
- return self.entries.values()
- else:
- return [e for e in self.entries.values() if isinstance(e, type)]
-
- def get_leases_by_state(self, state):
- return [e for e in self.entries.values() if e.state == state]
-
-#-------------------------------------------------------------------#
-# #
-# MISCELLANEOUS DATA STRUCTURES CONTAINERS #
-# #
-#-------------------------------------------------------------------#
-
-class ResourceTuple(object):
- def __init__(self, res):
- self._res = res
-
- @classmethod
- def from_list(cls, l):
- return cls(l[:])
-
- @classmethod
- def copy(cls, rt):
- return cls(rt._res[:])
-
- @classmethod
- def set_resource_types(cls, resourcetypes):
- cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
- cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
- cls.tuplelength = len(resourcetypes)
-
- @classmethod
- def create_empty(cls):
- return cls([0 for x in range(cls.tuplelength)])
-
- def fits_in(self, res2):
- fits = True
- for i in xrange(len(self._res)):
- if self._res[i] > res2._res[i]:
- fits = False
- break
- return fits
-
- def get_num_fits_in(self, res2):
- canfit = 10000 # Arbitrarily large
- for i in xrange(len(self._res)):
- if self._res[i] != 0:
- f = res2._res[i] / self._res[i]
- if f < canfit:
- canfit = f
- return int(floor(canfit))
-
- def decr(self, res2):
- for slottype in xrange(len(self._res)):
- self._res[slottype] -= res2._res[slottype]
-
- def incr(self, res2):
- for slottype in xrange(len(self._res)):
- self._res[slottype] += res2._res[slottype]
-
- def get_by_type(self, resourcetype):
- return self._res[self.type2pos[resourcetype]]
-
- def set_by_type(self, resourcetype, value):
- self._res[self.type2pos[resourcetype]] = value
-
- def is_zero_or_less(self):
- return sum([v for v in self._res]) <= 0
-
- def __repr__(self):
- r=""
- for i, x in enumerate(self._res):
- r += "%s:%.2f " % (self.descriptions[i], x)
- return r
-
-class Timestamp(object):
- def __init__(self, requested):
- self.requested = requested
- self.scheduled = None
- self.actual = None
-
- def __repr__(self):
- return "REQ: %s | SCH: %s | ACT: %s" % (self.requested, self.scheduled, self.actual)
-
-class Duration(object):
- def __init__(self, requested, known=None):
- self.original = requested
- self.requested = requested
- self.accumulated = TimeDelta()
- self.actual = None
- # The following is ONLY used in simulation
- self.known = known
-
- def incr(self, t):
- self.requested += t
- if self.known != None:
- self.known += t
-
- def incr_by_percent(self, pct):
- factor = 1 + float(pct)/100
- self.requested = round_datetime_delta(self.requested * factor)
- if self.known != None:
- self.requested = round_datetime_delta(self.known * factor)
-
- def accumulate_duration(self, t):
- self.accumulated += t
-
- def get_remaining_duration(self):
- return self.requested - self.accumulated
-
- # ONLY in simulation
- def get_remaining_known_duration(self):
- return self.known - self.accumulated
-
- def __repr__(self):
- return "REQ: %s | ACC: %s | ACT: %s | KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
-
Modified: trunk/src/haizea/resourcemanager/enact/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/__init__.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/__init__.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -17,14 +17,14 @@
# -------------------------------------------------------------------------- #
from haizea.common.utils import abstract
-import haizea.resourcemanager.datastruct as ds
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
class ResourcePoolInfo(object):
def __init__(self):
# Initialize the resource types in the ResourceTuple class
# TODO: Do this in a less kludgy way
resourcetypes = self.get_resource_types()
- ds.ResourceTuple.set_resource_types(resourcetypes)
+ ResourceTuple.set_resource_types(resourcetypes)
def get_nodes(self):
Modified: trunk/src/haizea/resourcemanager/enact/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/opennebula.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -16,11 +16,10 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.resourcepool import Node
+from haizea.resourcemanager.scheduler.resourcepool import Node
from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
from haizea.common.utils import get_config
import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
from pysqlite2 import dbapi2 as sqlite
import logging
import commands
Modified: trunk/src/haizea/resourcemanager/enact/simulated.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/simulated.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -16,11 +16,11 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.resourcepool import Node
+from haizea.resourcemanager.scheduler.resourcepool import Node
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
import haizea.common.constants as constants
from haizea.common.utils import get_config
-import haizea.resourcemanager.datastruct as ds
import logging
class SimulatedResourcePoolInfo(ResourcePoolInfo):
@@ -50,7 +50,7 @@
def parse_resources_string(self, resources):
resources = resources.split(";")
desc2type = dict([(x[2], x[0]) for x in self.get_resource_types()])
- capacity=ds.ResourceTuple.create_empty()
+ capacity = ResourceTuple.create_empty()
for r in resources:
resourcename = r.split(",")[0]
resourcecapacity = r.split(",")[1]
@@ -107,7 +107,7 @@
# Image repository nodes
numnodes = config.get("simul.nodes")
- imgcapacity = ds.ResourceTuple.create_empty()
+ imgcapacity = ResourceTuple.create_empty()
imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
self.fifo_node = Node(numnodes+1, "FIFOnode", imgcapacity)
Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -18,7 +18,8 @@
import haizea.common.constants as constants
from haizea.resourcemanager.frontends import RequestFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
from haizea.common.utils import UNIX2DateTime, round_datetime, get_config, get_clock
from pysqlite2 import dbapi2 as sqlite
Modified: trunk/src/haizea/resourcemanager/frontends/rpc.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/rpc.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/rpc.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -16,7 +16,8 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
from haizea.resourcemanager.frontends import RequestFrontend
from haizea.common.utils import round_datetime, get_config, get_clock
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
Modified: trunk/src/haizea/resourcemanager/frontends/tracefile.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/tracefile.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/tracefile.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -20,7 +20,7 @@
from haizea.common.utils import get_clock
from haizea.resourcemanager.frontends import RequestFrontend
import haizea.traces.readers as tracereaders
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease
+from haizea.resourcemanager.leases import ARLease, BestEffortLease
import operator
import logging
Copied: trunk/src/haizea/resourcemanager/leases.py (from rev 553, trunk/src/haizea/resourcemanager/datastruct.py)
===================================================================
--- trunk/src/haizea/resourcemanager/leases.py (rev 0)
+++ trunk/src/haizea/resourcemanager/leases.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,378 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+"""This module provides the lease data structures, and a couple of auxiliary
+data structures.
+
+* Lease data structures
+ * Lease: Base class for leases
+ * ARLease: Advance reservation lease
+ * BestEffortLease: Best-effort lease
+ * ImmediateLease: Immediate lease
+* Miscellaneous structures
+ * Timestamp: A wrapper around requested/scheduled/actual timestamps
+ * Duration: A wrapper around requested/accumulated/actual durations
+"""
+
+from haizea.common.constants import RES_MEM, MIGRATE_NONE, MIGRATE_MEM, MIGRATE_MEMDISK, LOGLEVEL_VDEBUG
+from haizea.common.utils import StateMachine, round_datetime_delta, get_lease_id, pretty_nodemap, estimate_transfer_time, xmlrpc_marshall_singlevalue
+
+from operator import attrgetter
+from mx.DateTime import TimeDelta
+from math import floor
+
+import logging
+
+
+#-------------------------------------------------------------------#
+# #
+# LEASE DATA STRUCTURES #
+# #
+#-------------------------------------------------------------------#
+
+
+class Lease(object):
+ # Lease states
+ STATE_NEW = 0
+ STATE_PENDING = 1
+ STATE_REJECTED = 2
+ STATE_SCHEDULED = 3
+ STATE_QUEUED = 4
+ STATE_CANCELLED = 5
+ STATE_PREPARING = 6
+ STATE_READY = 7
+ STATE_ACTIVE = 8
+ STATE_SUSPENDING = 9
+ STATE_SUSPENDED = 10
+ STATE_MIGRATING = 11
+ STATE_RESUMING = 12
+ STATE_RESUMED_READY = 13
+ STATE_DONE = 14
+ STATE_FAIL = 15
+
+ state_str = {STATE_NEW : "New",
+ STATE_PENDING : "Pending",
+ STATE_REJECTED : "Rejected",
+ STATE_SCHEDULED : "Scheduled",
+ STATE_QUEUED : "Queued",
+ STATE_CANCELLED : "Cancelled",
+ STATE_PREPARING : "Preparing",
+ STATE_READY : "Ready",
+ STATE_ACTIVE : "Active",
+ STATE_SUSPENDING : "Suspending",
+ STATE_SUSPENDED : "Suspended",
+ STATE_MIGRATING : "Migrating",
+ STATE_RESUMING : "Resuming",
+ STATE_RESUMED_READY: "Resumed-Ready",
+ STATE_DONE : "Done",
+ STATE_FAIL : "Fail"}
+
+ def __init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, requested_resources, preemptible):
+ # Lease ID (read only)
+ self.id = get_lease_id()
+
+ # Request attributes (read only)
+ self.submit_time = submit_time
+ self.start = start
+ self.duration = duration
+ self.end = None
+ self.diskimage_id = diskimage_id
+ self.diskimage_size = diskimage_size
+ # TODO: The following assumes homogeneous nodes. Should be modified
+ # to account for heterogeneous nodes.
+ self.numnodes = numnodes
+ self.requested_resources = requested_resources
+ self.preemptible = preemptible
+
+ # Bookkeeping attributes
+ # (keep track of the lease's state, resource reservations, etc.)
+ self.state = Lease.STATE_NEW
+ self.diskimagemap = {}
+ self.memimagemap = {}
+ self.deployment_rrs = []
+ self.vm_rrs = []
+
+ # Enactment information. Should only be manipulated by enactment module
+ self.enactment_info = None
+ self.vnode_enactment_info = dict([(n+1, None) for n in range(numnodes)])
+
+ self.logger = logging.getLogger("LEASES")
+
+ def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Lease ID : %i" % self.id)
+ self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
+ self.logger.log(loglevel, "Duration : %s" % self.duration)
+ self.logger.log(loglevel, "State : %s" % Lease.state_str[self.state])
+ self.logger.log(loglevel, "Disk image : %s" % self.diskimage_id)
+ self.logger.log(loglevel, "Disk image size: %s" % self.diskimage_size)
+ self.logger.log(loglevel, "Num nodes : %s" % self.numnodes)
+ self.logger.log(loglevel, "Resource req : %s" % self.requested_resources)
+ self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
+ self.logger.log(loglevel, "Mem image map : %s" % pretty_nodemap(self.memimagemap))
+
+ def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
+ if len(self.deployment_rrs) > 0:
+ self.logger.log(loglevel, "DEPLOYMENT RESOURCE RESERVATIONS")
+ self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
+ for r in self.deployment_rrs:
+ r.print_contents(loglevel)
+ self.logger.log(loglevel, "##")
+ self.logger.log(loglevel, "VM RESOURCE RESERVATIONS")
+ self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~")
+ for r in self.vm_rrs:
+ r.print_contents(loglevel)
+ self.logger.log(loglevel, "##")
+
+ def get_endtime(self):
+ vmrr = self.get_last_vmrr()
+ return vmrr.end
+
+ def append_vmrr(self, vmrr):
+ self.vm_rrs.append(vmrr)
+
+ def append_deployrr(self, vmrr):
+ self.deployment_rrs.append(vmrr)
+
+ def get_last_vmrr(self):
+ return self.vm_rrs[-1]
+
+ def update_vmrr(self, rrold, rrnew):
+ self.vm_rrs[self.vm_rrs.index(rrold)] = rrnew
+
+ def remove_vmrr(self, vmrr):
+ if not vmrr in self.vm_rrs:
+ raise Exception, "Tried to remove an VM RR not contained in this lease"
+ else:
+ self.vm_rrs.remove(vmrr)
+
+ def clear_rrs(self):
+ self.deployment_rrs = []
+ self.vm_rrs = []
+
+ def add_boot_overhead(self, t):
+ self.duration.incr(t)
+
+ def add_runtime_overhead(self, percent):
+ self.duration.incr_by_percent(percent)
+
+ def xmlrpc_marshall(self):
+ # Convert to something we can send through XMLRPC
+ l = {}
+ l["id"] = self.id
+ l["submit_time"] = xmlrpc_marshall_singlevalue(self.submit_time)
+ l["start_req"] = xmlrpc_marshall_singlevalue(self.start.requested)
+ l["start_sched"] = xmlrpc_marshall_singlevalue(self.start.scheduled)
+ l["start_actual"] = xmlrpc_marshall_singlevalue(self.start.actual)
+ l["duration_req"] = xmlrpc_marshall_singlevalue(self.duration.requested)
+ l["duration_acc"] = xmlrpc_marshall_singlevalue(self.duration.accumulated)
+ l["duration_actual"] = xmlrpc_marshall_singlevalue(self.duration.actual)
+ l["end"] = xmlrpc_marshall_singlevalue(self.end)
+ l["diskimage_id"] = self.diskimage_id
+ l["diskimage_size"] = self.diskimage_size
+ l["numnodes"] = self.numnodes
+ l["resources"] = `self.requested_resources`
+ l["preemptible"] = self.preemptible
+ l["state"] = self.state
+ l["vm_rrs"] = [vmrr.xmlrpc_marshall() for vmrr in self.vm_rrs]
+
+ return l
+
+class LeaseStateMachine(StateMachine):
+ initial_state = Lease.STATE_NEW
+ transitions = {Lease.STATE_NEW: [(Lease.STATE_PENDING, "")],
+ Lease.STATE_PENDING: [(Lease.STATE_SCHEDULED, ""),
+ (Lease.STATE_QUEUED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_REJECTED, "")],
+ Lease.STATE_SCHEDULED: [(Lease.STATE_PREPARING, ""),
+ (Lease.STATE_READY, ""),
+ (Lease.STATE_MIGRATING, ""),
+ (Lease.STATE_RESUMING, ""),
+ (Lease.STATE_CANCELLED, "")],
+ Lease.STATE_QUEUED: [(Lease.STATE_SCHEDULED, ""),
+ (Lease.STATE_SUSPENDED, ""),
+ (Lease.STATE_CANCELLED, "")],
+ Lease.STATE_PREPARING: [(Lease.STATE_READY, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_READY: [(Lease.STATE_ACTIVE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_ACTIVE: [(Lease.STATE_SUSPENDING, ""),
+ (Lease.STATE_DONE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_SUSPENDING: [(Lease.STATE_SUSPENDED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_SUSPENDED: [(Lease.STATE_QUEUED, ""),
+ (Lease.STATE_MIGRATING, ""),
+ (Lease.STATE_RESUMING, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_MIGRATING: [(Lease.STATE_SUSPENDED, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_RESUMING: [(Lease.STATE_RESUMED_READY, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+ Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE, ""),
+ (Lease.STATE_CANCELLED, ""),
+ (Lease.STATE_FAIL, "")],
+
+ # Final states
+ Lease.STATE_DONE: [],
+ Lease.STATE_CANCELLED: [],
+ Lease.STATE_FAIL: [],
+ Lease.STATE_REJECTED: [],
+ }
+
+ def __init__(self):
+ StateMachine.__init__(initial_state, transitions)
+
+class ARLease(Lease):
+ def __init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # AR-specific parameters:
+ realdur = None):
+ start = Timestamp(start)
+ duration = Duration(duration)
+ if realdur != duration.requested:
+ duration.known = realdur # ONLY for simulation
+ Lease.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
+
+ def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "__________________________________________________")
+ Lease.print_contents(self, loglevel)
+ self.logger.log(loglevel, "Type : AR")
+ self.logger.log(loglevel, "Start time : %s" % self.start)
+ self.print_rrs(loglevel)
+ self.logger.log(loglevel, "--------------------------------------------------")
+
+ def xmlrpc_marshall(self):
+ l = Lease.xmlrpc_marshall(self)
+ l["type"] = "AR"
+ return l
+
+
+class BestEffortLease(Lease):
+ def __init__(self, submit_time, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # BE-specific parameters:
+ realdur = None):
+ start = Timestamp(None) # i.e., start on a best-effort basis
+ duration = Duration(duration)
+ if realdur != duration.requested:
+ duration.known = realdur # ONLY for simulation
+ # When the images will be available
+ self.imagesavail = None
+ Lease.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
+
+ def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "__________________________________________________")
+ Lease.print_contents(self, loglevel)
+ self.logger.log(loglevel, "Type : BEST-EFFORT")
+ self.logger.log(loglevel, "Images Avail @ : %s" % self.imagesavail)
+ self.print_rrs(loglevel)
+ self.logger.log(loglevel, "--------------------------------------------------")
+
+ def get_waiting_time(self):
+ return self.start.actual - self.submit_time
+
+ def get_slowdown(self, bound=10):
+ time_on_dedicated = self.duration.original
+ time_on_loaded = self.end - self.submit_time
+ bound = TimeDelta(seconds=bound)
+ if time_on_dedicated < bound:
+ time_on_dedicated = bound
+ return time_on_loaded / time_on_dedicated
+
+ def xmlrpc_marshall(self):
+ l = Lease.xmlrpc_marshall(self)
+ l["type"] = "BE"
+ return l
+
+
+class ImmediateLease(Lease):
+ def __init__(self, submit_time, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible,
+ # Immediate-specific parameters:
+ realdur = None):
+ start = Timestamp(None) # i.e., start on a best-effort basis
+ duration = Duration(duration)
+ duration.known = realdur # ONLY for simulation
+ Lease.__init__(self, submit_time, start, duration, diskimage_id,
+ diskimage_size, numnodes, resreq, preemptible)
+
+ def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "__________________________________________________")
+ Lease.print_contents(self, loglevel)
+ self.logger.log(loglevel, "Type : IMMEDIATE")
+ self.print_rrs(loglevel)
+ self.logger.log(loglevel, "--------------------------------------------------")
+
+ def xmlrpc_marshall(self):
+ l = Lease.xmlrpc_marshall(self)
+ l["type"] = "IM"
+ return l
+
+class Timestamp(object):
+ def __init__(self, requested):
+ self.requested = requested
+ self.scheduled = None
+ self.actual = None
+
+ def __repr__(self):
+ return "REQ: %s | SCH: %s | ACT: %s" % (self.requested, self.scheduled, self.actual)
+
+class Duration(object):
+ def __init__(self, requested, known=None):
+ self.original = requested
+ self.requested = requested
+ self.accumulated = TimeDelta()
+ self.actual = None
+ # The following is ONLY used in simulation
+ self.known = known
+
+ def incr(self, t):
+ self.requested += t
+ if self.known != None:
+ self.known += t
+
+ def incr_by_percent(self, pct):
+ factor = 1 + float(pct)/100
+ self.requested = round_datetime_delta(self.requested * factor)
+ if self.known != None:
+ self.requested = round_datetime_delta(self.known * factor)
+
+ def accumulate_duration(self, t):
+ self.accumulated += t
+
+ def get_remaining_duration(self):
+ return self.requested - self.accumulated
+
+ # ONLY in simulation
+ def get_remaining_known_duration(self):
+ return self.known - self.accumulated
+
+ def __repr__(self):
+ return "REQ: %s | ACC: %s | ACT: %s | KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
+
Property changes on: trunk/src/haizea/resourcemanager/leases.py
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Deleted: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/resourcepool.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -1,425 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.common.utils import vnodemapstr, get_accounting
-import haizea.common.constants as constants
-import haizea.resourcemanager.enact.actions as actions
-import logging
-
-class FailedEnactmentException(Exception):
- pass
-
-class ResourcePool(object):
- def __init__(self, info_enact, vm_enact, deploy_enact):
- self.logger = logging.getLogger("RPOOL")
-
- self.info = info_enact
- self.vm = vm_enact
- # TODO: Ideally, deployment enactment shouldn't be here, specially since
- # it already "hangs" below the deployment modules. For now,
- # it does no harm, though.
- self.deployment = deploy_enact
-
- self.nodes = self.info.get_nodes()
-
- def start_vms(self, lease, rr):
- start_action = actions.VMEnactmentStartAction()
- start_action.from_rr(rr)
-
- for (vnode, pnode) in rr.nodes.items():
- node = self.get_node(pnode)
- diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
- start_action.vnodes[vnode].pnode = node.enactment_info
- start_action.vnodes[vnode].diskimage = diskimage.filename
- start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
-
- try:
- self.vm.start(start_action)
- except Exception, msg:
- self.logger.error("Enactment of start VM failed: %s" % msg)
- raise FailedEnactmentException()
-
- def stop_vms(self, lease, rr):
- stop_action = actions.VMEnactmentStopAction()
- stop_action.from_rr(rr)
- try:
- self.vm.stop(stop_action)
- except Exception, msg:
- self.logger.error("Enactment of end VM failed: %s" % msg)
- raise FailedEnactmentException()
-
- def suspend_vms(self, lease, rr):
- # Add memory image files
- for vnode in rr.vnodes:
- pnode = rr.vmrr.nodes[vnode]
- self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
-
- # Enact suspend
- suspend_action = actions.VMEnactmentSuspendAction()
- suspend_action.from_rr(rr)
- try:
- self.vm.suspend(suspend_action)
- except Exception, msg:
- self.logger.error("Enactment of suspend VM failed: %s" % msg)
- raise FailedEnactmentException()
-
- def verify_suspend(self, lease, rr):
- verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
- verify_suspend_action.from_rr(rr)
- self.vm.verify_suspend(verify_suspend_action)
-
- def resume_vms(self, lease, rr):
- # Remove memory image files
- for vnode in rr.vnodes:
- pnode = rr.vmrr.nodes[vnode]
- self.remove_ramfile(pnode, lease.id, vnode)
-
- # Enact resume
- resume_action = actions.VMEnactmentResumeAction()
- resume_action.from_rr(rr)
- try:
- self.vm.resume(resume_action)
- except Exception, msg:
- self.logger.error("Enactment of resume VM failed: %s" % msg)
- raise FailedEnactmentException()
-
- def verify_resume(self, lease, rr):
- verify_resume_action = actions.VMEnactmentConfirmResumeAction()
- verify_resume_action.from_rr(rr)
- self.vm.verify_resume(verify_resume_action)
-
- def get_nodes(self):
- return self.nodes
-
- # An auxiliary node is a host whose resources are going to be scheduled, but
- # where no VMs are actually going to run. For example, a disk image repository node.
- def get_aux_nodes(self):
- # TODO: We're only asking the deployment enactment module for auxiliary nodes.
- # There might be a scenario where the info enactment module also reports
- # auxiliary nodes.
- return self.deployment.get_aux_nodes()
-
- def get_num_nodes(self):
- return len(self.nodes)
-
- def get_node(self, nod_id):
- return self.nodes[nod_id-1]
-
- def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
- self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
-
- self.logger.vdebug("Files BEFORE:")
- self.get_node(pnode).print_files()
-
- imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
- img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
- self.get_node(pnode).add_file(img)
-
- self.logger.vdebug("Files AFTER:")
- self.get_node(pnode).print_files()
-
- get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
- return img
-
- def remove_diskimage(self, pnode, lease, vnode):
- node = self.get_node(pnode)
- node.print_files()
-
- self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
- node.remove_diskimage(lease, vnode)
-
- node.print_files()
-
- get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-
- def add_ramfile(self, pnode, lease_id, vnode, size):
- node = self.get_node(pnode)
- self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
- node.print_files()
- f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
- node.add_file(f)
- node.print_files()
- get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-
- def remove_ramfile(self, pnode, lease_id, vnode):
- node = self.get_node(pnode)
- self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
- node.print_files()
- node.remove_ramfile(lease_id, vnode)
- node.print_files()
- get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-
- def get_max_disk_usage(self):
- return max([n.get_disk_usage() for n in self.nodes])
-
-class Node(object):
- def __init__(self, nod_id, hostname, capacity):
- self.logger = logging.getLogger("RESOURCEPOOL")
- self.nod_id = nod_id
- self.hostname = hostname
- self.capacity = capacity
- self.files = []
-
- # enactment-specific information
- self.enactment_info = None
-
- def get_capacity(self):
- return self.capacity
-
- def add_file(self, f):
- self.files.append(f)
-
- def get_diskimage(self, lease_id, vnode, diskimage_id):
- image = [f for f in self.files if isinstance(f, DiskImageFile) and
- f.diskimage_id == diskimage_id and
- f.lease_id == lease_id and
- f.vnode == vnode]
- if len(image) == 0:
- return None
- elif len(image) == 1:
- return image[0]
- elif len(image) > 1:
- self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
- return image[0]
-
- def remove_diskimage(self, lease_id, vnode):
- image = [f for f in self.files if isinstance(f, DiskImageFile) and
- f.lease_id == lease_id and
- f.vnode == vnode]
- if len(image) > 0:
- image = image[0]
- self.files.remove(image)
-
- def remove_ramfile(self, lease_id, vnode):
- ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
- if len(ramfile) > 0:
- ramfile = ramfile[0]
- self.files.remove(ramfile)
-
-
- def get_disk_usage(self):
- return sum([f.filesize for f in self.files])
-
-
- def get_diskimages(self):
- return [f for f in self.files if isinstance(f, DiskImageFile)]
-
- def print_files(self):
- images = ""
- if len(self.files) > 0:
- images = ", ".join([str(img) for img in self.files])
- self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
-
- def xmlrpc_marshall(self):
- # Convert to something we can send through XMLRPC
- h = {}
- h["id"] = self.nod_id
- h["hostname"] = self.hostname
- h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
- h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
-
- return h
-
-
-
-class File(object):
- def __init__(self, filename, filesize):
- self.filename = filename
- self.filesize = filesize
-
-class DiskImageFile(File):
- def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
- File.__init__(self, filename, filesize)
- self.lease_id = lease_id
- self.vnode = vnode
- self.diskimage_id = diskimage_id
-
- def __str__(self):
- return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
-
-
-class RAMImageFile(File):
- def __init__(self, filename, filesize, lease_id, vnode):
- File.__init__(self, filename, filesize)
- self.lease_id = lease_id
- self.vnode = vnode
-
- def __str__(self):
- return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
-
-class ResourcePoolWithReusableImages(ResourcePool):
- def __init__(self, info_enact, vm_enact, deploy_enact):
- ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
-
- self.nodes = [NodeWithReusableImages.from_node(n) for n in self.nodes]
-
- def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
- self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
-
- self.logger.vdebug("Files BEFORE:")
- self.get_node(pnode).print_files()
-
- imagefile = "reusable-%s" % diskimage_id
- img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
- for (lease_id, vnode) in mappings:
- img.add_mapping(lease_id, vnode)
-
- self.get_node(pnode).add_reusable_image(img)
-
- self.logger.vdebug("Files AFTER:")
- self.get_node(pnode).print_files()
-
- get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
- return img
-
- def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
- self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
-
- def remove_diskimage(self, pnode_id, lease, vnode):
- ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
- self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
- for img in self.get_node(pnode_id).get_reusable_images():
- if (lease, vnode) in img.mappings:
- img.mappings.remove((lease, vnode))
- self.get_node(pnode_id).print_files()
- # Keep image around, even if it isn't going to be used
- # by any VMs. It might be reused later on.
- # It will be purged if space has to be made available
- # for other images
-
- def get_nodes_with_reusable_image(self, diskimage_id, after = None):
- return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
-
- def exists_reusable_image(self, pnode_id, diskimage_id, after):
- return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
-
-
-class NodeWithReusableImages(Node):
- def __init__(self, nod_id, hostname, capacity):
- Node.__init__(self, nod_id, hostname, capacity)
- self.reusable_images = []
-
- @classmethod
- def from_node(cls, n):
- node = cls(n.nod_id, n.hostname, n.capacity)
- node.enactment_info = n.enactment_info
- return node
-
- def add_reusable_image(self, f):
- self.reusable_images.append(f)
-
- def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
- for f in self.reusable_images:
- if f.diskimage_id == diskimage_id:
- f.add_mapping(lease_id, vnode)
- f.update_timeout(timeout)
- break # Ugh
- self.print_files()
-
- def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
- images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
- if after != None:
- images = [i for i in images if i.timeout >= after]
- if lease_id != None and vnode != None:
- images = [i for i in images if i.has_mapping(lease_id, vnode)]
- if len(images)>0:
- return images[0]
- else:
- return None
-
- def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
- entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
- if entry == None:
- return False
- else:
- return True
-
- def get_reusable_images(self):
- return self.reusable_images
-
- def get_reusable_images_size(self):
- return sum([f.filesize for f in self.reusable_images])
-
- def purge_oldest_unused_image(self):
- unused = [img for img in self.reusable_images if not img.has_mappings()]
- if len(unused) == 0:
- return 0
- else:
- i = iter(unused)
- oldest = i.next()
- for img in i:
- if img.timeout < oldest.timeout:
- oldest = img
- self.reusable_images.remove(oldest)
- return 1
-
- def purge_downto(self, target):
- done = False
- while not done:
- removed = self.purge_oldest_unused_image()
- if removed==0:
- done = True
- success = False
- elif removed == 1:
- if self.get_reusable_images_size() <= target:
- done = True
- success = True
- return success
-
- def print_files(self):
- Node.print_files(self)
- images = ""
- if len(self.reusable_images) > 0:
- images = ", ".join([str(img) for img in self.reusable_images])
- self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
-
-class ReusableDiskImageFile(File):
- def __init__(self, filename, filesize, diskimage_id, timeout):
- File.__init__(self, filename, filesize)
- self.diskimage_id = diskimage_id
- self.mappings = set([])
- self.timeout = timeout
-
- def add_mapping(self, lease_id, vnode):
- self.mappings.add((lease_id, vnode))
-
- def has_mapping(self, lease_id, vnode):
- return (lease_id, vnode) in self.mappings
-
- def has_mappings(self):
- return len(self.mappings) > 0
-
- def update_timeout(self, timeout):
- if timeout > self.timeout:
- self.timeout = timeout
-
- def is_expired(self, curTime):
- if self.timeout == None:
- return False
- elif self.timeout > curTime:
- return True
- else:
- return False
-
- def __str__(self):
- if self.timeout == None:
- timeout = "NOTIMEOUT"
- else:
- timeout = self.timeout
- return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
-
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/rm.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -34,17 +34,18 @@
import haizea.resourcemanager.accounting as accounting
import haizea.common.constants as constants
import haizea.resourcemanager.enact as enact
-from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeploymentScheduler
-from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeploymentScheduler
+from haizea.resourcemanager.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
+from haizea.resourcemanager.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
from haizea.resourcemanager.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
from haizea.resourcemanager.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
from haizea.resourcemanager.frontends.tracefile import TracefileFrontend
from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
from haizea.resourcemanager.frontends.rpc import RPCFrontend
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceTuple
-from haizea.resourcemanager.scheduler import Scheduler
-from haizea.resourcemanager.slottable import SlotTable
-from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.lease_scheduler import LeaseScheduler
+from haizea.resourcemanager.scheduler.vm_scheduler import VMScheduler
+from haizea.resourcemanager.scheduler.slottable import SlotTable
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
from haizea.resourcemanager.rpcserver import RPCServer
from haizea.common.utils import abstract, round_datetime, Singleton
@@ -125,8 +126,8 @@
deploy_enact = SimulatedDeploymentEnactment()
# Resource pool
- deploy_type = self.config.get("lease-preparation")
- if deploy_type == constants.DEPLOYMENT_TRANSFER:
+ preparation_type = self.config.get("lease-preparation")
+ if preparation_type == constants.PREPARATION_TRANSFER:
if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
else:
@@ -136,23 +137,21 @@
# Slot table
slottable = SlotTable()
+ for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
+ slottable.add_node(n)
- # Deployment scheduler
-
- if deploy_type == constants.DEPLOYMENT_UNMANAGED:
- deployment_scheduler = UnmanagedDeploymentScheduler(slottable, resourcepool, deploy_enact)
- elif deploy_type == constants.DEPLOYMENT_TRANSFER:
- deployment_scheduler = ImageTransferDeploymentScheduler(slottable, resourcepool, deploy_enact)
+ # Preparation scheduler
+ if preparation_type == constants.PREPARATION_UNMANAGED:
+ preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
+ elif deploy_type == constants.PREPARATION_TRANSFER:
+ preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)
- # Scheduler
- self.scheduler = Scheduler(slottable, resourcepool, deployment_scheduler)
+ # VM Scheduler
+ vm_scheduler = VMScheduler(slottable, resourcepool)
+
+ # Lease Scheduler
+ self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
- # TODO: Having the slot table contained in the deployment scheduler, and also
- # in the "main" scheduler (which itself contains the same slot table) is far
- # from ideal, although this is mostly a consequence of the Scheduler class
- # being in need of some serious refactoring. This will be fixed (see Scheduler
- # class comments for more details)
-
# Lease request frontends
if clock == constants.CLOCK_SIMULATED:
# In pure simulation, we can only use the tracefile frontend
Added: trunk/src/haizea/resourcemanager/scheduler/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/__init__.py (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/__init__.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,68 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+
+class SchedException(Exception):
+ """A simple exception class used for scheduling exceptions"""
+ pass
+
+class NotSchedulableException(Exception):
+ """A simple exception class used when a lease cannot be scheduled
+
+ This exception must be raised when a lease cannot be scheduled
+ (this is not necessarily an error condition, but the scheduler will
+ have to react to it)
+ """
+ pass
+
+class CriticalSchedException(Exception):
+ """A simple exception class used for critical scheduling exceptions
+
+ This exception must be raised when a non-recoverable error happens
+ (e.g., when there are unexplained inconsistencies in the schedule,
+ typically resulting from a code error)
+ """
+ pass
+
+class RescheduleLeaseException(Exception):
+ pass
+
+class CancelLeaseException(Exception):
+ pass
+
+class NormalEndLeaseException(Exception):
+ pass
+
+
+class ReservationEventHandler(object):
+ """A wrapper for reservation event handlers.
+
+ Reservations (in the slot table) can start and they can end. This class
+ provides a convenient wrapper around the event handlers for these two
+ events (see Scheduler.__register_handler for details on event handlers)
+ """
+ def __init__(self, sched, on_start, on_end):
+ self.sched = sched
+ self.on_start_method = on_start
+ self.on_end_method = on_end
+
+ def on_start(self, lease, rr):
+ self.on_start_method(self.sched, lease, rr)
+
+ def on_end(self, lease, rr):
+ self.on_end_method(self.sched, lease, rr)
\ No newline at end of file
Copied: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py (from rev 551, trunk/src/haizea/resourcemanager/scheduler.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,574 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+
+"""This module provides the main classes for Haizea's scheduler, particularly
+the Scheduler class. The deployment scheduling code (everything that has to be
+done to prepare a lease) happens in the modules inside the
+haizea.resourcemanager.deployment package.
+
+This module provides the following classes:
+
+* SchedException: A scheduling exception
+* ReservationEventHandler: A simple wrapper class
+* Scheduler: Do I really need to spell this one out for you?
+"""
+import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler import SchedException, RescheduleLeaseException, NormalEndLeaseException
+from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException, ResourceReservation
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation, SuspensionResourceReservation, ResumptionResourceReservation, ShutdownResourceReservation
+from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
+
+import logging
+
+class LeaseScheduler(object):
+ """The Haizea Lease Scheduler
+
+ Public methods:
+ schedule -- The scheduling function
+ process_reservations -- Processes starting/ending reservations at a given time
+ enqueue -- Queues a best-effort request
+ is_queue_empty -- Is the queue empty?
+ exists_scheduled_leases -- Are there any leases scheduled?
+
+ Private methods:
+ __schedule_ar_lease -- Schedules an AR lease
+ __schedule_besteffort_lease -- Schedules a best-effort lease
+ __preempt -- Preempts a lease
+ __reevaluate_schedule -- Reevaluate the schedule (used after resources become
+ unexpectedly unavailable)
+ _handle_* -- Reservation event handlers
+
+ """
+ def __init__(self, vm_scheduler, preparation_scheduler, slottable):
+ self.vm_scheduler = vm_scheduler
+ self.preparation_scheduler = preparation_scheduler
+ self.slottable = slottable
+ self.logger = logging.getLogger("LSCHED")
+
+ self.queue = Queue(self)
+ self.leases = LeaseTable(self)
+ self.completedleases = LeaseTable(self)
+
+ self.handlers = {}
+ for (type, handler) in self.vm_scheduler.handlers.items():
+ self.handlers[type] = handler
+
+ for (type, handler) in self.preparation_scheduler.handlers.items():
+ self.handlers[type] = handler
+
+ backfilling = get_config().get("backfilling")
+ if backfilling == constants.BACKFILLING_OFF:
+ self.maxres = 0
+ elif backfilling == constants.BACKFILLING_AGGRESSIVE:
+ self.maxres = 1
+ elif backfilling == constants.BACKFILLING_CONSERVATIVE:
+ self.maxres = 1000000 # Arbitrarily large
+ elif backfilling == constants.BACKFILLING_INTERMEDIATE:
+ self.maxres = get_config().get("backfilling-reservations")
+
+ self.numbesteffortres = 0
+
+ def schedule(self, nexttime):
+ pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
+ ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
+ im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
+ be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
+
+ # Queue best-effort requests
+ for lease in be_leases:
+ self.enqueue(lease)
+
+ # Process immediate requests
+ for lease_req in im_leases:
+ self.__process_im_request(lease_req, nexttime)
+
+ # Process AR requests
+ for lease_req in ar_leases:
+ self.__process_ar_request(lease_req, nexttime)
+
+ # Process best-effort requests
+ self.__process_queue(nexttime)
+
+
+ def process_reservations(self, nowtime):
+ starting = self.slottable.get_reservations_starting_at(nowtime)
+ starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
+ ending = self.slottable.get_reservations_ending_at(nowtime)
+ ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
+
+ for rr in ending:
+ lease = rr.lease
+ self._handle_end_rr(lease, rr)
+ try:
+ self.handlers[type(rr)].on_end(lease, rr)
+ except RescheduleLeaseException, msg:
+ if isinstance(rr.lease, BestEffortLease):
+ self.__enqueue_in_order(lease)
+ except NormalEndLeaseException, msg:
+ self._handle_end_lease(lease)
+
+ for rr in starting:
+ self.handlers[type(rr)].on_start(rr.lease, rr)
+
+
+ # TODO: Should be in VMScheduler
+ util = self.__get_utilization(nowtime)
+ if not util.has_key(VMResourceReservation):
+ cpuutil = 0.0
+ else:
+ cpuutil = util[VMResourceReservation]
+ get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)
+ get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
+
+
+ def enqueue(self, lease_req):
+ """Queues a best-effort lease request"""
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ lease_req.state = Lease.STATE_QUEUED
+ self.queue.enqueue(lease_req)
+ self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
+
+ def request_lease(self, lease):
+ """
+ Request a lease. At this point, it is simply marked as "Pending" and,
+ next time the scheduling function is called, the fate of the
+ lease will be determined (right now, AR+IM leases get scheduled
+ right away, and best-effort leases get placed on a queue)
+ """
+ lease.state = Lease.STATE_PENDING
+ self.leases.add(lease)
+
+ def is_queue_empty(self):
+ """Return True is the queue is empty, False otherwise"""
+ return self.queue.is_empty()
+
+
+ def exists_scheduled_leases(self):
+ """Return True if there are any leases scheduled in the future"""
+ return not self.slottable.is_empty()
+
+ def cancel_lease(self, lease_id):
+ """Cancels a lease.
+
+ Arguments:
+ lease_id -- ID of lease to cancel
+ """
+ time = get_clock().get_time()
+
+ self.logger.info("Cancelling lease %i..." % lease_id)
+ if self.leases.has_lease(lease_id):
+ # The lease is either running, or scheduled to run
+ lease = self.leases.get_lease(lease_id)
+
+ if lease.state == Lease.STATE_ACTIVE:
+ self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
+ rr = lease.get_active_reservations(time)[0]
+ if isinstance(rr, VMResourceReservation):
+ self._handle_unscheduled_end_vm(lease, rr, enact=True)
+ # TODO: Handle cancelations in middle of suspensions and
+ # resumptions
+ elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
+ self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
+ rrs = lease.get_scheduled_reservations()
+ for r in rrs:
+ lease.remove_rr(r)
+ self.slottable.removeReservation(r)
+ lease.state = Lease.STATE_CANCELLED
+ self.completedleases.add(lease)
+ self.leases.remove(lease)
+ elif self.queue.has_lease(lease_id):
+ # The lease is in the queue, waiting to be scheduled.
+ # Cancelling is as simple as removing it from the queue
+ self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
+ l = self.queue.get_lease(lease_id)
+ self.queue.remove_lease(lease)
+
+ def fail_lease(self, lease_id):
+ """Transitions a lease to a failed state, and does any necessary cleaning up
+
+ TODO: For now, just use the cancelling algorithm
+
+ Arguments:
+ lease -- Lease to fail
+ """
+ try:
+ raise
+ self.cancel_lease(lease_id)
+ except Exception, msg:
+ # Exit if something goes horribly wrong
+ raise CriticalSchedException()
+
+ def notify_event(self, lease_id, event):
+ time = get_clock().get_time()
+ if event == constants.EVENT_END_VM:
+ lease = self.leases.get_lease(lease_id)
+ vmrr = lease.get_last_vmrr()
+ self._handle_end_rr(lease, vmrr)
+ self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr, enact=False)
+ self._handle_end_lease(lease)
+ nexttime = get_clock().get_next_schedulable_time()
+ # We need to reevaluate the schedule to see if there are any future
+ # reservations that we can slide back.
+ self.vm_scheduler.reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
+
+
+
+
+
+ def __process_ar_request(self, lease_req, nexttime):
+ self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested))
+ self.logger.debug(" Start : %s" % lease_req.start)
+ self.logger.debug(" Duration: %s" % lease_req.duration)
+ self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+
+ accepted = False
+ try:
+ self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
+ self.leases.add(lease_req)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ accepted = True
+ except SchedException, msg:
+ # Our first try avoided preemption, try again
+ # without avoiding preemption.
+ # TODO: Roll this into the exact slot fitting algorithm
+ try:
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+ self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
+ self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
+ self.leases.add(lease_req)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ accepted = True
+ except SchedException, msg:
+ raise
+ get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+
+ if accepted:
+ self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
+ else:
+ self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
+ lease_req.state = Lease.STATE_REJECTED
+ self.completedleases.add(lease_req)
+ self.leases.remove(lease_req)
+
+
+ def __process_queue(self, nexttime):
+ done = False
+ newqueue = Queue(self)
+ while not done and not self.is_queue_empty():
+ if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
+ self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
+ done = True
+ else:
+ lease_req = self.queue.dequeue()
+ try:
+ self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id)
+ self.logger.debug(" Duration: %s" % lease_req.duration)
+ self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+ self.__schedule_besteffort_lease(lease_req, nexttime)
+ self.leases.add(lease_req)
+ get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ except SchedException, msg:
+ # Put back on queue
+ newqueue.enqueue(lease_req)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+ self.logger.info("Lease %i could not be scheduled at this time." % lease_req.id)
+ if not self.is_backfilling():
+ done = True
+
+ for lease in self.queue:
+ newqueue.enqueue(lease)
+
+ self.queue = newqueue
+
+
+ def __process_im_request(self, lease_req, nexttime):
+ self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
+ self.logger.debug(" Duration: %s" % lease_req.duration)
+ self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
+
+ try:
+ self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+ self.leases.add(lease_req)
+ get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
+ self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
+ except SchedException, msg:
+ get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
+ self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+
+
+ def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+ try:
+ (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+
+ if len(preemptions) > 0:
+ leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
+ self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
+ for lease in leases:
+ self.__preempt(lease, preemption_time=vmrr.start)
+
+ # Schedule deployment overhead
+ self.preparation_scheduler.schedule(lease_req, vmrr, nexttime)
+
+ # Commit reservation to slot table
+ # (we don't do this until the very end because the deployment overhead
+ # scheduling could still throw an exception)
+ lease_req.append_vmrr(vmrr)
+ self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+ except Exception, msg:
+ raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
+
+
+ def __schedule_besteffort_lease(self, lease, nexttime):
+ try:
+ # Schedule the VMs
+ canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
+
+ # Determine earliest start time in each node
+ if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
+ # Figure out earliest start times based on
+ # image schedule and reusable images
+ earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
+ elif lease.state == Lease.STATE_SUSPENDED:
+ # No need to transfer images from repository
+ # (only intra-node transfer)
+ earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
+
+ (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
+
+ # Schedule deployment
+ if lease.state != Lease.STATE_SUSPENDED:
+ self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+ else:
+ self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
+
+ # At this point, the lease is feasible.
+ # Commit changes by adding RRs to lease and to slot table
+
+ # Add VMRR to lease
+ lease.append_vmrr(vmrr)
+
+
+ # Add resource reservations to slottable
+
+ # TODO: deployment RRs should be added here, not in the preparation scheduler
+
+ # Pre-VM RRs (if any)
+ for rr in vmrr.pre_rrs:
+ self.slottable.addReservation(rr)
+
+ # VM
+ self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+
+ if in_future:
+ self.numbesteffortres += 1
+
+ lease.print_contents()
+
+ except SchedException, msg:
+ raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
+
+
+ def __schedule_immediate_lease(self, req, nexttime):
+ try:
+ (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+ # Schedule deployment
+ self.preparation_scheduler.schedule(req, vmrr, nexttime)
+
+ req.append_rr(vmrr)
+ self.slottable.addReservation(vmrr)
+
+ # Post-VM RRs (if any)
+ for rr in vmrr.post_rrs:
+ self.slottable.addReservation(rr)
+
+ req.print_contents()
+ except SlotFittingException, msg:
+ raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+
+
+ def __preempt(self, lease, preemption_time):
+
+ self.logger.info("Preempting lease #%i..." % (lease.id))
+ self.logger.vdebug("Lease before preemption:")
+ lease.print_contents()
+ vmrr = lease.get_last_vmrr()
+
+ if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
+ self.logger.debug("Lease was set to start in the middle of the preempting lease.")
+ must_cancel_and_requeue = True
+ else:
+ susptype = get_config().get("suspension")
+ if susptype == constants.SUSPENSION_NONE:
+ must_cancel_and_requeue = True
+ else:
+ time_until_suspend = preemption_time - vmrr.start
+ min_duration = self.__compute_scheduling_threshold(lease)
+ can_suspend = time_until_suspend >= min_duration
+ if not can_suspend:
+ self.logger.debug("Suspending the lease does not meet scheduling threshold.")
+ must_cancel_and_requeue = True
+ else:
+ if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
+ self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
+ must_cancel_and_requeue = True
+ else:
+ self.logger.debug("Lease can be suspended")
+ must_cancel_and_requeue = False
+
+ if must_cancel_and_requeue:
+ self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
+ if vmrr.backfill_reservation == True:
+ self.numbesteffortres -= 1
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ lease.remove_vmrr(vmrr)
+ self.slottable.removeReservation(vmrr)
+ for vnode, pnode in lease.diskimagemap.items():
+ self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+ self.preparation_scheduler.cancel_deployment(lease)
+ lease.diskimagemap = {}
+ lease.state = Lease.STATE_QUEUED
+ self.__enqueue_in_order(lease)
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ else:
+ self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
+ # Save original start and end time of the vmrr
+ old_start = vmrr.start
+ old_end = vmrr.end
+ self.__schedule_suspension(vmrr, preemption_time)
+ self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+ for susprr in vmrr.post_rrs:
+ self.slottable.addReservation(susprr)
+
+
+ self.logger.vdebug("Lease after preemption:")
+ lease.print_contents()
+
+ # TODO: Should be in VMScheduler
+ def __get_utilization(self, time):
+ total = self.slottable.get_total_capacity()
+ util = {}
+ reservations = self.slottable.getReservationsAt(time)
+ for r in reservations:
+ for node in r.resources_in_pnode:
+ if isinstance(r, VMResourceReservation):
+ use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+ util[type(r)] = use + util.setdefault(type(r),0.0)
+ elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
+ use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+ util[type(r)] = use + util.setdefault(type(r),0.0)
+ util[None] = total - sum(util.values())
+ for k in util:
+ util[k] /= total
+
+ return util
+
+ def __enqueue_in_order(self, lease):
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ self.queue.enqueue_in_order(lease)
+
+ def _handle_end_rr(self, l, rr):
+ self.slottable.removeReservation(rr)
+
+ def _handle_end_lease(self, l):
+ l.state = Lease.STATE_DONE
+ l.duration.actual = l.duration.accumulated
+ l.end = round_datetime(get_clock().get_time())
+ self.completedleases.add(l)
+ self.leases.remove(l)
+ if isinstance(l, BestEffortLease):
+ get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+
+
+class Queue(object):
+ def __init__(self, scheduler):
+ self.scheduler = scheduler
+ self.__q = []
+
+ def is_empty(self):
+ return len(self.__q)==0
+
+ def enqueue(self, r):
+ self.__q.append(r)
+
+ def dequeue(self):
+ return self.__q.pop(0)
+
+ def enqueue_in_order(self, r):
+ self.__q.append(r)
+ self.__q.sort(key=attrgetter("submit_time"))
+
+ def length(self):
+ return len(self.__q)
+
+ def has_lease(self, lease_id):
+ return (1 == len([l for l in self.__q if l.id == lease_id]))
+
+ def get_lease(self, lease_id):
+ return [l for l in self.__q if l.id == lease_id][0]
+
+ def remove_lease(self, lease):
+ self.__q.remove(lease)
+
+ def __iter__(self):
+ return iter(self.__q)
+
+class LeaseTable(object):
+ def __init__(self, scheduler):
+ self.scheduler = scheduler
+ self.entries = {}
+
+ def has_lease(self, lease_id):
+ return self.entries.has_key(lease_id)
+
+ def get_lease(self, lease_id):
+ return self.entries[lease_id]
+
+ def is_empty(self):
+ return len(self.entries)==0
+
+ def remove(self, lease):
+ del self.entries[lease.id]
+
+ def add(self, lease):
+ self.entries[lease.id] = lease
+
+ def get_leases(self, type=None):
+ if type==None:
+ return self.entries.values()
+ else:
+ return [e for e in self.entries.values() if isinstance(e, type)]
+
+ def get_leases_by_state(self, state):
+ return [e for e in self.entries.values() if e.state == state]
+
\ No newline at end of file
Property changes on: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Copied: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers (from rev 546, trunk/src/haizea/resourcemanager/deployment)
Property changes on: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/__init__.py 2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -18,7 +18,7 @@
import logging
-class DeploymentScheduler(object):
+class PreparationScheduler(object):
def __init__(self, slottable, resourcepool, deployment_enact):
self.slottable = slottable
self.resourcepool = resourcepool
@@ -26,5 +26,5 @@
self.logger = logging.getLogger("DEPLOY")
-class DeploymentSchedException(Exception):
+class PreparationSchedException(Exception):
pass
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -17,15 +17,15 @@
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
-from haizea.resourcemanager.deployment import DeploymentScheduler, DeploymentSchedException
-from haizea.resourcemanager.datastruct import ResourceReservation, Lease, ARLease, BestEffortLease
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler, PreparationSchedException
+from haizea.resourcemanager.scheduler.slottable import ResourceReservation
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
from haizea.resourcemanager.scheduler import ReservationEventHandler
from haizea.common.utils import estimate_transfer_time, get_config
import copy
-class ImageTransferDeploymentScheduler(DeploymentScheduler):
+class ImageTransferPreparationScheduler(PreparationScheduler):
def __init__(self, slottable, resourcepool, deployment_enact):
DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
@@ -50,8 +50,9 @@
self.handlers ={}
self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
- on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
- on_end = ImageTransferDeploymentScheduler.handle_end_filetransfer)
+ sched = self,
+ on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
+ on_end = ImageTransferDeploymentScheduler.handle_end_filetransfer)
def schedule(self, lease, vmrr, nexttime):
if isinstance(lease, ARLease):
Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/unmanaged.py 2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -16,13 +16,13 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.datastruct import Lease
-from haizea.resourcemanager.deployment import DeploymentScheduler
+from haizea.resourcemanager.leases import Lease
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler
import haizea.common.constants as constants
-class UnmanagedDeploymentScheduler(DeploymentScheduler):
+class UnmanagedPreparationScheduler(PreparationScheduler):
def __init__(self, slottable, resourcepool, deployment_enact):
- DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
+ PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
self.handlers = {}
# Add dummy disk images
Copied: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py (from rev 546, trunk/src/haizea/resourcemanager/resourcepool.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/resourcepool.py (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/resourcepool.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,425 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import vnodemapstr, get_accounting
+import haizea.common.constants as constants
+import haizea.resourcemanager.enact.actions as actions
+import logging
+
+class FailedEnactmentException(Exception):
+ pass
+
+class ResourcePool(object):
+ def __init__(self, info_enact, vm_enact, deploy_enact):
+ self.logger = logging.getLogger("RPOOL")
+
+ self.info = info_enact
+ self.vm = vm_enact
+ # TODO: Ideally, deployment enactment shouldn't be here, specially since
+ # it already "hangs" below the deployment modules. For now,
+ # it does no harm, though.
+ self.deployment = deploy_enact
+
+ self.nodes = self.info.get_nodes()
+
+ def start_vms(self, lease, rr):
+ start_action = actions.VMEnactmentStartAction()
+ start_action.from_rr(rr)
+
+ for (vnode, pnode) in rr.nodes.items():
+ node = self.get_node(pnode)
+ diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+ start_action.vnodes[vnode].pnode = node.enactment_info
+ start_action.vnodes[vnode].diskimage = diskimage.filename
+ start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
+
+ try:
+ self.vm.start(start_action)
+ except Exception, msg:
+ self.logger.error("Enactment of start VM failed: %s" % msg)
+ raise FailedEnactmentException()
+
+ def stop_vms(self, lease, rr):
+ stop_action = actions.VMEnactmentStopAction()
+ stop_action.from_rr(rr)
+ try:
+ self.vm.stop(stop_action)
+ except Exception, msg:
+ self.logger.error("Enactment of end VM failed: %s" % msg)
+ raise FailedEnactmentException()
+
+ def suspend_vms(self, lease, rr):
+ # Add memory image files
+ for vnode in rr.vnodes:
+ pnode = rr.vmrr.nodes[vnode]
+ self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
+
+ # Enact suspend
+ suspend_action = actions.VMEnactmentSuspendAction()
+ suspend_action.from_rr(rr)
+ try:
+ self.vm.suspend(suspend_action)
+ except Exception, msg:
+ self.logger.error("Enactment of suspend VM failed: %s" % msg)
+ raise FailedEnactmentException()
+
+ def verify_suspend(self, lease, rr):
+ verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
+ verify_suspend_action.from_rr(rr)
+ self.vm.verify_suspend(verify_suspend_action)
+
+ def resume_vms(self, lease, rr):
+ # Remove memory image files
+ for vnode in rr.vnodes:
+ pnode = rr.vmrr.nodes[vnode]
+ self.remove_ramfile(pnode, lease.id, vnode)
+
+ # Enact resume
+ resume_action = actions.VMEnactmentResumeAction()
+ resume_action.from_rr(rr)
+ try:
+ self.vm.resume(resume_action)
+ except Exception, msg:
+ self.logger.error("Enactment of resume VM failed: %s" % msg)
+ raise FailedEnactmentException()
+
+ def verify_resume(self, lease, rr):
+ verify_resume_action = actions.VMEnactmentConfirmResumeAction()
+ verify_resume_action.from_rr(rr)
+ self.vm.verify_resume(verify_resume_action)
+
+ def get_nodes(self):
+ return self.nodes
+
+ # An auxiliary node is a host whose resources are going to be scheduled, but
+ # where no VMs are actually going to run. For example, a disk image repository node.
+ def get_aux_nodes(self):
+ # TODO: We're only asking the deployment enactment module for auxiliary nodes.
+ # There might be a scenario where the info enactment module also reports
+ # auxiliary nodes.
+ return self.deployment.get_aux_nodes()
+
+ def get_num_nodes(self):
+ return len(self.nodes)
+
+ def get_node(self, nod_id):
+ return self.nodes[nod_id-1]
+
+ def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
+ self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
+
+ self.logger.vdebug("Files BEFORE:")
+ self.get_node(pnode).print_files()
+
+ imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
+ img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
+ self.get_node(pnode).add_file(img)
+
+ self.logger.vdebug("Files AFTER:")
+ self.get_node(pnode).print_files()
+
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ return img
+
+ def remove_diskimage(self, pnode, lease, vnode):
+ node = self.get_node(pnode)
+ node.print_files()
+
+ self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
+ node.remove_diskimage(lease, vnode)
+
+ node.print_files()
+
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+
+ def add_ramfile(self, pnode, lease_id, vnode, size):
+ node = self.get_node(pnode)
+ self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
+ node.print_files()
+ f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
+ node.add_file(f)
+ node.print_files()
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+
+ def remove_ramfile(self, pnode, lease_id, vnode):
+ node = self.get_node(pnode)
+ self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
+ node.print_files()
+ node.remove_ramfile(lease_id, vnode)
+ node.print_files()
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+
+ def get_max_disk_usage(self):
+ return max([n.get_disk_usage() for n in self.nodes])
+
+class Node(object):
+ def __init__(self, nod_id, hostname, capacity):
+ self.logger = logging.getLogger("RESOURCEPOOL")
+ self.nod_id = nod_id
+ self.hostname = hostname
+ self.capacity = capacity
+ self.files = []
+
+ # enactment-specific information
+ self.enactment_info = None
+
+ def get_capacity(self):
+ return self.capacity
+
+ def add_file(self, f):
+ self.files.append(f)
+
+ def get_diskimage(self, lease_id, vnode, diskimage_id):
+ image = [f for f in self.files if isinstance(f, DiskImageFile) and
+ f.diskimage_id == diskimage_id and
+ f.lease_id == lease_id and
+ f.vnode == vnode]
+ if len(image) == 0:
+ return None
+ elif len(image) == 1:
+ return image[0]
+ elif len(image) > 1:
+ self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
+ return image[0]
+
+ def remove_diskimage(self, lease_id, vnode):
+ image = [f for f in self.files if isinstance(f, DiskImageFile) and
+ f.lease_id == lease_id and
+ f.vnode == vnode]
+ if len(image) > 0:
+ image = image[0]
+ self.files.remove(image)
+
+ def remove_ramfile(self, lease_id, vnode):
+ ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
+ if len(ramfile) > 0:
+ ramfile = ramfile[0]
+ self.files.remove(ramfile)
+
+
+ def get_disk_usage(self):
+ return sum([f.filesize for f in self.files])
+
+
+ def get_diskimages(self):
+ return [f for f in self.files if isinstance(f, DiskImageFile)]
+
+ def print_files(self):
+ images = ""
+ if len(self.files) > 0:
+ images = ", ".join([str(img) for img in self.files])
+ self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
+
+ def xmlrpc_marshall(self):
+ # Convert to something we can send through XMLRPC
+ h = {}
+ h["id"] = self.nod_id
+ h["hostname"] = self.hostname
+ h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
+ h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
+
+ return h
+
+
+
+class File(object):
+ def __init__(self, filename, filesize):
+ self.filename = filename
+ self.filesize = filesize
+
+class DiskImageFile(File):
+ def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
+ File.__init__(self, filename, filesize)
+ self.lease_id = lease_id
+ self.vnode = vnode
+ self.diskimage_id = diskimage_id
+
+ def __str__(self):
+ return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
+
+
+class RAMImageFile(File):
+ def __init__(self, filename, filesize, lease_id, vnode):
+ File.__init__(self, filename, filesize)
+ self.lease_id = lease_id
+ self.vnode = vnode
+
+ def __str__(self):
+ return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
+
+class ResourcePoolWithReusableImages(ResourcePool):
+ def __init__(self, info_enact, vm_enact, deploy_enact):
+ ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
+
+ self.nodes = [NodeWithReusableImages.from_node(n) for n in self.nodes]
+
+ def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
+ self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
+
+ self.logger.vdebug("Files BEFORE:")
+ self.get_node(pnode).print_files()
+
+ imagefile = "reusable-%s" % diskimage_id
+ img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
+ for (lease_id, vnode) in mappings:
+ img.add_mapping(lease_id, vnode)
+
+ self.get_node(pnode).add_reusable_image(img)
+
+ self.logger.vdebug("Files AFTER:")
+ self.get_node(pnode).print_files()
+
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ return img
+
+ def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
+ self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
+
+ def remove_diskimage(self, pnode_id, lease, vnode):
+ ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
+ self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
+ for img in self.get_node(pnode_id).get_reusable_images():
+ if (lease, vnode) in img.mappings:
+ img.mappings.remove((lease, vnode))
+ self.get_node(pnode_id).print_files()
+ # Keep image around, even if it isn't going to be used
+ # by any VMs. It might be reused later on.
+ # It will be purged if space has to be made available
+ # for other images
+
+ def get_nodes_with_reusable_image(self, diskimage_id, after = None):
+ return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
+
+ def exists_reusable_image(self, pnode_id, diskimage_id, after):
+ return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
+
+
+class NodeWithReusableImages(Node):
+ def __init__(self, nod_id, hostname, capacity):
+ Node.__init__(self, nod_id, hostname, capacity)
+ self.reusable_images = []
+
+ @classmethod
+ def from_node(cls, n):
+ node = cls(n.nod_id, n.hostname, n.capacity)
+ node.enactment_info = n.enactment_info
+ return node
+
+ def add_reusable_image(self, f):
+ self.reusable_images.append(f)
+
+ def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
+ for f in self.reusable_images:
+ if f.diskimage_id == diskimage_id:
+ f.add_mapping(lease_id, vnode)
+ f.update_timeout(timeout)
+ break # Ugh
+ self.print_files()
+
+ def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
+ images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
+ if after != None:
+ images = [i for i in images if i.timeout >= after]
+ if lease_id != None and vnode != None:
+ images = [i for i in images if i.has_mapping(lease_id, vnode)]
+ if len(images)>0:
+ return images[0]
+ else:
+ return None
+
+ def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
+ entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
+ if entry == None:
+ return False
+ else:
+ return True
+
+ def get_reusable_images(self):
+ return self.reusable_images
+
+ def get_reusable_images_size(self):
+ return sum([f.filesize for f in self.reusable_images])
+
+ def purge_oldest_unused_image(self):
+ unused = [img for img in self.reusable_images if not img.has_mappings()]
+ if len(unused) == 0:
+ return 0
+ else:
+ i = iter(unused)
+ oldest = i.next()
+ for img in i:
+ if img.timeout < oldest.timeout:
+ oldest = img
+ self.reusable_images.remove(oldest)
+ return 1
+
+ def purge_downto(self, target):
+ done = False
+ while not done:
+ removed = self.purge_oldest_unused_image()
+ if removed==0:
+ done = True
+ success = False
+ elif removed == 1:
+ if self.get_reusable_images_size() <= target:
+ done = True
+ success = True
+ return success
+
+ def print_files(self):
+ Node.print_files(self)
+ images = ""
+ if len(self.reusable_images) > 0:
+ images = ", ".join([str(img) for img in self.reusable_images])
+ self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
+
+class ReusableDiskImageFile(File):
+ def __init__(self, filename, filesize, diskimage_id, timeout):
+ File.__init__(self, filename, filesize)
+ self.diskimage_id = diskimage_id
+ self.mappings = set([])
+ self.timeout = timeout
+
+ def add_mapping(self, lease_id, vnode):
+ self.mappings.add((lease_id, vnode))
+
+ def has_mapping(self, lease_id, vnode):
+ return (lease_id, vnode) in self.mappings
+
+ def has_mappings(self):
+ return len(self.mappings) > 0
+
+ def update_timeout(self, timeout):
+ if timeout > self.timeout:
+ self.timeout = timeout
+
+ def is_expired(self, curTime):
+ if self.timeout == None:
+ return False
+ elif self.timeout > curTime:
+ return True
+ else:
+ return False
+
+ def __str__(self):
+ if self.timeout == None:
+ timeout = "NOTIMEOUT"
+ else:
+ timeout = self.timeout
+ return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
+
Property changes on: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: trunk/src/haizea/resourcemanager/scheduler/slottable.py (from rev 549, trunk/src/haizea/resourcemanager/slottable.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/slottable.py (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/slottable.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,613 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+from mx.DateTime import ISO, TimeDelta
+from operator import attrgetter, itemgetter
+import haizea.common.constants as constants
+from math import ceil, floor
+import bisect
+import copy
+import logging
+
+class SlotFittingException(Exception):
+ pass
+
+class CriticalSlotFittingException(Exception):
+ pass
+
+
+class Node(object):
+ def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
+ self.capacity = ResourceTuple.copy(capacity)
+ if capacitywithpreemption == None:
+ self.capacitywithpreemption = None
+ else:
+ self.capacitywithpreemption = ResourceTuple.copy(capacitywithpreemption)
+ self.resourcepoolnode = resourcepoolnode
+
+ @classmethod
+ def from_resourcepool_node(cls, node):
+ capacity = node.get_capacity()
+ return cls(capacity, capacity, node)
+
+class NodeList(object):
+ def __init__(self):
+ self.nodelist = []
+
+ def add(self, node):
+ self.nodelist.append(node)
+
+ def __getitem__(self, n):
+ return self.nodelist[n-1]
+
+ def copy(self):
+ nodelist = NodeList()
+ for n in self.nodelist:
+ nodelist.add(Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode))
+ return nodelist
+
+ def toDict(self):
+ nodelist = self.copy()
+ return dict([(i+1, v) for i, v in enumerate(nodelist)])
+
+class KeyValueWrapper(object):
+ def __init__(self, key, value):
+ self.key = key
+ self.value = value
+
+ def __cmp__(self, other):
+ return cmp(self.key, other.key)
+
+class ResourceReservation(object):
+
+ # Resource reservation states
+ STATE_SCHEDULED = 0
+ STATE_ACTIVE = 1
+ STATE_DONE = 2
+
+ state_str = {STATE_SCHEDULED : "Scheduled",
+ STATE_ACTIVE : "Active",
+ STATE_DONE : "Done"}
+
+ def __init__(self, lease, start, end, res):
+ self.lease = lease
+ self.start = start
+ self.end = end
+ self.state = None
+ self.resources_in_pnode = res
+ self.logger = logging.getLogger("LEASES")
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Start : %s" % self.start)
+ self.logger.log(loglevel, "End : %s" % self.end)
+ self.logger.log(loglevel, "State : %s" % ResourceReservation.state_str[self.state])
+ self.logger.log(loglevel, "Resources : \n %s" % "\n ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()]))
+
+ def xmlrpc_marshall(self):
+ # Convert to something we can send through XMLRPC
+ rr = {}
+ rr["start"] = xmlrpc_marshall_singlevalue(self.start)
+ rr["end"] = xmlrpc_marshall_singlevalue(self.end)
+ rr["state"] = self.state
+ return rr
+
+class ResourceTuple(object):
+ def __init__(self, res):
+ self._res = res
+
+ @classmethod
+ def from_list(cls, l):
+ return cls(l[:])
+
+ @classmethod
+ def copy(cls, rt):
+ return cls(rt._res[:])
+
+ @classmethod
+ def set_resource_types(cls, resourcetypes):
+ cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
+ cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
+ cls.tuplelength = len(resourcetypes)
+
+ @classmethod
+ def create_empty(cls):
+ return cls([0 for x in range(cls.tuplelength)])
+
+ def fits_in(self, res2):
+ fits = True
+ for i in xrange(len(self._res)):
+ if self._res[i] > res2._res[i]:
+ fits = False
+ break
+ return fits
+
+ def get_num_fits_in(self, res2):
+ canfit = 10000 # Arbitrarily large
+ for i in xrange(len(self._res)):
+ if self._res[i] != 0:
+ f = res2._res[i] / self._res[i]
+ if f < canfit:
+ canfit = f
+ return int(floor(canfit))
+
+ def decr(self, res2):
+ for slottype in xrange(len(self._res)):
+ self._res[slottype] -= res2._res[slottype]
+
+ def incr(self, res2):
+ for slottype in xrange(len(self._res)):
+ self._res[slottype] += res2._res[slottype]
+
+ def get_by_type(self, resourcetype):
+ return self._res[self.type2pos[resourcetype]]
+
+ def set_by_type(self, resourcetype, value):
+ self._res[self.type2pos[resourcetype]] = value
+
+ def is_zero_or_less(self):
+ return sum([v for v in self._res]) <= 0
+
+ def __repr__(self):
+ r=""
+ for i, x in enumerate(self._res):
+ r += "%s:%.2f " % (self.descriptions[i], x)
+ return r
+
+
+class SlotTable(object):
+ def __init__(self):
+ self.logger = logging.getLogger("SLOT")
+ self.nodes = NodeList()
+ self.reservations = []
+ self.reservationsByStart = []
+ self.reservationsByEnd = []
+ self.availabilitycache = {}
+ self.changepointcache = None
+
+ self.availabilitywindow = AvailabilityWindow(self)
+
+ def add_node(self, resourcepoolnode):
+ self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
+
+ def is_empty(self):
+ return (len(self.reservationsByStart) == 0)
+
+ def dirty(self):
+ # You're a dirty, dirty slot table and you should be
+ # ashamed of having outdated caches!
+ self.availabilitycache = {}
+ self.changepointcache = None
+
+ def getAvailabilityCacheMiss(self, time):
+ allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
+ onlynodes = None
+ nodes = {}
+ reservations = self.getReservationsAt(time)
+ # Find how much resources are available on each node
+ canpreempt = True
+ for r in reservations:
+ for node in r.resources_in_pnode:
+ if onlynodes == None or (onlynodes != None and node in onlynodes):
+ if not nodes.has_key(node):
+ n = self.nodes[node]
+ if canpreempt:
+ nodes[node] = Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode)
+ else:
+ nodes[node] = Node(n.capacity, None, n.resourcepoolnode)
+ nodes[node].capacity.decr(r.resources_in_pnode[node])
+ if canpreempt and not r.is_preemptible:
+ nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
+
+ # For the remaining nodes, use a reference to the original node, not a copy
+ if onlynodes == None:
+ missing = allnodes - set(nodes.keys())
+ else:
+ missing = onlynodes - set(nodes.keys())
+
+ for node in missing:
+ nodes[node] = self.nodes[node]
+
+ self.availabilitycache[time] = nodes
+
+ def getAvailability(self, time, resreq=None, onlynodes=None, canpreempt=False):
+ if not self.availabilitycache.has_key(time):
+ self.getAvailabilityCacheMiss(time)
+ # Cache miss
+
+ nodes = self.availabilitycache[time]
+
+ if onlynodes != None:
+ onlynodes = set(onlynodes)
+ nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
+
+ # Keep only those nodes with enough resources
+ if resreq != None:
+ newnodes = {}
+ for n, node in nodes.items():
+ if not resreq.fits_in(node.capacity) or (canpreempt and not resreq.fits_in(node.capacitywithpreemption)):
+ pass
+ else:
+ newnodes[n]=node
+ nodes = newnodes
+
+ return nodes
+
+ def get_total_capacity(self, restype = constants.RES_CPU):
+ return sum([n.capacity.get_by_type(restype) for n in self.nodes.nodelist])
+
+ def getReservationsAt(self, time):
+ item = KeyValueWrapper(time, None)
+ startpos = bisect.bisect_right(self.reservationsByStart, item)
+ bystart = set([x.value for x in self.reservationsByStart[:startpos]])
+ endpos = bisect.bisect_right(self.reservationsByEnd, item)
+ byend = set([x.value for x in self.reservationsByEnd[endpos:]])
+ res = bystart & byend
+ return list(res)
+
+ def get_reservations_starting_between(self, start, end):
+ startitem = KeyValueWrapper(start, None)
+ enditem = KeyValueWrapper(end, None)
+ startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+ endpos = bisect.bisect_right(self.reservationsByStart, enditem)
+ res = [x.value for x in self.reservationsByStart[startpos:endpos]]
+ return res
+
+ def get_reservations_starting_after(self, start):
+ startitem = KeyValueWrapper(start, None)
+ startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+ res = [x.value for x in self.reservationsByStart[startpos:]]
+ return res
+
+ def get_reservations_ending_after(self, end):
+ startitem = KeyValueWrapper(end, None)
+ startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
+ res = [x.value for x in self.reservationsByEnd[startpos:]]
+ return res
+
+ def get_reservations_ending_between(self, start, end):
+ startitem = KeyValueWrapper(start, None)
+ enditem = KeyValueWrapper(end, None)
+ startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
+ endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
+ res = [x.value for x in self.reservationsByEnd[startpos:endpos]]
+ return res
+
+ def get_reservations_starting_at(self, time):
+ return self.get_reservations_starting_between(time, time)
+
+ def get_reservations_ending_at(self, time):
+ return self.get_reservations_ending_between(time, time)
+
+ # ONLY for simulation
+ def getNextPrematureEnd(self, after):
+ from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+ # Inefficient, but ok since this query seldom happens
+ res = [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after]
+ if len(res) > 0:
+ prematureends = [r.prematureend for r in res]
+ prematureends.sort()
+ return prematureends[0]
+ else:
+ return None
+
+ # ONLY for simulation
+ def getPrematurelyEndingRes(self, t):
+ from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+ return [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend == t]
+
+
+ def get_reservations_starting_or_ending_after(self, after):
+ item = KeyValueWrapper(after, None)
+ startpos = bisect.bisect_right(self.reservationsByStart, item)
+ bystart = set([x.value for x in self.reservationsByStart[:startpos]])
+ endpos = bisect.bisect_right(self.reservationsByEnd, item)
+ byend = set([x.value for x in self.reservationsByEnd[endpos:]])
+ res = bystart | byend
+ return list(res)
+
+ def addReservation(self, rr):
+ startitem = KeyValueWrapper(rr.start, rr)
+ enditem = KeyValueWrapper(rr.end, rr)
+ bisect.insort(self.reservationsByStart, startitem)
+ bisect.insort(self.reservationsByEnd, enditem)
+ self.dirty()
+
+ # If the slot table keys are not modified (start / end time)
+ # Just remove and reinsert.
+ def updateReservation(self, rr):
+ # TODO: Might be more efficient to resort lists
+ self.removeReservation(rr)
+ self.addReservation(rr)
+ self.dirty()
+
+ # If the slot table keys are modified (start and/or end time)
+ # provide the old keys (so we can remove it using
+ # the m) and updated reservation
+ def update_reservation_with_key_change(self, rr, old_start, old_end):
+ # TODO: Might be more efficient to resort lists
+ self.removeReservation(rr, old_start, old_end)
+ self.addReservation(rr)
+ self.dirty()
+
+
+ def getIndexOfReservation(self, rlist, rr, key):
+ item = KeyValueWrapper(key, None)
+ pos = bisect.bisect_left(rlist, item)
+ found = False
+ while not found:
+ if rlist[pos].value == rr:
+ found = True
+ else:
+ pos += 1
+ return pos
+
+ def removeReservation(self, rr, start=None, end=None):
+ if start == None:
+ start = rr.start
+ if end == None:
+ end = rr.start
+ posstart = self.getIndexOfReservation(self.reservationsByStart, rr, start)
+ posend = self.getIndexOfReservation(self.reservationsByEnd, rr, end)
+ self.reservationsByStart.pop(posstart)
+ self.reservationsByEnd.pop(posend)
+ self.dirty()
+
+
+ def findChangePointsAfter(self, after, until=None, nodes=None):
+ changepoints = set()
+ res = self.get_reservations_starting_or_ending_after(after)
+ for rr in res:
+ if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
+ if rr.start > after:
+ changepoints.add(rr.start)
+ if rr.end > after:
+ changepoints.add(rr.end)
+ changepoints = list(changepoints)
+ if until != None:
+ changepoints = [c for c in changepoints if c < until]
+ changepoints.sort()
+ return changepoints
+
+ def peekNextChangePoint(self, time):
+ if self.changepointcache == None:
+ # Cache is empty
+ changepoints = self.findChangePointsAfter(time)
+ changepoints.reverse()
+ self.changepointcache = changepoints
+ if len(self.changepointcache) == 0:
+ return None
+ else:
+ return self.changepointcache[-1]
+
+ def getNextChangePoint(self, time):
+ p = self.peekNextChangePoint(time)
+ if p != None:
+ self.changepointcache.pop()
+ return p
+
+ def isFull(self, time):
+ nodes = self.getAvailability(time)
+ avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
+ return (avail == 0)
+
+ def get_next_reservations_in_nodes(self, time, nodes, rr_type=None, immediately_next = False):
+ nodes = set(nodes)
+ rrs_in_nodes = []
+ earliest_end_time = {}
+ rrs = self.get_reservations_starting_after(time)
+ if rr_type != None:
+ rrs = [rr for rr in rrs if isinstance(rr, rr_type)]
+
+ # Filter the RRs by nodes
+ for r in rrs:
+ rr_nodes = set(rr.resources_in_pnode.keys())
+ if len(nodes & rr_nodes) > 0:
+ rrs_in_nodes.append(rr)
+ end = rr.end
+ for n in rr_nodes:
+ if not earliest_end_time.has_key(n):
+ earliest_end_time[n] = end
+ else:
+ if end < earliest_end_time[n]:
+ earliest_end_time[n] = end
+
+ if immediately_next:
+ # We only want to include the ones that are immediately
+ # next.
+ rr_nodes_excl = set()
+ for n in nodes:
+ if earliest_end_time.has_key(n):
+ end = earliest_end_time[n]
+ rrs = [rr for rr in rrs_in_nodes if n in rr.resources_in_pnode.keys() and rr.start < end]
+ rr_nodes_excl.update(rrs)
+ rrs_in_nodes = list(rr_nodes_excl)
+
+ return rrs_in_nodes
+
+class AvailEntry(object):
+ def __init__(self, time, avail, availpreempt, resreq):
+ self.time = time
+ self.avail = avail
+ self.availpreempt = availpreempt
+
+ if avail == None and availpreempt == None:
+ self.canfit = 0
+ self.canfitpreempt = 0
+ else:
+ self.canfit = resreq.get_num_fits_in(avail)
+ if availpreempt == None:
+ self.canfitpreempt = 0
+ else:
+ self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
+
+ def getCanfit(self, canpreempt):
+ if canpreempt:
+ return self.canfitpreempt
+ else:
+ return self.canfit
+
+
+class AvailabilityWindow(object):
+ def __init__(self, slottable):
+ self.slottable = slottable
+ self.logger = logging.getLogger("SLOTTABLE.WIN")
+ self.time = None
+ self.resreq = None
+ self.onlynodes = None
+ self.avail = None
+
+ # Create avail structure
+ def initWindow(self, time, resreq, onlynodes = None, canpreempt=False):
+ self.time = time
+ self.resreq = resreq
+ self.onlynodes = onlynodes
+ self.avail = {}
+
+ # Availability at initial time
+ availatstart = self.slottable.getAvailability(self.time, self.resreq, self.onlynodes, canpreempt)
+ for node in availatstart:
+ capacity = availatstart[node].capacity
+ if canpreempt:
+ capacitywithpreemption = availatstart[node].capacitywithpreemption
+ else:
+ capacitywithpreemption = None
+ self.avail[node] = [AvailEntry(self.time, capacity, capacitywithpreemption, self.resreq)]
+
+ # Determine the availability at the subsequent change points
+ nodes = set(availatstart.keys())
+ res = self.slottable.get_reservations_starting_after(self.time)
+ changepoints = set()
+ for rr in res:
+ if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
+ changepoints.add(rr.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+ for p in changepoints:
+ availatpoint = self.slottable.getAvailability(p, self.resreq, nodes, canpreempt)
+ newnodes = set(availatpoint.keys())
+
+ # Add entries for nodes that have no resources available
+ # (for, at least, one VM)
+ fullnodes = nodes - newnodes
+ for node in fullnodes:
+ self.avail[node].append(AvailEntry(p, None, None, None))
+ nodes.remove(node)
+
+ # For the rest, only interested if the available resources
+ # Decrease in the window
+ for node in newnodes:
+ capacity = availatpoint[node].capacity
+ fits = self.resreq.get_num_fits_in(capacity)
+ if canpreempt:
+ capacitywithpreemption = availatpoint[node].capacitywithpreemption
+ fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
+ prevavail = self.avail[node][-1]
+ if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
+ self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
+ elif canpreempt and (prevavail.getCanfit(canpreempt=False) > fits or prevavail.getCanfit(canpreempt=True) > fitswithpreemption):
+ self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
+
+
+ def fitAtStart(self, nodes = None, canpreempt = False):
+ if nodes != None:
+ avail = [v for (k, v) in self.avail.items() if k in nodes]
+ else:
+ avail = self.avail.values()
+ if canpreempt:
+ return sum([e[0].canfitpreempt for e in avail])
+ else:
+ return sum([e[0].canfit for e in avail])
+
+ # TODO: Also return the amount of resources that would have to be
+ # preempted in each physnode
+ def findPhysNodesForVMs(self, numnodes, maxend, strictend=False, canpreempt=False):
+ # Returns the physical nodes that can run all VMs, and the
+ # time at which the VMs must end
+ canfit = dict([(n, v[0].getCanfit(canpreempt)) for (n, v) in self.avail.items()])
+ entries = []
+ for n in self.avail.keys():
+ entries += [(n, e) for e in self.avail[n][1:]]
+ getTime = lambda x: x[1].time
+ entries.sort(key=getTime)
+ if strictend:
+ end = None
+ else:
+ end = maxend
+ for e in entries:
+ physnode = e[0]
+ entry = e[1]
+
+ if entry.time >= maxend:
+ # Can run to its maximum duration
+ break
+ else:
+ diff = canfit[physnode] - entry.getCanfit(canpreempt)
+ totalcanfit = sum([n for n in canfit.values()]) - diff
+ if totalcanfit < numnodes and not strictend:
+ # Not enough resources. Must end here
+ end = entry.time
+ break
+ else:
+ # Update canfit
+ canfit[physnode] = entry.getCanfit(canpreempt)
+
+ # Filter out nodes where we can't fit any vms
+ canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
+
+ return end, canfit
+
+
+ def printContents(self, nodes = None, withpreemption = False):
+ if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
+ if nodes == None:
+ physnodes = self.avail.keys()
+ else:
+ physnodes = [k for k in self.avail.keys() if k in nodes]
+ physnodes.sort()
+ if withpreemption:
+ p = "(with preemption)"
+ else:
+ p = "(without preemption)"
+ self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
+ for n in physnodes:
+ contents = "Node %i --- " % n
+ for x in self.avail[n]:
+ contents += "[ %s " % x.time
+ contents += "{ "
+ if x.avail == None and x.availpreempt == None:
+ contents += "END "
+ else:
+ if withpreemption:
+ res = x.availpreempt
+ canfit = x.canfitpreempt
+ else:
+ res = x.avail
+ canfit = x.canfit
+ contents += "%s" % res
+ contents += "} (Fits: %i) ] " % canfit
+ self.logger.vdebug(contents)
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
Property changes on: trunk/src/haizea/resourcemanager/scheduler/slottable.py
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Added: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,1378 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_accounting, get_clock
+from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceReservation, ResourceTuple
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException
+from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
+
+import logging
+
+
+class VMScheduler(object):
+ """The Haizea VM Scheduler
+
+ """
+
+ def __init__(self, slottable, resourcepool):
+ self.slottable = slottable
+ self.resourcepool = resourcepool
+ self.logger = logging.getLogger("VMSCHED")
+
+ self.handlers = {}
+ self.handlers[VMResourceReservation] = ReservationEventHandler(
+ sched = self,
+ on_start = VMScheduler._handle_start_vm,
+ on_end = VMScheduler._handle_end_vm)
+
+ self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
+ sched = self,
+ on_start = VMScheduler._handle_start_shutdown,
+ on_end = VMScheduler._handle_end_shutdown)
+
+ self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
+ sched = self,
+ on_start = VMScheduler._handle_start_suspend,
+ on_end = VMScheduler._handle_end_suspend)
+
+ self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
+ sched = self,
+ on_start = VMScheduler._handle_start_resume,
+ on_end = VMScheduler._handle_end_resume)
+
+ self.handlers[MigrationResourceReservation] = ReservationEventHandler(
+ sched = self,
+ on_start = VMScheduler._handle_start_migrate,
+ on_end = VMScheduler._handle_end_migrate)
+
+ backfilling = get_config().get("backfilling")
+ if backfilling == constants.BACKFILLING_OFF:
+ self.maxres = 0
+ elif backfilling == constants.BACKFILLING_AGGRESSIVE:
+ self.maxres = 1
+ elif backfilling == constants.BACKFILLING_CONSERVATIVE:
+ self.maxres = 1000000 # Arbitrarily large
+ elif backfilling == constants.BACKFILLING_INTERMEDIATE:
+ self.maxres = get_config().get("backfilling-reservations")
+
+ self.numbesteffortres = 0
+
+ def fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
+ lease_id = leasereq.id
+ start = leasereq.start.requested
+ end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
+ diskImageID = leasereq.diskimage_id
+ numnodes = leasereq.numnodes
+ resreq = leasereq.requested_resources
+
+ availabilitywindow = self.slottable.availabilitywindow
+
+ availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
+ availabilitywindow.printContents(withpreemption = False)
+ availabilitywindow.printContents(withpreemption = True)
+
+ mustpreempt = False
+ unfeasiblewithoutpreemption = False
+
+ fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
+ if fitatstart < numnodes:
+ if not canpreempt:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ unfeasiblewithoutpreemption = True
+ feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
+ fitatend = sum([n for n in canfitnopreempt.values()])
+ if fitatend < numnodes:
+ if not canpreempt:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ unfeasiblewithoutpreemption = True
+
+ canfitpreempt = None
+ if canpreempt:
+ fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
+ if fitatstart < numnodes:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
+ fitatend = sum([n for n in canfitpreempt.values()])
+ if fitatend < numnodes:
+ raise SlotFittingException, "Not enough resources in specified interval"
+ else:
+ if unfeasiblewithoutpreemption:
+ mustpreempt = True
+ else:
+ mustpreempt = False
+
+ # At this point we know if the lease is feasible, and if
+ # will require preemption.
+ if not mustpreempt:
+ self.logger.debug("The VM reservations for this lease are feasible without preemption.")
+ else:
+ self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
+
+ # merge canfitnopreempt and canfitpreempt
+ canfit = {}
+ for node in canfitnopreempt:
+ vnodes = canfitnopreempt[node]
+ canfit[node] = [vnodes, vnodes]
+ for node in canfitpreempt:
+ vnodes = canfitpreempt[node]
+ if canfit.has_key(node):
+ canfit[node][1] = vnodes
+ else:
+ canfit[node] = [0, vnodes]
+
+ orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
+
+ self.logger.debug("Node ordering: %s" % orderednodes)
+
+ # vnode -> pnode
+ nodeassignment = {}
+
+ # pnode -> resourcetuple
+ res = {}
+
+ # physnode -> how many vnodes
+ preemptions = {}
+
+ vnode = 1
+ if avoidpreempt:
+ # First pass, without preemption
+ for physnode in orderednodes:
+ canfitinnode = canfit[physnode][0]
+ for i in range(1, canfitinnode+1):
+ nodeassignment[vnode] = physnode
+ if res.has_key(physnode):
+ res[physnode].incr(resreq)
+ else:
+ res[physnode] = ResourceTuple.copy(resreq)
+ canfit[physnode][0] -= 1
+ canfit[physnode][1] -= 1
+ vnode += 1
+ if vnode > numnodes:
+ break
+ if vnode > numnodes:
+ break
+
+ # Second pass, with preemption
+ if mustpreempt or not avoidpreempt:
+ for physnode in orderednodes:
+ canfitinnode = canfit[physnode][1]
+ for i in range(1, canfitinnode+1):
+ nodeassignment[vnode] = physnode
+ if res.has_key(physnode):
+ res[physnode].incr(resreq)
+ else:
+ res[physnode] = ResourceTuple.copy(resreq)
+ canfit[physnode][1] -= 1
+ vnode += 1
+ # Check if this will actually result in a preemption
+ if canfit[physnode][0] == 0:
+ if preemptions.has_key(physnode):
+ preemptions[physnode].incr(resreq)
+ else:
+ preemptions[physnode] = ResourceTuple.copy(resreq)
+ else:
+ canfit[physnode][0] -= 1
+ if vnode > numnodes:
+ break
+ if vnode > numnodes:
+ break
+
+ if vnode <= numnodes:
+ raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
+
+ # Create VM resource reservations
+ vmrr = VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
+ vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+ self.__schedule_shutdown(vmrr)
+
+ return vmrr, preemptions
+
+ def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = False):
+ lease_id = lease.id
+ remaining_duration = lease.duration.get_remaining_duration()
+ numnodes = lease.numnodes
+ requested_resources = lease.requested_resources
+ preemptible = lease.preemptible
+ mustresume = (lease.state == Lease.STATE_SUSPENDED)
+ shutdown_time = self.__estimate_shutdown_time(lease)
+ susptype = get_config().get("suspension")
+ if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
+ suspendable = False
+ else:
+ suspendable = True
+
+ canmigrate = get_config().get("migration")
+
+ #
+ # STEP 1: FIGURE OUT THE MINIMUM DURATION
+ #
+
+ min_duration = self.__compute_scheduling_threshold(lease)
+
+
+ #
+ # STEP 2: FIND THE CHANGEPOINTS
+ #
+
+ # Find the changepoints, and the nodes we can use at each changepoint
+ # Nodes may not be available at a changepoint because images
+ # cannot be transferred at that time.
+ if not mustresume:
+ cps = [(node, e[0]) for node, e in earliest.items()]
+ cps.sort(key=itemgetter(1))
+ curcp = None
+ changepoints = []
+ nodes = []
+ for node, time in cps:
+ nodes.append(node)
+ if time != curcp:
+ changepoints.append([time, nodes[:]])
+ curcp = time
+ else:
+ changepoints[-1][1] = nodes[:]
+ else:
+ if not canmigrate:
+ vmrr = lease.get_last_vmrr()
+ curnodes = set(vmrr.nodes.values())
+ else:
+ curnodes=None
+ # If we have to resume this lease, make sure that
+ # we have enough time to transfer the images.
+ migratetime = self.__estimate_migration_time(lease)
+ earliesttransfer = get_clock().get_time() + migratetime
+
+ for n in earliest:
+ earliest[n][0] = max(earliest[n][0], earliesttransfer)
+
+ changepoints = list(set([x[0] for x in earliest.values()]))
+ changepoints.sort()
+ changepoints = [(x, curnodes) for x in changepoints]
+
+ # If we can make reservations in the future,
+ # we also consider future changepoints
+ # (otherwise, we only allow the VMs to start "now", accounting
+ # for the fact that vm images will have to be deployed)
+ if allow_reservation_in_future:
+ res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
+ futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
+ # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
+ # included in futurecp.
+ futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
+ futurecp = [(p,None) for p in futurecp]
+ else:
+ futurecp = []
+
+
+
+ #
+ # STEP 3: SLOT FITTING
+ #
+
+ # If resuming, we also have to allocate enough for the resumption
+ if mustresume:
+ duration = remaining_duration + self.__estimate_resume_time(lease)
+ else:
+ duration = remaining_duration
+
+ duration += shutdown_time
+
+ # First, assuming we can't make reservations in the future
+ start, end, canfit = self.__find_fit_at_points(
+ changepoints,
+ numnodes,
+ requested_resources,
+ duration,
+ suspendable,
+ min_duration)
+
+ if start == None:
+ if not allow_reservation_in_future:
+ # We did not find a suitable starting time. This can happen
+ # if we're unable to make future reservations
+ raise SchedException, "Could not find enough resources for this request"
+ else:
+ mustsuspend = (end - start) < duration
+ if mustsuspend and not suspendable:
+ if not allow_reservation_in_future:
+ raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+ else:
+ start = None # No satisfactory start time
+
+ # If we haven't been able to fit the lease, check if we can
+ # reserve it in the future
+ if start == None and allow_reservation_in_future:
+ start, end, canfit = self.__find_fit_at_points(
+ futurecp,
+ numnodes,
+ requested_resources,
+ duration,
+ suspendable,
+ min_duration
+ )
+
+
+ if start in [p[0] for p in futurecp]:
+ reservation = True
+ else:
+ reservation = False
+
+
+ #
+ # STEP 4: FINAL SLOT FITTING
+ #
+ # At this point, we know the lease fits, but we have to map it to
+ # specific physical nodes.
+
+ # Sort physical nodes
+ physnodes = canfit.keys()
+ if mustresume:
+ # If we're resuming, we prefer resuming in the nodes we're already
+ # deployed in, to minimize the number of transfers.
+ vmrr = lease.get_last_vmrr()
+ nodes = set(vmrr.nodes.values())
+ availnodes = set(physnodes)
+ deplnodes = availnodes.intersection(nodes)
+ notdeplnodes = availnodes.difference(nodes)
+ physnodes = list(deplnodes) + list(notdeplnodes)
+ else:
+ physnodes.sort() # Arbitrary, prioritize nodes, as in exact
+
+ # Map to physical nodes
+ mappings = {}
+ res = {}
+ vmnode = 1
+ while vmnode <= numnodes:
+ for n in physnodes:
+ if canfit[n]>0:
+ canfit[n] -= 1
+ mappings[vmnode] = n
+ if res.has_key(n):
+ res[n].incr(requested_resources)
+ else:
+ res[n] = ResourceTuple.copy(requested_resources)
+ vmnode += 1
+ break
+
+
+ vmrr = VMResourceReservation(lease, start, end, mappings, res, reservation)
+ vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+ if mustresume:
+ self.__schedule_resumption(vmrr, start)
+
+ mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
+ if mustsuspend:
+ self.__schedule_suspension(vmrr, end)
+ else:
+ # Compensate for any overestimation
+ if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
+ vmrr.end = vmrr.start + remaining_duration + shutdown_time
+ self.__schedule_shutdown(vmrr)
+
+
+
+ susp_str = res_str = ""
+ if mustresume:
+ res_str = " (resuming)"
+ if mustsuspend:
+ susp_str = " (suspending)"
+ self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+
+ return vmrr, reservation
+
+ # TODO: This has to be tied in with the preparation scheduler
+ def schedule_migration(self, lease, vmrr, nexttime):
+ last_vmrr = lease.get_last_vmrr()
+ vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
+
+ mustmigrate = False
+ for vnode in vnode_migrations:
+ if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
+ mustmigrate = True
+ break
+
+ if not mustmigrate:
+ return
+
+ # Figure out what migrations can be done simultaneously
+ migrations = []
+ while len(vnode_migrations) > 0:
+ pnodes = set()
+ migration = {}
+ for vnode in vnode_migrations:
+ origin = vnode_migrations[vnode][0]
+ dest = vnode_migrations[vnode][1]
+ if not origin in pnodes and not dest in pnodes:
+ migration[vnode] = vnode_migrations[vnode]
+ pnodes.add(origin)
+ pnodes.add(dest)
+ for vnode in migration:
+ del vnode_migrations[vnode]
+ migrations.append(migration)
+
+ # Create migration RRs
+ start = last_vmrr.post_rrs[-1].end
+ migr_time = self.__estimate_migration_time(lease)
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ migr_rrs = []
+ for m in migrations:
+ end = start + migr_time
+ res = {}
+ for (origin,dest) in m.values():
+ resorigin = ResourceTuple.create_empty()
+ resorigin.set_by_type(constants.RES_NETOUT, bandwidth)
+ resdest = ResourceTuple.create_empty()
+ resdest.set_by_type(constants.RES_NETIN, bandwidth)
+ res[origin] = resorigin
+ res[dest] = resdest
+ migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+ migr_rr.state = ResourceReservation.STATE_SCHEDULED
+ migr_rrs.append(migr_rr)
+ start = end
+
+ migr_rrs.reverse()
+ for migr_rr in migr_rrs:
+ vmrr.pre_rrs.insert(0, migr_rr)
+
+
+
+ def can_reserve_besteffort_in_future(self):
+ return self.numbesteffortres < self.maxres
+
+ def is_backfilling(self):
+ return self.maxres > 0
+
+
+ def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
+ start = None
+ end = None
+ canfit = None
+ availabilitywindow = self.slottable.availabilitywindow
+
+
+ for p in changepoints:
+ availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
+ availabilitywindow.printContents()
+
+ if availabilitywindow.fitAtStart() >= numnodes:
+ start=p[0]
+ maxend = start + duration
+ end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
+
+ self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
+
+ if end < maxend:
+ self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
+
+ if not suspendable:
+ pass
+ # If we can't suspend, this fit is no good, and we have to keep looking
+ else:
+ # If we can suspend, we still have to check if the lease will
+ # be able to run for the specified minimum duration
+ if end-start > min_duration:
+ break # We found a fit; stop looking
+ else:
+ self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
+ # Set start back to None, to indicate that we haven't
+ # found a satisfactory start time
+ start = None
+ else:
+ # We've found a satisfactory starting time
+ break
+
+ return start, end, canfit
+
+ def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
+ times = [] # (start, end, {pnode -> vnodes})
+ enactment_overhead = get_config().get("enactment-overhead")
+
+ if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
+ # Global exclusion (which represents, e.g., reading/writing the memory image files
+ # from a global file system) meaning no two suspensions/resumptions can happen at
+ # the same time in the entire resource pool.
+
+ t = time
+ t_prev = None
+
+ for (vnode,pnode) in vmrr.nodes.items():
+ if override == None:
+ mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+ op_time = self.__compute_suspend_resume_time(mem, rate)
+ else:
+ op_time = override
+
+ op_time += enactment_overhead
+
+ t_prev = t
+
+ if direction == constants.DIRECTION_FORWARD:
+ t += op_time
+ times.append((t_prev, t, {pnode:[vnode]}))
+ elif direction == constants.DIRECTION_BACKWARD:
+ t -= op_time
+ times.append((t, t_prev, {pnode:[vnode]}))
+
+ elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
+ # Local exclusion (which represents, e.g., reading the memory image files
+ # from a local file system) means no two resumptions can happen at the same
+ # time in the same physical node.
+ pervnode_times = [] # (start, end, vnode)
+ vnodes_in_pnode = {}
+ for (vnode,pnode) in vmrr.nodes.items():
+ vnodes_in_pnode.setdefault(pnode, []).append(vnode)
+ for pnode in vnodes_in_pnode:
+ t = time
+ t_prev = None
+ for vnode in vnodes_in_pnode[pnode]:
+ if override == None:
+ mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+ op_time = self.__compute_suspend_resume_time(mem, rate)
+ else:
+ op_time = override
+
+ t_prev = t
+
+ if direction == constants.DIRECTION_FORWARD:
+ t += op_time
+ pervnode_times.append((t_prev, t, vnode))
+ elif direction == constants.DIRECTION_BACKWARD:
+ t -= op_time
+ pervnode_times.append((t, t_prev, vnode))
+
+ # Consolidate suspend/resume operations happening at the same time
+ uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
+ for (start, end) in uniq_times:
+ vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
+ node_mappings = {}
+ for vnode in vnodes:
+ pnode = vmrr.nodes[vnode]
+ node_mappings.setdefault(pnode, []).append(vnode)
+ times.append([start,end,node_mappings])
+
+ # Add the enactment overhead
+ for t in times:
+ num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
+ overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
+ if direction == constants.DIRECTION_FORWARD:
+ t[1] += overhead
+ elif direction == constants.DIRECTION_BACKWARD:
+ t[0] -= overhead
+
+ # Fix overlaps
+ if direction == constants.DIRECTION_FORWARD:
+ times.sort(key=itemgetter(0))
+ elif direction == constants.DIRECTION_BACKWARD:
+ times.sort(key=itemgetter(1))
+ times.reverse()
+
+ prev_start = None
+ prev_end = None
+ for t in times:
+ if prev_start != None:
+ start = t[0]
+ end = t[1]
+ if direction == constants.DIRECTION_FORWARD:
+ if start < prev_end:
+ diff = prev_end - start
+ t[0] += diff
+ t[1] += diff
+ elif direction == constants.DIRECTION_BACKWARD:
+ if end > prev_start:
+ diff = end - prev_start
+ t[0] -= diff
+ t[1] -= diff
+ prev_start = t[0]
+ prev_end = t[1]
+
+ return times
+
+ def __schedule_shutdown(self, vmrr):
+ config = get_config()
+ shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
+
+ start = vmrr.end - shutdown_time
+ end = vmrr.end
+
+ shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
+ shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
+
+ vmrr.update_end(start)
+
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
+
+ vmrr.post_rrs.append(shutdown_rr)
+
+ def __schedule_suspension(self, vmrr, suspend_by):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ susp_exclusion = config.get("suspendresume-exclusion")
+ override = get_config().get("override-suspend-time")
+ rate = config.get("suspend-rate")
+
+ if suspend_by < vmrr.start or suspend_by > vmrr.end:
+ raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
+
+ times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
+ suspend_rrs = []
+ for (start, end, node_mappings) in times:
+ suspres = {}
+ all_vnodes = []
+ for (pnode,vnodes) in node_mappings.items():
+ num_vnodes = len(vnodes)
+ r = ResourceTuple.create_empty()
+ mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+ r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+ r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+ suspres[pnode] = r
+ all_vnodes += vnodes
+ susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
+ susprr.state = ResourceReservation.STATE_SCHEDULED
+ suspend_rrs.append(susprr)
+
+ suspend_rrs.sort(key=attrgetter("start"))
+
+ susp_start = suspend_rrs[0].start
+ if susp_start < vmrr.start:
+ raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
+
+ vmrr.update_end(susp_start)
+
+ # If there are any post RRs, remove them
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
+
+ for susprr in suspend_rrs:
+ vmrr.post_rrs.append(susprr)
+
+ def __schedule_resumption(self, vmrr, resume_at):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ resm_exclusion = config.get("suspendresume-exclusion")
+ override = get_config().get("override-resume-time")
+ rate = config.get("resume-rate")
+
+ if resume_at < vmrr.start or resume_at > vmrr.end:
+ raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+
+ times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
+ resume_rrs = []
+ for (start, end, node_mappings) in times:
+ resmres = {}
+ all_vnodes = []
+ for (pnode,vnodes) in node_mappings.items():
+ num_vnodes = len(vnodes)
+ r = ResourceTuple.create_empty()
+ mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+ r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+ r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+ resmres[pnode] = r
+ all_vnodes += vnodes
+ resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
+ resmrr.state = ResourceReservation.STATE_SCHEDULED
+ resume_rrs.append(resmrr)
+
+ resume_rrs.sort(key=attrgetter("start"))
+
+ resm_end = resume_rrs[-1].end
+ if resm_end > vmrr.end:
+ raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+
+ vmrr.update_start(resm_end)
+ for resmrr in resume_rrs:
+ vmrr.pre_rrs.append(resmrr)
+
+ def __compute_suspend_resume_time(self, mem, rate):
+ time = float(mem) / rate
+ time = round_datetime_delta(TimeDelta(seconds = time))
+ return time
+
+ def __estimate_suspend_resume_time(self, lease, rate):
+ susp_exclusion = get_config().get("suspendresume-exclusion")
+ enactment_overhead = get_config().get("enactment-overhead")
+ mem = lease.requested_resources.get_by_type(constants.RES_MEM)
+ if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
+ return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
+ elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
+ # Overestimating
+ return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
+
+ def __estimate_shutdown_time(self, lease):
+ enactment_overhead = get_config().get("enactment-overhead").seconds
+ return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
+
+ def __estimate_suspend_time(self, lease):
+ rate = get_config().get("suspend-rate")
+ override = get_config().get("override-suspend-time")
+ if override != None:
+ return override
+ else:
+ return self.__estimate_suspend_resume_time(lease, rate)
+
+ def __estimate_resume_time(self, lease):
+ rate = get_config().get("resume-rate")
+ override = get_config().get("override-resume-time")
+ if override != None:
+ return override
+ else:
+ return self.__estimate_suspend_resume_time(lease, rate)
+
+
+ def __estimate_migration_time(self, lease):
+ whattomigrate = get_config().get("what-to-migrate")
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ if whattomigrate == constants.MIGRATE_NONE:
+ return TimeDelta(seconds=0)
+ else:
+ if whattomigrate == constants.MIGRATE_MEM:
+ mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
+ elif whattomigrate == constants.MIGRATE_MEMDISK:
+ mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
+ return estimate_transfer_time(mbtotransfer, bandwidth)
+
+ # TODO: Take into account other things like boot overhead, migration overhead, etc.
+ def __compute_scheduling_threshold(self, lease):
+ from haizea.resourcemanager.rm import ResourceManager
+ config = ResourceManager.get_singleton().config
+ threshold = config.get("force-scheduling-threshold")
+ if threshold != None:
+ # If there is a hard-coded threshold, use that
+ return threshold
+ else:
+ factor = config.get("scheduling-threshold-factor")
+ susp_overhead = self.__estimate_suspend_time(lease)
+ safe_duration = susp_overhead
+
+ if lease.state == Lease.STATE_SUSPENDED:
+ resm_overhead = self.__estimate_resume_time(lease)
+ safe_duration += resm_overhead
+
+ # TODO: Incorporate other overheads into the minimum duration
+ min_duration = safe_duration
+
+ # At the very least, we want to allocate enough time for the
+ # safe duration (otherwise, we'll end up with incorrect schedules,
+ # where a lease is scheduled to suspend, but isn't even allocated
+ # enough time to suspend).
+ # The factor is assumed to be non-negative. i.e., a factor of 0
+ # means we only allocate enough time for potential suspend/resume
+ # operations, while a factor of 1 means the lease will get as much
+ # running time as spend on the runtime overheads involved in setting
+ # it up
+ threshold = safe_duration + (min_duration * factor)
+ return threshold
+
+ def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
+ # TODO2: Choose appropriate prioritizing function based on a
+ # config file, instead of hardcoding it)
+ #
+ # TODO3: Basing decisions only on CPU allocations. This is ok for now,
+ # since the memory allocation is proportional to the CPU allocation.
+ # Later on we need to come up with some sort of weighed average.
+
+ nodes = canfit.keys()
+
+ # TODO: The deployment module should just provide a list of nodes
+ # it prefers
+ nodeswithimg=[]
+ #self.lease_deployment_type = get_config().get("lease-preparation")
+ #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+ # reusealg = get_config().get("diskimage-reuse")
+ # if reusealg==constants.REUSE_IMAGECACHES:
+ # nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
+
+ # Compares node x and node y.
+ # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+ def comparenodes(x, y):
+ hasimgX = x in nodeswithimg
+ hasimgY = y in nodeswithimg
+
+ # First comparison: A node with no preemptible VMs is preferible
+ # to one with preemptible VMs (i.e. we want to avoid preempting)
+ canfitnopreemptionX = canfit[x][0]
+ canfitpreemptionX = canfit[x][1]
+ hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
+
+ canfitnopreemptionY = canfit[y][0]
+ canfitpreemptionY = canfit[y][1]
+ hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
+
+ # TODO: Factor out common code
+ if avoidpreempt:
+ if hasPreemptibleX and not hasPreemptibleY:
+ return constants.WORSE
+ elif not hasPreemptibleX and hasPreemptibleY:
+ return constants.BETTER
+ elif not hasPreemptibleX and not hasPreemptibleY:
+ if hasimgX and not hasimgY:
+ return constants.BETTER
+ elif not hasimgX and hasimgY:
+ return constants.WORSE
+ else:
+ if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
+ elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
+ else: return constants.EQUAL
+ elif hasPreemptibleX and hasPreemptibleY:
+ # If both have (some) preemptible resources, we prefer those
+ # that involve the less preemptions
+ preemptX = canfitpreemptionX - canfitnopreemptionX
+ preemptY = canfitpreemptionY - canfitnopreemptionY
+ if preemptX < preemptY:
+ return constants.BETTER
+ elif preemptX > preemptY:
+ return constants.WORSE
+ else:
+ if hasimgX and not hasimgY: return constants.BETTER
+ elif not hasimgX and hasimgY: return constants.WORSE
+ else: return constants.EQUAL
+ elif not avoidpreempt:
+ # First criteria: Can we reuse image?
+ if hasimgX and not hasimgY:
+ return constants.BETTER
+ elif not hasimgX and hasimgY:
+ return constants.WORSE
+ else:
+ # Now we just want to avoid preemption
+ if hasPreemptibleX and not hasPreemptibleY:
+ return constants.WORSE
+ elif not hasPreemptibleX and hasPreemptibleY:
+ return constants.BETTER
+ elif hasPreemptibleX and hasPreemptibleY:
+ # If both have (some) preemptible resources, we prefer those
+ # that involve the less preemptions
+ preemptX = canfitpreemptionX - canfitnopreemptionX
+ preemptY = canfitpreemptionY - canfitnopreemptionY
+ if preemptX < preemptY:
+ return constants.BETTER
+ elif preemptX > preemptY:
+ return constants.WORSE
+ else:
+ if hasimgX and not hasimgY: return constants.BETTER
+ elif not hasimgX and hasimgY: return constants.WORSE
+ else: return constants.EQUAL
+ else:
+ return constants.EQUAL
+
+ # Order nodes
+ nodes.sort(comparenodes)
+ return nodes
+
+ def find_preemptable_leases(self, mustpreempt, startTime, endTime):
+ def comparepreemptability(rrX, rrY):
+ if rrX.lease.submit_time > rrY.lease.submit_time:
+ return constants.BETTER
+ elif rrX.lease.submit_time < rrY.lease.submit_time:
+ return constants.WORSE
+ else:
+ return constants.EQUAL
+
+ def preemptedEnough(amountToPreempt):
+ for node in amountToPreempt:
+ if not amountToPreempt[node].is_zero_or_less():
+ return False
+ return True
+
+ # Get allocations at the specified time
+ atstart = set()
+ atmiddle = set()
+ nodes = set(mustpreempt.keys())
+
+ reservationsAtStart = self.slottable.getReservationsAt(startTime)
+ reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+ reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
+ and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+
+ reservationsAtStart.sort(comparepreemptability)
+ reservationsAtMiddle.sort(comparepreemptability)
+
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+
+ # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+ for r in reservationsAtStart:
+ # The following will really only come into play when we have
+ # multiple VMs per node
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ # Don't need to preempt if we've already preempted all
+ # the needed resources in node n
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atstart.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+ if len(reservationsAtMiddle)>0:
+ changepoints = set()
+ for r in reservationsAtMiddle:
+ changepoints.add(r.start)
+ changepoints = list(changepoints)
+ changepoints.sort()
+
+ for cp in changepoints:
+ amountToPreempt = {}
+ for n in mustpreempt:
+ amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+ reservations = [r for r in reservationsAtMiddle
+ if r.start <= cp and cp < r.end]
+ for r in reservations:
+ mustpreemptres = False
+ for n in r.resources_in_pnode.keys():
+ if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+ amountToPreempt[n].decr(r.resources_in_pnode[n])
+ mustpreemptres = True
+ if mustpreemptres:
+ atmiddle.add(r)
+ if preemptedEnough(amountToPreempt):
+ break
+
+ self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+ self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+
+ leases = [r.lease for r in atstart|atmiddle]
+
+ return leases
+
+ # TODO: Should be moved to LeaseScheduler
+ def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+ self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
+ leases = []
+ vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
+ leases = set([rr.lease for rr in vmrrs])
+ leases = [l for l in leases if isinstance(l, BestEffortLease) and l.state in (Lease.STATE_SUSPENDED,Lease.STATE_READY) and not l in checkedleases]
+ for lease in leases:
+ self.logger.debug("Found lease %i" % l.id)
+ l.print_contents()
+ # Earliest time can't be earlier than time when images will be
+ # available in node
+ earliest = max(nexttime, lease.imagesavail)
+ self.__slideback(lease, earliest)
+ checkedleases.append(l)
+ #for l in leases:
+ # vmrr, susprr = l.getLastVMRR()
+ # self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
+
+ def __slideback(self, lease, earliest):
+ vmrr = lease.get_last_vmrr()
+ # Save original start and end time of the vmrr
+ old_start = vmrr.start
+ old_end = vmrr.end
+ nodes = vmrr.nodes.values()
+ if lease.state == Lease.STATE_SUSPENDED:
+ originalstart = vmrr.pre_rrs[0].start
+ else:
+ originalstart = vmrr.start
+ cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+ cp = [earliest] + cp
+ newstart = None
+ for p in cp:
+ self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+ self.slottable.availabilitywindow.printContents()
+ if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+ (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+ if end == originalstart and set(nodes) <= set(canfit.keys()):
+ self.logger.debug("Can slide back to %s" % p)
+ newstart = p
+ break
+ if newstart == None:
+ # Can't slide back. Leave as is.
+ pass
+ else:
+ diff = originalstart - newstart
+ if lease.state == Lease.STATE_SUSPENDED:
+ resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+ for resmrr in resmrrs:
+ resmrr_old_start = resmrr.start
+ resmrr_old_end = resmrr.end
+ resmrr.start -= diff
+ resmrr.end -= diff
+ self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
+ vmrr.update_start(vmrr.start - diff)
+
+ # If the lease was going to be suspended, check to see if
+ # we don't need to suspend any more.
+ remdur = lease.duration.get_remaining_duration()
+ if vmrr.is_suspending() and vmrr.end - newstart >= remdur:
+ vmrr.update_end(vmrr.start + remdur)
+ for susprr in vmrr.post_rrs:
+ self.slottable.removeReservation(susprr)
+ vmrr.post_rrs = []
+ else:
+ vmrr.update_end(vmrr.end - diff)
+
+ if not vmrr.is_suspending():
+ # If the VM was set to shutdown, we need to slideback the shutdown RRs
+ for rr in vmrr.post_rrs:
+ rr_old_start = rr.start
+ rr_old_end = rr.end
+ rr.start -= diff
+ rr.end -= diff
+ self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
+
+ self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+ self.logger.vdebug("New lease descriptor (after slideback):")
+ lease.print_contents()
+
+
+
+ #-------------------------------------------------------------------#
+ # #
+ # SLOT TABLE EVENT HANDLERS #
+ # #
+ #-------------------------------------------------------------------#
+
+ def _handle_start_vm(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
+ l.print_contents()
+ if l.state == Lease.STATE_READY:
+ l.state = Lease.STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
+ now_time = get_clock().get_time()
+ l.start.actual = now_time
+
+ try:
+ self.resourcepool.start_vms(l, rr)
+ # The next two lines have to be moved somewhere more
+ # appropriate inside the resourcepool module
+ for (vnode, pnode) in rr.nodes.items():
+ l.diskimagemap[vnode] = pnode
+ except Exception, e:
+ self.logger.error("ERROR when starting VMs.")
+ raise
+ elif l.state == Lease.STATE_RESUMED_READY:
+ l.state = Lease.STATE_ACTIVE
+ rr.state = ResourceReservation.STATE_ACTIVE
+ # No enactment to do here, since all the suspend/resume actions are
+ # handled during the suspend/resume RRs
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
+ self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
+
+
+ def _handle_end_vm(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
+ self.logger.vdebug("LEASE-%i Before:" % l.id)
+ l.print_contents()
+ now_time = round_datetime(get_clock().get_time())
+ diff = now_time - rr.start
+ l.duration.accumulate_duration(diff)
+ rr.state = ResourceReservation.STATE_DONE
+
+ if isinstance(l, BestEffortLease):
+ if rr.backfill_reservation == True:
+ self.numbesteffortres -= 1
+
+ self.logger.vdebug("LEASE-%i After:" % l.id)
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
+ self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
+
+ def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
+ self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
+ for rr in vmrr.post_rrs:
+ self.slottable.removeReservation(rr)
+ vmrr.post_rrs = []
+ # TODO: slideback shutdown RRs
+ vmrr.end = get_clock().get_time()
+ self._handle_end_vm(l, vmrr)
+
+ def _handle_start_shutdown(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_ACTIVE
+ self.resourcepool.stop_vms(l, rr)
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
+
+ def _handle_end_shutdown(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_DONE
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
+ self.logger.info("Lease %i shutdown." % (l.id))
+ raise NormalEndLeaseException
+
+
+
+ def _handle_start_suspend(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_ACTIVE
+ self.resourcepool.suspend_vms(l, rr)
+ for vnode in rr.vnodes:
+ pnode = rr.vmrr.nodes[vnode]
+ l.memimagemap[vnode] = pnode
+ if rr.is_first():
+ l.state = Lease.STATE_SUSPENDING
+ l.print_contents()
+ self.logger.info("Suspending lease %i..." % (l.id))
+ self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
+
+ def _handle_end_suspend(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
+ l.print_contents()
+ # TODO: React to incomplete suspend
+ self.resourcepool.verify_suspend(l, rr)
+ rr.state = ResourceReservation.STATE_DONE
+ if rr.is_last():
+ l.state = Lease.STATE_SUSPENDED
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
+ self.logger.info("Lease %i suspended." % (l.id))
+
+ if l.state == Lease.STATE_SUSPENDED:
+ raise RescheduleLeaseException
+
+ def _handle_start_resume(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
+ l.print_contents()
+ self.resourcepool.resume_vms(l, rr)
+ rr.state = ResourceReservation.STATE_ACTIVE
+ if rr.is_first():
+ l.state = Lease.STATE_RESUMING
+ l.print_contents()
+ self.logger.info("Resuming lease %i..." % (l.id))
+ self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
+
+ def _handle_end_resume(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
+ l.print_contents()
+ # TODO: React to incomplete resume
+ self.resourcepool.verify_resume(l, rr)
+ rr.state = ResourceReservation.STATE_DONE
+ if rr.is_last():
+ l.state = Lease.STATE_RESUMED_READY
+ self.logger.info("Resumed lease %i" % (l.id))
+ for vnode, pnode in rr.vmrr.nodes.items():
+ self.resourcepool.remove_ramfile(pnode, l.id, vnode)
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
+
+ def _handle_start_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_ACTIVE
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+ self.logger.info("Migrating lease %i..." % (l.id))
+
+ def _handle_end_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+ l.print_contents()
+
+ for vnode in rr.transfers:
+ origin = rr.transfers[vnode][0]
+ dest = rr.transfers[vnode][1]
+
+ # Update VM image mappings
+ self.resourcepool.remove_diskimage(origin, l.id, vnode)
+ self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
+ l.diskimagemap[vnode] = dest
+
+ # Update RAM file mappings
+ self.resourcepool.remove_ramfile(origin, l.id, vnode)
+ self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
+ l.memimagemap[vnode] = dest
+
+ rr.state = ResourceReservation.STATE_DONE
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+ self.logger.info("Migrated lease %i..." % (l.id))
+
+
+
+
+
+class VMResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, nodes, res, backfill_reservation):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.nodes = nodes # { vnode -> pnode }
+ self.backfill_reservation = backfill_reservation
+ self.pre_rrs = []
+ self.post_rrs = []
+
+ # ONLY for simulation
+ self.__update_prematureend()
+
+ def update_start(self, time):
+ self.start = time
+ # ONLY for simulation
+ self.__update_prematureend()
+
+ def update_end(self, time):
+ self.end = time
+ # ONLY for simulation
+ self.__update_prematureend()
+
+ # ONLY for simulation
+ def __update_prematureend(self):
+ if self.lease.duration.known != None:
+ remdur = self.lease.duration.get_remaining_known_duration()
+ rrdur = self.end - self.start
+ if remdur < rrdur:
+ self.prematureend = self.start + remdur
+ else:
+ self.prematureend = None
+ else:
+ self.prematureend = None
+
+ def get_final_end(self):
+ if len(self.post_rrs) == 0:
+ return self.end
+ else:
+ return self.post_rrs[-1].end
+
+ def is_suspending(self):
+ return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
+
+ def is_shutting_down(self):
+ return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ for resmrr in self.pre_rrs:
+ resmrr.print_contents(loglevel)
+ self.logger.log(loglevel, "--")
+ self.logger.log(loglevel, "Type : VM")
+ self.logger.log(loglevel, "Nodes : %s" % pretty_nodemap(self.nodes))
+ if self.prematureend != None:
+ self.logger.log(loglevel, "Premature end : %s" % self.prematureend)
+ ResourceReservation.print_contents(self, loglevel)
+ for susprr in self.post_rrs:
+ self.logger.log(loglevel, "--")
+ susprr.print_contents(loglevel)
+
+ def is_preemptible(self):
+ return self.lease.preemptible
+
+ def xmlrpc_marshall(self):
+ rr = ResourceReservation.xmlrpc_marshall(self)
+ rr["type"] = "VM"
+ rr["nodes"] = self.nodes.items()
+ return rr
+
+
+class SuspensionResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vnodes, vmrr):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.vnodes = vnodes
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : SUSPEND")
+ self.logger.log(loglevel, "Vnodes : %s" % self.vnodes)
+ ResourceReservation.print_contents(self, loglevel)
+
+ def is_first(self):
+ return (self == self.vmrr.post_rrs[0])
+
+ def is_last(self):
+ return (self == self.vmrr.post_rrs[-1])
+
+ # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
+ # has wider implications (with a non-trivial handling). For now, we leave them
+ # as non-preemptible, since the probability of preempting a suspension RR is slim.
+ def is_preemptible(self):
+ return False
+
+ def xmlrpc_marshall(self):
+ rr = ResourceReservation.xmlrpc_marshall(self)
+ rr["type"] = "SUSP"
+ return rr
+
+class ResumptionResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vnodes, vmrr):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.vnodes = vnodes
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : RESUME")
+ self.logger.log(loglevel, "Vnodes : %s" % self.vnodes)
+ ResourceReservation.print_contents(self, loglevel)
+
+ def is_first(self):
+ resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+ return (self == resm_rrs[0])
+
+ def is_last(self):
+ resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+ return (self == resm_rrs[-1])
+
+ # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
+ # has wider implications (with a non-trivial handling). For now, we leave them
+ # as non-preemptible, since the probability of preempting a resumption RR is slim.
+ def is_preemptible(self):
+ return False
+
+ def xmlrpc_marshall(self):
+ rr = ResourceReservation.xmlrpc_marshall(self)
+ rr["type"] = "RESM"
+ return rr
+
+class ShutdownResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vnodes, vmrr):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.vnodes = vnodes
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : SHUTDOWN")
+ ResourceReservation.print_contents(self, loglevel)
+
+ def is_preemptible(self):
+ return True
+
+ def xmlrpc_marshall(self):
+ rr = ResourceReservation.xmlrpc_marshall(self)
+ rr["type"] = "SHTD"
+ return rr
+
+class MigrationResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vmrr, transfers):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.transfers = transfers
+
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : MIGRATE")
+ self.logger.log(loglevel, "Transfers : %s" % self.transfers)
+ ResourceReservation.print_contents(self, loglevel)
+
+ def is_preemptible(self):
+ return False
+
Deleted: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -1,1676 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-
-"""This module provides the main classes for Haizea's scheduler, particularly
-the Scheduler class. The deployment scheduling code (everything that has to be
-done to prepare a lease) happens in the modules inside the
-haizea.resourcemanager.deployment package.
-
-This module provides the following classes:
-
-* SchedException: A scheduling exception
-* ReservationEventHandler: A simple wrapper class
-* Scheduler: Do I really need to spell this one out for you?
-
-TODO: The Scheduler class is in need of some serious refactoring. The likely outcome is
-that it will be divided into two classes: LeaseScheduler, which handles top-level
-lease constructs and doesn't interact with the slot table, and VMScheduler, which
-actually schedules the VMs. The slot table would be contained in VMScheduler and
-in the lease preparation scheduler. In turn, these two would be contained in
-LeaseScheduler.
-"""
-
-import haizea.resourcemanager.datastruct as ds
-import haizea.common.constants as constants
-from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
-from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation, MigrationResourceReservation, ShutdownResourceReservation
-from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from operator import attrgetter, itemgetter
-from mx.DateTime import TimeDelta
-
-import logging
-
-class SchedException(Exception):
- """A simple exception class used for scheduling exceptions"""
- pass
-
-class NotSchedulableException(Exception):
- """A simple exception class used when a lease cannot be scheduled
-
- This exception must be raised when a lease cannot be scheduled
- (this is not necessarily an error condition, but the scheduler will
- have to react to it)
- """
- pass
-
-class CriticalSchedException(Exception):
- """A simple exception class used for critical scheduling exceptions
-
- This exception must be raised when a non-recoverable error happens
- (e.g., when there are unexplained inconsistencies in the schedule,
- typically resulting from a code error)
- """
- pass
-
-
-class ReservationEventHandler(object):
- """A wrapper for reservation event handlers.
-
- Reservations (in the slot table) can start and they can end. This class
- provides a convenient wrapper around the event handlers for these two
- events (see Scheduler.__register_handler for details on event handlers)
- """
- def __init__(self, on_start, on_end):
- self.on_start = on_start
- self.on_end = on_end
-
-class Scheduler(object):
- """The Haizea Scheduler
-
- Public methods:
- schedule -- The scheduling function
- process_reservations -- Processes starting/ending reservations at a given time
- enqueue -- Queues a best-effort request
- is_queue_empty -- Is the queue empty?
- exists_scheduled_leases -- Are there any leases scheduled?
-
- Private methods:
- __schedule_ar_lease -- Schedules an AR lease
- __schedule_besteffort_lease -- Schedules a best-effort lease
- __preempt -- Preempts a lease
- __reevaluate_schedule -- Reevaluate the schedule (used after resources become
- unexpectedly unavailable)
- _handle_* -- Reservation event handlers
-
- """
- def __init__(self, slottable, resourcepool, deployment_scheduler):
- self.slottable = slottable
- self.resourcepool = resourcepool
- self.deployment_scheduler = deployment_scheduler
- self.logger = logging.getLogger("SCHED")
-
- self.queue = ds.Queue(self)
- self.leases = ds.LeaseTable(self)
- self.completedleases = ds.LeaseTable(self)
-
- for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
- self.slottable.add_node(n)
-
- self.handlers = {}
-
- self.register_handler(type = ds.VMResourceReservation,
- on_start = Scheduler._handle_start_vm,
- on_end = Scheduler._handle_end_vm)
-
- self.register_handler(type = ds.ShutdownResourceReservation,
- on_start = Scheduler._handle_start_shutdown,
- on_end = Scheduler._handle_end_shutdown)
-
- self.register_handler(type = ds.SuspensionResourceReservation,
- on_start = Scheduler._handle_start_suspend,
- on_end = Scheduler._handle_end_suspend)
-
- self.register_handler(type = ds.ResumptionResourceReservation,
- on_start = Scheduler._handle_start_resume,
- on_end = Scheduler._handle_end_resume)
-
- self.register_handler(type = ds.MigrationResourceReservation,
- on_start = Scheduler._handle_start_migrate,
- on_end = Scheduler._handle_end_migrate)
-
- for (type, handler) in self.deployment_scheduler.handlers.items():
- self.handlers[type] = handler
-
- backfilling = get_config().get("backfilling")
- if backfilling == constants.BACKFILLING_OFF:
- self.maxres = 0
- elif backfilling == constants.BACKFILLING_AGGRESSIVE:
- self.maxres = 1
- elif backfilling == constants.BACKFILLING_CONSERVATIVE:
- self.maxres = 1000000 # Arbitrarily large
- elif backfilling == constants.BACKFILLING_INTERMEDIATE:
- self.maxres = get_config().get("backfilling-reservations")
-
- self.numbesteffortres = 0
-
- def schedule(self, nexttime):
- pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
- ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
- im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
- be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
-
- # Queue best-effort requests
- for lease in be_leases:
- self.enqueue(lease)
-
- # Process immediate requests
- for lease_req in im_leases:
- self.__process_im_request(lease_req, nexttime)
-
- # Process AR requests
- for lease_req in ar_leases:
- self.__process_ar_request(lease_req, nexttime)
-
- # Process best-effort requests
- self.__process_queue(nexttime)
-
-
- def process_reservations(self, nowtime):
- starting = self.slottable.get_reservations_starting_at(nowtime)
- starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
- ending = self.slottable.get_reservations_ending_at(nowtime)
- ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
- for rr in ending:
- self._handle_end_rr(rr.lease, rr)
- self.handlers[type(rr)].on_end(self, rr.lease, rr)
-
- for rr in starting:
- self.handlers[type(rr)].on_start(self, rr.lease, rr)
-
- util = self.slottable.get_utilization(nowtime)
- if not util.has_key(VMResourceReservation):
- cpuutil = 0.0
- else:
- cpuutil = util[VMResourceReservation]
- get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)
- get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
-
- def register_handler(self, type, on_start, on_end):
- handler = ReservationEventHandler(on_start=on_start, on_end=on_end)
- self.handlers[type] = handler
-
- def enqueue(self, lease_req):
- """Queues a best-effort lease request"""
- get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
- lease_req.state = Lease.STATE_QUEUED
- self.queue.enqueue(lease_req)
- self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
-
- def request_lease(self, lease):
- """
- Request a lease. At this point, it is simply marked as "Pending" and,
- next time the scheduling function is called, the fate of the
- lease will be determined (right now, AR+IM leases get scheduled
- right away, and best-effort leases get placed on a queue)
- """
- lease.state = Lease.STATE_PENDING
- self.leases.add(lease)
-
- def is_queue_empty(self):
- """Return True is the queue is empty, False otherwise"""
- return self.queue.is_empty()
-
-
- def exists_scheduled_leases(self):
- """Return True if there are any leases scheduled in the future"""
- return not self.slottable.is_empty()
-
- def cancel_lease(self, lease_id):
- """Cancels a lease.
-
- Arguments:
- lease_id -- ID of lease to cancel
- """
- time = get_clock().get_time()
-
- self.logger.info("Cancelling lease %i..." % lease_id)
- if self.leases.has_lease(lease_id):
- # The lease is either running, or scheduled to run
- lease = self.leases.get_lease(lease_id)
-
- if lease.state == Lease.STATE_ACTIVE:
- self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
- rr = lease.get_active_reservations(time)[0]
- if isinstance(rr, VMResourceReservation):
- self._handle_unscheduled_end_vm(lease, rr, enact=True)
- # TODO: Handle cancelations in middle of suspensions and
- # resumptions
- elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
- self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
- rrs = lease.get_scheduled_reservations()
- for r in rrs:
- lease.remove_rr(r)
- self.slottable.removeReservation(r)
- lease.state = Lease.STATE_CANCELLED
- self.completedleases.add(lease)
- self.leases.remove(lease)
- elif self.queue.has_lease(lease_id):
- # The lease is in the queue, waiting to be scheduled.
- # Cancelling is as simple as removing it from the queue
- self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
- l = self.queue.get_lease(lease_id)
- self.queue.remove_lease(lease)
-
- def fail_lease(self, lease_id):
- """Transitions a lease to a failed state, and does any necessary cleaning up
-
- TODO: For now, just use the cancelling algorithm
-
- Arguments:
- lease -- Lease to fail
- """
- try:
- raise
- self.cancel_lease(lease_id)
- except Exception, msg:
- # Exit if something goes horribly wrong
- raise CriticalSchedException()
-
- def notify_event(self, lease_id, event):
- time = get_clock().get_time()
- if event == constants.EVENT_END_VM:
- lease = self.leases.get_lease(lease_id)
- vmrr = lease.get_last_vmrr()
- self._handle_unscheduled_end_vm(lease, vmrr, enact=False)
-
-
- def __process_ar_request(self, lease_req, nexttime):
- self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested))
- self.logger.debug(" Start : %s" % lease_req.start)
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
-
- accepted = False
- try:
- self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
- accepted = True
- except SchedException, msg:
- # Our first try avoided preemption, try again
- # without avoiding preemption.
- # TODO: Roll this into the exact slot fitting algorithm
- try:
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
- self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
- self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
- accepted = True
- except SchedException, msg:
- get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-
- if accepted:
- self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
- else:
- self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
- lease_req.state = Lease.STATE_REJECTED
- self.completedleases.add(lease_req)
- self.leases.remove(lease_req)
-
-
- def __process_queue(self, nexttime):
- done = False
- newqueue = ds.Queue(self)
- while not done and not self.is_queue_empty():
- if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
- self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
- done = True
- else:
- lease_req = self.queue.dequeue()
- try:
- self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id)
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
- self.__schedule_besteffort_lease(lease_req, nexttime)
- self.leases.add(lease_req)
- get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
- except SchedException, msg:
- # Put back on queue
- newqueue.enqueue(lease_req)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
- self.logger.info("Lease %i could not be scheduled at this time." % lease_req.id)
- if not self.is_backfilling():
- done = True
-
- for lease in self.queue:
- newqueue.enqueue(lease)
-
- self.queue = newqueue
-
-
- def __process_im_request(self, lease_req, nexttime):
- self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
- self.logger.debug(" Duration: %s" % lease_req.duration)
- self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
-
- try:
- self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
- self.leases.add(lease_req)
- get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
- self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
- except SchedException, msg:
- get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
- self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-
-
- def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
- try:
- (vmrr, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-
- if len(preemptions) > 0:
- leases = self.__find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
- self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
- for lease in leases:
- self.__preempt(lease, preemption_time=vmrr.start)
-
- # Schedule deployment overhead
- self.deployment_scheduler.schedule(lease_req, vmrr, nexttime)
-
- # Commit reservation to slot table
- # (we don't do this until the very end because the deployment overhead
- # scheduling could still throw an exception)
- lease_req.append_vmrr(vmrr)
- self.slottable.addReservation(vmrr)
-
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
- except Exception, msg:
- raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
-
-
- def __schedule_besteffort_lease(self, lease, nexttime):
- try:
- # Schedule the VMs
- canreserve = self.__can_reserve_besteffort_in_future()
- (vmrr, in_future) = self.__fit_asap(lease, nexttime, allow_reservation_in_future = canreserve)
-
- # Schedule deployment
- if lease.state != Lease.STATE_SUSPENDED:
- self.deployment_scheduler.schedule(lease, vmrr, nexttime)
- else:
- self.__schedule_migration(lease, vmrr, nexttime)
-
- # At this point, the lease is feasible.
- # Commit changes by adding RRs to lease and to slot table
-
- # Add VMRR to lease
- lease.append_vmrr(vmrr)
-
-
- # Add resource reservations to slottable
-
- # TODO: deployment RRs should be added here, not in the preparation scheduler
-
- # Pre-VM RRs (if any)
- for rr in vmrr.pre_rrs:
- self.slottable.addReservation(rr)
-
- # VM
- self.slottable.addReservation(vmrr)
-
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- if in_future:
- self.numbesteffortres += 1
-
- lease.print_contents()
-
- except SchedException, msg:
- raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
-
-
-
-
- def __schedule_immediate_lease(self, req, nexttime):
- try:
- (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
- # Schedule deployment
- self.deployment_scheduler.schedule(req, vmrr, nexttime)
-
- req.append_rr(vmrr)
- self.slottable.addReservation(vmrr)
-
- # Post-VM RRs (if any)
- for rr in vmrr.post_rrs:
- self.slottable.addReservation(rr)
-
- req.print_contents()
- except SlotFittingException, msg:
- raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
-
- def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
- lease_id = leasereq.id
- start = leasereq.start.requested
- end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
- diskImageID = leasereq.diskimage_id
- numnodes = leasereq.numnodes
- resreq = leasereq.requested_resources
-
- availabilitywindow = self.slottable.availabilitywindow
-
- availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
- availabilitywindow.printContents(withpreemption = False)
- availabilitywindow.printContents(withpreemption = True)
-
- mustpreempt = False
- unfeasiblewithoutpreemption = False
-
- fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
- if fitatstart < numnodes:
- if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- unfeasiblewithoutpreemption = True
- feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
- fitatend = sum([n for n in canfitnopreempt.values()])
- if fitatend < numnodes:
- if not canpreempt:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- unfeasiblewithoutpreemption = True
-
- canfitpreempt = None
- if canpreempt:
- fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
- if fitatstart < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
- feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
- fitatend = sum([n for n in canfitpreempt.values()])
- if fitatend < numnodes:
- raise SlotFittingException, "Not enough resources in specified interval"
- else:
- if unfeasiblewithoutpreemption:
- mustpreempt = True
- else:
- mustpreempt = False
-
- # At this point we know if the lease is feasible, and if
- # will require preemption.
- if not mustpreempt:
- self.logger.debug("The VM reservations for this lease are feasible without preemption.")
- else:
- self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
-
- # merge canfitnopreempt and canfitpreempt
- canfit = {}
- for node in canfitnopreempt:
- vnodes = canfitnopreempt[node]
- canfit[node] = [vnodes, vnodes]
- for node in canfitpreempt:
- vnodes = canfitpreempt[node]
- if canfit.has_key(node):
- canfit[node][1] = vnodes
- else:
- canfit[node] = [0, vnodes]
-
- orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
-
- self.logger.debug("Node ordering: %s" % orderednodes)
-
- # vnode -> pnode
- nodeassignment = {}
-
- # pnode -> resourcetuple
- res = {}
-
- # physnode -> how many vnodes
- preemptions = {}
-
- vnode = 1
- if avoidpreempt:
- # First pass, without preemption
- for physnode in orderednodes:
- canfitinnode = canfit[physnode][0]
- for i in range(1, canfitinnode+1):
- nodeassignment[vnode] = physnode
- if res.has_key(physnode):
- res[physnode].incr(resreq)
- else:
- res[physnode] = ds.ResourceTuple.copy(resreq)
- canfit[physnode][0] -= 1
- canfit[physnode][1] -= 1
- vnode += 1
- if vnode > numnodes:
- break
- if vnode > numnodes:
- break
-
- # Second pass, with preemption
- if mustpreempt or not avoidpreempt:
- for physnode in orderednodes:
- canfitinnode = canfit[physnode][1]
- for i in range(1, canfitinnode+1):
- nodeassignment[vnode] = physnode
- if res.has_key(physnode):
- res[physnode].incr(resreq)
- else:
- res[physnode] = ds.ResourceTuple.copy(resreq)
- canfit[physnode][1] -= 1
- vnode += 1
- # Check if this will actually result in a preemption
- if canfit[physnode][0] == 0:
- if preemptions.has_key(physnode):
- preemptions[physnode].incr(resreq)
- else:
- preemptions[physnode] = ds.ResourceTuple.copy(resreq)
- else:
- canfit[physnode][0] -= 1
- if vnode > numnodes:
- break
- if vnode > numnodes:
- break
-
- if vnode <= numnodes:
- raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
-
- # Create VM resource reservations
- vmrr = ds.VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
- vmrr.state = ResourceReservation.STATE_SCHEDULED
-
- self.__schedule_shutdown(vmrr)
-
- return vmrr, preemptions
-
- def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
- lease_id = lease.id
- remaining_duration = lease.duration.get_remaining_duration()
- numnodes = lease.numnodes
- requested_resources = lease.requested_resources
- preemptible = lease.preemptible
- mustresume = (lease.state == Lease.STATE_SUSPENDED)
- shutdown_time = self.__estimate_shutdown_time(lease)
- susptype = get_config().get("suspension")
- if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
- suspendable = False
- else:
- suspendable = True
-
- # Determine earliest start time in each node
- if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
- # Figure out earliest start times based on
- # image schedule and reusable images
- earliest = self.deployment_scheduler.find_earliest_starting_times(lease, nexttime)
- elif lease.state == Lease.STATE_SUSPENDED:
- # No need to transfer images from repository
- # (only intra-node transfer)
- earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
-
-
- canmigrate = get_config().get("migration")
-
- #
- # STEP 1: FIGURE OUT THE MINIMUM DURATION
- #
-
- min_duration = self.__compute_scheduling_threshold(lease)
-
-
- #
- # STEP 2: FIND THE CHANGEPOINTS
- #
-
- # Find the changepoints, and the nodes we can use at each changepoint
- # Nodes may not be available at a changepoint because images
- # cannot be transferred at that time.
- if not mustresume:
- cps = [(node, e[0]) for node, e in earliest.items()]
- cps.sort(key=itemgetter(1))
- curcp = None
- changepoints = []
- nodes = []
- for node, time in cps:
- nodes.append(node)
- if time != curcp:
- changepoints.append([time, nodes[:]])
- curcp = time
- else:
- changepoints[-1][1] = nodes[:]
- else:
- if not canmigrate:
- vmrr = lease.get_last_vmrr()
- curnodes = set(vmrr.nodes.values())
- else:
- curnodes=None
- # If we have to resume this lease, make sure that
- # we have enough time to transfer the images.
- migratetime = self.__estimate_migration_time(lease)
- earliesttransfer = get_clock().get_time() + migratetime
-
- for n in earliest:
- earliest[n][0] = max(earliest[n][0], earliesttransfer)
-
- changepoints = list(set([x[0] for x in earliest.values()]))
- changepoints.sort()
- changepoints = [(x, curnodes) for x in changepoints]
-
- # If we can make reservations in the future,
- # we also consider future changepoints
- # (otherwise, we only allow the VMs to start "now", accounting
- # for the fact that vm images will have to be deployed)
- if allow_reservation_in_future:
- res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
- futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
- # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
- # included in futurecp.
- futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
- futurecp = [(p,None) for p in futurecp]
- else:
- futurecp = []
-
-
-
- #
- # STEP 3: SLOT FITTING
- #
-
- # If resuming, we also have to allocate enough for the resumption
- if mustresume:
- duration = remaining_duration + self.__estimate_resume_time(lease)
- else:
- duration = remaining_duration
-
- duration += shutdown_time
-
- # First, assuming we can't make reservations in the future
- start, end, canfit = self.__find_fit_at_points(
- changepoints,
- numnodes,
- requested_resources,
- duration,
- suspendable,
- min_duration)
-
- if start == None:
- if not allow_reservation_in_future:
- # We did not find a suitable starting time. This can happen
- # if we're unable to make future reservations
- raise SchedException, "Could not find enough resources for this request"
- else:
- mustsuspend = (end - start) < duration
- if mustsuspend and not suspendable:
- if not allow_reservation_in_future:
- raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
- else:
- start = None # No satisfactory start time
-
- # If we haven't been able to fit the lease, check if we can
- # reserve it in the future
- if start == None and allow_reservation_in_future:
- start, end, canfit = self.__find_fit_at_points(
- futurecp,
- numnodes,
- requested_resources,
- duration,
- suspendable,
- min_duration
- )
-
-
- if start in [p[0] for p in futurecp]:
- reservation = True
- else:
- reservation = False
-
-
- #
- # STEP 4: FINAL SLOT FITTING
- #
- # At this point, we know the lease fits, but we have to map it to
- # specific physical nodes.
-
- # Sort physical nodes
- physnodes = canfit.keys()
- if mustresume:
- # If we're resuming, we prefer resuming in the nodes we're already
- # deployed in, to minimize the number of transfers.
- vmrr = lease.get_last_vmrr()
- nodes = set(vmrr.nodes.values())
- availnodes = set(physnodes)
- deplnodes = availnodes.intersection(nodes)
- notdeplnodes = availnodes.difference(nodes)
- physnodes = list(deplnodes) + list(notdeplnodes)
- else:
- physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-
- # Map to physical nodes
- mappings = {}
- res = {}
- vmnode = 1
- while vmnode <= numnodes:
- for n in physnodes:
- if canfit[n]>0:
- canfit[n] -= 1
- mappings[vmnode] = n
- if res.has_key(n):
- res[n].incr(requested_resources)
- else:
- res[n] = ds.ResourceTuple.copy(requested_resources)
- vmnode += 1
- break
-
-
- vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, reservation)
- vmrr.state = ResourceReservation.STATE_SCHEDULED
-
- if mustresume:
- self.__schedule_resumption(vmrr, start)
-
- mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
- if mustsuspend:
- self.__schedule_suspension(vmrr, end)
- else:
- # Compensate for any overestimation
- if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
- vmrr.end = vmrr.start + remaining_duration + shutdown_time
- self.__schedule_shutdown(vmrr)
-
-
-
- susp_str = res_str = ""
- if mustresume:
- res_str = " (resuming)"
- if mustsuspend:
- susp_str = " (suspending)"
- self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
-
- return vmrr, reservation
-
- def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
- start = None
- end = None
- canfit = None
- availabilitywindow = self.slottable.availabilitywindow
-
-
- for p in changepoints:
- availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
- availabilitywindow.printContents()
-
- if availabilitywindow.fitAtStart() >= numnodes:
- start=p[0]
- maxend = start + duration
- end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-
- self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-
- if end < maxend:
- self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-
- if not suspendable:
- pass
- # If we can't suspend, this fit is no good, and we have to keep looking
- else:
- # If we can suspend, we still have to check if the lease will
- # be able to run for the specified minimum duration
- if end-start > min_duration:
- break # We found a fit; stop looking
- else:
- self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
- # Set start back to None, to indicate that we haven't
- # found a satisfactory start time
- start = None
- else:
- # We've found a satisfactory starting time
- break
-
- return start, end, canfit
-
- def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
- times = [] # (start, end, {pnode -> vnodes})
- enactment_overhead = get_config().get("enactment-overhead")
-
- if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
- # Global exclusion (which represents, e.g., reading/writing the memory image files
- # from a global file system) meaning no two suspensions/resumptions can happen at
- # the same time in the entire resource pool.
-
- t = time
- t_prev = None
-
- for (vnode,pnode) in vmrr.nodes.items():
- if override == None:
- mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
- op_time = self.__compute_suspend_resume_time(mem, rate)
- else:
- op_time = override
-
- op_time += enactment_overhead
-
- t_prev = t
-
- if direction == constants.DIRECTION_FORWARD:
- t += op_time
- times.append((t_prev, t, {pnode:[vnode]}))
- elif direction == constants.DIRECTION_BACKWARD:
- t -= op_time
- times.append((t, t_prev, {pnode:[vnode]}))
-
- elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
- # Local exclusion (which represents, e.g., reading the memory image files
- # from a local file system) means no two resumptions can happen at the same
- # time in the same physical node.
- pervnode_times = [] # (start, end, vnode)
- vnodes_in_pnode = {}
- for (vnode,pnode) in vmrr.nodes.items():
- vnodes_in_pnode.setdefault(pnode, []).append(vnode)
- for pnode in vnodes_in_pnode:
- t = time
- t_prev = None
- for vnode in vnodes_in_pnode[pnode]:
- if override == None:
- mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
- op_time = self.__compute_suspend_resume_time(mem, rate)
- else:
- op_time = override
-
- t_prev = t
-
- if direction == constants.DIRECTION_FORWARD:
- t += op_time
- pervnode_times.append((t_prev, t, vnode))
- elif direction == constants.DIRECTION_BACKWARD:
- t -= op_time
- pervnode_times.append((t, t_prev, vnode))
-
- # Consolidate suspend/resume operations happening at the same time
- uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
- for (start, end) in uniq_times:
- vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
- node_mappings = {}
- for vnode in vnodes:
- pnode = vmrr.nodes[vnode]
- node_mappings.setdefault(pnode, []).append(vnode)
- times.append([start,end,node_mappings])
-
- # Add the enactment overhead
- for t in times:
- num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
- overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
- if direction == constants.DIRECTION_FORWARD:
- t[1] += overhead
- elif direction == constants.DIRECTION_BACKWARD:
- t[0] -= overhead
-
- # Fix overlaps
- if direction == constants.DIRECTION_FORWARD:
- times.sort(key=itemgetter(0))
- elif direction == constants.DIRECTION_BACKWARD:
- times.sort(key=itemgetter(1))
- times.reverse()
-
- prev_start = None
- prev_end = None
- for t in times:
- if prev_start != None:
- start = t[0]
- end = t[1]
- if direction == constants.DIRECTION_FORWARD:
- if start < prev_end:
- diff = prev_end - start
- t[0] += diff
- t[1] += diff
- elif direction == constants.DIRECTION_BACKWARD:
- if end > prev_start:
- diff = end - prev_start
- t[0] -= diff
- t[1] -= diff
- prev_start = t[0]
- prev_end = t[1]
-
- return times
-
- def __schedule_shutdown(self, vmrr):
- config = get_config()
- shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
-
- start = vmrr.end - shutdown_time
- end = vmrr.end
-
- shutdown_rr = ds.ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
- shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
-
- vmrr.update_end(start)
-
- # If there are any post RRs, remove them
- for rr in vmrr.post_rrs:
- self.slottable.removeReservation(rr)
- vmrr.post_rrs = []
-
- vmrr.post_rrs.append(shutdown_rr)
-
- def __schedule_suspension(self, vmrr, suspend_by):
- from haizea.resourcemanager.rm import ResourceManager
- config = ResourceManager.get_singleton().config
- susp_exclusion = config.get("suspendresume-exclusion")
- override = get_config().get("override-suspend-time")
- rate = config.get("suspend-rate")
-
- if suspend_by < vmrr.start or suspend_by > vmrr.end:
- raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
-
- times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
- suspend_rrs = []
- for (start, end, node_mappings) in times:
- suspres = {}
- all_vnodes = []
- for (pnode,vnodes) in node_mappings.items():
- num_vnodes = len(vnodes)
- r = ds.ResourceTuple.create_empty()
- mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
- r.set_by_type(constants.RES_MEM, mem * num_vnodes)
- r.set_by_type(constants.RES_DISK, mem * num_vnodes)
- suspres[pnode] = r
- all_vnodes += vnodes
- susprr = ds.SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
- susprr.state = ResourceReservation.STATE_SCHEDULED
- suspend_rrs.append(susprr)
-
- suspend_rrs.sort(key=attrgetter("start"))
-
- susp_start = suspend_rrs[0].start
- if susp_start < vmrr.start:
- raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
-
- vmrr.update_end(susp_start)
-
- # If there are any post RRs, remove them
- for rr in vmrr.post_rrs:
- self.slottable.removeReservation(rr)
- vmrr.post_rrs = []
-
- for susprr in suspend_rrs:
- vmrr.post_rrs.append(susprr)
-
- def __schedule_resumption(self, vmrr, resume_at):
- from haizea.resourcemanager.rm import ResourceManager
- config = ResourceManager.get_singleton().config
- resm_exclusion = config.get("suspendresume-exclusion")
- override = get_config().get("override-resume-time")
- rate = config.get("resume-rate")
-
- if resume_at < vmrr.start or resume_at > vmrr.end:
- raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
-
- times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
- resume_rrs = []
- for (start, end, node_mappings) in times:
- resmres = {}
- all_vnodes = []
- for (pnode,vnodes) in node_mappings.items():
- num_vnodes = len(vnodes)
- r = ds.ResourceTuple.create_empty()
- mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
- r.set_by_type(constants.RES_MEM, mem * num_vnodes)
- r.set_by_type(constants.RES_DISK, mem * num_vnodes)
- resmres[pnode] = r
- all_vnodes += vnodes
- resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
- resmrr.state = ResourceReservation.STATE_SCHEDULED
- resume_rrs.append(resmrr)
-
- resume_rrs.sort(key=attrgetter("start"))
-
- resm_end = resume_rrs[-1].end
- if resm_end > vmrr.end:
- raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
-
- vmrr.update_start(resm_end)
- for resmrr in resume_rrs:
- vmrr.pre_rrs.append(resmrr)
-
-
- # TODO: This has to be tied in with the preparation scheduler
- def __schedule_migration(self, lease, vmrr, nexttime):
- last_vmrr = lease.get_last_vmrr()
- vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
-
- mustmigrate = False
- for vnode in vnode_migrations:
- if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
- mustmigrate = True
- break
-
- if not mustmigrate:
- return
-
- # Figure out what migrations can be done simultaneously
- migrations = []
- while len(vnode_migrations) > 0:
- pnodes = set()
- migration = {}
- for vnode in vnode_migrations:
- origin = vnode_migrations[vnode][0]
- dest = vnode_migrations[vnode][1]
- if not origin in pnodes and not dest in pnodes:
- migration[vnode] = vnode_migrations[vnode]
- pnodes.add(origin)
- pnodes.add(dest)
- for vnode in migration:
- del vnode_migrations[vnode]
- migrations.append(migration)
-
- # Create migration RRs
- start = last_vmrr.post_rrs[-1].end
- migr_time = self.__estimate_migration_time(lease)
- bandwidth = self.resourcepool.info.get_migration_bandwidth()
- migr_rrs = []
- for m in migrations:
- end = start + migr_time
- res = {}
- for (origin,dest) in m.values():
- resorigin = ds.ResourceTuple.create_empty()
- resorigin.set_by_type(constants.RES_NETOUT, bandwidth)
- resdest = ds.ResourceTuple.create_empty()
- resdest.set_by_type(constants.RES_NETIN, bandwidth)
- res[origin] = resorigin
- res[dest] = resdest
- migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
- migr_rr.state = ResourceReservation.STATE_SCHEDULED
- migr_rrs.append(migr_rr)
- start = end
-
- migr_rrs.reverse()
- for migr_rr in migr_rrs:
- vmrr.pre_rrs.insert(0, migr_rr)
-
- def __compute_suspend_resume_time(self, mem, rate):
- time = float(mem) / rate
- time = round_datetime_delta(TimeDelta(seconds = time))
- return time
-
- def __estimate_suspend_resume_time(self, lease, rate):
- susp_exclusion = get_config().get("suspendresume-exclusion")
- enactment_overhead = get_config().get("enactment-overhead")
- mem = lease.requested_resources.get_by_type(constants.RES_MEM)
- if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
- return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
- elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
- # Overestimating
- return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
-
- def __estimate_shutdown_time(self, lease):
- enactment_overhead = get_config().get("enactment-overhead").seconds
- return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
-
- def __estimate_suspend_time(self, lease):
- rate = get_config().get("suspend-rate")
- override = get_config().get("override-suspend-time")
- if override != None:
- return override
- else:
- return self.__estimate_suspend_resume_time(lease, rate)
-
- def __estimate_resume_time(self, lease):
- rate = get_config().get("resume-rate")
- override = get_config().get("override-resume-time")
- if override != None:
- return override
- else:
- return self.__estimate_suspend_resume_time(lease, rate)
-
-
- def __estimate_migration_time(self, lease):
- whattomigrate = get_config().get("what-to-migrate")
- bandwidth = self.resourcepool.info.get_migration_bandwidth()
- if whattomigrate == constants.MIGRATE_NONE:
- return TimeDelta(seconds=0)
- else:
- if whattomigrate == constants.MIGRATE_MEM:
- mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
- elif whattomigrate == constants.MIGRATE_MEMDISK:
- mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
- return estimate_transfer_time(mbtotransfer, bandwidth)
-
- # TODO: Take into account other things like boot overhead, migration overhead, etc.
- def __compute_scheduling_threshold(self, lease):
- from haizea.resourcemanager.rm import ResourceManager
- config = ResourceManager.get_singleton().config
- threshold = config.get("force-scheduling-threshold")
- if threshold != None:
- # If there is a hard-coded threshold, use that
- return threshold
- else:
- factor = config.get("scheduling-threshold-factor")
- susp_overhead = self.__estimate_suspend_time(lease)
- safe_duration = susp_overhead
-
- if lease.state == Lease.STATE_SUSPENDED:
- resm_overhead = self.__estimate_resume_time(lease)
- safe_duration += resm_overhead
-
- # TODO: Incorporate other overheads into the minimum duration
- min_duration = safe_duration
-
- # At the very least, we want to allocate enough time for the
- # safe duration (otherwise, we'll end up with incorrect schedules,
- # where a lease is scheduled to suspend, but isn't even allocated
- # enough time to suspend).
- # The factor is assumed to be non-negative. i.e., a factor of 0
- # means we only allocate enough time for potential suspend/resume
- # operations, while a factor of 1 means the lease will get as much
- # running time as spend on the runtime overheads involved in setting
- # it up
- threshold = safe_duration + (min_duration * factor)
- return threshold
-
- def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
- # TODO2: Choose appropriate prioritizing function based on a
- # config file, instead of hardcoding it)
- #
- # TODO3: Basing decisions only on CPU allocations. This is ok for now,
- # since the memory allocation is proportional to the CPU allocation.
- # Later on we need to come up with some sort of weighed average.
-
- nodes = canfit.keys()
-
- # TODO: The deployment module should just provide a list of nodes
- # it prefers
- nodeswithimg=[]
- #self.lease_deployment_type = get_config().get("lease-preparation")
- #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- # reusealg = get_config().get("diskimage-reuse")
- # if reusealg==constants.REUSE_IMAGECACHES:
- # nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
-
- # Compares node x and node y.
- # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
- def comparenodes(x, y):
- hasimgX = x in nodeswithimg
- hasimgY = y in nodeswithimg
-
- # First comparison: A node with no preemptible VMs is preferible
- # to one with preemptible VMs (i.e. we want to avoid preempting)
- canfitnopreemptionX = canfit[x][0]
- canfitpreemptionX = canfit[x][1]
- hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
-
- canfitnopreemptionY = canfit[y][0]
- canfitpreemptionY = canfit[y][1]
- hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
-
- # TODO: Factor out common code
- if avoidpreempt:
- if hasPreemptibleX and not hasPreemptibleY:
- return constants.WORSE
- elif not hasPreemptibleX and hasPreemptibleY:
- return constants.BETTER
- elif not hasPreemptibleX and not hasPreemptibleY:
- if hasimgX and not hasimgY:
- return constants.BETTER
- elif not hasimgX and hasimgY:
- return constants.WORSE
- else:
- if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
- elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
- else: return constants.EQUAL
- elif hasPreemptibleX and hasPreemptibleY:
- # If both have (some) preemptible resources, we prefer those
- # that involve the less preemptions
- preemptX = canfitpreemptionX - canfitnopreemptionX
- preemptY = canfitpreemptionY - canfitnopreemptionY
- if preemptX < preemptY:
- return constants.BETTER
- elif preemptX > preemptY:
- return constants.WORSE
- else:
- if hasimgX and not hasimgY: return constants.BETTER
- elif not hasimgX and hasimgY: return constants.WORSE
- else: return constants.EQUAL
- elif not avoidpreempt:
- # First criteria: Can we reuse image?
- if hasimgX and not hasimgY:
- return constants.BETTER
- elif not hasimgX and hasimgY:
- return constants.WORSE
- else:
- # Now we just want to avoid preemption
- if hasPreemptibleX and not hasPreemptibleY:
- return constants.WORSE
- elif not hasPreemptibleX and hasPreemptibleY:
- return constants.BETTER
- elif hasPreemptibleX and hasPreemptibleY:
- # If both have (some) preemptible resources, we prefer those
- # that involve the less preemptions
- preemptX = canfitpreemptionX - canfitnopreemptionX
- preemptY = canfitpreemptionY - canfitnopreemptionY
- if preemptX < preemptY:
- return constants.BETTER
- elif preemptX > preemptY:
- return constants.WORSE
- else:
- if hasimgX and not hasimgY: return constants.BETTER
- elif not hasimgX and hasimgY: return constants.WORSE
- else: return constants.EQUAL
- else:
- return constants.EQUAL
-
- # Order nodes
- nodes.sort(comparenodes)
- return nodes
-
- def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
- def comparepreemptability(rrX, rrY):
- if rrX.lease.submit_time > rrY.lease.submit_time:
- return constants.BETTER
- elif rrX.lease.submit_time < rrY.lease.submit_time:
- return constants.WORSE
- else:
- return constants.EQUAL
-
- def preemptedEnough(amountToPreempt):
- for node in amountToPreempt:
- if not amountToPreempt[node].is_zero_or_less():
- return False
- return True
-
- # Get allocations at the specified time
- atstart = set()
- atmiddle = set()
- nodes = set(mustpreempt.keys())
-
- reservationsAtStart = self.slottable.getReservationsAt(startTime)
- reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
- reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
- and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-
- reservationsAtStart.sort(comparepreemptability)
- reservationsAtMiddle.sort(comparepreemptability)
-
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-
- # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
- for r in reservationsAtStart:
- # The following will really only come into play when we have
- # multiple VMs per node
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- # Don't need to preempt if we've already preempted all
- # the needed resources in node n
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atstart.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
- if len(reservationsAtMiddle)>0:
- changepoints = set()
- for r in reservationsAtMiddle:
- changepoints.add(r.start)
- changepoints = list(changepoints)
- changepoints.sort()
-
- for cp in changepoints:
- amountToPreempt = {}
- for n in mustpreempt:
- amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
- reservations = [r for r in reservationsAtMiddle
- if r.start <= cp and cp < r.end]
- for r in reservations:
- mustpreemptres = False
- for n in r.resources_in_pnode.keys():
- if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
- amountToPreempt[n].decr(r.resources_in_pnode[n])
- mustpreemptres = True
- if mustpreemptres:
- atmiddle.add(r)
- if preemptedEnough(amountToPreempt):
- break
-
- self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
- self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-
- leases = [r.lease for r in atstart|atmiddle]
-
- return leases
-
- def __preempt(self, lease, preemption_time):
-
- self.logger.info("Preempting lease #%i..." % (lease.id))
- self.logger.vdebug("Lease before preemption:")
- lease.print_contents()
- vmrr = lease.get_last_vmrr()
-
- if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
- self.logger.debug("Lease was set to start in the middle of the preempting lease.")
- must_cancel_and_requeue = True
- else:
- susptype = get_config().get("suspension")
- if susptype == constants.SUSPENSION_NONE:
- must_cancel_and_requeue = True
- else:
- time_until_suspend = preemption_time - vmrr.start
- min_duration = self.__compute_scheduling_threshold(lease)
- can_suspend = time_until_suspend >= min_duration
- if not can_suspend:
- self.logger.debug("Suspending the lease does not meet scheduling threshold.")
- must_cancel_and_requeue = True
- else:
- if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
- self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
- must_cancel_and_requeue = True
- else:
- self.logger.debug("Lease can be suspended")
- must_cancel_and_requeue = False
-
- if must_cancel_and_requeue:
- self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
- if vmrr.backfill_reservation == True:
- self.numbesteffortres -= 1
- # If there are any post RRs, remove them
- for rr in vmrr.post_rrs:
- self.slottable.removeReservation(rr)
- lease.remove_vmrr(vmrr)
- self.slottable.removeReservation(vmrr)
- for vnode, pnode in lease.diskimagemap.items():
- self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
- self.deployment_scheduler.cancel_deployment(lease)
- lease.diskimagemap = {}
- lease.state = Lease.STATE_QUEUED
- self.__enqueue_in_order(lease)
- get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
- else:
- self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
- # Save original start and end time of the vmrr
- old_start = vmrr.start
- old_end = vmrr.end
- self.__schedule_suspension(vmrr, preemption_time)
- self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
- for susprr in vmrr.post_rrs:
- self.slottable.addReservation(susprr)
-
-
- self.logger.vdebug("Lease after preemption:")
- lease.print_contents()
-
- def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
- self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime))
- leases = []
- vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
- leases = set([rr.lease for rr in vmrrs])
- leases = [l for l in leases if isinstance(l, ds.BestEffortLease) and l.state in (Lease.STATE_SUSPENDED,Lease.STATE_READY) and not l in checkedleases]
- for lease in leases:
- self.logger.debug("Found lease %i" % l.id)
- l.print_contents()
- # Earliest time can't be earlier than time when images will be
- # available in node
- earliest = max(nexttime, lease.imagesavail)
- self.__slideback(lease, earliest)
- checkedleases.append(l)
- #for l in leases:
- # vmrr, susprr = l.getLastVMRR()
- # self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
-
- def __slideback(self, lease, earliest):
- vmrr = lease.get_last_vmrr()
- # Save original start and end time of the vmrr
- old_start = vmrr.start
- old_end = vmrr.end
- nodes = vmrr.nodes.values()
- if lease.state == Lease.STATE_SUSPENDED:
- originalstart = vmrr.pre_rrs[0].start
- else:
- originalstart = vmrr.start
- cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
- cp = [earliest] + cp
- newstart = None
- for p in cp:
- self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
- self.slottable.availabilitywindow.printContents()
- if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
- (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
- if end == originalstart and set(nodes) <= set(canfit.keys()):
- self.logger.debug("Can slide back to %s" % p)
- newstart = p
- break
- if newstart == None:
- # Can't slide back. Leave as is.
- pass
- else:
- diff = originalstart - newstart
- if lease.state == Lease.STATE_SUSPENDED:
- resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ds.ResumptionResourceReservation)]
- for resmrr in resmrrs:
- resmrr_old_start = resmrr.start
- resmrr_old_end = resmrr.end
- resmrr.start -= diff
- resmrr.end -= diff
- self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
- vmrr.update_start(vmrr.start - diff)
-
- # If the lease was going to be suspended, check to see if
- # we don't need to suspend any more.
- remdur = lease.duration.get_remaining_duration()
- if vmrr.is_suspending() and vmrr.end - newstart >= remdur:
- vmrr.update_end(vmrr.start + remdur)
- for susprr in vmrr.post_rrs:
- self.slottable.removeReservation(susprr)
- vmrr.post_rrs = []
- else:
- vmrr.update_end(vmrr.end - diff)
-
- if not vmrr.is_suspending():
- # If the VM was set to shutdown, we need to slideback the shutdown RRs
- for rr in vmrr.post_rrs:
- rr_old_start = rr.start
- rr_old_end = rr.end
- rr.start -= diff
- rr.end -= diff
- self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
-
- self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
- self.logger.vdebug("New lease descriptor (after slideback):")
- lease.print_contents()
-
-
-
- #-------------------------------------------------------------------#
- # #
- # SLOT TABLE EVENT HANDLERS #
- # #
- #-------------------------------------------------------------------#
-
- def _handle_start_vm(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
- l.print_contents()
- if l.state == Lease.STATE_READY:
- l.state = Lease.STATE_ACTIVE
- rr.state = ResourceReservation.STATE_ACTIVE
- now_time = get_clock().get_time()
- l.start.actual = now_time
-
- try:
- self.deployment_scheduler.check(l, rr)
- self.resourcepool.start_vms(l, rr)
- # The next two lines have to be moved somewhere more
- # appropriate inside the resourcepool module
- for (vnode, pnode) in rr.nodes.items():
- l.diskimagemap[vnode] = pnode
- except Exception, e:
- self.logger.error("ERROR when starting VMs.")
- raise
- elif l.state == Lease.STATE_RESUMED_READY:
- l.state = Lease.STATE_ACTIVE
- rr.state = ResourceReservation.STATE_ACTIVE
- # No enactment to do here, since all the suspend/resume actions are
- # handled during the suspend/resume RRs
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
- self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
-
-
- def _handle_end_vm(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
- self.logger.vdebug("LEASE-%i Before:" % l.id)
- l.print_contents()
- now_time = round_datetime(get_clock().get_time())
- diff = now_time - rr.start
- l.duration.accumulate_duration(diff)
- rr.state = ResourceReservation.STATE_DONE
-
- if isinstance(l, ds.BestEffortLease):
- if rr.backfill_reservation == True:
- self.numbesteffortres -= 1
-
- self.logger.vdebug("LEASE-%i After:" % l.id)
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
- self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
-
- def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
- self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
- self._handle_end_rr(l, vmrr)
- for rr in vmrr.post_rrs:
- self.slottable.removeReservation(rr)
- vmrr.post_rrs = []
- # TODO: slideback shutdown RRs
- vmrr.end = get_clock().get_time()
- self._handle_end_vm(l, vmrr)
- self._handle_end_lease(l)
- nexttime = get_clock().get_next_schedulable_time()
- if self.is_backfilling():
- # We need to reevaluate the schedule to see if there are any future
- # reservations that we can slide back.
- self.__reevaluate_schedule(l, vmrr.nodes.values(), nexttime, [])
-
- def _handle_start_shutdown(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
- l.print_contents()
- rr.state = ResourceReservation.STATE_ACTIVE
- self.resourcepool.stop_vms(l, rr)
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
-
- def _handle_end_shutdown(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
- l.print_contents()
- rr.state = ResourceReservation.STATE_DONE
- self._handle_end_lease(l)
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
- self.logger.info("Lease %i shutdown." % (l.id))
-
- def _handle_end_lease(self, l):
- l.state = Lease.STATE_DONE
- l.duration.actual = l.duration.accumulated
- l.end = round_datetime(get_clock().get_time())
- self.completedleases.add(l)
- self.leases.remove(l)
- if isinstance(l, ds.BestEffortLease):
- get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
-
-
- def _handle_start_suspend(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
- l.print_contents()
- rr.state = ResourceReservation.STATE_ACTIVE
- self.resourcepool.suspend_vms(l, rr)
- for vnode in rr.vnodes:
- pnode = rr.vmrr.nodes[vnode]
- l.memimagemap[vnode] = pnode
- if rr.is_first():
- l.state = Lease.STATE_SUSPENDING
- l.print_contents()
- self.logger.info("Suspending lease %i..." % (l.id))
- self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
-
- def _handle_end_suspend(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
- l.print_contents()
- # TODO: React to incomplete suspend
- self.resourcepool.verify_suspend(l, rr)
- rr.state = ResourceReservation.STATE_DONE
- if rr.is_last():
- l.state = Lease.STATE_SUSPENDED
- self.__enqueue_in_order(l)
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
- self.logger.info("Lease %i suspended." % (l.id))
-
- def _handle_start_resume(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
- l.print_contents()
- self.resourcepool.resume_vms(l, rr)
- rr.state = ResourceReservation.STATE_ACTIVE
- if rr.is_first():
- l.state = Lease.STATE_RESUMING
- l.print_contents()
- self.logger.info("Resuming lease %i..." % (l.id))
- self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
-
- def _handle_end_resume(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
- l.print_contents()
- # TODO: React to incomplete resume
- self.resourcepool.verify_resume(l, rr)
- rr.state = ResourceReservation.STATE_DONE
- if rr.is_last():
- l.state = Lease.STATE_RESUMED_READY
- self.logger.info("Resumed lease %i" % (l.id))
- for vnode, pnode in rr.vmrr.nodes.items():
- self.resourcepool.remove_ramfile(pnode, l.id, vnode)
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
-
- def _handle_start_migrate(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
- l.print_contents()
- rr.state = ResourceReservation.STATE_ACTIVE
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
- self.logger.info("Migrating lease %i..." % (l.id))
-
- def _handle_end_migrate(self, l, rr):
- self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
- l.print_contents()
-
- for vnode in rr.transfers:
- origin = rr.transfers[vnode][0]
- dest = rr.transfers[vnode][1]
-
- # Update VM image mappings
- self.resourcepool.remove_diskimage(origin, l.id, vnode)
- self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
- l.diskimagemap[vnode] = dest
-
- # Update RAM file mappings
- self.resourcepool.remove_ramfile(origin, l.id, vnode)
- self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
- l.memimagemap[vnode] = dest
-
- rr.state = ResourceReservation.STATE_DONE
- l.print_contents()
- self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
- self.logger.info("Migrated lease %i..." % (l.id))
-
- def _handle_end_rr(self, l, rr):
- self.slottable.removeReservation(rr)
-
- def __enqueue_in_order(self, lease):
- get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
- self.queue.enqueue_in_order(lease)
-
- def __can_reserve_besteffort_in_future(self):
- return self.numbesteffortres < self.maxres
-
- def is_backfilling(self):
- return self.maxres > 0
Deleted: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/slottable.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -1,530 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from mx.DateTime import ISO, TimeDelta
-from operator import attrgetter, itemgetter
-import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
-import bisect
-import copy
-import logging
-
-class SlotFittingException(Exception):
- pass
-
-class CriticalSlotFittingException(Exception):
- pass
-
-
-class Node(object):
- def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
- self.capacity = ds.ResourceTuple.copy(capacity)
- if capacitywithpreemption == None:
- self.capacitywithpreemption = None
- else:
- self.capacitywithpreemption = ds.ResourceTuple.copy(capacitywithpreemption)
- self.resourcepoolnode = resourcepoolnode
-
- @classmethod
- def from_resourcepool_node(cls, node):
- capacity = node.get_capacity()
- return cls(capacity, capacity, node)
-
-class NodeList(object):
- def __init__(self):
- self.nodelist = []
-
- def add(self, node):
- self.nodelist.append(node)
-
- def __getitem__(self, n):
- return self.nodelist[n-1]
-
- def copy(self):
- nodelist = NodeList()
- for n in self.nodelist:
- nodelist.add(Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode))
- return nodelist
-
- def toDict(self):
- nodelist = self.copy()
- return dict([(i+1, v) for i, v in enumerate(nodelist)])
-
-class KeyValueWrapper(object):
- def __init__(self, key, value):
- self.key = key
- self.value = value
-
- def __cmp__(self, other):
- return cmp(self.key, other.key)
-
-class SlotTable(object):
- def __init__(self):
- self.logger = logging.getLogger("SLOT")
- self.nodes = NodeList()
- self.reservations = []
- self.reservationsByStart = []
- self.reservationsByEnd = []
- self.availabilitycache = {}
- self.changepointcache = None
-
- self.availabilitywindow = AvailabilityWindow(self)
-
- def add_node(self, resourcepoolnode):
- self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
-
- def is_empty(self):
- return (len(self.reservationsByStart) == 0)
-
- def dirty(self):
- # You're a dirty, dirty slot table and you should be
- # ashamed of having outdated caches!
- self.availabilitycache = {}
- self.changepointcache = None
-
- def getAvailabilityCacheMiss(self, time):
- allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
- onlynodes = None
- nodes = {}
- reservations = self.getReservationsAt(time)
- # Find how much resources are available on each node
- canpreempt = True
- for r in reservations:
- for node in r.resources_in_pnode:
- if onlynodes == None or (onlynodes != None and node in onlynodes):
- if not nodes.has_key(node):
- n = self.nodes[node]
- if canpreempt:
- nodes[node] = Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode)
- else:
- nodes[node] = Node(n.capacity, None, n.resourcepoolnode)
- nodes[node].capacity.decr(r.resources_in_pnode[node])
- if canpreempt and not r.is_preemptible:
- nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
-
- # For the remaining nodes, use a reference to the original node, not a copy
- if onlynodes == None:
- missing = allnodes - set(nodes.keys())
- else:
- missing = onlynodes - set(nodes.keys())
-
- for node in missing:
- nodes[node] = self.nodes[node]
-
- self.availabilitycache[time] = nodes
-
- def getAvailability(self, time, resreq=None, onlynodes=None, canpreempt=False):
- if not self.availabilitycache.has_key(time):
- self.getAvailabilityCacheMiss(time)
- # Cache miss
-
- nodes = self.availabilitycache[time]
-
- if onlynodes != None:
- onlynodes = set(onlynodes)
- nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
-
- # Keep only those nodes with enough resources
- if resreq != None:
- newnodes = {}
- for n, node in nodes.items():
- if not resreq.fits_in(node.capacity) or (canpreempt and not resreq.fits_in(node.capacitywithpreemption)):
- pass
- else:
- newnodes[n]=node
- nodes = newnodes
-
- return nodes
-
- def get_utilization(self, time):
- total = sum([n.capacity.get_by_type(constants.RES_CPU) for n in self.nodes.nodelist])
- util = {}
- reservations = self.getReservationsAt(time)
- for r in reservations:
- for node in r.resources_in_pnode:
- if isinstance(r, ds.VMResourceReservation):
- use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
- util[type(r)] = use + util.setdefault(type(r),0.0)
- elif isinstance(r, ds.SuspensionResourceReservation) or isinstance(r, ds.ResumptionResourceReservation) or isinstance(r, ds.ShutdownResourceReservation):
- use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
- util[type(r)] = use + util.setdefault(type(r),0.0)
- util[None] = total - sum(util.values())
- for k in util:
- util[k] /= total
-
- return util
-
- def getReservationsAt(self, time):
- item = KeyValueWrapper(time, None)
- startpos = bisect.bisect_right(self.reservationsByStart, item)
- bystart = set([x.value for x in self.reservationsByStart[:startpos]])
- endpos = bisect.bisect_right(self.reservationsByEnd, item)
- byend = set([x.value for x in self.reservationsByEnd[endpos:]])
- res = bystart & byend
- return list(res)
-
- def get_reservations_starting_between(self, start, end):
- startitem = KeyValueWrapper(start, None)
- enditem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByStart, startitem)
- endpos = bisect.bisect_right(self.reservationsByStart, enditem)
- res = [x.value for x in self.reservationsByStart[startpos:endpos]]
- return res
-
- def get_reservations_starting_after(self, start):
- startitem = KeyValueWrapper(start, None)
- startpos = bisect.bisect_left(self.reservationsByStart, startitem)
- res = [x.value for x in self.reservationsByStart[startpos:]]
- return res
-
- def get_reservations_ending_after(self, end):
- startitem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
- res = [x.value for x in self.reservationsByEnd[startpos:]]
- return res
-
- def get_reservations_ending_between(self, start, end):
- startitem = KeyValueWrapper(start, None)
- enditem = KeyValueWrapper(end, None)
- startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
- endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
- res = [x.value for x in self.reservationsByEnd[startpos:endpos]]
- return res
-
- def get_reservations_starting_at(self, time):
- return self.get_reservations_starting_between(time, time)
-
- def get_reservations_ending_at(self, time):
- return self.get_reservations_ending_between(time, time)
-
- # ONLY for simulation
- def getNextPrematureEnd(self, after):
- # Inefficient, but ok since this query seldom happens
- res = [i.value for i in self.reservationsByEnd if isinstance(i.value, ds.VMResourceReservation) and i.value.prematureend > after]
- if len(res) > 0:
- prematureends = [r.prematureend for r in res]
- prematureends.sort()
- return prematureends[0]
- else:
- return None
-
- # ONLY for simulation
- def getPrematurelyEndingRes(self, t):
- return [i.value for i in self.reservationsByEnd if isinstance(i.value, ds.VMResourceReservation) and i.value.prematureend == t]
-
-
- def get_reservations_starting_or_ending_after(self, after):
- item = KeyValueWrapper(after, None)
- startpos = bisect.bisect_right(self.reservationsByStart, item)
- bystart = set([x.value for x in self.reservationsByStart[:startpos]])
- endpos = bisect.bisect_right(self.reservationsByEnd, item)
- byend = set([x.value for x in self.reservationsByEnd[endpos:]])
- res = bystart | byend
- return list(res)
-
- def addReservation(self, rr):
- startitem = KeyValueWrapper(rr.start, rr)
- enditem = KeyValueWrapper(rr.end, rr)
- bisect.insort(self.reservationsByStart, startitem)
- bisect.insort(self.reservationsByEnd, enditem)
- self.dirty()
-
- # If the slot table keys are not modified (start / end time)
- # Just remove and reinsert.
- def updateReservation(self, rr):
- # TODO: Might be more efficient to resort lists
- self.removeReservation(rr)
- self.addReservation(rr)
- self.dirty()
-
- # If the slot table keys are modified (start and/or end time)
- # provide the old keys (so we can remove it using
- # the m) and updated reservation
- def update_reservation_with_key_change(self, rr, old_start, old_end):
- # TODO: Might be more efficient to resort lists
- self.removeReservation(rr, old_start, old_end)
- self.addReservation(rr)
- self.dirty()
-
-
- def getIndexOfReservation(self, rlist, rr, key):
- item = KeyValueWrapper(key, None)
- pos = bisect.bisect_left(rlist, item)
- found = False
- while not found:
- if rlist[pos].value == rr:
- found = True
- else:
- pos += 1
- return pos
-
- def removeReservation(self, rr, start=None, end=None):
- if start == None:
- start = rr.start
- if end == None:
- end = rr.start
- posstart = self.getIndexOfReservation(self.reservationsByStart, rr, start)
- posend = self.getIndexOfReservation(self.reservationsByEnd, rr, end)
- self.reservationsByStart.pop(posstart)
- self.reservationsByEnd.pop(posend)
- self.dirty()
-
-
- def findChangePointsAfter(self, after, until=None, nodes=None):
- changepoints = set()
- res = self.get_reservations_starting_or_ending_after(after)
- for rr in res:
- if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
- if rr.start > after:
- changepoints.add(rr.start)
- if rr.end > after:
- changepoints.add(rr.end)
- changepoints = list(changepoints)
- if until != None:
- changepoints = [c for c in changepoints if c < until]
- changepoints.sort()
- return changepoints
-
- def peekNextChangePoint(self, time):
- if self.changepointcache == None:
- # Cache is empty
- changepoints = self.findChangePointsAfter(time)
- changepoints.reverse()
- self.changepointcache = changepoints
- if len(self.changepointcache) == 0:
- return None
- else:
- return self.changepointcache[-1]
-
- def getNextChangePoint(self, time):
- p = self.peekNextChangePoint(time)
- if p != None:
- self.changepointcache.pop()
- return p
-
- def isFull(self, time):
- nodes = self.getAvailability(time)
- avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
- return (avail == 0)
-
- def get_next_reservations_in_nodes(self, time, nodes, rr_type=None, immediately_next = False):
- nodes = set(nodes)
- rrs_in_nodes = []
- earliest_end_time = {}
- rrs = self.get_reservations_starting_after(time)
- if rr_type != None:
- rrs = [rr for rr in rrs if isinstance(rr, rr_type)]
-
- # Filter the RRs by nodes
- for r in rrs:
- rr_nodes = set(rr.resources_in_pnode.keys())
- if len(nodes & rr_nodes) > 0:
- rrs_in_nodes.append(rr)
- end = rr.end
- for n in rr_nodes:
- if not earliest_end_time.has_key(n):
- earliest_end_time[n] = end
- else:
- if end < earliest_end_time[n]:
- earliest_end_time[n] = end
-
- if immediately_next:
- # We only want to include the ones that are immediately
- # next.
- rr_nodes_excl = set()
- for n in nodes:
- if earliest_end_time.has_key(n):
- end = earliest_end_time[n]
- rrs = [rr for rr in rrs_in_nodes if n in rr.resources_in_pnode.keys() and rr.start < end]
- rr_nodes_excl.update(rrs)
- rrs_in_nodes = list(rr_nodes_excl)
-
- return rrs_in_nodes
-
-class AvailEntry(object):
- def __init__(self, time, avail, availpreempt, resreq):
- self.time = time
- self.avail = avail
- self.availpreempt = availpreempt
-
- if avail == None and availpreempt == None:
- self.canfit = 0
- self.canfitpreempt = 0
- else:
- self.canfit = resreq.get_num_fits_in(avail)
- if availpreempt == None:
- self.canfitpreempt = 0
- else:
- self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
-
- def getCanfit(self, canpreempt):
- if canpreempt:
- return self.canfitpreempt
- else:
- return self.canfit
-
-
-class AvailabilityWindow(object):
- def __init__(self, slottable):
- self.slottable = slottable
- self.logger = logging.getLogger("SLOTTABLE.WIN")
- self.time = None
- self.resreq = None
- self.onlynodes = None
- self.avail = None
-
- # Create avail structure
- def initWindow(self, time, resreq, onlynodes = None, canpreempt=False):
- self.time = time
- self.resreq = resreq
- self.onlynodes = onlynodes
- self.avail = {}
-
- # Availability at initial time
- availatstart = self.slottable.getAvailability(self.time, self.resreq, self.onlynodes, canpreempt)
- for node in availatstart:
- capacity = availatstart[node].capacity
- if canpreempt:
- capacitywithpreemption = availatstart[node].capacitywithpreemption
- else:
- capacitywithpreemption = None
- self.avail[node] = [AvailEntry(self.time, capacity, capacitywithpreemption, self.resreq)]
-
- # Determine the availability at the subsequent change points
- nodes = set(availatstart.keys())
- res = self.slottable.get_reservations_starting_after(self.time)
- changepoints = set()
- for rr in res:
- if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
- changepoints.add(rr.start)
- changepoints = list(changepoints)
- changepoints.sort()
- for p in changepoints:
- availatpoint = self.slottable.getAvailability(p, self.resreq, nodes, canpreempt)
- newnodes = set(availatpoint.keys())
-
- # Add entries for nodes that have no resources available
- # (for, at least, one VM)
- fullnodes = nodes - newnodes
- for node in fullnodes:
- self.avail[node].append(AvailEntry(p, None, None, None))
- nodes.remove(node)
-
- # For the rest, only interested if the available resources
- # Decrease in the window
- for node in newnodes:
- capacity = availatpoint[node].capacity
- fits = self.resreq.get_num_fits_in(capacity)
- if canpreempt:
- capacitywithpreemption = availatpoint[node].capacitywithpreemption
- fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
- prevavail = self.avail[node][-1]
- if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
- self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
- elif canpreempt and (prevavail.getCanfit(canpreempt=False) > fits or prevavail.getCanfit(canpreempt=True) > fitswithpreemption):
- self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
-
-
- def fitAtStart(self, nodes = None, canpreempt = False):
- if nodes != None:
- avail = [v for (k, v) in self.avail.items() if k in nodes]
- else:
- avail = self.avail.values()
- if canpreempt:
- return sum([e[0].canfitpreempt for e in avail])
- else:
- return sum([e[0].canfit for e in avail])
-
- # TODO: Also return the amount of resources that would have to be
- # preempted in each physnode
- def findPhysNodesForVMs(self, numnodes, maxend, strictend=False, canpreempt=False):
- # Returns the physical nodes that can run all VMs, and the
- # time at which the VMs must end
- canfit = dict([(n, v[0].getCanfit(canpreempt)) for (n, v) in self.avail.items()])
- entries = []
- for n in self.avail.keys():
- entries += [(n, e) for e in self.avail[n][1:]]
- getTime = lambda x: x[1].time
- entries.sort(key=getTime)
- if strictend:
- end = None
- else:
- end = maxend
- for e in entries:
- physnode = e[0]
- entry = e[1]
-
- if entry.time >= maxend:
- # Can run to its maximum duration
- break
- else:
- diff = canfit[physnode] - entry.getCanfit(canpreempt)
- totalcanfit = sum([n for n in canfit.values()]) - diff
- if totalcanfit < numnodes and not strictend:
- # Not enough resources. Must end here
- end = entry.time
- break
- else:
- # Update canfit
- canfit[physnode] = entry.getCanfit(canpreempt)
-
- # Filter out nodes where we can't fit any vms
- canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
-
- return end, canfit
-
-
- def printContents(self, nodes = None, withpreemption = False):
- if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
- if nodes == None:
- physnodes = self.avail.keys()
- else:
- physnodes = [k for k in self.avail.keys() if k in nodes]
- physnodes.sort()
- if withpreemption:
- p = "(with preemption)"
- else:
- p = "(without preemption)"
- self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
- for n in physnodes:
- contents = "Node %i --- " % n
- for x in self.avail[n]:
- contents += "[ %s " % x.time
- contents += "{ "
- if x.avail == None and x.availpreempt == None:
- contents += "END "
- else:
- if withpreemption:
- res = x.availpreempt
- canfit = x.canfitpreempt
- else:
- res = x.avail
- canfit = x.canfit
- contents += "%s" % res
- contents += "} (Fits: %i) ] " % canfit
- self.logger.vdebug(contents)
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
Modified: trunk/src/haizea/traces/readers.py
===================================================================
--- trunk/src/haizea/traces/readers.py 2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/traces/readers.py 2009-01-06 11:56:27 UTC (rev 554)
@@ -17,7 +17,8 @@
# -------------------------------------------------------------------------- #
from mx.DateTime import TimeDelta
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ResourceTuple
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
import haizea.common.constants as constants
import haizea.traces.formats as formats
More information about the Haizea-commit
mailing list