[haizea-commit] r608 - in branches/TP2.0/src/haizea: common core core/scheduler core/scheduler/preparation_schedulers
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Fri Jul 17 10:02:16 CDT 2009
Author: borja
Date: 2009-07-17 10:02:10 -0500 (Fri, 17 Jul 2009)
New Revision: 608
Modified:
branches/TP2.0/src/haizea/common/constants.py
branches/TP2.0/src/haizea/core/configfile.py
branches/TP2.0/src/haizea/core/manager.py
branches/TP2.0/src/haizea/core/scheduler/__init__.py
branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
branches/TP2.0/src/haizea/core/scheduler/mapper.py
branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
Log:
Migration is working again. It is now separated into two steps: migrating the software environment (usually a disk image) and migrating the memory state files.
Modified: branches/TP2.0/src/haizea/common/constants.py
===================================================================
--- branches/TP2.0/src/haizea/common/constants.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/common/constants.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -42,9 +42,9 @@
SUSPRES_EXCLUSION_LOCAL="local"
SUSPRES_EXCLUSION_GLOBAL="global"
-MIGRATE_NONE="nothing"
-MIGRATE_MEM="mem"
-MIGRATE_MEMDISK="mem+disk"
+MIGRATE_NO="no"
+MIGRATE_YES="yes"
+MIGRATE_YES_NOTRANSFER="yes-notransfer"
TRANSFER_UNICAST="unicast"
TRANSFER_MULTICAST="multicast"
Modified: branches/TP2.0/src/haizea/core/configfile.py
===================================================================
--- branches/TP2.0/src/haizea/core/configfile.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/configfile.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -341,31 +341,20 @@
Option(name = "migration",
getter = "migration",
- type = OPTTYPE_BOOLEAN,
- required = True,
- doc = """
- Specifies whether leases can be migrated from one
- physical node to another. Valid values are "True" or "False"
- """),
-
- Option(name = "what-to-migrate",
- getter = "what-to-migrate",
type = OPTTYPE_STRING,
required = False,
- required_if = [(("scheduling","migration"),True)],
- default = constants.MIGRATE_NONE,
- valid = [constants.MIGRATE_NONE,
- constants.MIGRATE_MEM,
- constants.MIGRATE_MEMDISK],
+ default = constants.MIGRATE_NO,
+ valid = [constants.MIGRATE_NO,
+ constants.MIGRATE_YES,
+ constants.MIGRATE_YES_NOTRANSFER],
doc = """
- Specifies what data has to be moved around when
- migrating a lease. Valid values are:
-
- - nothing: migration can be performed without transferring any
- files.
- - mem: only the memory must be transferred
- - mem+disk: both the memory and the VM disk image must be
- transferred
+ Specifies whether leases can be migrated from one
+ physical node to another. Valid values are:
+
+ - no
+ - yes
+ - yes-notransfer: migration can be performed without
+ transferring any files.
"""),
Option(name = "non-schedulable-interval",
Modified: branches/TP2.0/src/haizea/core/manager.py
===================================================================
--- branches/TP2.0/src/haizea/core/manager.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/manager.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -405,16 +405,27 @@
self.__unexpected_exception(exc)
- def process_reservations(self, time):
+ def process_starting_reservations(self, time):
"""Process reservations starting/stopping at specified time"""
# The scheduler takes care of this.
try:
- self.scheduler.process_reservations(time)
+ self.scheduler.process_starting_reservations(time)
except UnrecoverableError, exc:
self.__unrecoverable_error(exc)
except Exception, exc:
self.__unexpected_exception(exc)
+
+ def process_ending_reservations(self, time):
+ """Process reservations starting/stopping at specified time"""
+
+ # The scheduler takes care of this.
+ try:
+ self.scheduler.process_ending_reservations(time)
+ except UnrecoverableError, exc:
+ self.__unrecoverable_error(exc)
+ except Exception, exc:
+ self.__unexpected_exception(exc)
def notify_event(self, lease_id, event):
"""Notifies an asynchronous event to Haizea.
@@ -630,12 +641,15 @@
# Process reservations starting/stopping at the current time and
# check if there are any new requests.
- self.manager.process_reservations(self.time)
+ self.manager.process_ending_reservations(self.time)
+ self.manager.process_starting_reservations(self.time)
self.manager.process_requests(self.time)
# Since processing requests may have resulted in new reservations
# starting now, we process reservations again.
- self.manager.process_reservations(self.time)
+ self.manager.process_starting_reservations(self.time)
+ # And one final call to deal with nil-duration reservations
+ self.manager.process_ending_reservations(self.time)
# Print a status message
if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
@@ -808,7 +822,8 @@
self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
# Wake up the resource manager
- self.manager.process_reservations(self.lastwakeup)
+ self.manager.process_ending_reservations(self.lastwakeup)
+ self.manager.process_starting_reservations(self.lastwakeup)
# TODO: Compute nextschedulable here, before processing requests
self.manager.process_requests(self.nextschedulable)
Modified: branches/TP2.0/src/haizea/core/scheduler/__init__.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/__init__.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/__init__.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -16,6 +16,8 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+from haizea.core.scheduler.slottable import ResourceReservation
+import haizea.common.constants as constants
import sys
class SchedException(Exception):
@@ -89,4 +91,10 @@
def __init__(self, time, type):
self.time = time
- self.type = type
\ No newline at end of file
+ self.type = type
+
+class MigrationResourceReservation(ResourceReservation):
+ def __init__(self, lease, start, end, res, vmrr, transfers):
+ ResourceReservation.__init__(self, lease, start, end, res)
+ self.vmrr = vmrr
+ self.transfers = transfers
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/lease_scheduler.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -31,7 +31,7 @@
import haizea.common.constants as constants
from haizea.common.utils import round_datetime, get_config, get_accounting, get_clock
from haizea.core.leases import Lease
-from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException
+from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
from haizea.core.scheduler.slottable import ResourceReservation
from haizea.core.scheduler.vm_scheduler import VMResourceReservation
from operator import attrgetter
@@ -169,7 +169,7 @@
self.__process_queue(nexttime)
- def process_reservations(self, nowtime):
+ def process_starting_reservations(self, nowtime):
"""Processes starting/ending reservations
This method checks the slottable to see if there are any reservations that are
@@ -182,6 +182,53 @@
# Find starting/ending reservations
starting = self.slottable.get_reservations_starting_at(nowtime)
starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
+
+ # Process starting reservations
+ for rr in starting:
+ lease = rr.lease
+ # Call the appropriate handler, and catch exceptions and errors.
+ try:
+ self.handlers[type(rr)].on_start(lease, rr)
+
+ # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
+ # state. This is usually indicative of a programming error, but not necessarily
+ # one that affects all leases, so we just fail this lease. Note that Haizea can also
+ # be configured to stop immediately when a lease fails.
+ except InconsistentLeaseStateError, exc:
+ self.fail_lease(lease, exc)
+ # An EnactmentError is raised when the handler had to perform an enactment action
+ # (e.g., stopping a VM), and that enactment action failed. This is currently treated
+ # as a non-recoverable error for the lease, and the lease is failed.
+ except EnactmentError, exc:
+ self.fail_lease(lease, exc)
+
+ # Other exceptions are not expected, and generally indicate a programming error.
+ # Thus, they are propagated upwards to the Manager where they will make
+ # Haizea crash and burn.
+
+
+ # TODO: Move up to manager
+ # Each time we process reservations, we report resource utilization to the accounting
+ # module. This utilization information shows what portion of the physical resources
+ # is used by each type of reservation (e.g., 70% are running a VM, 5% are doing suspensions,
+ # etc.) See the get_utilization module for details on how this data is stored.
+ # Currently we only collect utilization from the VM Scheduler (in the future,
+ # information will also be gathered from the preparation scheduler).
+ #util = self.vm_scheduler.get_utilization(nowtime)
+ #get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
+
+
+ def process_ending_reservations(self, nowtime):
+ """Processes starting/ending reservations
+
+ This method checks the slottable to see if there are any reservations that are
+ starting or ending at "nowtime". If so, the appropriate handler is called.
+
+ Arguments:
+ nowtime -- Time at which to check for starting/ending reservations.
+ """
+
+ # Find starting/ending reservations
ending = self.slottable.get_reservations_ending_at(nowtime)
ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
@@ -228,42 +275,7 @@
# Thus, they are propagated upwards to the Manager where they will make
# Haizea crash and burn.
-
- # Process starting reservations
- for rr in starting:
- lease = rr.lease
- # Call the appropriate handler, and catch exceptions and errors.
- try:
- self.handlers[type(rr)].on_start(lease, rr)
-
-
- # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
- # state. This is usually indicative of a programming error, but not necessarily
- # one that affects all leases, so we just fail this lease. Note that Haizea can also
- # be configured to stop immediately when a lease fails.
- except InconsistentLeaseStateError, exc:
- self.fail_lease(lease, exc)
- # An EnactmentError is raised when the handler had to perform an enactment action
- # (e.g., stopping a VM), and that enactment action failed. This is currently treated
- # as a non-recoverable error for the lease, and the lease is failed.
- except EnactmentError, exc:
- self.fail_lease(lease, exc)
- # Other exceptions are not expected, and generally indicate a programming error.
- # Thus, they are propagated upwards to the Manager where they will make
- # Haizea crash and burn.
-
-
- # Each time we process reservations, we report resource utilization to the accounting
- # module. This utilization information shows what portion of the physical resources
- # is used by each type of reservation (e.g., 70% are running a VM, 5% are doing suspensions,
- # etc.) See the get_utilization module for details on how this data is stored.
- # Currently we only collect utilization from the VM Scheduler (in the future,
- # information will also be gathered from the preparation scheduler).
- util = self.vm_scheduler.get_utilization(nowtime)
- get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)
-
-
def get_lease_by_id(self, lease_id):
"""Gets a lease with the given ID
@@ -382,7 +394,11 @@
self.vm_scheduler.cancel_vm(vmrr)
l.remove_vmrr(vmrr)
# TODO: Clean up (transfers, etc.)
- l.state = Lease.STATE_PENDING
+ if l.state in (Lease.STATE_READY, Lease.STATE_SCHEDULED):
+ l.state = Lease.STATE_PENDING
+ elif l.state == Lease.STATE_SUSPENDED_SCHEDULED:
+ l.state = Lease.STATE_SUSPENDED_PENDING
+
self.__schedule_lease(l, nexttime)
def is_queue_empty(self):
@@ -446,19 +462,26 @@
"""
lease_state = lease.get_state()
-
+ migration = get_config().get("migration")
+
# Determine earliest start time in each node
if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
# Figure out earliest start times based on
# image schedule and reusable images
earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
- elif lease_state == Lease.STATE_SUSPENDED_QUEUED or lease_state == Lease.STATE_SUSPENDED_SCHEDULED:
- # No need to transfer images from repository
- # (only intra-node transfer)
- earliest = self.preparation_scheduler.find_earliest_migration_times(lease, nexttime)
- migr_time = self.preparation_scheduler.estimate_migration_time(lease)
- for pnode in earliest:
- earliest[pnode].time += migr_time
+ elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
+ # Migration
+
+ node_ids = self.slottable.nodes.keys()
+ earliest = {}
+ if migration == constants.MIGRATE_NO:
+ for node in node_ids:
+ earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
+ else:
+ prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)
+ vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
+ for node in node_ids:
+ earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
else:
raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
@@ -472,10 +495,17 @@
# Schedule deployment
is_ready = False
preparation_rrs = []
- if lease_state == Lease.STATE_SUSPENDED_QUEUED:
- #self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
- self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
- else:
+ if lease_state == Lease.STATE_SUSPENDED_QUEUED and migration != constants.MIGRATE_NO:
+ migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
+ if len(migr_rrs) > 0:
+ end_migr = migr_rrs[-1].end
+ else:
+ end_migr = nexttime
+ migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
+ migr_rrs.reverse()
+ for migr_rr in migr_rrs:
+ vmrr.pre_rrs.insert(0, migr_rr)
+ elif lease_state != Lease.STATE_SUSPENDED_QUEUED:
preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
# At this point, the lease is feasible.
Modified: branches/TP2.0/src/haizea/core/scheduler/mapper.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/mapper.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/mapper.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -39,11 +39,13 @@
def __init__(self, slottable, policy):
Mapper.__init__(self, slottable, policy)
- def map(self, lease, requested_resources, start, end, strictend):
+ def map(self, lease, requested_resources, start, end, strictend, onlynodes=None):
aw = self.slottable.get_availability_window(start)
nodes = aw.get_nodes_at(start)
-
+ if onlynodes != None:
+ nodes = list(set(nodes) & onlynodes)
+
pnodes = self.policy.sort_hosts(nodes, start, lease)
vnodes = self.__sort_vnodes(requested_resources)
vnodes.reverse()
Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/imagetransfer.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -19,10 +19,12 @@
import haizea.common.constants as constants
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
from haizea.core.scheduler.slottable import ResourceReservation
-from haizea.core.leases import Lease, Capacity
+from haizea.core.scheduler import MigrationResourceReservation
+from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
from haizea.common.utils import estimate_transfer_time, get_config
from haizea.core.scheduler.slottable import ResourceTuple
+from mx.DateTime import TimeDelta
import copy
import bisect
@@ -58,11 +60,91 @@
on_end = ImageTransferPreparationScheduler._handle_end_migrate)
def schedule(self, lease, vmrr, earliest):
+ if type(lease.software) == UnmanagedSoftwareEnvironment:
+ return [], True
if lease.get_type() == Lease.ADVANCE_RESERVATION:
return self.__schedule_deadline(lease, vmrr, earliest)
elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
return self.__schedule_asap(lease, vmrr, earliest)
+ def schedule_migration(self, lease, vmrr, nexttime):
+ if type(lease.software) == UnmanagedSoftwareEnvironment:
+ return []
+
+ # This code is the same as the one in vm_scheduler
+ # Should be factored out
+ last_vmrr = lease.get_last_vmrr()
+ vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
+
+ mustmigrate = False
+ for vnode in vnode_migrations:
+ if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
+ mustmigrate = True
+ break
+
+ if not mustmigrate:
+ return []
+
+ if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
+ start = nexttime
+ end = nexttime
+ res = {}
+ migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
+ migr_rr.state = ResourceReservation.STATE_SCHEDULED
+ return [migr_rr]
+
+ # Figure out what migrations can be done simultaneously
+ migrations = []
+ while len(vnode_migrations) > 0:
+ pnodes = set()
+ migration = {}
+ for vnode in vnode_migrations:
+ origin = vnode_migrations[vnode][0]
+ dest = vnode_migrations[vnode][1]
+ if not origin in pnodes and not dest in pnodes:
+ migration[vnode] = vnode_migrations[vnode]
+ pnodes.add(origin)
+ pnodes.add(dest)
+ for vnode in migration:
+ del vnode_migrations[vnode]
+ migrations.append(migration)
+
+ # Create migration RRs
+ start = max(last_vmrr.post_rrs[-1].end, nexttime)
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ migr_rrs = []
+ for m in migrations:
+ mb_to_migrate = lease.software.image_size * len(m.keys())
+ migr_time = estimate_transfer_time(mb_to_migrate, bandwidth)
+ end = start + migr_time
+ res = {}
+ for (origin,dest) in m.values():
+ resorigin = Capacity([constants.RES_NETOUT])
+ resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
+ resdest = Capacity([constants.RES_NETIN])
+ resdest.set_quantity(constants.RES_NETIN, bandwidth)
+ res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
+ res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
+ migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+ migr_rr.state = ResourceReservation.STATE_SCHEDULED
+ migr_rrs.append(migr_rr)
+ start = end
+
+ return migr_rrs
+
+ def estimate_migration_time(self, lease):
+ migration = get_config().get("migration")
+ if migration == constants.MIGRATE_YES:
+ vmrr = lease.get_last_vmrr()
+ images_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
+ for (vnode,pnode) in vmrr.nodes.items():
+ images_in_pnode[pnode] += lease.software.image_size
+ max_to_transfer = max(images_in_pnode.values())
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ return estimate_transfer_time(max_to_transfer, bandwidth)
+ elif migration == constants.MIGRATE_YES_NOTRANSFER:
+ return TimeDelta(seconds=0)
+
def find_earliest_starting_times(self, lease, nexttime):
node_ids = [node.id for node in self.resourcepool.get_nodes()]
config = get_config()
@@ -70,6 +152,12 @@
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
+ if type(lease.software) == UnmanagedSoftwareEnvironment:
+ earliest = {}
+ for node in node_ids:
+ earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
+ return earliest
+
# Figure out earliest times assuming we have to transfer the images
transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
if mechanism == constants.TRANSFER_UNICAST:
@@ -386,14 +474,30 @@
sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
sched.logger.info("Completed image transfer for lease %i" % (lease.id))
- @staticmethod
- def _handle_start_migrate(sched, lease, rr):
- pass
+ def _handle_start_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+ l.print_contents()
+ rr.state = ResourceReservation.STATE_ACTIVE
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+ self.logger.info("Migrating lease %i..." % (l.id))
- @staticmethod
- def _handle_end_migrate(sched, lease, rr):
- pass
+ def _handle_end_migrate(self, l, rr):
+ self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+ l.print_contents()
+
+ for vnode in rr.transfers:
+ origin = rr.transfers[vnode][0]
+ dest = rr.transfers[vnode][1]
+
+ self.resourcepool.remove_diskimage(origin, l.id, vnode)
+ self.resourcepool.add_diskimage(dest, l.software.image_id, l.software.image_size, l.id, vnode)
+ rr.state = ResourceReservation.STATE_DONE
+ l.print_contents()
+ self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+ self.logger.info("Migrated lease %i..." % (l.id))
+
def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
@@ -442,11 +546,7 @@
self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
pnode.print_files()
-
-class DiskImageMigrationResourceReservation(ResourceReservation):
- pass
-
class FileTransferResourceReservation(ResourceReservation):
def __init__(self, lease, res, start=None, end=None):
ResourceReservation.__init__(self, lease, start, end, res)
@@ -484,4 +584,11 @@
self.transfer_start = None
self.piggybacking_on = None
+class DiskImageMigrationResourceReservation(MigrationResourceReservation):
+ def __init__(self, lease, start, end, res, vmrr, transfers):
+ MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
+ def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+ self.logger.log(loglevel, "Type : DISK IMAGE MIGRATION")
+ self.logger.log(loglevel, "Transfers : %s" % self.transfers)
+ ResourceReservation.print_contents(self, loglevel)
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/preparation_schedulers/unmanaged.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -20,6 +20,7 @@
from haizea.core.scheduler import EarliestStartingTime
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
import haizea.common.constants as constants
+from mx.DateTime import TimeDelta
class UnmanagedPreparationScheduler(PreparationScheduler):
def __init__(self, slottable, resourcepool, deployment_enact):
@@ -38,6 +39,12 @@
earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
return earliest
+ def estimate_migration_time(self, lease):
+ return TimeDelta(seconds=0)
+
+ def schedule_migration(self, lease, vmrr, nexttime):
+ return []
+
def cancel_preparation(self, lease):
self.cleanup(lease)
Modified: branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py
===================================================================
--- branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-16 16:55:07 UTC (rev 607)
+++ branches/TP2.0/src/haizea/core/scheduler/vm_scheduler.py 2009-07-17 15:02:10 UTC (rev 608)
@@ -20,7 +20,7 @@
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_policy
from haizea.core.leases import Lease, Capacity
from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
-from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError
+from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
from haizea.core.scheduler.mapper import GreedyMapper
from operator import attrgetter, itemgetter
from mx.DateTime import TimeDelta
@@ -62,7 +62,7 @@
on_start = VMScheduler._handle_start_resume,
on_end = VMScheduler._handle_end_resume)
- self.handlers[MigrationResourceReservation] = ReservationEventHandler(
+ self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
sched = self,
on_start = VMScheduler._handle_start_migrate,
on_end = VMScheduler._handle_end_migrate)
@@ -129,8 +129,6 @@
if allow_reservation_in_future == None:
allow_reservation_in_future = self.can_reserve_in_future()
- canmigrate = get_config().get("migration")
-
#
# STEP 1: FIGURE OUT THE MINIMUM DURATION
#
@@ -145,6 +143,12 @@
# Find the changepoints, and the nodes we can use at each changepoint
# Nodes may not be available at a changepoint because images
# cannot be transferred at that time.
+ if mustresume and get_config().get("migration") == constants.MIGRATE_NO:
+ vmrr = lease.get_last_vmrr()
+ onlynodes = set(vmrr.nodes.values())
+ else:
+ onlynodes = None
+
if not mustresume:
cps = [(node, e.time) for node, e in earliest.items()]
cps.sort(key=itemgetter(1))
@@ -154,27 +158,14 @@
for node, time in cps:
nodes.append(node)
if time != curcp:
- changepoints.append([time, nodes[:]])
+ changepoints.append([time, set(nodes)])
curcp = time
else:
- changepoints[-1][1] = nodes[:]
+ changepoints[-1][1] = set(nodes)
else:
- if not canmigrate:
- vmrr = lease.get_last_vmrr()
- curnodes = set(vmrr.nodes.values())
- else:
- curnodes=None
- # If we have to resume this lease, make sure that
- # we have enough time to transfer the images.
- migratetime = self.__estimate_migration_time(lease)
- earliesttransfer = get_clock().get_time() + migratetime
-
- for n in earliest:
- earliest[n][0] = max(earliest[n][0], earliesttransfer)
-
- changepoints = list(set([x[0] for x in earliest.values()]))
+ changepoints = list(set([x.time for x in earliest.values()]))
changepoints.sort()
- changepoints = [(x, curnodes) for x in changepoints]
+ changepoints = [(x, onlynodes) for x in changepoints]
# If we can make reservations in the future,
# we also consider future changepoints
@@ -186,7 +177,10 @@
# Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
# included in futurecp.
futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
- futurecp = [(p,None) for p in futurecp]
+ if not mustresume:
+ futurecp = [(p,None) for p in futurecp]
+ else:
+ futurecp = [(p,onlynodes) for p in futurecp]
else:
futurecp = []
@@ -278,7 +272,18 @@
return vmrr, preemptions
def estimate_migration_time(self, lease):
- return self.__estimate_migration_time(lease)
+ migration = get_config().get("migration")
+ if migration == constants.MIGRATE_YES:
+ vmrr = lease.get_last_vmrr()
+ mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
+ for (vnode,pnode) in vmrr.nodes.items():
+ mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
+ mem_in_pnode[pnode] += mem
+ max_mem_to_transfer = max(mem_in_pnode.values())
+ bandwidth = self.resourcepool.info.get_migration_bandwidth()
+ return estimate_transfer_time(max_mem_to_transfer, bandwidth)
+ elif migration == constants.MIGRATE_YES_NOTRANSFER:
+ return TimeDelta(seconds=0)
def schedule_migration(self, lease, vmrr, nexttime):
last_vmrr = lease.get_last_vmrr()
@@ -291,8 +296,16 @@
break
if not mustmigrate:
- return
+ return []
+ if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
+ start = nexttime
+ end = nexttime
+ res = {}
+ migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
+ migr_rr.state = ResourceReservation.STATE_SCHEDULED
+ return [migr_rr]
+
# Figure out what migrations can be done simultaneously
migrations = []
while len(vnode_migrations) > 0:
@@ -311,10 +324,12 @@
# Create migration RRs
start = max(last_vmrr.post_rrs[-1].end, nexttime)
- migr_time = self.__estimate_migration_time(lease)
bandwidth = self.resourcepool.info.get_migration_bandwidth()
migr_rrs = []
for m in migrations:
+ vnodes_to_migrate = m.keys()
+ max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
+ migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
end = start + migr_time
res = {}
for (origin,dest) in m.values():
@@ -324,15 +339,13 @@
resdest.set_quantity(constants.RES_NETIN, bandwidth)
res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
- migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+ migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
migr_rr.state = ResourceReservation.STATE_SCHEDULED
migr_rrs.append(migr_rr)
start = end
+
+ return migr_rrs
- migr_rrs.reverse()
- for migr_rr in migr_rrs:
- vmrr.pre_rrs.insert(0, migr_rr)
-
def cancel_vm(self, vmrr):
if vmrr.backfill_reservation == True:
@@ -405,9 +418,11 @@
requested_resources,
start,
end,
- strictend = False)
+ strictend = False,
+ onlynodes = onlynodes)
if mapping != None:
+ # TODO: Take into account case where suspension is disabled.
if actualend < end:
actualduration = actualend - start
if actualduration >= min_duration:
@@ -669,21 +684,6 @@
else:
return self.__estimate_suspend_resume_time(lease, rate)
- def __estimate_migration_time(self, lease):
- whattomigrate = get_config().get("what-to-migrate")
- if whattomigrate == constants.MIGRATE_NONE:
- # TODO: At this point, giving an RR a duration of 0 seconds
- # will produce unexpected results. So, we need to give
- # migrations a symbolic duration of one second,
- # even when we are assuming that migrations are instantaneous
- return TimeDelta(seconds=1)
- else:
- bandwidth = self.resourcepool.info.get_migration_bandwidth()
- if whattomigrate == constants.MIGRATE_MEM:
- mbtotransfer = lease.requested_resources.get_quantity(constants.RES_MEM)
- elif whattomigrate == constants.MIGRATE_MEMDISK:
- mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
- return estimate_transfer_time(mbtotransfer, bandwidth)
# TODO: Take into account other things like boot overhead, migration overhead, etc.
def __compute_scheduling_threshold(self, lease):
@@ -897,11 +897,6 @@
origin = rr.transfers[vnode][0]
dest = rr.transfers[vnode][1]
- # Commenting for now
- # Has to be moved to preparation scheduler migrate handler
- #self.resourcepool.remove_diskimage(origin, l.id, vnode)
- #self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
-
# Update RAM files
self.resourcepool.remove_ramfile(origin, l.id, vnode)
self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
@@ -913,8 +908,6 @@
-
-
class VMResourceReservation(ResourceReservation):
def __init__(self, lease, start, end, nodes, res, backfill_reservation):
ResourceReservation.__init__(self, lease, start, end, res)
@@ -978,9 +971,6 @@
for susprr in self.post_rrs:
self.logger.log(loglevel, "--")
susprr.print_contents(loglevel)
-
- def is_preemptible(self):
- return self.lease.preemptible
def xmlrpc_marshall(self):
rr = ResourceReservation.xmlrpc_marshall(self)
@@ -1004,14 +994,8 @@
return (self == self.vmrr.post_rrs[0])
def is_last(self):
- return (self == self.vmrr.post_rrs[-1])
+ return (self == self.vmrr.post_rrs[-1])
- # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
- # has wider implications (with a non-trivial handling). For now, we leave them
- # as non-preemptible, since the probability of preempting a suspension RR is slim.
- def is_preemptible(self):
- return False
-
def xmlrpc_marshall(self):
rr = ResourceReservation.xmlrpc_marshall(self)
rr["type"] = "SUSP"
@@ -1035,13 +1019,7 @@
def is_last(self):
resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
return (self == resm_rrs[-1])
-
- # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
- # has wider implications (with a non-trivial handling). For now, we leave them
- # as non-preemptible, since the probability of preempting a resumption RR is slim.
- def is_preemptible(self):
- return False
-
+
def xmlrpc_marshall(self):
rr = ResourceReservation.xmlrpc_marshall(self)
rr["type"] = "RESM"
@@ -1056,26 +1034,17 @@
def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
self.logger.log(loglevel, "Type : SHUTDOWN")
ResourceReservation.print_contents(self, loglevel)
-
- def is_preemptible(self):
- return True
-
+
def xmlrpc_marshall(self):
rr = ResourceReservation.xmlrpc_marshall(self)
rr["type"] = "SHTD"
return rr
-class MigrationResourceReservation(ResourceReservation):
+class MemImageMigrationResourceReservation(MigrationResourceReservation):
def __init__(self, lease, start, end, res, vmrr, transfers):
- ResourceReservation.__init__(self, lease, start, end, res)
- self.vmrr = vmrr
- self.transfers = transfers
-
+ MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
+
def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
- self.logger.log(loglevel, "Type : MIGRATE")
+ self.logger.log(loglevel, "Type : MEM IMAGE MIGRATION")
self.logger.log(loglevel, "Transfers : %s" % self.transfers)
- ResourceReservation.print_contents(self, loglevel)
-
- def is_preemptible(self):
- return False
-
+ ResourceReservation.print_contents(self, loglevel)
\ No newline at end of file
More information about the Haizea-commit
mailing list