[haizea-commit] r554 - in trunk/src/haizea: common resourcemanager resourcemanager/enact resourcemanager/frontends resourcemanager/scheduler resourcemanager/scheduler/preparation_schedulers traces

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Tue Jan 6 05:56:35 CST 2009


Author: borja
Date: 2009-01-06 05:56:27 -0600 (Tue, 06 Jan 2009)
New Revision: 554

Added:
   trunk/src/haizea/resourcemanager/leases.py
   trunk/src/haizea/resourcemanager/scheduler/
   trunk/src/haizea/resourcemanager/scheduler/__init__.py
   trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/
   trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
   trunk/src/haizea/resourcemanager/scheduler/slottable.py
   trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
Removed:
   trunk/src/haizea/resourcemanager/datastruct.py
   trunk/src/haizea/resourcemanager/deployment/
   trunk/src/haizea/resourcemanager/resourcepool.py
   trunk/src/haizea/resourcemanager/scheduler.py
   trunk/src/haizea/resourcemanager/slottable.py
Modified:
   trunk/src/haizea/common/constants.py
   trunk/src/haizea/resourcemanager/accounting.py
   trunk/src/haizea/resourcemanager/configfile.py
   trunk/src/haizea/resourcemanager/enact/__init__.py
   trunk/src/haizea/resourcemanager/enact/opennebula.py
   trunk/src/haizea/resourcemanager/enact/simulated.py
   trunk/src/haizea/resourcemanager/frontends/opennebula.py
   trunk/src/haizea/resourcemanager/frontends/rpc.py
   trunk/src/haizea/resourcemanager/frontends/tracefile.py
   trunk/src/haizea/resourcemanager/rm.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
   trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
   trunk/src/haizea/traces/readers.py
Log:
Started refactoring scheduling code into more manageable modules + classes. Note that this leaves some parts of the scheduler broken for the time being.

Modified: trunk/src/haizea/common/constants.py
===================================================================
--- trunk/src/haizea/common/constants.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/common/constants.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -65,9 +65,9 @@
 RUNTIMEOVERHEAD_ALL="all"
 RUNTIMEOVERHEAD_BE="besteffort"
 
-DEPLOYMENT_UNMANAGED = "unmanaged"
-DEPLOYMENT_PREDEPLOY = "predeployed-images"
-DEPLOYMENT_TRANSFER = "imagetransfer"
+PREPARATION_UNMANAGED = "unmanaged"
+PREPARATION_PREDEPLOY = "predeployed-images"
+PREPARATION_TRANSFER = "imagetransfer"
 
 CLOCK_SIMULATED = "simulated"
 CLOCK_REAL = "real"

Modified: trunk/src/haizea/resourcemanager/accounting.py
===================================================================
--- trunk/src/haizea/resourcemanager/accounting.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/accounting.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -19,7 +19,6 @@
 import os
 import os.path
 import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
 from haizea.common.utils import pickle, get_config, get_clock
 from errno import EEXIST
 

Modified: trunk/src/haizea/resourcemanager/configfile.py
===================================================================
--- trunk/src/haizea/resourcemanager/configfile.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/configfile.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -86,9 +86,9 @@
             getter      = "lease-preparation",
             type        = OPTTYPE_STRING,
             required    = False,
-            default     = constants.DEPLOYMENT_UNMANAGED,
-            valid       = [constants.DEPLOYMENT_UNMANAGED,
-                           constants.DEPLOYMENT_TRANSFER],
+            default     = constants.PREPARATION_UNMANAGED,
+            valid       = [constants.PREPARATION_UNMANAGED,
+                           constants.PREPARATION_TRANSFER],
             doc         = """
             Sets how the scheduler will handle the
             preparation overhead of leases. Valid values are:

Deleted: trunk/src/haizea/resourcemanager/datastruct.py
===================================================================
--- trunk/src/haizea/resourcemanager/datastruct.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/datastruct.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -1,724 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago                                 #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
-# Complutense de Madrid (dsa-research.org)                                   #
-#                                                                            #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
-# not use this file except in compliance with the License. You may obtain    #
-# a copy of the License at                                                   #
-#                                                                            #
-# http://www.apache.org/licenses/LICENSE-2.0                                 #
-#                                                                            #
-# Unless required by applicable law or agreed to in writing, software        #
-# distributed under the License is distributed on an "AS IS" BASIS,          #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
-# See the License for the specific language governing permissions and        #
-# limitations under the License.                                             #
-# -------------------------------------------------------------------------- #
-
-"""This module provides the fundamental data structures (besides the slot table,
-which is in a module of its own) used by Haizea. The module provides four types
-of structures:
-
-* Lease data structures
-  * Lease: Base class for leases
-  * ARLease: Advance reservation lease
-  * BestEffortLease: Best-effort lease
-  * ImmediateLease: Immediate lease
-* Resource reservation (RR) structures:
-  * ResourceReservationBase: Base class for RRs in the slot table
-  * VMResourceReservation: RR representing one or more VMs
-  * SuspensionResourceReservation: RR representing a lease suspension
-  * ResumptionResourceReservation: RR representing a lease resumption
-* Lease containers
-  * Queue: Your run-of-the-mill queue
-  * LeaseTable: Provides easy access to leases in the system
-* Miscellaneous structures
-  * ResourceTuple: A tuple representing a resource usage or requirement
-  * Timestamp: A wrapper around requested/scheduled/actual timestamps
-  * Duration: A wrapper around requested/accumulated/actual durations
-"""
-
-from haizea.common.constants import RES_MEM, MIGRATE_NONE, MIGRATE_MEM, MIGRATE_MEMDISK, LOGLEVEL_VDEBUG
-from haizea.common.utils import StateMachine, round_datetime_delta, get_lease_id, pretty_nodemap, estimate_transfer_time, xmlrpc_marshall_singlevalue
-
-from operator import attrgetter
-from mx.DateTime import TimeDelta
-from math import floor
-
-import logging
-
-
-#-------------------------------------------------------------------#
-#                                                                   #
-#                     LEASE DATA STRUCTURES                         #
-#                                                                   #
-#-------------------------------------------------------------------#
-
-
-class Lease(object):
-    # Lease states
-    STATE_NEW = 0
-    STATE_PENDING = 1
-    STATE_REJECTED = 2
-    STATE_SCHEDULED = 3
-    STATE_QUEUED = 4
-    STATE_CANCELLED = 5
-    STATE_PREPARING = 6
-    STATE_READY = 7
-    STATE_ACTIVE = 8
-    STATE_SUSPENDING = 9
-    STATE_SUSPENDED = 10
-    STATE_MIGRATING = 11
-    STATE_RESUMING = 12
-    STATE_RESUMED_READY = 13
-    STATE_DONE = 14
-    STATE_FAIL = 15
-    
-    state_str = {STATE_NEW : "New",
-                 STATE_PENDING : "Pending",
-                 STATE_REJECTED : "Rejected",
-                 STATE_SCHEDULED : "Scheduled",
-                 STATE_QUEUED : "Queued",
-                 STATE_CANCELLED : "Cancelled",
-                 STATE_PREPARING : "Preparing",
-                 STATE_READY : "Ready",
-                 STATE_ACTIVE : "Active",
-                 STATE_SUSPENDING : "Suspending",
-                 STATE_SUSPENDED : "Suspended",
-                 STATE_MIGRATING : "Migrating",
-                 STATE_RESUMING : "Resuming",
-                 STATE_RESUMED_READY: "Resumed-Ready",
-                 STATE_DONE : "Done",
-                 STATE_FAIL : "Fail"}
-    
-    def __init__(self, submit_time, start, duration, diskimage_id, 
-                 diskimage_size, numnodes, requested_resources, preemptible):
-        # Lease ID (read only)
-        self.id = get_lease_id()
-        
-        # Request attributes (read only)
-        self.submit_time = submit_time
-        self.start = start
-        self.duration = duration
-        self.end = None
-        self.diskimage_id = diskimage_id
-        self.diskimage_size = diskimage_size
-        # TODO: The following assumes homogeneous nodes. Should be modified
-        # to account for heterogeneous nodes.
-        self.numnodes = numnodes
-        self.requested_resources = requested_resources
-        self.preemptible = preemptible
-
-        # Bookkeeping attributes
-        # (keep track of the lease's state, resource reservations, etc.)
-        self.state = Lease.STATE_NEW
-        self.diskimagemap = {}
-        self.memimagemap = {}
-        self.deployment_rrs = []
-        self.vm_rrs = []
-
-        # Enactment information. Should only be manipulated by enactment module
-        self.enactment_info = None
-        self.vnode_enactment_info = dict([(n+1, None) for n in range(numnodes)])
-        
-        self.logger = logging.getLogger("LEASES")
-        
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Lease ID       : %i" % self.id)
-        self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
-        self.logger.log(loglevel, "Duration       : %s" % self.duration)
-        self.logger.log(loglevel, "State          : %s" % Lease.state_str[self.state])
-        self.logger.log(loglevel, "Disk image     : %s" % self.diskimage_id)
-        self.logger.log(loglevel, "Disk image size: %s" % self.diskimage_size)
-        self.logger.log(loglevel, "Num nodes      : %s" % self.numnodes)
-        self.logger.log(loglevel, "Resource req   : %s" % self.requested_resources)
-        self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
-        self.logger.log(loglevel, "Mem image map  : %s" % pretty_nodemap(self.memimagemap))
-
-    def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
-        if len(self.deployment_rrs) > 0:
-            self.logger.log(loglevel, "DEPLOYMENT RESOURCE RESERVATIONS")
-            self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-            for r in self.deployment_rrs:
-                r.print_contents(loglevel)
-                self.logger.log(loglevel, "##")
-        self.logger.log(loglevel, "VM RESOURCE RESERVATIONS")
-        self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~")
-        for r in self.vm_rrs:
-            r.print_contents(loglevel)
-            self.logger.log(loglevel, "##")
-
-    def get_endtime(self):
-        vmrr = self.get_last_vmrr()
-        return vmrr.end
-
-    def append_vmrr(self, vmrr):
-        self.vm_rrs.append(vmrr)
-        
-    def append_deployrr(self, vmrr):
-        self.deployment_rrs.append(vmrr)
-
-    def get_last_vmrr(self):
-        return self.vm_rrs[-1]
-
-    def update_vmrr(self, rrold, rrnew):
-        self.vm_rrs[self.vm_rrs.index(rrold)] = rrnew
-    
-    def remove_vmrr(self, vmrr):
-        if not vmrr in self.vm_rrs:
-            raise Exception, "Tried to remove an VM RR not contained in this lease"
-        else:
-            self.vm_rrs.remove(vmrr)
-
-    def clear_rrs(self):
-        self.deployment_rrs = []
-        self.vm_rrs = []
-        
-    def add_boot_overhead(self, t):
-        self.duration.incr(t)        
-
-    def add_runtime_overhead(self, percent):
-        self.duration.incr_by_percent(percent)
-        
-    def xmlrpc_marshall(self):
-        # Convert to something we can send through XMLRPC
-        l = {}
-        l["id"] = self.id
-        l["submit_time"] = xmlrpc_marshall_singlevalue(self.submit_time)
-        l["start_req"] = xmlrpc_marshall_singlevalue(self.start.requested)
-        l["start_sched"] = xmlrpc_marshall_singlevalue(self.start.scheduled)
-        l["start_actual"] = xmlrpc_marshall_singlevalue(self.start.actual)
-        l["duration_req"] = xmlrpc_marshall_singlevalue(self.duration.requested)
-        l["duration_acc"] = xmlrpc_marshall_singlevalue(self.duration.accumulated)
-        l["duration_actual"] = xmlrpc_marshall_singlevalue(self.duration.actual)
-        l["end"] = xmlrpc_marshall_singlevalue(self.end)
-        l["diskimage_id"] = self.diskimage_id
-        l["diskimage_size"] = self.diskimage_size
-        l["numnodes"] = self.numnodes
-        l["resources"] = `self.requested_resources`
-        l["preemptible"] = self.preemptible
-        l["state"] = self.state
-        l["vm_rrs"] = [vmrr.xmlrpc_marshall() for vmrr in self.vm_rrs]
-                
-        return l
-        
-class LeaseStateMachine(StateMachine):
-    initial_state = Lease.STATE_NEW
-    transitions = {Lease.STATE_NEW:           [(Lease.STATE_PENDING,    "")],
-                   Lease.STATE_PENDING:       [(Lease.STATE_SCHEDULED,  ""),
-                                               (Lease.STATE_QUEUED,     ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_REJECTED,   "")],
-                   Lease.STATE_SCHEDULED:     [(Lease.STATE_PREPARING,  ""),
-                                               (Lease.STATE_READY,      ""),
-                                               (Lease.STATE_MIGRATING,  ""),
-                                               (Lease.STATE_RESUMING,   ""),
-                                               (Lease.STATE_CANCELLED,  "")],
-                   Lease.STATE_QUEUED:        [(Lease.STATE_SCHEDULED,  ""),
-                                               (Lease.STATE_SUSPENDED,  ""),
-                                               (Lease.STATE_CANCELLED,  "")],
-                   Lease.STATE_PREPARING:     [(Lease.STATE_READY,      ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_READY:         [(Lease.STATE_ACTIVE,     ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_ACTIVE:        [(Lease.STATE_SUSPENDING, ""),
-                                               (Lease.STATE_DONE,       ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_SUSPENDING:    [(Lease.STATE_SUSPENDED,  ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_SUSPENDED:     [(Lease.STATE_QUEUED,     ""),
-                                               (Lease.STATE_MIGRATING,  ""),
-                                               (Lease.STATE_RESUMING,   ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_MIGRATING:     [(Lease.STATE_SUSPENDED,  ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_RESUMING:      [(Lease.STATE_RESUMED_READY, ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE,     ""),
-                                               (Lease.STATE_CANCELLED,  ""),
-                                               (Lease.STATE_FAIL,       "")],
-                   
-                   # Final states
-                   Lease.STATE_DONE:          [],
-                   Lease.STATE_CANCELLED:     [],
-                   Lease.STATE_FAIL:          [],
-                   Lease.STATE_REJECTED:      [],
-                   }
-    
-    def __init__(self):
-        StateMachine.__init__(initial_state, transitions)
-        
-class ARLease(Lease):
-    def __init__(self, submit_time, start, duration, diskimage_id, 
-                 diskimage_size, numnodes, resreq, preemptible,
-                 # AR-specific parameters:
-                 realdur = None):
-        start = Timestamp(start)
-        duration = Duration(duration)
-        if realdur != duration.requested:
-            duration.known = realdur # ONLY for simulation
-        Lease.__init__(self, submit_time, start, duration, diskimage_id,
-                           diskimage_size, numnodes, resreq, preemptible)
-        
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "__________________________________________________")
-        Lease.print_contents(self, loglevel)
-        self.logger.log(loglevel, "Type           : AR")
-        self.logger.log(loglevel, "Start time     : %s" % self.start)
-        self.print_rrs(loglevel)
-        self.logger.log(loglevel, "--------------------------------------------------")
-    
-    def xmlrpc_marshall(self):
-        l = Lease.xmlrpc_marshall(self)
-        l["type"] = "AR"
-        return l
-
-        
-class BestEffortLease(Lease):
-    def __init__(self, submit_time, duration, diskimage_id, 
-                 diskimage_size, numnodes, resreq, preemptible,
-                 # BE-specific parameters:
-                 realdur = None):
-        start = Timestamp(None) # i.e., start on a best-effort basis
-        duration = Duration(duration)
-        if realdur != duration.requested:
-            duration.known = realdur # ONLY for simulation
-        # When the images will be available
-        self.imagesavail = None        
-        Lease.__init__(self, submit_time, start, duration, diskimage_id,
-                           diskimage_size, numnodes, resreq, preemptible)
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "__________________________________________________")
-        Lease.print_contents(self, loglevel)
-        self.logger.log(loglevel, "Type           : BEST-EFFORT")
-        self.logger.log(loglevel, "Images Avail @ : %s" % self.imagesavail)
-        self.print_rrs(loglevel)
-        self.logger.log(loglevel, "--------------------------------------------------")
-        
-    def get_waiting_time(self):
-        return self.start.actual - self.submit_time
-        
-    def get_slowdown(self, bound=10):
-        time_on_dedicated = self.duration.original
-        time_on_loaded = self.end - self.submit_time
-        bound = TimeDelta(seconds=bound)
-        if time_on_dedicated < bound:
-            time_on_dedicated = bound
-        return time_on_loaded / time_on_dedicated
-
-    def xmlrpc_marshall(self):
-        l = Lease.xmlrpc_marshall(self)
-        l["type"] = "BE"
-        return l
-
-
-class ImmediateLease(Lease):
-    def __init__(self, submit_time, duration, diskimage_id, 
-                 diskimage_size, numnodes, resreq, preemptible,
-                 # Immediate-specific parameters:
-                 realdur = None):
-        start = Timestamp(None) # i.e., start on a best-effort basis
-        duration = Duration(duration)
-        duration.known = realdur # ONLY for simulation
-        Lease.__init__(self, submit_time, start, duration, diskimage_id,
-                           diskimage_size, numnodes, resreq, preemptible)
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "__________________________________________________")
-        Lease.print_contents(self, loglevel)
-        self.logger.log(loglevel, "Type           : IMMEDIATE")
-        self.print_rrs(loglevel)
-        self.logger.log(loglevel, "--------------------------------------------------")
-
-    def xmlrpc_marshall(self):
-        l = Lease.xmlrpc_marshall(self)
-        l["type"] = "IM"
-        return l
-
-
-#-------------------------------------------------------------------#
-#                                                                   #
-#                        RESOURCE RESERVATION                       #
-#                          DATA STRUCTURES                          #
-#                                                                   #
-#-------------------------------------------------------------------#
-
-        
-class ResourceReservation(object):
-    
-    # Resource reservation states
-    STATE_SCHEDULED = 0
-    STATE_ACTIVE = 1
-    STATE_DONE = 2
-
-    state_str = {STATE_SCHEDULED : "Scheduled",
-                 STATE_ACTIVE : "Active",
-                 STATE_DONE : "Done"}
-    
-    def __init__(self, lease, start, end, res):
-        self.lease = lease
-        self.start = start
-        self.end = end
-        self.state = None
-        self.resources_in_pnode = res
-        self.logger = logging.getLogger("LEASES")
-                        
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Start          : %s" % self.start)
-        self.logger.log(loglevel, "End            : %s" % self.end)
-        self.logger.log(loglevel, "State          : %s" % ResourceReservation.state_str[self.state])
-        self.logger.log(loglevel, "Resources      : \n                         %s" % "\n                         ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()])) 
-                
-    def xmlrpc_marshall(self):
-        # Convert to something we can send through XMLRPC
-        rr = {}                
-        rr["start"] = xmlrpc_marshall_singlevalue(self.start)
-        rr["end"] = xmlrpc_marshall_singlevalue(self.end)
-        rr["state"] = self.state
-        return rr
-                
-class VMResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, nodes, res, backfill_reservation):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.nodes = nodes # { vnode -> pnode }
-        self.backfill_reservation = backfill_reservation
-        self.pre_rrs = []
-        self.post_rrs = []
-
-        # ONLY for simulation
-        self.__update_prematureend()
-
-    def update_start(self, time):
-        self.start = time
-        # ONLY for simulation
-        self.__update_prematureend()
-
-    def update_end(self, time):
-        self.end = time
-        # ONLY for simulation
-        self.__update_prematureend()
-        
-    # ONLY for simulation
-    def __update_prematureend(self):
-        if self.lease.duration.known != None:
-            remdur = self.lease.duration.get_remaining_known_duration()
-            rrdur = self.end - self.start
-            if remdur < rrdur:
-                self.prematureend = self.start + remdur
-            else:
-                self.prematureend = None
-        else:
-            self.prematureend = None 
-
-    def get_final_end(self):
-        if len(self.post_rrs) == 0:
-            return self.end
-        else:
-            return self.post_rrs[-1].end
-
-    def is_suspending(self):
-        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
-
-    def is_shutting_down(self):
-        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        for resmrr in self.pre_rrs:
-            resmrr.print_contents(loglevel)
-            self.logger.log(loglevel, "--")
-        self.logger.log(loglevel, "Type           : VM")
-        self.logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
-        if self.prematureend != None:
-            self.logger.log(loglevel, "Premature end  : %s" % self.prematureend)
-        ResourceReservation.print_contents(self, loglevel)
-        for susprr in self.post_rrs:
-            self.logger.log(loglevel, "--")
-            susprr.print_contents(loglevel)
-        
-    def is_preemptible(self):
-        return self.lease.preemptible
-
-    def xmlrpc_marshall(self):
-        rr = ResourceReservation.xmlrpc_marshall(self)
-        rr["type"] = "VM"
-        rr["nodes"] = self.nodes.items()
-        return rr
-
-        
-class SuspensionResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, res, vnodes, vmrr):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.vmrr = vmrr
-        self.vnodes = vnodes
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Type           : SUSPEND")
-        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
-        ResourceReservation.print_contents(self, loglevel)
-        
-    def is_first(self):
-        return (self == self.vmrr.post_rrs[0])
-
-    def is_last(self):
-        return (self == self.vmrr.post_rrs[-1])
-        
-    # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
-    # has wider implications (with a non-trivial handling). For now, we leave them 
-    # as non-preemptible, since the probability of preempting a suspension RR is slim.
-    def is_preemptible(self):
-        return False        
-        
-    def xmlrpc_marshall(self):
-        rr = ResourceReservation.xmlrpc_marshall(self)
-        rr["type"] = "SUSP"
-        return rr
-        
-class ResumptionResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, res, vnodes, vmrr):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.vmrr = vmrr
-        self.vnodes = vnodes
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Type           : RESUME")
-        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
-        ResourceReservation.print_contents(self, loglevel)
-
-    def is_first(self):
-        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
-        return (self == resm_rrs[0])
-
-    def is_last(self):
-        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
-        return (self == resm_rrs[-1])
-
-    # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
-    # has wider implications (with a non-trivial handling). For now, we leave them 
-    # as non-preemptible, since the probability of preempting a resumption RR is slim.
-    def is_preemptible(self):
-        return False        
-        
-    def xmlrpc_marshall(self):
-        rr = ResourceReservation.xmlrpc_marshall(self)
-        rr["type"] = "RESM"
-        return rr
-    
-class ShutdownResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, res, vnodes, vmrr):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.vmrr = vmrr
-        self.vnodes = vnodes
-
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Type           : SHUTDOWN")
-        ResourceReservation.print_contents(self, loglevel)
-        
-    def is_preemptible(self):
-        return True        
-        
-    def xmlrpc_marshall(self):
-        rr = ResourceReservation.xmlrpc_marshall(self)
-        rr["type"] = "SHTD"
-        return rr
-
-class MigrationResourceReservation(ResourceReservation):
-    def __init__(self, lease, start, end, res, vmrr, transfers):
-        ResourceReservation.__init__(self, lease, start, end, res)
-        self.vmrr = vmrr
-        self.transfers = transfers
-        
-    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
-        self.logger.log(loglevel, "Type           : MIGRATE")
-        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
-        ResourceReservation.print_contents(self, loglevel)        
-
-    def is_preemptible(self):
-        return False        
-
-#-------------------------------------------------------------------#
-#                                                                   #
-#                         LEASE CONTAINERS                          #
-#                                                                   #
-#-------------------------------------------------------------------#
-        
-class Queue(object):
-    def __init__(self, scheduler):
-        self.scheduler = scheduler
-        self.__q = []
-        
-    def is_empty(self):
-        return len(self.__q)==0
-    
-    def enqueue(self, r):
-        self.__q.append(r)
-    
-    def dequeue(self):
-        return self.__q.pop(0)
-    
-    def enqueue_in_order(self, r):
-        self.__q.append(r)
-        self.__q.sort(key=attrgetter("submit_time"))
-
-    def length(self):
-        return len(self.__q)
-    
-    def has_lease(self, lease_id):
-        return (1 == len([l for l in self.__q if l.id == lease_id]))
-    
-    def get_lease(self, lease_id):
-        return [l for l in self.__q if l.id == lease_id][0]
-    
-    def remove_lease(self, lease):
-        self.__q.remove(lease)
-    
-    def __iter__(self):
-        return iter(self.__q)
-        
-class LeaseTable(object):
-    def __init__(self, scheduler):
-        self.scheduler = scheduler
-        self.entries = {}
-        
-    def has_lease(self, lease_id):
-        return self.entries.has_key(lease_id)
-        
-    def get_lease(self, lease_id):
-        return self.entries[lease_id]
-    
-    def is_empty(self):
-        return len(self.entries)==0
-    
-    def remove(self, lease):
-        del self.entries[lease.id]
-        
-    def add(self, lease):
-        self.entries[lease.id] = lease
-        
-    def get_leases(self, type=None):
-        if type==None:
-            return self.entries.values()
-        else:
-            return [e for e in self.entries.values() if isinstance(e, type)]
-
-    def get_leases_by_state(self, state):
-        return [e for e in self.entries.values() if e.state == state]
-    
-#-------------------------------------------------------------------#
-#                                                                   #
-#             MISCELLANEOUS DATA STRUCTURES CONTAINERS              #
-#                                                                   #
-#-------------------------------------------------------------------#    
-    
-class ResourceTuple(object):
-    def __init__(self, res):
-        self._res = res
-        
-    @classmethod
-    def from_list(cls, l):
-        return cls(l[:])
-
-    @classmethod
-    def copy(cls, rt):
-        return cls(rt._res[:])
-    
-    @classmethod
-    def set_resource_types(cls, resourcetypes):
-        cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
-        cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
-        cls.tuplelength = len(resourcetypes)
-
-    @classmethod
-    def create_empty(cls):
-        return cls([0 for x in range(cls.tuplelength)])
-        
-    def fits_in(self, res2):
-        fits = True
-        for i in xrange(len(self._res)):
-            if self._res[i] > res2._res[i]:
-                fits = False
-                break
-        return fits
-    
-    def get_num_fits_in(self, res2):
-        canfit = 10000 # Arbitrarily large
-        for i in xrange(len(self._res)):
-            if self._res[i] != 0:
-                f = res2._res[i] / self._res[i]
-                if f < canfit:
-                    canfit = f
-        return int(floor(canfit))
-    
-    def decr(self, res2):
-        for slottype in xrange(len(self._res)):
-            self._res[slottype] -= res2._res[slottype]
-
-    def incr(self, res2):
-        for slottype in xrange(len(self._res)):
-            self._res[slottype] += res2._res[slottype]
-        
-    def get_by_type(self, resourcetype):
-        return self._res[self.type2pos[resourcetype]]
-
-    def set_by_type(self, resourcetype, value):
-        self._res[self.type2pos[resourcetype]] = value        
-        
-    def is_zero_or_less(self):
-        return sum([v for v in self._res]) <= 0
-    
-    def __repr__(self):
-        r=""
-        for i, x in enumerate(self._res):
-            r += "%s:%.2f " % (self.descriptions[i], x)
-        return r
-
-class Timestamp(object):
-    def __init__(self, requested):
-        self.requested = requested
-        self.scheduled = None
-        self.actual = None
-
-    def __repr__(self):
-        return "REQ: %s  |  SCH: %s  |  ACT: %s" % (self.requested, self.scheduled, self.actual)
-        
-class Duration(object):
-    def __init__(self, requested, known=None):
-        self.original = requested
-        self.requested = requested
-        self.accumulated = TimeDelta()
-        self.actual = None
-        # The following is ONLY used in simulation
-        self.known = known
-        
-    def incr(self, t):
-        self.requested += t
-        if self.known != None:
-            self.known += t
-            
-    def incr_by_percent(self, pct):
-        factor = 1 + float(pct)/100
-        self.requested = round_datetime_delta(self.requested * factor)
-        if self.known != None:
-            self.requested = round_datetime_delta(self.known * factor)
-        
-    def accumulate_duration(self, t):
-        self.accumulated += t
-            
-    def get_remaining_duration(self):
-        return self.requested - self.accumulated
-
-    # ONLY in simulation
-    def get_remaining_known_duration(self):
-        return self.known - self.accumulated
-            
-    def __repr__(self):
-        return "REQ: %s  |  ACC: %s  |  ACT: %s  |  KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
-    

Modified: trunk/src/haizea/resourcemanager/enact/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/__init__.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/__init__.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -17,14 +17,14 @@
 # -------------------------------------------------------------------------- #
 
 from haizea.common.utils import abstract
-import haizea.resourcemanager.datastruct as ds
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 
 class ResourcePoolInfo(object):
     def __init__(self):
         # Initialize the resource types in the ResourceTuple class
         # TODO: Do this in a less kludgy way
         resourcetypes = self.get_resource_types()
-        ds.ResourceTuple.set_resource_types(resourcetypes)
+        ResourceTuple.set_resource_types(resourcetypes)
 
 
     def get_nodes(self): 

Modified: trunk/src/haizea/resourcemanager/enact/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/opennebula.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -16,11 +16,10 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
-from haizea.resourcemanager.resourcepool import Node
+from haizea.resourcemanager.scheduler.resourcepool import Node
 from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
 from haizea.common.utils import get_config
 import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
 from pysqlite2 import dbapi2 as sqlite
 import logging
 import commands

Modified: trunk/src/haizea/resourcemanager/enact/simulated.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/enact/simulated.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -16,11 +16,11 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
-from haizea.resourcemanager.resourcepool import Node
+from haizea.resourcemanager.scheduler.resourcepool import Node
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 from haizea.resourcemanager.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
 import haizea.common.constants as constants
 from haizea.common.utils import get_config
-import haizea.resourcemanager.datastruct as ds
 import logging
 
 class SimulatedResourcePoolInfo(ResourcePoolInfo):
@@ -50,7 +50,7 @@
     def parse_resources_string(self, resources):
         resources = resources.split(";")
         desc2type = dict([(x[2], x[0]) for x in self.get_resource_types()])
-        capacity=ds.ResourceTuple.create_empty()
+        capacity = ResourceTuple.create_empty()
         for r in resources:
             resourcename = r.split(",")[0]
             resourcecapacity = r.split(",")[1]
@@ -107,7 +107,7 @@
         # Image repository nodes
         numnodes = config.get("simul.nodes")
         
-        imgcapacity = ds.ResourceTuple.create_empty()
+        imgcapacity = ResourceTuple.create_empty()
         imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
 
         self.fifo_node = Node(numnodes+1, "FIFOnode", imgcapacity)

Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -18,7 +18,8 @@
 
 import haizea.common.constants as constants
 from haizea.resourcemanager.frontends import RequestFrontend
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 from haizea.common.utils import UNIX2DateTime, round_datetime, get_config, get_clock
 
 from pysqlite2 import dbapi2 as sqlite

Modified: trunk/src/haizea/resourcemanager/frontends/rpc.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/rpc.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/rpc.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -16,7 +16,8 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 import haizea.common.constants as constants
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 from haizea.resourcemanager.frontends import RequestFrontend
 from haizea.common.utils import round_datetime, get_config, get_clock
 from mx.DateTime import DateTimeDelta, TimeDelta, ISO

Modified: trunk/src/haizea/resourcemanager/frontends/tracefile.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/tracefile.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/frontends/tracefile.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -20,7 +20,7 @@
 from haizea.common.utils import get_clock
 from haizea.resourcemanager.frontends import RequestFrontend
 import haizea.traces.readers as tracereaders
-from haizea.resourcemanager.datastruct import ARLease, BestEffortLease 
+from haizea.resourcemanager.leases import ARLease, BestEffortLease 
 import operator
 import logging
 

Copied: trunk/src/haizea/resourcemanager/leases.py (from rev 553, trunk/src/haizea/resourcemanager/datastruct.py)
===================================================================
--- trunk/src/haizea/resourcemanager/leases.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/leases.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,378 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+"""This module provides the lease data structures, and a couple of auxiliary
+data structures.
+
+* Lease data structures
+  * Lease: Base class for leases
+  * ARLease: Advance reservation lease
+  * BestEffortLease: Best-effort lease
+  * ImmediateLease: Immediate lease
+* Miscellaneous structures
+  * Timestamp: A wrapper around requested/scheduled/actual timestamps
+  * Duration: A wrapper around requested/accumulated/actual durations
+"""
+
+from haizea.common.constants import RES_MEM, MIGRATE_NONE, MIGRATE_MEM, MIGRATE_MEMDISK, LOGLEVEL_VDEBUG
+from haizea.common.utils import StateMachine, round_datetime_delta, get_lease_id, pretty_nodemap, estimate_transfer_time, xmlrpc_marshall_singlevalue
+
+from operator import attrgetter
+from mx.DateTime import TimeDelta
+from math import floor
+
+import logging
+
+
+#-------------------------------------------------------------------#
+#                                                                   #
+#                     LEASE DATA STRUCTURES                         #
+#                                                                   #
+#-------------------------------------------------------------------#
+
+
+class Lease(object):
+    # Lease states
+    STATE_NEW = 0
+    STATE_PENDING = 1
+    STATE_REJECTED = 2
+    STATE_SCHEDULED = 3
+    STATE_QUEUED = 4
+    STATE_CANCELLED = 5
+    STATE_PREPARING = 6
+    STATE_READY = 7
+    STATE_ACTIVE = 8
+    STATE_SUSPENDING = 9
+    STATE_SUSPENDED = 10
+    STATE_MIGRATING = 11
+    STATE_RESUMING = 12
+    STATE_RESUMED_READY = 13
+    STATE_DONE = 14
+    STATE_FAIL = 15
+    
+    state_str = {STATE_NEW : "New",
+                 STATE_PENDING : "Pending",
+                 STATE_REJECTED : "Rejected",
+                 STATE_SCHEDULED : "Scheduled",
+                 STATE_QUEUED : "Queued",
+                 STATE_CANCELLED : "Cancelled",
+                 STATE_PREPARING : "Preparing",
+                 STATE_READY : "Ready",
+                 STATE_ACTIVE : "Active",
+                 STATE_SUSPENDING : "Suspending",
+                 STATE_SUSPENDED : "Suspended",
+                 STATE_MIGRATING : "Migrating",
+                 STATE_RESUMING : "Resuming",
+                 STATE_RESUMED_READY: "Resumed-Ready",
+                 STATE_DONE : "Done",
+                 STATE_FAIL : "Fail"}
+    
+    def __init__(self, submit_time, start, duration, diskimage_id, 
+                 diskimage_size, numnodes, requested_resources, preemptible):
+        # Lease ID (read only)
+        self.id = get_lease_id()
+        
+        # Request attributes (read only)
+        self.submit_time = submit_time
+        self.start = start
+        self.duration = duration
+        self.end = None
+        self.diskimage_id = diskimage_id
+        self.diskimage_size = diskimage_size
+        # TODO: The following assumes homogeneous nodes. Should be modified
+        # to account for heterogeneous nodes.
+        self.numnodes = numnodes
+        self.requested_resources = requested_resources
+        self.preemptible = preemptible
+
+        # Bookkeeping attributes
+        # (keep track of the lease's state, resource reservations, etc.)
+        self.state = Lease.STATE_NEW
+        self.diskimagemap = {}
+        self.memimagemap = {}
+        self.deployment_rrs = []
+        self.vm_rrs = []
+
+        # Enactment information. Should only be manipulated by enactment module
+        self.enactment_info = None
+        self.vnode_enactment_info = dict([(n+1, None) for n in range(numnodes)])
+        
+        self.logger = logging.getLogger("LEASES")
+        
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Lease ID       : %i" % self.id)
+        self.logger.log(loglevel, "Submission time: %s" % self.submit_time)
+        self.logger.log(loglevel, "Duration       : %s" % self.duration)
+        self.logger.log(loglevel, "State          : %s" % Lease.state_str[self.state])
+        self.logger.log(loglevel, "Disk image     : %s" % self.diskimage_id)
+        self.logger.log(loglevel, "Disk image size: %s" % self.diskimage_size)
+        self.logger.log(loglevel, "Num nodes      : %s" % self.numnodes)
+        self.logger.log(loglevel, "Resource req   : %s" % self.requested_resources)
+        self.logger.log(loglevel, "Disk image map : %s" % pretty_nodemap(self.diskimagemap))
+        self.logger.log(loglevel, "Mem image map  : %s" % pretty_nodemap(self.memimagemap))
+
+    def print_rrs(self, loglevel=LOGLEVEL_VDEBUG):
+        if len(self.deployment_rrs) > 0:
+            self.logger.log(loglevel, "DEPLOYMENT RESOURCE RESERVATIONS")
+            self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
+            for r in self.deployment_rrs:
+                r.print_contents(loglevel)
+                self.logger.log(loglevel, "##")
+        self.logger.log(loglevel, "VM RESOURCE RESERVATIONS")
+        self.logger.log(loglevel, "~~~~~~~~~~~~~~~~~~~~~~~~")
+        for r in self.vm_rrs:
+            r.print_contents(loglevel)
+            self.logger.log(loglevel, "##")
+
+    def get_endtime(self):
+        vmrr = self.get_last_vmrr()
+        return vmrr.end
+
+    def append_vmrr(self, vmrr):
+        self.vm_rrs.append(vmrr)
+        
+    def append_deployrr(self, vmrr):
+        self.deployment_rrs.append(vmrr)
+
+    def get_last_vmrr(self):
+        return self.vm_rrs[-1]
+
+    def update_vmrr(self, rrold, rrnew):
+        self.vm_rrs[self.vm_rrs.index(rrold)] = rrnew
+    
+    def remove_vmrr(self, vmrr):
+        if not vmrr in self.vm_rrs:
+            raise Exception, "Tried to remove an VM RR not contained in this lease"
+        else:
+            self.vm_rrs.remove(vmrr)
+
+    def clear_rrs(self):
+        self.deployment_rrs = []
+        self.vm_rrs = []
+        
+    def add_boot_overhead(self, t):
+        self.duration.incr(t)        
+
+    def add_runtime_overhead(self, percent):
+        self.duration.incr_by_percent(percent)
+        
+    def xmlrpc_marshall(self):
+        # Convert to something we can send through XMLRPC
+        l = {}
+        l["id"] = self.id
+        l["submit_time"] = xmlrpc_marshall_singlevalue(self.submit_time)
+        l["start_req"] = xmlrpc_marshall_singlevalue(self.start.requested)
+        l["start_sched"] = xmlrpc_marshall_singlevalue(self.start.scheduled)
+        l["start_actual"] = xmlrpc_marshall_singlevalue(self.start.actual)
+        l["duration_req"] = xmlrpc_marshall_singlevalue(self.duration.requested)
+        l["duration_acc"] = xmlrpc_marshall_singlevalue(self.duration.accumulated)
+        l["duration_actual"] = xmlrpc_marshall_singlevalue(self.duration.actual)
+        l["end"] = xmlrpc_marshall_singlevalue(self.end)
+        l["diskimage_id"] = self.diskimage_id
+        l["diskimage_size"] = self.diskimage_size
+        l["numnodes"] = self.numnodes
+        l["resources"] = `self.requested_resources`
+        l["preemptible"] = self.preemptible
+        l["state"] = self.state
+        l["vm_rrs"] = [vmrr.xmlrpc_marshall() for vmrr in self.vm_rrs]
+                
+        return l
+        
+class LeaseStateMachine(StateMachine):
+    initial_state = Lease.STATE_NEW
+    transitions = {Lease.STATE_NEW:           [(Lease.STATE_PENDING,    "")],
+                   Lease.STATE_PENDING:       [(Lease.STATE_SCHEDULED,  ""),
+                                               (Lease.STATE_QUEUED,     ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_REJECTED,   "")],
+                   Lease.STATE_SCHEDULED:     [(Lease.STATE_PREPARING,  ""),
+                                               (Lease.STATE_READY,      ""),
+                                               (Lease.STATE_MIGRATING,  ""),
+                                               (Lease.STATE_RESUMING,   ""),
+                                               (Lease.STATE_CANCELLED,  "")],
+                   Lease.STATE_QUEUED:        [(Lease.STATE_SCHEDULED,  ""),
+                                               (Lease.STATE_SUSPENDED,  ""),
+                                               (Lease.STATE_CANCELLED,  "")],
+                   Lease.STATE_PREPARING:     [(Lease.STATE_READY,      ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_READY:         [(Lease.STATE_ACTIVE,     ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_ACTIVE:        [(Lease.STATE_SUSPENDING, ""),
+                                               (Lease.STATE_DONE,       ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_SUSPENDING:    [(Lease.STATE_SUSPENDED,  ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_SUSPENDED:     [(Lease.STATE_QUEUED,     ""),
+                                               (Lease.STATE_MIGRATING,  ""),
+                                               (Lease.STATE_RESUMING,   ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_MIGRATING:     [(Lease.STATE_SUSPENDED,  ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_RESUMING:      [(Lease.STATE_RESUMED_READY, ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   Lease.STATE_RESUMED_READY: [(Lease.STATE_ACTIVE,     ""),
+                                               (Lease.STATE_CANCELLED,  ""),
+                                               (Lease.STATE_FAIL,       "")],
+                   
+                   # Final states
+                   Lease.STATE_DONE:          [],
+                   Lease.STATE_CANCELLED:     [],
+                   Lease.STATE_FAIL:          [],
+                   Lease.STATE_REJECTED:      [],
+                   }
+    
+    def __init__(self):
+        StateMachine.__init__(initial_state, transitions)
+        
+class ARLease(Lease):
+    def __init__(self, submit_time, start, duration, diskimage_id, 
+                 diskimage_size, numnodes, resreq, preemptible,
+                 # AR-specific parameters:
+                 realdur = None):
+        start = Timestamp(start)
+        duration = Duration(duration)
+        if realdur != duration.requested:
+            duration.known = realdur # ONLY for simulation
+        Lease.__init__(self, submit_time, start, duration, diskimage_id,
+                           diskimage_size, numnodes, resreq, preemptible)
+        
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "__________________________________________________")
+        Lease.print_contents(self, loglevel)
+        self.logger.log(loglevel, "Type           : AR")
+        self.logger.log(loglevel, "Start time     : %s" % self.start)
+        self.print_rrs(loglevel)
+        self.logger.log(loglevel, "--------------------------------------------------")
+    
+    def xmlrpc_marshall(self):
+        l = Lease.xmlrpc_marshall(self)
+        l["type"] = "AR"
+        return l
+
+        
+class BestEffortLease(Lease):
+    def __init__(self, submit_time, duration, diskimage_id, 
+                 diskimage_size, numnodes, resreq, preemptible,
+                 # BE-specific parameters:
+                 realdur = None):
+        start = Timestamp(None) # i.e., start on a best-effort basis
+        duration = Duration(duration)
+        if realdur != duration.requested:
+            duration.known = realdur # ONLY for simulation
+        # When the images will be available
+        self.imagesavail = None        
+        Lease.__init__(self, submit_time, start, duration, diskimage_id,
+                           diskimage_size, numnodes, resreq, preemptible)
+
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "__________________________________________________")
+        Lease.print_contents(self, loglevel)
+        self.logger.log(loglevel, "Type           : BEST-EFFORT")
+        self.logger.log(loglevel, "Images Avail @ : %s" % self.imagesavail)
+        self.print_rrs(loglevel)
+        self.logger.log(loglevel, "--------------------------------------------------")
+        
+    def get_waiting_time(self):
+        return self.start.actual - self.submit_time
+        
+    def get_slowdown(self, bound=10):
+        time_on_dedicated = self.duration.original
+        time_on_loaded = self.end - self.submit_time
+        bound = TimeDelta(seconds=bound)
+        if time_on_dedicated < bound:
+            time_on_dedicated = bound
+        return time_on_loaded / time_on_dedicated
+
+    def xmlrpc_marshall(self):
+        l = Lease.xmlrpc_marshall(self)
+        l["type"] = "BE"
+        return l
+
+
+class ImmediateLease(Lease):
+    def __init__(self, submit_time, duration, diskimage_id, 
+                 diskimage_size, numnodes, resreq, preemptible,
+                 # Immediate-specific parameters:
+                 realdur = None):
+        start = Timestamp(None) # i.e., start on a best-effort basis
+        duration = Duration(duration)
+        duration.known = realdur # ONLY for simulation
+        Lease.__init__(self, submit_time, start, duration, diskimage_id,
+                           diskimage_size, numnodes, resreq, preemptible)
+
+    def print_contents(self, loglevel=LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "__________________________________________________")
+        Lease.print_contents(self, loglevel)
+        self.logger.log(loglevel, "Type           : IMMEDIATE")
+        self.print_rrs(loglevel)
+        self.logger.log(loglevel, "--------------------------------------------------")
+
+    def xmlrpc_marshall(self):
+        l = Lease.xmlrpc_marshall(self)
+        l["type"] = "IM"
+        return l
+
+class Timestamp(object):
+    def __init__(self, requested):
+        self.requested = requested
+        self.scheduled = None
+        self.actual = None
+
+    def __repr__(self):
+        return "REQ: %s  |  SCH: %s  |  ACT: %s" % (self.requested, self.scheduled, self.actual)
+        
+class Duration(object):
+    def __init__(self, requested, known=None):
+        self.original = requested
+        self.requested = requested
+        self.accumulated = TimeDelta()
+        self.actual = None
+        # The following is ONLY used in simulation
+        self.known = known
+        
+    def incr(self, t):
+        self.requested += t
+        if self.known != None:
+            self.known += t
+            
+    def incr_by_percent(self, pct):
+        factor = 1 + float(pct)/100
+        self.requested = round_datetime_delta(self.requested * factor)
+        if self.known != None:
+            self.requested = round_datetime_delta(self.known * factor)
+        
+    def accumulate_duration(self, t):
+        self.accumulated += t
+            
+    def get_remaining_duration(self):
+        return self.requested - self.accumulated
+
+    # ONLY in simulation
+    def get_remaining_known_duration(self):
+        return self.known - self.accumulated
+            
+    def __repr__(self):
+        return "REQ: %s  |  ACC: %s  |  ACT: %s  |  KNW: %s" % (self.requested, self.accumulated, self.actual, self.known)
+    


Property changes on: trunk/src/haizea/resourcemanager/leases.py
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Deleted: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/resourcepool.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -1,425 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago                                 #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
-# Complutense de Madrid (dsa-research.org)                                   #
-#                                                                            #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
-# not use this file except in compliance with the License. You may obtain    #
-# a copy of the License at                                                   #
-#                                                                            #
-# http://www.apache.org/licenses/LICENSE-2.0                                 #
-#                                                                            #
-# Unless required by applicable law or agreed to in writing, software        #
-# distributed under the License is distributed on an "AS IS" BASIS,          #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
-# See the License for the specific language governing permissions and        #
-# limitations under the License.                                             #
-# -------------------------------------------------------------------------- #
-
-from haizea.common.utils import vnodemapstr, get_accounting
-import haizea.common.constants as constants
-import haizea.resourcemanager.enact.actions as actions
-import logging 
-
-class FailedEnactmentException(Exception):
-    pass
-
-class ResourcePool(object):
-    def __init__(self, info_enact, vm_enact, deploy_enact):
-        self.logger = logging.getLogger("RPOOL")
-                
-        self.info = info_enact
-        self.vm = vm_enact
-        # TODO: Ideally, deployment enactment shouldn't be here, specially since
-        # it already "hangs" below the deployment modules. For now,
-        # it does no harm, though.
-        self.deployment = deploy_enact
-        
-        self.nodes = self.info.get_nodes()
-        
-    def start_vms(self, lease, rr):
-        start_action = actions.VMEnactmentStartAction()
-        start_action.from_rr(rr)
-                
-        for (vnode, pnode) in rr.nodes.items():
-            node = self.get_node(pnode)
-            diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
-            start_action.vnodes[vnode].pnode = node.enactment_info
-            start_action.vnodes[vnode].diskimage = diskimage.filename
-            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
-
-        try:
-            self.vm.start(start_action)
-        except Exception, msg:
-            self.logger.error("Enactment of start VM failed: %s" % msg)
-            raise FailedEnactmentException()
-        
-    def stop_vms(self, lease, rr):
-        stop_action = actions.VMEnactmentStopAction()
-        stop_action.from_rr(rr)
-        try:
-            self.vm.stop(stop_action)
-        except Exception, msg:
-            self.logger.error("Enactment of end VM failed: %s" % msg)
-            raise FailedEnactmentException()
-         
-    def suspend_vms(self, lease, rr):
-        # Add memory image files
-        for vnode in rr.vnodes:
-            pnode = rr.vmrr.nodes[vnode]
-            self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
-
-        # Enact suspend
-        suspend_action = actions.VMEnactmentSuspendAction()
-        suspend_action.from_rr(rr)
-        try:
-            self.vm.suspend(suspend_action)
-        except Exception, msg:
-            self.logger.error("Enactment of suspend VM failed: %s" % msg)
-            raise FailedEnactmentException()
-    
-    def verify_suspend(self, lease, rr):
-        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
-        verify_suspend_action.from_rr(rr)
-        self.vm.verify_suspend(verify_suspend_action)
-    
-    def resume_vms(self, lease, rr):
-        # Remove memory image files
-        for vnode in rr.vnodes:
-            pnode = rr.vmrr.nodes[vnode]
-            self.remove_ramfile(pnode, lease.id, vnode)
-
-        # Enact resume
-        resume_action = actions.VMEnactmentResumeAction()
-        resume_action.from_rr(rr)
-        try:
-            self.vm.resume(resume_action)
-        except Exception, msg:
-            self.logger.error("Enactment of resume VM failed: %s" % msg)
-            raise FailedEnactmentException()
-    
-    def verify_resume(self, lease, rr):
-        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
-        verify_resume_action.from_rr(rr)
-        self.vm.verify_resume(verify_resume_action)    
-    
-    def get_nodes(self):
-        return self.nodes
-    
-    # An auxiliary node is a host whose resources are going to be scheduled, but
-    # where no VMs are actually going to run. For example, a disk image repository node.
-    def get_aux_nodes(self):
-        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
-        # There might be a scenario where the info enactment module also reports
-        # auxiliary nodes.
-        return self.deployment.get_aux_nodes()
-
-    def get_num_nodes(self):
-        return len(self.nodes)
-        
-    def get_node(self, nod_id):
-        return self.nodes[nod_id-1]
-        
-    def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
-        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
-        
-        self.logger.vdebug("Files BEFORE:")
-        self.get_node(pnode).print_files()
-        
-        imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
-        img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
-        self.get_node(pnode).add_file(img)
-
-        self.logger.vdebug("Files AFTER:")
-        self.get_node(pnode).print_files()
-        
-        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-        return img
-            
-    def remove_diskimage(self, pnode, lease, vnode):
-        node = self.get_node(pnode)
-        node.print_files()
-
-        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
-        node.remove_diskimage(lease, vnode)
-
-        node.print_files()
-        
-        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())    
-        
-    def add_ramfile(self, pnode, lease_id, vnode, size):
-        node = self.get_node(pnode)
-        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
-        node.print_files()
-        f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
-        node.add_file(f)        
-        node.print_files()
-        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-
-    def remove_ramfile(self, pnode, lease_id, vnode):
-        node = self.get_node(pnode)
-        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
-        node.print_files()
-        node.remove_ramfile(lease_id, vnode)
-        node.print_files()
-        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-        
-    def get_max_disk_usage(self):
-        return max([n.get_disk_usage() for n in self.nodes])
-    
-class Node(object):
-    def __init__(self, nod_id, hostname, capacity):
-        self.logger = logging.getLogger("RESOURCEPOOL")
-        self.nod_id = nod_id
-        self.hostname = hostname
-        self.capacity = capacity
-        self.files = []
-
-        # enactment-specific information
-        self.enactment_info = None
-        
-    def get_capacity(self):
-        return self.capacity
-           
-    def add_file(self, f):
-        self.files.append(f)
-        
-    def get_diskimage(self, lease_id, vnode, diskimage_id):
-        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
-                 f.diskimage_id == diskimage_id and 
-                 f.lease_id == lease_id and
-                 f.vnode == vnode]
-        if len(image) == 0:
-            return None
-        elif len(image) == 1:
-            return image[0]
-        elif len(image) > 1:
-            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
-            return image[0]
-
-    def remove_diskimage(self, lease_id, vnode):
-        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
-                 f.lease_id == lease_id and
-                 f.vnode == vnode]
-        if len(image) > 0:
-            image = image[0]
-            self.files.remove(image)
-            
-    def remove_ramfile(self, lease_id, vnode):
-        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
-        if len(ramfile) > 0:
-            ramfile = ramfile[0]
-            self.files.remove(ramfile)
-                
-        
-    def get_disk_usage(self):
-        return sum([f.filesize for f in self.files])
-
-
-    def get_diskimages(self):
-        return [f for f in self.files if isinstance(f, DiskImageFile)]
-        
-    def print_files(self):
-        images = ""
-        if len(self.files) > 0:
-            images = ", ".join([str(img) for img in self.files])
-        self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
-
-    def xmlrpc_marshall(self):
-        # Convert to something we can send through XMLRPC
-        h = {}
-        h["id"] = self.nod_id
-        h["hostname"] = self.hostname
-        h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
-        h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
-                
-        return h
-        
-
-        
-class File(object):
-    def __init__(self, filename, filesize):
-        self.filename = filename
-        self.filesize = filesize
-        
-class DiskImageFile(File):
-    def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
-        File.__init__(self, filename, filesize)
-        self.lease_id = lease_id
-        self.vnode = vnode
-        self.diskimage_id = diskimage_id
-                
-    def __str__(self):
-        return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
-
-
-class RAMImageFile(File):
-    def __init__(self, filename, filesize, lease_id, vnode):
-        File.__init__(self, filename, filesize)
-        self.lease_id = lease_id
-        self.vnode = vnode
-                
-    def __str__(self):
-        return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
-    
-class ResourcePoolWithReusableImages(ResourcePool):
-    def __init__(self, info_enact, vm_enact, deploy_enact):
-        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
-        
-        self.nodes = [NodeWithReusableImages.from_node(n) for n in self.nodes]
-    
-    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
-        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
-        
-        self.logger.vdebug("Files BEFORE:")
-        self.get_node(pnode).print_files()
-        
-        imagefile = "reusable-%s" % diskimage_id
-        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
-        for (lease_id, vnode) in mappings:
-            img.add_mapping(lease_id, vnode)
-
-        self.get_node(pnode).add_reusable_image(img)
-
-        self.logger.vdebug("Files AFTER:")
-        self.get_node(pnode).print_files()
-        
-        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
-        return img
-    
-    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
-        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
-    
-    def remove_diskimage(self, pnode_id, lease, vnode):
-        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
-        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
-        for img in self.get_node(pnode_id).get_reusable_images():
-            if (lease, vnode) in img.mappings:
-                img.mappings.remove((lease, vnode))
-            self.get_node(pnode_id).print_files()
-            # Keep image around, even if it isn't going to be used
-            # by any VMs. It might be reused later on.
-            # It will be purged if space has to be made available
-            # for other images
-        
-    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
-        return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
-
-    def exists_reusable_image(self, pnode_id, diskimage_id, after):
-        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
-    
-    
-class NodeWithReusableImages(Node):
-    def __init__(self, nod_id, hostname, capacity):
-        Node.__init__(self, nod_id, hostname, capacity)
-        self.reusable_images = []
-
-    @classmethod
-    def from_node(cls, n):
-        node = cls(n.nod_id, n.hostname, n.capacity)
-        node.enactment_info = n.enactment_info
-        return node
-    
-    def add_reusable_image(self, f):
-        self.reusable_images.append(f)
-
-    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
-        for f in self.reusable_images:
-            if f.diskimage_id == diskimage_id:
-                f.add_mapping(lease_id, vnode)
-                f.update_timeout(timeout)
-                break  # Ugh
-        self.print_files()
-            
-    def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
-        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
-        if after != None:
-            images = [i for i in images if i.timeout >= after]
-        if lease_id != None and vnode != None:
-            images = [i for i in images if i.has_mapping(lease_id, vnode)]
-        if len(images)>0:
-            return images[0]
-        else:
-            return None
-        
-    def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
-        entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
-        if entry == None:
-            return False
-        else:
-            return True
-
-    def get_reusable_images(self):
-        return self.reusable_images
-
-    def get_reusable_images_size(self):
-        return sum([f.filesize for f in self.reusable_images])
-    
-    def purge_oldest_unused_image(self):
-        unused = [img for img in self.reusable_images if not img.has_mappings()]
-        if len(unused) == 0:
-            return 0
-        else:
-            i = iter(unused)
-            oldest = i.next()
-            for img in i:
-                if img.timeout < oldest.timeout:
-                    oldest = img
-            self.reusable_images.remove(oldest)
-            return 1
-    
-    def purge_downto(self, target):
-        done = False
-        while not done:
-            removed = self.purge_oldest_unused_image()
-            if removed==0:
-                done = True
-                success = False
-            elif removed == 1:
-                if self.get_reusable_images_size() <= target:
-                    done = True
-                    success = True
-        return success
-
-    def print_files(self):
-        Node.print_files(self)
-        images = ""
-        if len(self.reusable_images) > 0:
-            images = ", ".join([str(img) for img in self.reusable_images])
-        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
-
-class ReusableDiskImageFile(File):
-    def __init__(self, filename, filesize, diskimage_id, timeout):
-        File.__init__(self, filename, filesize)
-        self.diskimage_id = diskimage_id
-        self.mappings = set([])
-        self.timeout = timeout
-        
-    def add_mapping(self, lease_id, vnode):
-        self.mappings.add((lease_id, vnode))
-        
-    def has_mapping(self, lease_id, vnode):
-        return (lease_id, vnode) in self.mappings
-    
-    def has_mappings(self):
-        return len(self.mappings) > 0
-        
-    def update_timeout(self, timeout):
-        if timeout > self.timeout:
-            self.timeout = timeout
-        
-    def is_expired(self, curTime):
-        if self.timeout == None:
-            return False
-        elif self.timeout > curTime:
-            return True
-        else:
-            return False
-        
-    def __str__(self):
-        if self.timeout == None:
-            timeout = "NOTIMEOUT"
-        else:
-            timeout = self.timeout
-        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
-

Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/rm.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -34,17 +34,18 @@
 import haizea.resourcemanager.accounting as accounting
 import haizea.common.constants as constants
 import haizea.resourcemanager.enact as enact
-from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeploymentScheduler
-from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeploymentScheduler
+from haizea.resourcemanager.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
+from haizea.resourcemanager.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
 from haizea.resourcemanager.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
 from haizea.resourcemanager.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
 from haizea.resourcemanager.frontends.tracefile import TracefileFrontend
 from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
 from haizea.resourcemanager.frontends.rpc import RPCFrontend
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceTuple
-from haizea.resourcemanager.scheduler import Scheduler
-from haizea.resourcemanager.slottable import SlotTable
-from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.leases import ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.lease_scheduler import LeaseScheduler
+from haizea.resourcemanager.scheduler.vm_scheduler import VMScheduler
+from haizea.resourcemanager.scheduler.slottable import SlotTable
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
 from haizea.resourcemanager.rpcserver import RPCServer
 from haizea.common.utils import abstract, round_datetime, Singleton
 
@@ -125,8 +126,8 @@
         deploy_enact = SimulatedDeploymentEnactment()
                 
         # Resource pool
-        deploy_type = self.config.get("lease-preparation")
-        if deploy_type == constants.DEPLOYMENT_TRANSFER:
+        preparation_type = self.config.get("lease-preparation")
+        if preparation_type == constants.PREPARATION_TRANSFER:
             if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
                 resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
             else:
@@ -136,23 +137,21 @@
     
         # Slot table
         slottable = SlotTable()
+        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
+            slottable.add_node(n)
         
-        # Deployment scheduler
-        
-        if deploy_type == constants.DEPLOYMENT_UNMANAGED:
-            deployment_scheduler = UnmanagedDeploymentScheduler(slottable, resourcepool, deploy_enact)
-        elif deploy_type == constants.DEPLOYMENT_TRANSFER:
-            deployment_scheduler = ImageTransferDeploymentScheduler(slottable, resourcepool, deploy_enact)    
+        # Preparation scheduler
+        if preparation_type == constants.PREPARATION_UNMANAGED:
+            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
+        elif deploy_type == constants.PREPARATION_TRANSFER:
+            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
     
-        # Scheduler
-        self.scheduler = Scheduler(slottable, resourcepool, deployment_scheduler)
+        # VM Scheduler
+        vm_scheduler = VMScheduler(slottable, resourcepool)
+    
+        # Lease Scheduler
+        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
         
-        # TODO: Having the slot table contained in the deployment scheduler, and also
-        # in the "main" scheduler (which itself contains the same slot table) is far
-        # from ideal, although this is mostly a consequence of the Scheduler class
-        # being in need of some serious refactoring. This will be fixed (see Scheduler
-        # class comments for more details)
-        
         # Lease request frontends
         if clock == constants.CLOCK_SIMULATED:
             # In pure simulation, we can only use the tracefile frontend

Added: trunk/src/haizea/resourcemanager/scheduler/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/__init__.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/__init__.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,68 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+
+class SchedException(Exception):
+    """A simple exception class used for scheduling exceptions"""
+    pass
+
+class NotSchedulableException(Exception):
+    """A simple exception class used when a lease cannot be scheduled
+    
+    This exception must be raised when a lease cannot be scheduled
+    (this is not necessarily an error condition, but the scheduler will
+    have to react to it)
+    """
+    pass
+
+class CriticalSchedException(Exception):
+    """A simple exception class used for critical scheduling exceptions
+    
+    This exception must be raised when a non-recoverable error happens
+    (e.g., when there are unexplained inconsistencies in the schedule,
+    typically resulting from a code error)
+    """
+    pass
+
+class RescheduleLeaseException(Exception):
+    pass
+
+class CancelLeaseException(Exception):
+    pass
+
+class NormalEndLeaseException(Exception):
+    pass
+
+
+class ReservationEventHandler(object):
+    """A wrapper for reservation event handlers.
+    
+    Reservations (in the slot table) can start and they can end. This class
+    provides a convenient wrapper around the event handlers for these two
+    events (see Scheduler.__register_handler for details on event handlers)
+    """
+    def __init__(self, sched, on_start, on_end):
+        self.sched = sched
+        self.on_start_method = on_start
+        self.on_end_method = on_end
+        
+    def on_start(self, lease, rr):
+        self.on_start_method(self.sched, lease, rr)
+        
+    def on_end(self, lease, rr):
+        self.on_end_method(self.sched, lease, rr)        
\ No newline at end of file

Copied: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py (from rev 551, trunk/src/haizea/resourcemanager/scheduler.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,574 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+
+"""This module provides the main classes for Haizea's scheduler, particularly
+the Scheduler class. The deployment scheduling code (everything that has to be 
+done to prepare a lease) happens in the modules inside the 
+haizea.resourcemanager.deployment package.
+
+This module provides the following classes:
+
+* SchedException: A scheduling exception
+* ReservationEventHandler: A simple wrapper class
+* Scheduler: Do I really need to spell this one out for you?
+"""
+import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler import SchedException, RescheduleLeaseException, NormalEndLeaseException
+from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException, ResourceReservation
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation, SuspensionResourceReservation, ResumptionResourceReservation, ShutdownResourceReservation
+from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
+
+import logging
+
+class LeaseScheduler(object):
+    """The Haizea Lease Scheduler
+    
+    Public methods:
+    schedule -- The scheduling function
+    process_reservations -- Processes starting/ending reservations at a given time
+    enqueue -- Queues a best-effort request
+    is_queue_empty -- Is the queue empty?
+    exists_scheduled_leases -- Are there any leases scheduled?
+    
+    Private methods:
+    __schedule_ar_lease -- Schedules an AR lease
+    __schedule_besteffort_lease -- Schedules a best-effort lease
+    __preempt -- Preempts a lease
+    __reevaluate_schedule -- Reevaluate the schedule (used after resources become
+                             unexpectedly unavailable)
+    _handle_* -- Reservation event handlers
+    
+    """
+    def __init__(self, vm_scheduler, preparation_scheduler, slottable):
+        self.vm_scheduler = vm_scheduler
+        self.preparation_scheduler = preparation_scheduler
+        self.slottable = slottable
+        self.logger = logging.getLogger("LSCHED")
+
+        self.queue = Queue(self)
+        self.leases = LeaseTable(self)
+        self.completedleases = LeaseTable(self)
+
+        self.handlers = {}
+        for (type, handler) in self.vm_scheduler.handlers.items():
+            self.handlers[type] = handler
+
+        for (type, handler) in self.preparation_scheduler.handlers.items():
+            self.handlers[type] = handler
+
+        backfilling = get_config().get("backfilling")
+        if backfilling == constants.BACKFILLING_OFF:
+            self.maxres = 0
+        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
+            self.maxres = 1
+        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
+            self.maxres = 1000000 # Arbitrarily large
+        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
+            self.maxres = get_config().get("backfilling-reservations")
+
+        self.numbesteffortres = 0
+        
+    def schedule(self, nexttime):      
+        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
+        ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
+        im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
+        be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
+        
+        # Queue best-effort requests
+        for lease in be_leases:
+            self.enqueue(lease)
+        
+        # Process immediate requests
+        for lease_req in im_leases:
+            self.__process_im_request(lease_req, nexttime)
+
+        # Process AR requests
+        for lease_req in ar_leases:
+            self.__process_ar_request(lease_req, nexttime)
+            
+        # Process best-effort requests
+        self.__process_queue(nexttime)
+        
+    
+    def process_reservations(self, nowtime):
+        starting = self.slottable.get_reservations_starting_at(nowtime)
+        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
+        ending = self.slottable.get_reservations_ending_at(nowtime)
+        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
+        
+        for rr in ending:
+            lease = rr.lease
+            self._handle_end_rr(lease, rr)
+            try:
+                self.handlers[type(rr)].on_end(lease, rr)
+            except RescheduleLeaseException, msg:
+                if isinstance(rr.lease, BestEffortLease):
+                    self.__enqueue_in_order(lease)
+            except NormalEndLeaseException, msg:
+                self._handle_end_lease(lease)
+        
+        for rr in starting:
+            self.handlers[type(rr)].on_start(rr.lease, rr)
+            
+
+        # TODO: Should be in VMScheduler
+        util = self.__get_utilization(nowtime)
+        if not util.has_key(VMResourceReservation):
+            cpuutil = 0.0
+        else:
+            cpuutil = util[VMResourceReservation]
+        get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)        
+        get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)        
+
+    
+    def enqueue(self, lease_req):
+        """Queues a best-effort lease request"""
+        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+        lease_req.state = Lease.STATE_QUEUED
+        self.queue.enqueue(lease_req)
+        self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
+
+    def request_lease(self, lease):
+        """
+        Request a lease. At this point, it is simply marked as "Pending" and,
+        next time the scheduling function is called, the fate of the
+        lease will be determined (right now, AR+IM leases get scheduled
+        right away, and best-effort leases get placed on a queue)
+        """
+        lease.state = Lease.STATE_PENDING
+        self.leases.add(lease)
+
+    def is_queue_empty(self):
+        """Return True is the queue is empty, False otherwise"""
+        return self.queue.is_empty()
+
+    
+    def exists_scheduled_leases(self):
+        """Return True if there are any leases scheduled in the future"""
+        return not self.slottable.is_empty()    
+
+    def cancel_lease(self, lease_id):
+        """Cancels a lease.
+        
+        Arguments:
+        lease_id -- ID of lease to cancel
+        """
+        time = get_clock().get_time()
+        
+        self.logger.info("Cancelling lease %i..." % lease_id)
+        if self.leases.has_lease(lease_id):
+            # The lease is either running, or scheduled to run
+            lease = self.leases.get_lease(lease_id)
+            
+            if lease.state == Lease.STATE_ACTIVE:
+                self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
+                rr = lease.get_active_reservations(time)[0]
+                if isinstance(rr, VMResourceReservation):
+                    self._handle_unscheduled_end_vm(lease, rr, enact=True)
+                # TODO: Handle cancelations in middle of suspensions and
+                # resumptions                
+            elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
+                self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
+                rrs = lease.get_scheduled_reservations()
+                for r in rrs:
+                    lease.remove_rr(r)
+                    self.slottable.removeReservation(r)
+                lease.state = Lease.STATE_CANCELLED
+                self.completedleases.add(lease)
+                self.leases.remove(lease)
+        elif self.queue.has_lease(lease_id):
+            # The lease is in the queue, waiting to be scheduled.
+            # Cancelling is as simple as removing it from the queue
+            self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
+            l = self.queue.get_lease(lease_id)
+            self.queue.remove_lease(lease)
+    
+    def fail_lease(self, lease_id):
+        """Transitions a lease to a failed state, and does any necessary cleaning up
+        
+        TODO: For now, just use the cancelling algorithm
+        
+        Arguments:
+        lease -- Lease to fail
+        """    
+        try:
+            raise
+            self.cancel_lease(lease_id)
+        except Exception, msg:
+            # Exit if something goes horribly wrong
+            raise CriticalSchedException()      
+    
+    def notify_event(self, lease_id, event):
+        time = get_clock().get_time()
+        if event == constants.EVENT_END_VM:
+            lease = self.leases.get_lease(lease_id)
+            vmrr = lease.get_last_vmrr()
+            self._handle_end_rr(lease, vmrr)
+            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr, enact=False)
+            self._handle_end_lease(lease)
+            nexttime = get_clock().get_next_schedulable_time()
+            # We need to reevaluate the schedule to see if there are any future
+            # reservations that we can slide back.
+            self.vm_scheduler.reevaluate_schedule(lease, vmrr.nodes.values(), nexttime, [])
+
+            
+        
+
+    
+    def __process_ar_request(self, lease_req, nexttime):
+        self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested))
+        self.logger.debug("  Start   : %s" % lease_req.start)
+        self.logger.debug("  Duration: %s" % lease_req.duration)
+        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
+        
+        accepted = False
+        try:
+            self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
+            self.leases.add(lease_req)
+            get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+            accepted = True
+        except SchedException, msg:
+            # Our first try avoided preemption, try again
+            # without avoiding preemption.
+            # TODO: Roll this into the exact slot fitting algorithm
+            try:
+                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+                self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
+                self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
+                self.leases.add(lease_req)
+                get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+                accepted = True
+            except SchedException, msg:
+                raise
+                get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
+                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+
+        if accepted:
+            self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
+        else:
+            self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
+            lease_req.state = Lease.STATE_REJECTED
+            self.completedleases.add(lease_req)
+            self.leases.remove(lease_req)
+        
+        
+    def __process_queue(self, nexttime):
+        done = False
+        newqueue = Queue(self)
+        while not done and not self.is_queue_empty():
+            if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
+                self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
+                done = True
+            else:
+                lease_req = self.queue.dequeue()
+                try:
+                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id)
+                    self.logger.debug("  Duration: %s" % lease_req.duration)
+                    self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
+                    self.__schedule_besteffort_lease(lease_req, nexttime)
+                    self.leases.add(lease_req)
+                    get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+                except SchedException, msg:
+                    # Put back on queue
+                    newqueue.enqueue(lease_req)
+                    self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+                    self.logger.info("Lease %i could not be scheduled at this time." % lease_req.id)
+                    if not self.is_backfilling():
+                        done = True
+        
+        for lease in self.queue:
+            newqueue.enqueue(lease)
+        
+        self.queue = newqueue 
+
+
+    def __process_im_request(self, lease_req, nexttime):
+        self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
+        self.logger.debug("  Duration: %s" % lease_req.duration)
+        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
+        
+        try:
+            self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
+            self.leases.add(lease_req)
+            get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
+            self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
+        except SchedException, msg:
+            get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
+            self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
+    
+    
+    def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
+        try:
+            (vmrr, preemptions) = self.vm_scheduler.fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
+            
+            if len(preemptions) > 0:
+                leases = self.vm_scheduler.find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
+                self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
+                for lease in leases:
+                    self.__preempt(lease, preemption_time=vmrr.start)
+
+            # Schedule deployment overhead
+            self.preparation_scheduler.schedule(lease_req, vmrr, nexttime)
+            
+            # Commit reservation to slot table
+            # (we don't do this until the very end because the deployment overhead
+            # scheduling could still throw an exception)
+            lease_req.append_vmrr(vmrr)
+            self.slottable.addReservation(vmrr)
+            
+            # Post-VM RRs (if any)
+            for rr in vmrr.post_rrs:
+                self.slottable.addReservation(rr)
+        except Exception, msg:
+            raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
+
+
+    def __schedule_besteffort_lease(self, lease, nexttime):            
+        try:
+            # Schedule the VMs
+            canreserve = self.vm_scheduler.can_reserve_besteffort_in_future()
+            
+            # Determine earliest start time in each node
+            if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
+                # Figure out earliest start times based on
+                # image schedule and reusable images
+                earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
+            elif lease.state == Lease.STATE_SUSPENDED:
+                # No need to transfer images from repository
+                # (only intra-node transfer)
+                earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
+
+            (vmrr, in_future) = self.vm_scheduler.fit_asap(lease, nexttime, earliest, allow_reservation_in_future = canreserve)
+            
+            # Schedule deployment
+            if lease.state != Lease.STATE_SUSPENDED:
+                self.preparation_scheduler.schedule(lease, vmrr, nexttime)
+            else:
+                self.vm_scheduler.schedule_migration(lease, vmrr, nexttime)
+
+            # At this point, the lease is feasible.
+            # Commit changes by adding RRs to lease and to slot table
+            
+            # Add VMRR to lease
+            lease.append_vmrr(vmrr)
+            
+
+            # Add resource reservations to slottable
+            
+            # TODO: deployment RRs should be added here, not in the preparation scheduler
+            
+            # Pre-VM RRs (if any)
+            for rr in vmrr.pre_rrs:
+                self.slottable.addReservation(rr)
+                
+            # VM
+            self.slottable.addReservation(vmrr)
+            
+            # Post-VM RRs (if any)
+            for rr in vmrr.post_rrs:
+                self.slottable.addReservation(rr)
+           
+            if in_future:
+                self.numbesteffortres += 1
+                
+            lease.print_contents()
+
+        except SchedException, msg:
+            raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
+
+
+    def __schedule_immediate_lease(self, req, nexttime):
+        try:
+            (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
+            # Schedule deployment
+            self.preparation_scheduler.schedule(req, vmrr, nexttime)
+                        
+            req.append_rr(vmrr)
+            self.slottable.addReservation(vmrr)
+            
+            # Post-VM RRs (if any)
+            for rr in vmrr.post_rrs:
+                self.slottable.addReservation(rr)
+                    
+            req.print_contents()
+        except SlotFittingException, msg:
+            raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
+        
+        
+    def __preempt(self, lease, preemption_time):
+        
+        self.logger.info("Preempting lease #%i..." % (lease.id))
+        self.logger.vdebug("Lease before preemption:")
+        lease.print_contents()
+        vmrr = lease.get_last_vmrr()
+        
+        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
+            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
+            must_cancel_and_requeue = True
+        else:
+            susptype = get_config().get("suspension")
+            if susptype == constants.SUSPENSION_NONE:
+                must_cancel_and_requeue = True
+            else:
+                time_until_suspend = preemption_time - vmrr.start
+                min_duration = self.__compute_scheduling_threshold(lease)
+                can_suspend = time_until_suspend >= min_duration        
+                if not can_suspend:
+                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
+                    must_cancel_and_requeue = True
+                else:
+                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
+                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
+                        must_cancel_and_requeue = True
+                    else:
+                        self.logger.debug("Lease can be suspended")
+                        must_cancel_and_requeue = False
+                    
+        if must_cancel_and_requeue:
+            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
+            if vmrr.backfill_reservation == True:
+                self.numbesteffortres -= 1
+            # If there are any post RRs, remove them
+            for rr in vmrr.post_rrs:
+                self.slottable.removeReservation(rr)
+            lease.remove_vmrr(vmrr)
+            self.slottable.removeReservation(vmrr)
+            for vnode, pnode in lease.diskimagemap.items():
+                self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
+            self.preparation_scheduler.cancel_deployment(lease)
+            lease.diskimagemap = {}
+            lease.state = Lease.STATE_QUEUED
+            self.__enqueue_in_order(lease)
+            get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+        else:
+            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
+            # Save original start and end time of the vmrr
+            old_start = vmrr.start
+            old_end = vmrr.end
+            self.__schedule_suspension(vmrr, preemption_time)
+            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+            for susprr in vmrr.post_rrs:
+                self.slottable.addReservation(susprr)
+            
+            
+        self.logger.vdebug("Lease after preemption:")
+        lease.print_contents()
+        
+    # TODO: Should be in VMScheduler
+    def __get_utilization(self, time):
+        total = self.slottable.get_total_capacity()
+        util = {}
+        reservations = self.slottable.getReservationsAt(time)
+        for r in reservations:
+            for node in r.resources_in_pnode:
+                if isinstance(r, VMResourceReservation):
+                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+                    util[type(r)] = use + util.setdefault(type(r),0.0)
+                elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
+                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
+                    util[type(r)] = use + util.setdefault(type(r),0.0)
+        util[None] = total - sum(util.values())
+        for k in util:
+            util[k] /= total
+            
+        return util        
+
+    def __enqueue_in_order(self, lease):
+        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+        self.queue.enqueue_in_order(lease)
+
+    def _handle_end_rr(self, l, rr):
+        self.slottable.removeReservation(rr)
+        
+    def _handle_end_lease(self, l):
+        l.state = Lease.STATE_DONE
+        l.duration.actual = l.duration.accumulated
+        l.end = round_datetime(get_clock().get_time())
+        self.completedleases.add(l)
+        self.leases.remove(l)
+        if isinstance(l, BestEffortLease):
+            get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+        
+
+class Queue(object):
+    def __init__(self, scheduler):
+        self.scheduler = scheduler
+        self.__q = []
+        
+    def is_empty(self):
+        return len(self.__q)==0
+    
+    def enqueue(self, r):
+        self.__q.append(r)
+    
+    def dequeue(self):
+        return self.__q.pop(0)
+    
+    def enqueue_in_order(self, r):
+        self.__q.append(r)
+        self.__q.sort(key=attrgetter("submit_time"))
+
+    def length(self):
+        return len(self.__q)
+    
+    def has_lease(self, lease_id):
+        return (1 == len([l for l in self.__q if l.id == lease_id]))
+    
+    def get_lease(self, lease_id):
+        return [l for l in self.__q if l.id == lease_id][0]
+    
+    def remove_lease(self, lease):
+        self.__q.remove(lease)
+    
+    def __iter__(self):
+        return iter(self.__q)
+        
+class LeaseTable(object):
+    def __init__(self, scheduler):
+        self.scheduler = scheduler
+        self.entries = {}
+        
+    def has_lease(self, lease_id):
+        return self.entries.has_key(lease_id)
+        
+    def get_lease(self, lease_id):
+        return self.entries[lease_id]
+    
+    def is_empty(self):
+        return len(self.entries)==0
+    
+    def remove(self, lease):
+        del self.entries[lease.id]
+        
+    def add(self, lease):
+        self.entries[lease.id] = lease
+        
+    def get_leases(self, type=None):
+        if type==None:
+            return self.entries.values()
+        else:
+            return [e for e in self.entries.values() if isinstance(e, type)]
+
+    def get_leases_by_state(self, state):
+        return [e for e in self.entries.values() if e.state == state]
+        
\ No newline at end of file


Property changes on: trunk/src/haizea/resourcemanager/scheduler/lease_scheduler.py
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Copied: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers (from rev 546, trunk/src/haizea/resourcemanager/deployment)


Property changes on: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/__init__.py	2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/__init__.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -18,7 +18,7 @@
 
 import logging
 
-class DeploymentScheduler(object):
+class PreparationScheduler(object):
     def __init__(self, slottable, resourcepool, deployment_enact):
         self.slottable = slottable
         self.resourcepool = resourcepool
@@ -26,5 +26,5 @@
         self.logger = logging.getLogger("DEPLOY")
 
         
-class DeploymentSchedException(Exception):
+class PreparationSchedException(Exception):
     pass
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py	2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/imagetransfer.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -17,15 +17,15 @@
 # -------------------------------------------------------------------------- #
 
 import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
-from haizea.resourcemanager.deployment import DeploymentScheduler, DeploymentSchedException
-from haizea.resourcemanager.datastruct import ResourceReservation, Lease, ARLease, BestEffortLease
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler, PreparationSchedException
+from haizea.resourcemanager.scheduler.slottable import ResourceReservation
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
 from haizea.resourcemanager.scheduler import ReservationEventHandler
 from haizea.common.utils import estimate_transfer_time, get_config
 
 import copy
 
-class ImageTransferDeploymentScheduler(DeploymentScheduler):
+class ImageTransferPreparationScheduler(PreparationScheduler):
     def __init__(self, slottable, resourcepool, deployment_enact):
         DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
         
@@ -50,8 +50,9 @@
         
         self.handlers ={}
         self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
-                                        on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
-                                        on_end   = ImageTransferDeploymentScheduler.handle_end_filetransfer)
+                                sched    = self,
+                                on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
+                                on_end   = ImageTransferDeploymentScheduler.handle_end_filetransfer)
 
     def schedule(self, lease, vmrr, nexttime):
         if isinstance(lease, ARLease):

Modified: trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/unmanaged.py	2008-11-20 00:39:06 UTC (rev 546)
+++ trunk/src/haizea/resourcemanager/scheduler/preparation_schedulers/unmanaged.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -16,13 +16,13 @@
 # limitations under the License.                                             #
 # -------------------------------------------------------------------------- #
 
-from haizea.resourcemanager.datastruct import Lease
-from haizea.resourcemanager.deployment import DeploymentScheduler
+from haizea.resourcemanager.leases import Lease
+from haizea.resourcemanager.scheduler.preparation_schedulers import PreparationScheduler
 import haizea.common.constants as constants
 
-class UnmanagedDeploymentScheduler(DeploymentScheduler):
+class UnmanagedPreparationScheduler(PreparationScheduler):
     def __init__(self, slottable, resourcepool, deployment_enact):
-        DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
+        PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
         self.handlers = {}
     
     # Add dummy disk images

Copied: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py (from rev 546, trunk/src/haizea/resourcemanager/resourcepool.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/resourcepool.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/resourcepool.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,425 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+from haizea.common.utils import vnodemapstr, get_accounting
+import haizea.common.constants as constants
+import haizea.resourcemanager.enact.actions as actions
+import logging 
+
+class FailedEnactmentException(Exception):
+    pass
+
+class ResourcePool(object):
+    def __init__(self, info_enact, vm_enact, deploy_enact):
+        self.logger = logging.getLogger("RPOOL")
+                
+        self.info = info_enact
+        self.vm = vm_enact
+        # TODO: Ideally, deployment enactment shouldn't be here, specially since
+        # it already "hangs" below the deployment modules. For now,
+        # it does no harm, though.
+        self.deployment = deploy_enact
+        
+        self.nodes = self.info.get_nodes()
+        
+    def start_vms(self, lease, rr):
+        start_action = actions.VMEnactmentStartAction()
+        start_action.from_rr(rr)
+                
+        for (vnode, pnode) in rr.nodes.items():
+            node = self.get_node(pnode)
+            diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
+            start_action.vnodes[vnode].pnode = node.enactment_info
+            start_action.vnodes[vnode].diskimage = diskimage.filename
+            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
+
+        try:
+            self.vm.start(start_action)
+        except Exception, msg:
+            self.logger.error("Enactment of start VM failed: %s" % msg)
+            raise FailedEnactmentException()
+        
+    def stop_vms(self, lease, rr):
+        stop_action = actions.VMEnactmentStopAction()
+        stop_action.from_rr(rr)
+        try:
+            self.vm.stop(stop_action)
+        except Exception, msg:
+            self.logger.error("Enactment of end VM failed: %s" % msg)
+            raise FailedEnactmentException()
+         
+    def suspend_vms(self, lease, rr):
+        # Add memory image files
+        for vnode in rr.vnodes:
+            pnode = rr.vmrr.nodes[vnode]
+            self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources.get_by_type(constants.RES_MEM))
+
+        # Enact suspend
+        suspend_action = actions.VMEnactmentSuspendAction()
+        suspend_action.from_rr(rr)
+        try:
+            self.vm.suspend(suspend_action)
+        except Exception, msg:
+            self.logger.error("Enactment of suspend VM failed: %s" % msg)
+            raise FailedEnactmentException()
+    
+    def verify_suspend(self, lease, rr):
+        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
+        verify_suspend_action.from_rr(rr)
+        self.vm.verify_suspend(verify_suspend_action)
+    
+    def resume_vms(self, lease, rr):
+        # Remove memory image files
+        for vnode in rr.vnodes:
+            pnode = rr.vmrr.nodes[vnode]
+            self.remove_ramfile(pnode, lease.id, vnode)
+
+        # Enact resume
+        resume_action = actions.VMEnactmentResumeAction()
+        resume_action.from_rr(rr)
+        try:
+            self.vm.resume(resume_action)
+        except Exception, msg:
+            self.logger.error("Enactment of resume VM failed: %s" % msg)
+            raise FailedEnactmentException()
+    
+    def verify_resume(self, lease, rr):
+        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
+        verify_resume_action.from_rr(rr)
+        self.vm.verify_resume(verify_resume_action)    
+    
+    def get_nodes(self):
+        return self.nodes
+    
+    # An auxiliary node is a host whose resources are going to be scheduled, but
+    # where no VMs are actually going to run. For example, a disk image repository node.
+    def get_aux_nodes(self):
+        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
+        # There might be a scenario where the info enactment module also reports
+        # auxiliary nodes.
+        return self.deployment.get_aux_nodes()
+
+    def get_num_nodes(self):
+        return len(self.nodes)
+        
+    def get_node(self, nod_id):
+        return self.nodes[nod_id-1]
+        
+    def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
+        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
+        
+        self.logger.vdebug("Files BEFORE:")
+        self.get_node(pnode).print_files()
+        
+        imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
+        img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
+        self.get_node(pnode).add_file(img)
+
+        self.logger.vdebug("Files AFTER:")
+        self.get_node(pnode).print_files()
+        
+        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+        return img
+            
+    def remove_diskimage(self, pnode, lease, vnode):
+        node = self.get_node(pnode)
+        node.print_files()
+
+        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
+        node.remove_diskimage(lease, vnode)
+
+        node.print_files()
+        
+        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())    
+        
+    def add_ramfile(self, pnode, lease_id, vnode, size):
+        node = self.get_node(pnode)
+        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
+        node.print_files()
+        f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
+        node.add_file(f)        
+        node.print_files()
+        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+
+    def remove_ramfile(self, pnode, lease_id, vnode):
+        node = self.get_node(pnode)
+        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
+        node.print_files()
+        node.remove_ramfile(lease_id, vnode)
+        node.print_files()
+        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+        
+    def get_max_disk_usage(self):
+        return max([n.get_disk_usage() for n in self.nodes])
+    
+class Node(object):
+    def __init__(self, nod_id, hostname, capacity):
+        self.logger = logging.getLogger("RESOURCEPOOL")
+        self.nod_id = nod_id
+        self.hostname = hostname
+        self.capacity = capacity
+        self.files = []
+
+        # enactment-specific information
+        self.enactment_info = None
+        
+    def get_capacity(self):
+        return self.capacity
+           
+    def add_file(self, f):
+        self.files.append(f)
+        
+    def get_diskimage(self, lease_id, vnode, diskimage_id):
+        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
+                 f.diskimage_id == diskimage_id and 
+                 f.lease_id == lease_id and
+                 f.vnode == vnode]
+        if len(image) == 0:
+            return None
+        elif len(image) == 1:
+            return image[0]
+        elif len(image) > 1:
+            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
+            return image[0]
+
+    def remove_diskimage(self, lease_id, vnode):
+        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
+                 f.lease_id == lease_id and
+                 f.vnode == vnode]
+        if len(image) > 0:
+            image = image[0]
+            self.files.remove(image)
+            
+    def remove_ramfile(self, lease_id, vnode):
+        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
+        if len(ramfile) > 0:
+            ramfile = ramfile[0]
+            self.files.remove(ramfile)
+                
+        
+    def get_disk_usage(self):
+        return sum([f.filesize for f in self.files])
+
+
+    def get_diskimages(self):
+        return [f for f in self.files if isinstance(f, DiskImageFile)]
+        
+    def print_files(self):
+        images = ""
+        if len(self.files) > 0:
+            images = ", ".join([str(img) for img in self.files])
+        self.logger.vdebug("Node %i files: %iMB %s" % (self.nod_id, self.get_disk_usage(), images))
+
+    def xmlrpc_marshall(self):
+        # Convert to something we can send through XMLRPC
+        h = {}
+        h["id"] = self.nod_id
+        h["hostname"] = self.hostname
+        h["cpu"] = self.capacity.get_by_type(constants.RES_CPU)
+        h["mem"] = self.capacity.get_by_type(constants.RES_MEM)
+                
+        return h
+        
+
+        
+class File(object):
+    def __init__(self, filename, filesize):
+        self.filename = filename
+        self.filesize = filesize
+        
+class DiskImageFile(File):
+    def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
+        File.__init__(self, filename, filesize)
+        self.lease_id = lease_id
+        self.vnode = vnode
+        self.diskimage_id = diskimage_id
+                
+    def __str__(self):
+        return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
+
+
+class RAMImageFile(File):
+    def __init__(self, filename, filesize, lease_id, vnode):
+        File.__init__(self, filename, filesize)
+        self.lease_id = lease_id
+        self.vnode = vnode
+                
+    def __str__(self):
+        return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
+    
+class ResourcePoolWithReusableImages(ResourcePool):
+    def __init__(self, info_enact, vm_enact, deploy_enact):
+        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
+        
+        self.nodes = [NodeWithReusableImages.from_node(n) for n in self.nodes]
+    
+    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
+        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
+        
+        self.logger.vdebug("Files BEFORE:")
+        self.get_node(pnode).print_files()
+        
+        imagefile = "reusable-%s" % diskimage_id
+        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
+        for (lease_id, vnode) in mappings:
+            img.add_mapping(lease_id, vnode)
+
+        self.get_node(pnode).add_reusable_image(img)
+
+        self.logger.vdebug("Files AFTER:")
+        self.get_node(pnode).print_files()
+        
+        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+        return img
+    
+    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
+        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
+    
+    def remove_diskimage(self, pnode_id, lease, vnode):
+        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
+        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
+        for img in self.get_node(pnode_id).get_reusable_images():
+            if (lease, vnode) in img.mappings:
+                img.mappings.remove((lease, vnode))
+            self.get_node(pnode_id).print_files()
+            # Keep image around, even if it isn't going to be used
+            # by any VMs. It might be reused later on.
+            # It will be purged if space has to be made available
+            # for other images
+        
+    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
+        return [n.nod_id for n in self.nodes if n.exists_reusable_image(diskimage_id, after=after)]
+
+    def exists_reusable_image(self, pnode_id, diskimage_id, after):
+        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
+    
+    
+class NodeWithReusableImages(Node):
+    def __init__(self, nod_id, hostname, capacity):
+        Node.__init__(self, nod_id, hostname, capacity)
+        self.reusable_images = []
+
+    @classmethod
+    def from_node(cls, n):
+        node = cls(n.nod_id, n.hostname, n.capacity)
+        node.enactment_info = n.enactment_info
+        return node
+    
+    def add_reusable_image(self, f):
+        self.reusable_images.append(f)
+
+    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
+        for f in self.reusable_images:
+            if f.diskimage_id == diskimage_id:
+                f.add_mapping(lease_id, vnode)
+                f.update_timeout(timeout)
+                break  # Ugh
+        self.print_files()
+            
+    def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
+        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
+        if after != None:
+            images = [i for i in images if i.timeout >= after]
+        if lease_id != None and vnode != None:
+            images = [i for i in images if i.has_mapping(lease_id, vnode)]
+        if len(images)>0:
+            return images[0]
+        else:
+            return None
+        
+    def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
+        entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
+        if entry == None:
+            return False
+        else:
+            return True
+
+    def get_reusable_images(self):
+        return self.reusable_images
+
+    def get_reusable_images_size(self):
+        return sum([f.filesize for f in self.reusable_images])
+    
+    def purge_oldest_unused_image(self):
+        unused = [img for img in self.reusable_images if not img.has_mappings()]
+        if len(unused) == 0:
+            return 0
+        else:
+            i = iter(unused)
+            oldest = i.next()
+            for img in i:
+                if img.timeout < oldest.timeout:
+                    oldest = img
+            self.reusable_images.remove(oldest)
+            return 1
+    
+    def purge_downto(self, target):
+        done = False
+        while not done:
+            removed = self.purge_oldest_unused_image()
+            if removed==0:
+                done = True
+                success = False
+            elif removed == 1:
+                if self.get_reusable_images_size() <= target:
+                    done = True
+                    success = True
+        return success
+
+    def print_files(self):
+        Node.print_files(self)
+        images = ""
+        if len(self.reusable_images) > 0:
+            images = ", ".join([str(img) for img in self.reusable_images])
+        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.nod_id, self.get_reusable_images_size(), images))
+
+class ReusableDiskImageFile(File):
+    def __init__(self, filename, filesize, diskimage_id, timeout):
+        File.__init__(self, filename, filesize)
+        self.diskimage_id = diskimage_id
+        self.mappings = set([])
+        self.timeout = timeout
+        
+    def add_mapping(self, lease_id, vnode):
+        self.mappings.add((lease_id, vnode))
+        
+    def has_mapping(self, lease_id, vnode):
+        return (lease_id, vnode) in self.mappings
+    
+    def has_mappings(self):
+        return len(self.mappings) > 0
+        
+    def update_timeout(self, timeout):
+        if timeout > self.timeout:
+            self.timeout = timeout
+        
+    def is_expired(self, curTime):
+        if self.timeout == None:
+            return False
+        elif self.timeout > curTime:
+            return True
+        else:
+            return False
+        
+    def __str__(self):
+        if self.timeout == None:
+            timeout = "NOTIMEOUT"
+        else:
+            timeout = self.timeout
+        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
+


Property changes on: trunk/src/haizea/resourcemanager/scheduler/resourcepool.py
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: trunk/src/haizea/resourcemanager/scheduler/slottable.py (from rev 549, trunk/src/haizea/resourcemanager/slottable.py)
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/slottable.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/slottable.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,613 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+from mx.DateTime import ISO, TimeDelta
+from operator import attrgetter, itemgetter
+import haizea.common.constants as constants
+from math import ceil, floor
+import bisect
+import copy
+import logging
+
+class SlotFittingException(Exception):
+    pass
+
+class CriticalSlotFittingException(Exception):
+    pass
+
+
+class Node(object):
+    def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
+        self.capacity = ResourceTuple.copy(capacity)
+        if capacitywithpreemption == None:
+            self.capacitywithpreemption = None
+        else:
+            self.capacitywithpreemption = ResourceTuple.copy(capacitywithpreemption)
+        self.resourcepoolnode = resourcepoolnode
+        
+    @classmethod
+    def from_resourcepool_node(cls, node):
+        capacity = node.get_capacity()
+        return cls(capacity, capacity, node)
+
+class NodeList(object):
+    def __init__(self):
+        self.nodelist = []
+
+    def add(self, node):
+        self.nodelist.append(node)
+        
+    def __getitem__(self, n):
+        return self.nodelist[n-1]
+
+    def copy(self):
+        nodelist = NodeList()
+        for n in self.nodelist:
+            nodelist.add(Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode))
+        return nodelist
+   
+    def toDict(self):
+        nodelist = self.copy()
+        return dict([(i+1, v) for i, v in enumerate(nodelist)])
+        
+class KeyValueWrapper(object):
+    def __init__(self, key, value):
+        self.key = key
+        self.value = value
+        
+    def __cmp__(self, other):
+        return cmp(self.key, other.key)
+
+class ResourceReservation(object):
+    
+    # Resource reservation states
+    STATE_SCHEDULED = 0
+    STATE_ACTIVE = 1
+    STATE_DONE = 2
+
+    state_str = {STATE_SCHEDULED : "Scheduled",
+                 STATE_ACTIVE : "Active",
+                 STATE_DONE : "Done"}
+    
+    def __init__(self, lease, start, end, res):
+        self.lease = lease
+        self.start = start
+        self.end = end
+        self.state = None
+        self.resources_in_pnode = res
+        self.logger = logging.getLogger("LEASES")
+                        
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Start          : %s" % self.start)
+        self.logger.log(loglevel, "End            : %s" % self.end)
+        self.logger.log(loglevel, "State          : %s" % ResourceReservation.state_str[self.state])
+        self.logger.log(loglevel, "Resources      : \n                         %s" % "\n                         ".join(["N%i: %s" %(i, x) for i, x in self.resources_in_pnode.items()])) 
+                
+    def xmlrpc_marshall(self):
+        # Convert to something we can send through XMLRPC
+        rr = {}                
+        rr["start"] = xmlrpc_marshall_singlevalue(self.start)
+        rr["end"] = xmlrpc_marshall_singlevalue(self.end)
+        rr["state"] = self.state
+        return rr
+
+class ResourceTuple(object):
+    def __init__(self, res):
+        self._res = res
+        
+    @classmethod
+    def from_list(cls, l):
+        return cls(l[:])
+
+    @classmethod
+    def copy(cls, rt):
+        return cls(rt._res[:])
+    
+    @classmethod
+    def set_resource_types(cls, resourcetypes):
+        cls.type2pos = dict([(x[0], i) for i, x in enumerate(resourcetypes)])
+        cls.descriptions = dict([(i, x[2]) for i, x in enumerate(resourcetypes)])
+        cls.tuplelength = len(resourcetypes)
+
+    @classmethod
+    def create_empty(cls):
+        return cls([0 for x in range(cls.tuplelength)])
+        
+    def fits_in(self, res2):
+        fits = True
+        for i in xrange(len(self._res)):
+            if self._res[i] > res2._res[i]:
+                fits = False
+                break
+        return fits
+    
+    def get_num_fits_in(self, res2):
+        canfit = 10000 # Arbitrarily large
+        for i in xrange(len(self._res)):
+            if self._res[i] != 0:
+                f = res2._res[i] / self._res[i]
+                if f < canfit:
+                    canfit = f
+        return int(floor(canfit))
+    
+    def decr(self, res2):
+        for slottype in xrange(len(self._res)):
+            self._res[slottype] -= res2._res[slottype]
+
+    def incr(self, res2):
+        for slottype in xrange(len(self._res)):
+            self._res[slottype] += res2._res[slottype]
+        
+    def get_by_type(self, resourcetype):
+        return self._res[self.type2pos[resourcetype]]
+
+    def set_by_type(self, resourcetype, value):
+        self._res[self.type2pos[resourcetype]] = value        
+        
+    def is_zero_or_less(self):
+        return sum([v for v in self._res]) <= 0
+    
+    def __repr__(self):
+        r=""
+        for i, x in enumerate(self._res):
+            r += "%s:%.2f " % (self.descriptions[i], x)
+        return r
+
+
+class SlotTable(object):
+    def __init__(self):
+        self.logger = logging.getLogger("SLOT")
+        self.nodes = NodeList()
+        self.reservations = []
+        self.reservationsByStart = []
+        self.reservationsByEnd = []
+        self.availabilitycache = {}
+        self.changepointcache = None
+        
+        self.availabilitywindow = AvailabilityWindow(self)
+
+    def add_node(self, resourcepoolnode):
+        self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
+
+    def is_empty(self):
+        return (len(self.reservationsByStart) == 0)
+
+    def dirty(self):
+        # You're a dirty, dirty slot table and you should be
+        # ashamed of having outdated caches!
+        self.availabilitycache = {}
+        self.changepointcache = None
+        
+    def getAvailabilityCacheMiss(self, time):
+        allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
+        onlynodes = None       
+        nodes = {} 
+        reservations = self.getReservationsAt(time)
+        # Find how much resources are available on each node
+        canpreempt = True
+        for r in reservations:
+            for node in r.resources_in_pnode:
+                if onlynodes == None or (onlynodes != None and node in onlynodes):
+                    if not nodes.has_key(node):
+                        n = self.nodes[node]
+                        if canpreempt:
+                            nodes[node] = Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode)
+                        else:
+                            nodes[node] = Node(n.capacity, None, n.resourcepoolnode)
+                    nodes[node].capacity.decr(r.resources_in_pnode[node])
+                    if canpreempt and not r.is_preemptible:
+                        nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
+
+        # For the remaining nodes, use a reference to the original node, not a copy
+        if onlynodes == None:
+            missing = allnodes - set(nodes.keys())
+        else:
+            missing = onlynodes - set(nodes.keys())
+            
+        for node in missing:
+            nodes[node] = self.nodes[node]                    
+            
+        self.availabilitycache[time] = nodes
+
+    def getAvailability(self, time, resreq=None, onlynodes=None, canpreempt=False):
+        if not self.availabilitycache.has_key(time):
+            self.getAvailabilityCacheMiss(time)
+            # Cache miss
+            
+        nodes = self.availabilitycache[time]
+
+        if onlynodes != None:
+            onlynodes = set(onlynodes)
+            nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
+
+        # Keep only those nodes with enough resources
+        if resreq != None:
+            newnodes = {}
+            for n, node in nodes.items():
+                if not resreq.fits_in(node.capacity) or (canpreempt and not resreq.fits_in(node.capacitywithpreemption)):
+                    pass
+                else:
+                    newnodes[n]=node
+            nodes = newnodes
+
+        return nodes
+    
+    def get_total_capacity(self, restype = constants.RES_CPU):
+        return sum([n.capacity.get_by_type(restype) for n in self.nodes.nodelist])        
+
+    def getReservationsAt(self, time):
+        item = KeyValueWrapper(time, None)
+        startpos = bisect.bisect_right(self.reservationsByStart, item)
+        bystart = set([x.value for x in self.reservationsByStart[:startpos]])
+        endpos = bisect.bisect_right(self.reservationsByEnd, item)
+        byend = set([x.value for x in self.reservationsByEnd[endpos:]])
+        res = bystart & byend
+        return list(res)
+    
+    def get_reservations_starting_between(self, start, end):
+        startitem = KeyValueWrapper(start, None)
+        enditem = KeyValueWrapper(end, None)
+        startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+        endpos = bisect.bisect_right(self.reservationsByStart, enditem)
+        res = [x.value for x in self.reservationsByStart[startpos:endpos]]
+        return res
+
+    def get_reservations_starting_after(self, start):
+        startitem = KeyValueWrapper(start, None)
+        startpos = bisect.bisect_left(self.reservationsByStart, startitem)
+        res = [x.value for x in self.reservationsByStart[startpos:]]
+        return res
+
+    def get_reservations_ending_after(self, end):
+        startitem = KeyValueWrapper(end, None)
+        startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
+        res = [x.value for x in self.reservationsByEnd[startpos:]]
+        return res
+
+    def get_reservations_ending_between(self, start, end):
+        startitem = KeyValueWrapper(start, None)
+        enditem = KeyValueWrapper(end, None)
+        startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
+        endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
+        res = [x.value for x in self.reservationsByEnd[startpos:endpos]]
+        return res
+    
+    def get_reservations_starting_at(self, time):
+        return self.get_reservations_starting_between(time, time)
+
+    def get_reservations_ending_at(self, time):
+        return self.get_reservations_ending_between(time, time)
+    
+    # ONLY for simulation
+    def getNextPrematureEnd(self, after):
+        from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+        # Inefficient, but ok since this query seldom happens
+        res = [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend > after]
+        if len(res) > 0:
+            prematureends = [r.prematureend for r in res]
+            prematureends.sort()
+            return prematureends[0]
+        else:
+            return None
+    
+    # ONLY for simulation
+    def getPrematurelyEndingRes(self, t):
+        from haizea.resourcemanager.scheduler.vm_scheduler import VMResourceReservation
+        return [i.value for i in self.reservationsByEnd if isinstance(i.value, VMResourceReservation) and i.value.prematureend == t]
+
+    
+    def get_reservations_starting_or_ending_after(self, after):
+        item = KeyValueWrapper(after, None)
+        startpos = bisect.bisect_right(self.reservationsByStart, item)
+        bystart = set([x.value for x in self.reservationsByStart[:startpos]])
+        endpos = bisect.bisect_right(self.reservationsByEnd, item)
+        byend = set([x.value for x in self.reservationsByEnd[endpos:]])
+        res = bystart | byend
+        return list(res)    
+    
+    def addReservation(self, rr):
+        startitem = KeyValueWrapper(rr.start, rr)
+        enditem = KeyValueWrapper(rr.end, rr)
+        bisect.insort(self.reservationsByStart, startitem)
+        bisect.insort(self.reservationsByEnd, enditem)
+        self.dirty()
+
+    # If the slot table keys are not modified (start / end time)
+    # Just remove and reinsert.
+    def updateReservation(self, rr):
+        # TODO: Might be more efficient to resort lists
+        self.removeReservation(rr)
+        self.addReservation(rr)
+        self.dirty()
+
+    # If the slot table keys are modified (start and/or end time)
+    # provide the old keys (so we can remove it using
+    # the m) and updated reservation
+    def update_reservation_with_key_change(self, rr, old_start, old_end):
+        # TODO: Might be more efficient to resort lists
+        self.removeReservation(rr, old_start, old_end)
+        self.addReservation(rr)
+        self.dirty()
+
+
+    def getIndexOfReservation(self, rlist, rr, key):
+        item = KeyValueWrapper(key, None)
+        pos = bisect.bisect_left(rlist, item)
+        found = False
+        while not found:
+            if rlist[pos].value == rr:
+                found = True
+            else:
+                pos += 1
+        return pos
+
+    def removeReservation(self, rr, start=None, end=None):
+        if start == None:
+            start = rr.start
+        if end == None:
+            end = rr.start
+        posstart = self.getIndexOfReservation(self.reservationsByStart, rr, start)
+        posend = self.getIndexOfReservation(self.reservationsByEnd, rr, end)
+        self.reservationsByStart.pop(posstart)
+        self.reservationsByEnd.pop(posend)
+        self.dirty()
+
+    
+    def findChangePointsAfter(self, after, until=None, nodes=None):
+        changepoints = set()
+        res = self.get_reservations_starting_or_ending_after(after)
+        for rr in res:
+            if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
+                if rr.start > after:
+                    changepoints.add(rr.start)
+                if rr.end > after:
+                    changepoints.add(rr.end)
+        changepoints = list(changepoints)
+        if until != None:
+            changepoints =  [c for c in changepoints if c < until]
+        changepoints.sort()
+        return changepoints
+    
+    def peekNextChangePoint(self, time):
+        if self.changepointcache == None:
+            # Cache is empty
+            changepoints = self.findChangePointsAfter(time)
+            changepoints.reverse()
+            self.changepointcache = changepoints
+        if len(self.changepointcache) == 0:
+            return None
+        else:
+            return self.changepointcache[-1]
+    
+    def getNextChangePoint(self, time):
+        p = self.peekNextChangePoint(time)
+        if p != None:
+            self.changepointcache.pop()
+        return p
+        
+    def isFull(self, time):
+        nodes = self.getAvailability(time)
+        avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
+        return (avail == 0)
+    
+    def get_next_reservations_in_nodes(self, time, nodes, rr_type=None, immediately_next = False):
+        nodes = set(nodes)
+        rrs_in_nodes = []
+        earliest_end_time = {}
+        rrs = self.get_reservations_starting_after(time)
+        if rr_type != None:
+            rrs = [rr for rr in rrs if isinstance(rr, rr_type)]
+            
+        # Filter the RRs by nodes
+        for r in rrs:
+            rr_nodes = set(rr.resources_in_pnode.keys())
+            if len(nodes & rr_nodes) > 0:
+                rrs_in_nodes.append(rr)
+                end = rr.end
+                for n in rr_nodes:
+                    if not earliest_end_time.has_key(n):
+                        earliest_end_time[n] = end
+                    else:
+                        if end < earliest_end_time[n]:
+                            earliest_end_time[n] = end
+                            
+        if immediately_next:
+            # We only want to include the ones that are immediately
+            # next. 
+            rr_nodes_excl = set()
+            for n in nodes:
+                if earliest_end_time.has_key(n):
+                    end = earliest_end_time[n]
+                    rrs = [rr for rr in rrs_in_nodes if n in rr.resources_in_pnode.keys() and rr.start < end]
+                    rr_nodes_excl.update(rrs)
+            rrs_in_nodes = list(rr_nodes_excl)
+        
+        return rrs_in_nodes
+
+class AvailEntry(object):
+    def __init__(self, time, avail, availpreempt, resreq):
+        self.time = time
+        self.avail = avail
+        self.availpreempt = availpreempt
+        
+        if avail == None and availpreempt == None:
+            self.canfit = 0
+            self.canfitpreempt = 0
+        else:
+            self.canfit = resreq.get_num_fits_in(avail)
+            if availpreempt == None:
+                self.canfitpreempt = 0
+            else:
+                self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
+        
+    def getCanfit(self, canpreempt):
+        if canpreempt:
+            return self.canfitpreempt
+        else:
+            return self.canfit
+
+
+class AvailabilityWindow(object):
+    def __init__(self, slottable):
+        self.slottable = slottable
+        self.logger = logging.getLogger("SLOTTABLE.WIN")
+        self.time = None
+        self.resreq = None
+        self.onlynodes = None
+        self.avail = None
+        
+    # Create avail structure
+    def initWindow(self, time, resreq, onlynodes = None, canpreempt=False):
+        self.time = time
+        self.resreq = resreq
+        self.onlynodes = onlynodes
+        self.avail = {}
+
+        # Availability at initial time
+        availatstart = self.slottable.getAvailability(self.time, self.resreq, self.onlynodes, canpreempt)
+        for node in availatstart:
+            capacity = availatstart[node].capacity
+            if canpreempt:
+                capacitywithpreemption = availatstart[node].capacitywithpreemption
+            else:
+                capacitywithpreemption = None
+            self.avail[node] = [AvailEntry(self.time, capacity, capacitywithpreemption, self.resreq)]
+        
+        # Determine the availability at the subsequent change points
+        nodes = set(availatstart.keys())
+        res = self.slottable.get_reservations_starting_after(self.time)
+        changepoints = set()
+        for rr in res:
+            if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
+                changepoints.add(rr.start)
+        changepoints = list(changepoints)
+        changepoints.sort()
+        for p in changepoints:
+            availatpoint = self.slottable.getAvailability(p, self.resreq, nodes, canpreempt)
+            newnodes = set(availatpoint.keys())
+            
+            # Add entries for nodes that have no resources available
+            # (for, at least, one VM)
+            fullnodes = nodes - newnodes
+            for node in fullnodes:
+                self.avail[node].append(AvailEntry(p, None, None, None))
+                nodes.remove(node)
+                
+            # For the rest, only interested if the available resources
+            # Decrease in the window
+            for node in newnodes:
+                capacity = availatpoint[node].capacity
+                fits = self.resreq.get_num_fits_in(capacity)
+                if canpreempt:
+                    capacitywithpreemption = availatpoint[node].capacitywithpreemption
+                    fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
+                prevavail = self.avail[node][-1]
+                if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
+                    self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
+                elif canpreempt and (prevavail.getCanfit(canpreempt=False) > fits or prevavail.getCanfit(canpreempt=True) > fitswithpreemption):
+                    self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
+                  
+    
+    def fitAtStart(self, nodes = None, canpreempt = False):
+        if nodes != None:
+            avail = [v for (k, v) in self.avail.items() if k in nodes]
+        else:
+            avail = self.avail.values()
+        if canpreempt:
+            return sum([e[0].canfitpreempt for e in avail])
+        else:
+            return sum([e[0].canfit for e in avail])
+        
+    # TODO: Also return the amount of resources that would have to be
+    # preempted in each physnode
+    def findPhysNodesForVMs(self, numnodes, maxend, strictend=False, canpreempt=False):
+        # Returns the physical nodes that can run all VMs, and the
+        # time at which the VMs must end
+        canfit = dict([(n, v[0].getCanfit(canpreempt)) for (n, v) in self.avail.items()])
+        entries = []
+        for n in self.avail.keys():
+            entries += [(n, e) for e in self.avail[n][1:]]
+        getTime = lambda x: x[1].time
+        entries.sort(key=getTime)
+        if strictend:
+            end = None
+        else:
+            end = maxend
+        for e in entries:
+            physnode = e[0]
+            entry = e[1]
+       
+            if entry.time >= maxend:
+                # Can run to its maximum duration
+                break
+            else:
+                diff = canfit[physnode] - entry.getCanfit(canpreempt)
+                totalcanfit = sum([n for n in canfit.values()]) - diff
+                if totalcanfit < numnodes and not strictend:
+                    # Not enough resources. Must end here
+                    end = entry.time
+                    break
+                else:
+                    # Update canfit
+                    canfit[physnode] = entry.getCanfit(canpreempt)
+
+        # Filter out nodes where we can't fit any vms
+        canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
+        
+        return end, canfit
+            
+                    
+    def printContents(self, nodes = None, withpreemption = False):
+        if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
+            if nodes == None:
+                physnodes = self.avail.keys()
+            else:
+                physnodes = [k for k in self.avail.keys() if k in nodes]
+            physnodes.sort()
+            if withpreemption:
+                p = "(with preemption)"
+            else:
+                p = "(without preemption)"
+            self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
+            for n in physnodes:
+                contents = "Node %i --- " % n
+                for x in self.avail[n]:
+                    contents += "[ %s " % x.time
+                    contents += "{ "
+                    if x.avail == None and x.availpreempt == None:
+                        contents += "END "
+                    else:
+                        if withpreemption:
+                            res = x.availpreempt
+                            canfit = x.canfitpreempt
+                        else:
+                            res = x.avail
+                            canfit = x.canfit
+                        contents += "%s" % res
+                    contents += "} (Fits: %i) ]  " % canfit
+                self.logger.vdebug(contents)
+                
+
+                
+                          
+                          
+            
+
+
+        
+        
\ No newline at end of file


Property changes on: trunk/src/haizea/resourcemanager/scheduler/slottable.py
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Added: trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py	                        (rev 0)
+++ trunk/src/haizea/resourcemanager/scheduler/vm_scheduler.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -0,0 +1,1378 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago                                 #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
+# Complutense de Madrid (dsa-research.org)                                   #
+#                                                                            #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
+# not use this file except in compliance with the License. You may obtain    #
+# a copy of the License at                                                   #
+#                                                                            #
+# http://www.apache.org/licenses/LICENSE-2.0                                 #
+#                                                                            #
+# Unless required by applicable law or agreed to in writing, software        #
+# distributed under the License is distributed on an "AS IS" BASIS,          #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
+# See the License for the specific language governing permissions and        #
+# limitations under the License.                                             #
+# -------------------------------------------------------------------------- #
+
+import haizea.common.constants as constants
+from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_accounting, get_clock
+from haizea.resourcemanager.scheduler.slottable import SlotTable, SlotFittingException
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.scheduler.slottable import ResourceReservation, ResourceTuple
+from haizea.resourcemanager.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
+from haizea.resourcemanager.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException
+from operator import attrgetter, itemgetter
+from mx.DateTime import TimeDelta
+
+import logging
+
+
+class VMScheduler(object):
+    """The Haizea VM Scheduler
+    
+    """
+    
+    def __init__(self, slottable, resourcepool):
+        self.slottable = slottable
+        self.resourcepool = resourcepool
+        self.logger = logging.getLogger("VMSCHED")
+        
+        self.handlers = {}
+        self.handlers[VMResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = VMScheduler._handle_start_vm,
+                                on_end   = VMScheduler._handle_end_vm)
+
+        self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = VMScheduler._handle_start_shutdown,
+                                on_end   = VMScheduler._handle_end_shutdown)
+
+        self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = VMScheduler._handle_start_suspend,
+                                on_end   = VMScheduler._handle_end_suspend)
+
+        self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = VMScheduler._handle_start_resume,
+                                on_end   = VMScheduler._handle_end_resume)
+
+        self.handlers[MigrationResourceReservation] = ReservationEventHandler(
+                                sched    = self,
+                                on_start = VMScheduler._handle_start_migrate,
+                                on_end   = VMScheduler._handle_end_migrate)
+        
+        backfilling = get_config().get("backfilling")
+        if backfilling == constants.BACKFILLING_OFF:
+            self.maxres = 0
+        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
+            self.maxres = 1
+        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
+            self.maxres = 1000000 # Arbitrarily large
+        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
+            self.maxres = get_config().get("backfilling-reservations")
+
+        self.numbesteffortres = 0
+
+    def fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
+        lease_id = leasereq.id
+        start = leasereq.start.requested
+        end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
+        diskImageID = leasereq.diskimage_id
+        numnodes = leasereq.numnodes
+        resreq = leasereq.requested_resources
+
+        availabilitywindow = self.slottable.availabilitywindow
+
+        availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
+        availabilitywindow.printContents(withpreemption = False)
+        availabilitywindow.printContents(withpreemption = True)
+
+        mustpreempt = False
+        unfeasiblewithoutpreemption = False
+        
+        fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
+        if fitatstart < numnodes:
+            if not canpreempt:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                unfeasiblewithoutpreemption = True
+        feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
+        fitatend = sum([n for n in canfitnopreempt.values()])
+        if fitatend < numnodes:
+            if not canpreempt:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                unfeasiblewithoutpreemption = True
+
+        canfitpreempt = None
+        if canpreempt:
+            fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
+            if fitatstart < numnodes:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
+            fitatend = sum([n for n in canfitpreempt.values()])
+            if fitatend < numnodes:
+                raise SlotFittingException, "Not enough resources in specified interval"
+            else:
+                if unfeasiblewithoutpreemption:
+                    mustpreempt = True
+                else:
+                    mustpreempt = False
+
+        # At this point we know if the lease is feasible, and if
+        # will require preemption.
+        if not mustpreempt:
+           self.logger.debug("The VM reservations for this lease are feasible without preemption.")
+        else:
+           self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
+
+        # merge canfitnopreempt and canfitpreempt
+        canfit = {}
+        for node in canfitnopreempt:
+            vnodes = canfitnopreempt[node]
+            canfit[node] = [vnodes, vnodes]
+        for node in canfitpreempt:
+            vnodes = canfitpreempt[node]
+            if canfit.has_key(node):
+                canfit[node][1] = vnodes
+            else:
+                canfit[node] = [0, vnodes]
+
+        orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
+            
+        self.logger.debug("Node ordering: %s" % orderednodes)
+        
+        # vnode -> pnode
+        nodeassignment = {}
+        
+        # pnode -> resourcetuple
+        res = {}
+        
+        # physnode -> how many vnodes
+        preemptions = {}
+        
+        vnode = 1
+        if avoidpreempt:
+            # First pass, without preemption
+            for physnode in orderednodes:
+                canfitinnode = canfit[physnode][0]
+                for i in range(1, canfitinnode+1):
+                    nodeassignment[vnode] = physnode
+                    if res.has_key(physnode):
+                        res[physnode].incr(resreq)
+                    else:
+                        res[physnode] = ResourceTuple.copy(resreq)
+                    canfit[physnode][0] -= 1
+                    canfit[physnode][1] -= 1
+                    vnode += 1
+                    if vnode > numnodes:
+                        break
+                if vnode > numnodes:
+                    break
+            
+        # Second pass, with preemption
+        if mustpreempt or not avoidpreempt:
+            for physnode in orderednodes:
+                canfitinnode = canfit[physnode][1]
+                for i in range(1, canfitinnode+1):
+                    nodeassignment[vnode] = physnode
+                    if res.has_key(physnode):
+                        res[physnode].incr(resreq)
+                    else:
+                        res[physnode] = ResourceTuple.copy(resreq)
+                    canfit[physnode][1] -= 1
+                    vnode += 1
+                    # Check if this will actually result in a preemption
+                    if canfit[physnode][0] == 0:
+                        if preemptions.has_key(physnode):
+                            preemptions[physnode].incr(resreq)
+                        else:
+                            preemptions[physnode] = ResourceTuple.copy(resreq)
+                    else:
+                        canfit[physnode][0] -= 1
+                    if vnode > numnodes:
+                        break
+                if vnode > numnodes:
+                    break
+
+        if vnode <= numnodes:
+            raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
+
+        # Create VM resource reservations
+        vmrr = VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
+        vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+        self.__schedule_shutdown(vmrr)
+
+        return vmrr, preemptions
+
+    def fit_asap(self, lease, nexttime, earliest, allow_reservation_in_future = False):
+        lease_id = lease.id
+        remaining_duration = lease.duration.get_remaining_duration()
+        numnodes = lease.numnodes
+        requested_resources = lease.requested_resources
+        preemptible = lease.preemptible
+        mustresume = (lease.state == Lease.STATE_SUSPENDED)
+        shutdown_time = self.__estimate_shutdown_time(lease)
+        susptype = get_config().get("suspension")
+        if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
+            suspendable = False
+        else:
+            suspendable = True
+
+        canmigrate = get_config().get("migration")
+
+        #
+        # STEP 1: FIGURE OUT THE MINIMUM DURATION
+        #
+        
+        min_duration = self.__compute_scheduling_threshold(lease)
+
+
+        #
+        # STEP 2: FIND THE CHANGEPOINTS
+        #
+
+        # Find the changepoints, and the nodes we can use at each changepoint
+        # Nodes may not be available at a changepoint because images
+        # cannot be transferred at that time.
+        if not mustresume:
+            cps = [(node, e[0]) for node, e in earliest.items()]
+            cps.sort(key=itemgetter(1))
+            curcp = None
+            changepoints = []
+            nodes = []
+            for node, time in cps:
+                nodes.append(node)
+                if time != curcp:
+                    changepoints.append([time, nodes[:]])
+                    curcp = time
+                else:
+                    changepoints[-1][1] = nodes[:]
+        else:
+            if not canmigrate:
+                vmrr = lease.get_last_vmrr()
+                curnodes = set(vmrr.nodes.values())
+            else:
+                curnodes=None
+                # If we have to resume this lease, make sure that
+                # we have enough time to transfer the images.
+                migratetime = self.__estimate_migration_time(lease)
+                earliesttransfer = get_clock().get_time() + migratetime
+    
+                for n in earliest:
+                    earliest[n][0] = max(earliest[n][0], earliesttransfer)
+
+            changepoints = list(set([x[0] for x in earliest.values()]))
+            changepoints.sort()
+            changepoints = [(x, curnodes) for x in changepoints]
+
+        # If we can make reservations in the future,
+        # we also consider future changepoints
+        # (otherwise, we only allow the VMs to start "now", accounting
+        #  for the fact that vm images will have to be deployed)
+        if allow_reservation_in_future:
+            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
+            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
+            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
+            # included in futurecp.
+            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
+            futurecp = [(p,None) for p in futurecp]
+        else:
+            futurecp = []
+
+
+
+        #
+        # STEP 3: SLOT FITTING
+        #
+        
+        # If resuming, we also have to allocate enough for the resumption
+        if mustresume:
+            duration = remaining_duration + self.__estimate_resume_time(lease)
+        else:
+            duration = remaining_duration
+
+        duration += shutdown_time
+
+        # First, assuming we can't make reservations in the future
+        start, end, canfit = self.__find_fit_at_points(
+                                                       changepoints, 
+                                                       numnodes, 
+                                                       requested_resources, 
+                                                       duration, 
+                                                       suspendable, 
+                                                       min_duration)
+        
+        if start == None:
+            if not allow_reservation_in_future:
+                # We did not find a suitable starting time. This can happen
+                # if we're unable to make future reservations
+                raise SchedException, "Could not find enough resources for this request"
+        else:
+            mustsuspend = (end - start) < duration
+            if mustsuspend and not suspendable:
+                if not allow_reservation_in_future:
+                    raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
+                else:
+                    start = None # No satisfactory start time
+            
+        # If we haven't been able to fit the lease, check if we can
+        # reserve it in the future
+        if start == None and allow_reservation_in_future:
+            start, end, canfit = self.__find_fit_at_points(
+                                                           futurecp, 
+                                                           numnodes, 
+                                                           requested_resources, 
+                                                           duration, 
+                                                           suspendable, 
+                                                           min_duration
+                                                           )
+
+
+        if start in [p[0] for p in futurecp]:
+            reservation = True
+        else:
+            reservation = False
+
+
+        #
+        # STEP 4: FINAL SLOT FITTING
+        #
+        # At this point, we know the lease fits, but we have to map it to
+        # specific physical nodes.
+        
+        # Sort physical nodes
+        physnodes = canfit.keys()
+        if mustresume:
+            # If we're resuming, we prefer resuming in the nodes we're already
+            # deployed in, to minimize the number of transfers.
+            vmrr = lease.get_last_vmrr()
+            nodes = set(vmrr.nodes.values())
+            availnodes = set(physnodes)
+            deplnodes = availnodes.intersection(nodes)
+            notdeplnodes = availnodes.difference(nodes)
+            physnodes = list(deplnodes) + list(notdeplnodes)
+        else:
+            physnodes.sort() # Arbitrary, prioritize nodes, as in exact
+        
+        # Map to physical nodes
+        mappings = {}
+        res = {}
+        vmnode = 1
+        while vmnode <= numnodes:
+            for n in physnodes:
+                if canfit[n]>0:
+                    canfit[n] -= 1
+                    mappings[vmnode] = n
+                    if res.has_key(n):
+                        res[n].incr(requested_resources)
+                    else:
+                        res[n] = ResourceTuple.copy(requested_resources)
+                    vmnode += 1
+                    break
+
+
+        vmrr = VMResourceReservation(lease, start, end, mappings, res, reservation)
+        vmrr.state = ResourceReservation.STATE_SCHEDULED
+
+        if mustresume:
+            self.__schedule_resumption(vmrr, start)
+
+        mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
+        if mustsuspend:
+            self.__schedule_suspension(vmrr, end)
+        else:
+            # Compensate for any overestimation
+            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
+                vmrr.end = vmrr.start + remaining_duration + shutdown_time
+            self.__schedule_shutdown(vmrr)
+        
+
+        
+        susp_str = res_str = ""
+        if mustresume:
+            res_str = " (resuming)"
+        if mustsuspend:
+            susp_str = " (suspending)"
+        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
+
+        return vmrr, reservation
+
+    # TODO: This has to be tied in with the preparation scheduler
+    def schedule_migration(self, lease, vmrr, nexttime):
+        last_vmrr = lease.get_last_vmrr()
+        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
+        
+        mustmigrate = False
+        for vnode in vnode_migrations:
+            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
+                mustmigrate = True
+                break
+            
+        if not mustmigrate:
+            return
+
+        # Figure out what migrations can be done simultaneously
+        migrations = []
+        while len(vnode_migrations) > 0:
+            pnodes = set()
+            migration = {}
+            for vnode in vnode_migrations:
+                origin = vnode_migrations[vnode][0]
+                dest = vnode_migrations[vnode][1]
+                if not origin in pnodes and not dest in pnodes:
+                    migration[vnode] = vnode_migrations[vnode]
+                    pnodes.add(origin)
+                    pnodes.add(dest)
+            for vnode in migration:
+                del vnode_migrations[vnode]
+            migrations.append(migration)
+        
+        # Create migration RRs
+        start = last_vmrr.post_rrs[-1].end
+        migr_time = self.__estimate_migration_time(lease)
+        bandwidth = self.resourcepool.info.get_migration_bandwidth()
+        migr_rrs = []
+        for m in migrations:
+            end = start + migr_time
+            res = {}
+            for (origin,dest) in m.values():
+                resorigin = ResourceTuple.create_empty()
+                resorigin.set_by_type(constants.RES_NETOUT, bandwidth)
+                resdest = ResourceTuple.create_empty()
+                resdest.set_by_type(constants.RES_NETIN, bandwidth)
+                res[origin] = resorigin
+                res[dest] = resdest
+            migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
+            migr_rr.state = ResourceReservation.STATE_SCHEDULED
+            migr_rrs.append(migr_rr)
+            start = end
+
+        migr_rrs.reverse()
+        for migr_rr in migr_rrs:
+            vmrr.pre_rrs.insert(0, migr_rr)
+
+
+
+    def can_reserve_besteffort_in_future(self):
+        return self.numbesteffortres < self.maxres
+                
+    def is_backfilling(self):
+        return self.maxres > 0
+
+
+    def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
+        start = None
+        end = None
+        canfit = None
+        availabilitywindow = self.slottable.availabilitywindow
+
+
+        for p in changepoints:
+            availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
+            availabilitywindow.printContents()
+            
+            if availabilitywindow.fitAtStart() >= numnodes:
+                start=p[0]
+                maxend = start + duration
+                end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
+        
+                self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
+                
+                if end < maxend:
+                    self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
+                    
+                    if not suspendable:
+                        pass
+                        # If we can't suspend, this fit is no good, and we have to keep looking
+                    else:
+                        # If we can suspend, we still have to check if the lease will
+                        # be able to run for the specified minimum duration
+                        if end-start > min_duration:
+                            break # We found a fit; stop looking
+                        else:
+                            self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
+                            # Set start back to None, to indicate that we haven't
+                            # found a satisfactory start time
+                            start = None
+                else:
+                    # We've found a satisfactory starting time
+                    break        
+                
+        return start, end, canfit
+    
+    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
+        times = [] # (start, end, {pnode -> vnodes})
+        enactment_overhead = get_config().get("enactment-overhead") 
+
+        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
+            # Global exclusion (which represents, e.g., reading/writing the memory image files
+            # from a global file system) meaning no two suspensions/resumptions can happen at 
+            # the same time in the entire resource pool.
+            
+            t = time
+            t_prev = None
+                
+            for (vnode,pnode) in vmrr.nodes.items():
+                if override == None:
+                    mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                    op_time = self.__compute_suspend_resume_time(mem, rate)
+                else:
+                    op_time = override
+
+                op_time += enactment_overhead
+                    
+                t_prev = t
+                
+                if direction == constants.DIRECTION_FORWARD:
+                    t += op_time
+                    times.append((t_prev, t, {pnode:[vnode]}))
+                elif direction == constants.DIRECTION_BACKWARD:
+                    t -= op_time
+                    times.append((t, t_prev, {pnode:[vnode]}))
+
+        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
+            # Local exclusion (which represents, e.g., reading the memory image files
+            # from a local file system) means no two resumptions can happen at the same
+            # time in the same physical node.
+            pervnode_times = [] # (start, end, vnode)
+            vnodes_in_pnode = {}
+            for (vnode,pnode) in vmrr.nodes.items():
+                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
+            for pnode in vnodes_in_pnode:
+                t = time
+                t_prev = None
+                for vnode in vnodes_in_pnode[pnode]:
+                    if override == None:
+                        mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                        op_time = self.__compute_suspend_resume_time(mem, rate)
+                    else:
+                        op_time = override                    
+                    
+                    t_prev = t
+                    
+                    if direction == constants.DIRECTION_FORWARD:
+                        t += op_time
+                        pervnode_times.append((t_prev, t, vnode))
+                    elif direction == constants.DIRECTION_BACKWARD:
+                        t -= op_time
+                        pervnode_times.append((t, t_prev, vnode))
+            
+            # Consolidate suspend/resume operations happening at the same time
+            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
+            for (start, end) in uniq_times:
+                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
+                node_mappings = {}
+                for vnode in vnodes:
+                    pnode = vmrr.nodes[vnode]
+                    node_mappings.setdefault(pnode, []).append(vnode)
+                times.append([start,end,node_mappings])
+        
+            # Add the enactment overhead
+            for t in times:
+                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
+                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
+                if direction == constants.DIRECTION_FORWARD:
+                    t[1] += overhead
+                elif direction == constants.DIRECTION_BACKWARD:
+                    t[0] -= overhead
+                    
+            # Fix overlaps
+            if direction == constants.DIRECTION_FORWARD:
+                times.sort(key=itemgetter(0))
+            elif direction == constants.DIRECTION_BACKWARD:
+                times.sort(key=itemgetter(1))
+                times.reverse()
+                
+            prev_start = None
+            prev_end = None
+            for t in times:
+                if prev_start != None:
+                    start = t[0]
+                    end = t[1]
+                    if direction == constants.DIRECTION_FORWARD:
+                        if start < prev_end:
+                            diff = prev_end - start
+                            t[0] += diff
+                            t[1] += diff
+                    elif direction == constants.DIRECTION_BACKWARD:
+                        if end > prev_start:
+                            diff = end - prev_start
+                            t[0] -= diff
+                            t[1] -= diff
+                prev_start = t[0]
+                prev_end = t[1]
+        
+        return times
+    
+    def __schedule_shutdown(self, vmrr):
+        config = get_config()
+        shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
+
+        start = vmrr.end - shutdown_time
+        end = vmrr.end
+        
+        shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
+        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
+                
+        vmrr.update_end(start)
+        
+        # If there are any post RRs, remove them
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
+
+        vmrr.post_rrs.append(shutdown_rr)
+
+    def __schedule_suspension(self, vmrr, suspend_by):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        susp_exclusion = config.get("suspendresume-exclusion")
+        override = get_config().get("override-suspend-time")
+        rate = config.get("suspend-rate") 
+
+        if suspend_by < vmrr.start or suspend_by > vmrr.end:
+            raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
+
+        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
+        suspend_rrs = []
+        for (start, end, node_mappings) in times:
+            suspres = {}
+            all_vnodes = []
+            for (pnode,vnodes) in node_mappings.items():
+                num_vnodes = len(vnodes)
+                r = ResourceTuple.create_empty()
+                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+                suspres[pnode] = r          
+                all_vnodes += vnodes                      
+            susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
+            susprr.state = ResourceReservation.STATE_SCHEDULED
+            suspend_rrs.append(susprr)
+                
+        suspend_rrs.sort(key=attrgetter("start"))
+            
+        susp_start = suspend_rrs[0].start
+        if susp_start < vmrr.start:
+            raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
+        
+        vmrr.update_end(susp_start)
+        
+        # If there are any post RRs, remove them
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
+
+        for susprr in suspend_rrs:
+            vmrr.post_rrs.append(susprr)       
+            
+    def __schedule_resumption(self, vmrr, resume_at):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        resm_exclusion = config.get("suspendresume-exclusion")        
+        override = get_config().get("override-resume-time")
+        rate = config.get("resume-rate") 
+
+        if resume_at < vmrr.start or resume_at > vmrr.end:
+            raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
+
+        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
+        resume_rrs = []
+        for (start, end, node_mappings) in times:
+            resmres = {}
+            all_vnodes = []
+            for (pnode,vnodes) in node_mappings.items():
+                num_vnodes = len(vnodes)
+                r = ResourceTuple.create_empty()
+                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
+                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
+                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
+                resmres[pnode] = r
+                all_vnodes += vnodes
+            resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
+            resmrr.state = ResourceReservation.STATE_SCHEDULED
+            resume_rrs.append(resmrr)
+                
+        resume_rrs.sort(key=attrgetter("start"))
+            
+        resm_end = resume_rrs[-1].end
+        if resm_end > vmrr.end:
+            raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
+        
+        vmrr.update_start(resm_end)
+        for resmrr in resume_rrs:
+            vmrr.pre_rrs.append(resmrr)        
+           
+    def __compute_suspend_resume_time(self, mem, rate):
+        time = float(mem) / rate
+        time = round_datetime_delta(TimeDelta(seconds = time))
+        return time
+    
+    def __estimate_suspend_resume_time(self, lease, rate):
+        susp_exclusion = get_config().get("suspendresume-exclusion")        
+        enactment_overhead = get_config().get("enactment-overhead") 
+        mem = lease.requested_resources.get_by_type(constants.RES_MEM)
+        if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
+            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
+        elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
+            # Overestimating
+            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
+
+    def __estimate_shutdown_time(self, lease):
+        enactment_overhead = get_config().get("enactment-overhead").seconds
+        return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
+
+    def __estimate_suspend_time(self, lease):
+        rate = get_config().get("suspend-rate")
+        override = get_config().get("override-suspend-time")
+        if override != None:
+            return override
+        else:
+            return self.__estimate_suspend_resume_time(lease, rate)
+
+    def __estimate_resume_time(self, lease):
+        rate = get_config().get("resume-rate") 
+        override = get_config().get("override-resume-time")
+        if override != None:
+            return override
+        else:
+            return self.__estimate_suspend_resume_time(lease, rate)
+
+
+    def __estimate_migration_time(self, lease):
+        whattomigrate = get_config().get("what-to-migrate")
+        bandwidth = self.resourcepool.info.get_migration_bandwidth()
+        if whattomigrate == constants.MIGRATE_NONE:
+            return TimeDelta(seconds=0)
+        else:
+            if whattomigrate == constants.MIGRATE_MEM:
+                mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
+            elif whattomigrate == constants.MIGRATE_MEMDISK:
+                mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
+            return estimate_transfer_time(mbtotransfer, bandwidth)
+
+    # TODO: Take into account other things like boot overhead, migration overhead, etc.
+    def __compute_scheduling_threshold(self, lease):
+        from haizea.resourcemanager.rm import ResourceManager
+        config = ResourceManager.get_singleton().config
+        threshold = config.get("force-scheduling-threshold")
+        if threshold != None:
+            # If there is a hard-coded threshold, use that
+            return threshold
+        else:
+            factor = config.get("scheduling-threshold-factor")
+            susp_overhead = self.__estimate_suspend_time(lease)
+            safe_duration = susp_overhead
+            
+            if lease.state == Lease.STATE_SUSPENDED:
+                resm_overhead = self.__estimate_resume_time(lease)
+                safe_duration += resm_overhead
+            
+            # TODO: Incorporate other overheads into the minimum duration
+            min_duration = safe_duration
+            
+            # At the very least, we want to allocate enough time for the
+            # safe duration (otherwise, we'll end up with incorrect schedules,
+            # where a lease is scheduled to suspend, but isn't even allocated
+            # enough time to suspend). 
+            # The factor is assumed to be non-negative. i.e., a factor of 0
+            # means we only allocate enough time for potential suspend/resume
+            # operations, while a factor of 1 means the lease will get as much
+            # running time as spend on the runtime overheads involved in setting
+            # it up
+            threshold = safe_duration + (min_duration * factor)
+            return threshold
+
+    def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
+        # TODO2: Choose appropriate prioritizing function based on a
+        # config file, instead of hardcoding it)
+        #
+        # TODO3: Basing decisions only on CPU allocations. This is ok for now,
+        # since the memory allocation is proportional to the CPU allocation.
+        # Later on we need to come up with some sort of weighed average.
+        
+        nodes = canfit.keys()
+        
+        # TODO: The deployment module should just provide a list of nodes
+        # it prefers
+        nodeswithimg=[]
+        #self.lease_deployment_type = get_config().get("lease-preparation")
+        #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
+        #    reusealg = get_config().get("diskimage-reuse")
+        #    if reusealg==constants.REUSE_IMAGECACHES:
+        #        nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
+
+        # Compares node x and node y. 
+        # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
+        def comparenodes(x, y):
+            hasimgX = x in nodeswithimg
+            hasimgY = y in nodeswithimg
+
+            # First comparison: A node with no preemptible VMs is preferible
+            # to one with preemptible VMs (i.e. we want to avoid preempting)
+            canfitnopreemptionX = canfit[x][0]
+            canfitpreemptionX = canfit[x][1]
+            hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
+            
+            canfitnopreemptionY = canfit[y][0]
+            canfitpreemptionY = canfit[y][1]
+            hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
+
+            # TODO: Factor out common code
+            if avoidpreempt:
+                if hasPreemptibleX and not hasPreemptibleY:
+                    return constants.WORSE
+                elif not hasPreemptibleX and hasPreemptibleY:
+                    return constants.BETTER
+                elif not hasPreemptibleX and not hasPreemptibleY:
+                    if hasimgX and not hasimgY: 
+                        return constants.BETTER
+                    elif not hasimgX and hasimgY: 
+                        return constants.WORSE
+                    else:
+                        if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
+                        elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
+                        else: return constants.EQUAL
+                elif hasPreemptibleX and hasPreemptibleY:
+                    # If both have (some) preemptible resources, we prefer those
+                    # that involve the less preemptions
+                    preemptX = canfitpreemptionX - canfitnopreemptionX
+                    preemptY = canfitpreemptionY - canfitnopreemptionY
+                    if preemptX < preemptY:
+                        return constants.BETTER
+                    elif preemptX > preemptY:
+                        return constants.WORSE
+                    else:
+                        if hasimgX and not hasimgY: return constants.BETTER
+                        elif not hasimgX and hasimgY: return constants.WORSE
+                        else: return constants.EQUAL
+            elif not avoidpreempt:
+                # First criteria: Can we reuse image?
+                if hasimgX and not hasimgY: 
+                    return constants.BETTER
+                elif not hasimgX and hasimgY: 
+                    return constants.WORSE
+                else:
+                    # Now we just want to avoid preemption
+                    if hasPreemptibleX and not hasPreemptibleY:
+                        return constants.WORSE
+                    elif not hasPreemptibleX and hasPreemptibleY:
+                        return constants.BETTER
+                    elif hasPreemptibleX and hasPreemptibleY:
+                        # If both have (some) preemptible resources, we prefer those
+                        # that involve the less preemptions
+                        preemptX = canfitpreemptionX - canfitnopreemptionX
+                        preemptY = canfitpreemptionY - canfitnopreemptionY
+                        if preemptX < preemptY:
+                            return constants.BETTER
+                        elif preemptX > preemptY:
+                            return constants.WORSE
+                        else:
+                            if hasimgX and not hasimgY: return constants.BETTER
+                            elif not hasimgX and hasimgY: return constants.WORSE
+                            else: return constants.EQUAL
+                    else:
+                        return constants.EQUAL
+        
+        # Order nodes
+        nodes.sort(comparenodes)
+        return nodes        
+
+    def find_preemptable_leases(self, mustpreempt, startTime, endTime):
+        def comparepreemptability(rrX, rrY):
+            if rrX.lease.submit_time > rrY.lease.submit_time:
+                return constants.BETTER
+            elif rrX.lease.submit_time < rrY.lease.submit_time:
+                return constants.WORSE
+            else:
+                return constants.EQUAL        
+            
+        def preemptedEnough(amountToPreempt):
+            for node in amountToPreempt:
+                if not amountToPreempt[node].is_zero_or_less():
+                    return False
+            return True
+        
+        # Get allocations at the specified time
+        atstart = set()
+        atmiddle = set()
+        nodes = set(mustpreempt.keys())
+        
+        reservationsAtStart = self.slottable.getReservationsAt(startTime)
+        reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
+        reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
+                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
+        
+        reservationsAtStart.sort(comparepreemptability)
+        reservationsAtMiddle.sort(comparepreemptability)
+        
+        amountToPreempt = {}
+        for n in mustpreempt:
+            amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+
+        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
+        for r in reservationsAtStart:
+            # The following will really only come into play when we have
+            # multiple VMs per node
+            mustpreemptres = False
+            for n in r.resources_in_pnode.keys():
+                # Don't need to preempt if we've already preempted all
+                # the needed resources in node n
+                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                    amountToPreempt[n].decr(r.resources_in_pnode[n])
+                    mustpreemptres = True
+            if mustpreemptres:
+                atstart.add(r)
+            if preemptedEnough(amountToPreempt):
+                break
+        
+        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
+        if len(reservationsAtMiddle)>0:
+            changepoints = set()
+            for r in reservationsAtMiddle:
+                changepoints.add(r.start)
+            changepoints = list(changepoints)
+            changepoints.sort()        
+            
+            for cp in changepoints:
+                amountToPreempt = {}
+                for n in mustpreempt:
+                    amountToPreempt[n] = ResourceTuple.copy(mustpreempt[n])
+                reservations = [r for r in reservationsAtMiddle 
+                                if r.start <= cp and cp < r.end]
+                for r in reservations:
+                    mustpreemptres = False
+                    for n in r.resources_in_pnode.keys():
+                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
+                            amountToPreempt[n].decr(r.resources_in_pnode[n])
+                            mustpreemptres = True
+                    if mustpreemptres:
+                        atmiddle.add(r)
+                    if preemptedEnough(amountToPreempt):
+                        break
+            
+        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
+        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
+        
+        leases = [r.lease for r in atstart|atmiddle]
+        
+        return leases
+                
+    # TODO: Should be moved to LeaseScheduler
+    def reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
+        self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime)) 
+        leases = []
+        vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
+        leases = set([rr.lease for rr in vmrrs])
+        leases = [l for l in leases if isinstance(l, BestEffortLease) and l.state in (Lease.STATE_SUSPENDED,Lease.STATE_READY) and not l in checkedleases]
+        for lease in leases:
+            self.logger.debug("Found lease %i" % l.id)
+            l.print_contents()
+            # Earliest time can't be earlier than time when images will be
+            # available in node
+            earliest = max(nexttime, lease.imagesavail)
+            self.__slideback(lease, earliest)
+            checkedleases.append(l)
+        #for l in leases:
+        #    vmrr, susprr = l.getLastVMRR()
+        #    self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
+          
+    def __slideback(self, lease, earliest):
+        vmrr = lease.get_last_vmrr()
+        # Save original start and end time of the vmrr
+        old_start = vmrr.start
+        old_end = vmrr.end
+        nodes = vmrr.nodes.values()
+        if lease.state == Lease.STATE_SUSPENDED:
+            originalstart = vmrr.pre_rrs[0].start
+        else:
+            originalstart = vmrr.start
+        cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
+        cp = [earliest] + cp
+        newstart = None
+        for p in cp:
+            self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
+            self.slottable.availabilitywindow.printContents()
+            if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
+                (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
+                if end == originalstart and set(nodes) <= set(canfit.keys()):
+                    self.logger.debug("Can slide back to %s" % p)
+                    newstart = p
+                    break
+        if newstart == None:
+            # Can't slide back. Leave as is.
+            pass
+        else:
+            diff = originalstart - newstart
+            if lease.state == Lease.STATE_SUSPENDED:
+                resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+                for resmrr in resmrrs:
+                    resmrr_old_start = resmrr.start
+                    resmrr_old_end = resmrr.end
+                    resmrr.start -= diff
+                    resmrr.end -= diff
+                    self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
+            vmrr.update_start(vmrr.start - diff)
+            
+            # If the lease was going to be suspended, check to see if
+            # we don't need to suspend any more.
+            remdur = lease.duration.get_remaining_duration()
+            if vmrr.is_suspending() and vmrr.end - newstart >= remdur: 
+                vmrr.update_end(vmrr.start + remdur)
+                for susprr in vmrr.post_rrs:
+                    self.slottable.removeReservation(susprr)
+                vmrr.post_rrs = []
+            else:
+                vmrr.update_end(vmrr.end - diff)
+                
+            if not vmrr.is_suspending():
+                # If the VM was set to shutdown, we need to slideback the shutdown RRs
+                for rr in vmrr.post_rrs:
+                    rr_old_start = rr.start
+                    rr_old_end = rr.end
+                    rr.start -= diff
+                    rr.end -= diff
+                    self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
+
+            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
+            self.logger.vdebug("New lease descriptor (after slideback):")
+            lease.print_contents()
+    
+          
+
+    #-------------------------------------------------------------------#
+    #                                                                   #
+    #                  SLOT TABLE EVENT HANDLERS                        #
+    #                                                                   #
+    #-------------------------------------------------------------------#
+
+    def _handle_start_vm(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
+        l.print_contents()
+        if l.state == Lease.STATE_READY:
+            l.state = Lease.STATE_ACTIVE
+            rr.state = ResourceReservation.STATE_ACTIVE
+            now_time = get_clock().get_time()
+            l.start.actual = now_time
+            
+            try:
+                self.resourcepool.start_vms(l, rr)
+                # The next two lines have to be moved somewhere more
+                # appropriate inside the resourcepool module
+                for (vnode, pnode) in rr.nodes.items():
+                    l.diskimagemap[vnode] = pnode
+            except Exception, e:
+                self.logger.error("ERROR when starting VMs.")
+                raise
+        elif l.state == Lease.STATE_RESUMED_READY:
+            l.state = Lease.STATE_ACTIVE
+            rr.state = ResourceReservation.STATE_ACTIVE
+            # No enactment to do here, since all the suspend/resume actions are
+            # handled during the suspend/resume RRs
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
+        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
+
+
+    def _handle_end_vm(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
+        self.logger.vdebug("LEASE-%i Before:" % l.id)
+        l.print_contents()
+        now_time = round_datetime(get_clock().get_time())
+        diff = now_time - rr.start
+        l.duration.accumulate_duration(diff)
+        rr.state = ResourceReservation.STATE_DONE
+       
+        if isinstance(l, BestEffortLease):
+            if rr.backfill_reservation == True:
+                self.numbesteffortres -= 1
+                
+        self.logger.vdebug("LEASE-%i After:" % l.id)
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
+        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
+
+    def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
+        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
+        for rr in vmrr.post_rrs:
+            self.slottable.removeReservation(rr)
+        vmrr.post_rrs = []
+        # TODO: slideback shutdown RRs
+        vmrr.end = get_clock().get_time()
+        self._handle_end_vm(l, vmrr)
+
+    def _handle_start_shutdown(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_ACTIVE
+        self.resourcepool.stop_vms(l, rr)
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
+
+    def _handle_end_shutdown(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_DONE
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
+        self.logger.info("Lease %i shutdown." % (l.id))
+        raise NormalEndLeaseException
+
+
+
+    def _handle_start_suspend(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_ACTIVE
+        self.resourcepool.suspend_vms(l, rr)
+        for vnode in rr.vnodes:
+            pnode = rr.vmrr.nodes[vnode]
+            l.memimagemap[vnode] = pnode
+        if rr.is_first():
+            l.state = Lease.STATE_SUSPENDING
+            l.print_contents()
+            self.logger.info("Suspending lease %i..." % (l.id))
+        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
+
+    def _handle_end_suspend(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
+        l.print_contents()
+        # TODO: React to incomplete suspend
+        self.resourcepool.verify_suspend(l, rr)
+        rr.state = ResourceReservation.STATE_DONE
+        if rr.is_last():
+            l.state = Lease.STATE_SUSPENDED
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
+        self.logger.info("Lease %i suspended." % (l.id))
+        
+        if l.state == Lease.STATE_SUSPENDED:
+            raise RescheduleLeaseException
+
+    def _handle_start_resume(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
+        l.print_contents()
+        self.resourcepool.resume_vms(l, rr)
+        rr.state = ResourceReservation.STATE_ACTIVE
+        if rr.is_first():
+            l.state = Lease.STATE_RESUMING
+            l.print_contents()
+            self.logger.info("Resuming lease %i..." % (l.id))
+        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
+
+    def _handle_end_resume(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
+        l.print_contents()
+        # TODO: React to incomplete resume
+        self.resourcepool.verify_resume(l, rr)
+        rr.state = ResourceReservation.STATE_DONE
+        if rr.is_last():
+            l.state = Lease.STATE_RESUMED_READY
+            self.logger.info("Resumed lease %i" % (l.id))
+        for vnode, pnode in rr.vmrr.nodes.items():
+            self.resourcepool.remove_ramfile(pnode, l.id, vnode)
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
+
+    def _handle_start_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
+        l.print_contents()
+        rr.state = ResourceReservation.STATE_ACTIVE
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
+        self.logger.info("Migrating lease %i..." % (l.id))
+
+    def _handle_end_migrate(self, l, rr):
+        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
+        l.print_contents()
+
+        for vnode in rr.transfers:
+            origin = rr.transfers[vnode][0]
+            dest = rr.transfers[vnode][1]
+            
+            # Update VM image mappings
+            self.resourcepool.remove_diskimage(origin, l.id, vnode)
+            self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
+            l.diskimagemap[vnode] = dest
+
+            # Update RAM file mappings
+            self.resourcepool.remove_ramfile(origin, l.id, vnode)
+            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
+            l.memimagemap[vnode] = dest
+        
+        rr.state = ResourceReservation.STATE_DONE
+        l.print_contents()
+        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
+        self.logger.info("Migrated lease %i..." % (l.id))
+
+
+
+            
+
+class VMResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, nodes, res, backfill_reservation):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.nodes = nodes # { vnode -> pnode }
+        self.backfill_reservation = backfill_reservation
+        self.pre_rrs = []
+        self.post_rrs = []
+
+        # ONLY for simulation
+        self.__update_prematureend()
+
+    def update_start(self, time):
+        self.start = time
+        # ONLY for simulation
+        self.__update_prematureend()
+
+    def update_end(self, time):
+        self.end = time
+        # ONLY for simulation
+        self.__update_prematureend()
+        
+    # ONLY for simulation
+    def __update_prematureend(self):
+        if self.lease.duration.known != None:
+            remdur = self.lease.duration.get_remaining_known_duration()
+            rrdur = self.end - self.start
+            if remdur < rrdur:
+                self.prematureend = self.start + remdur
+            else:
+                self.prematureend = None
+        else:
+            self.prematureend = None 
+
+    def get_final_end(self):
+        if len(self.post_rrs) == 0:
+            return self.end
+        else:
+            return self.post_rrs[-1].end
+
+    def is_suspending(self):
+        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
+
+    def is_shutting_down(self):
+        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
+
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        for resmrr in self.pre_rrs:
+            resmrr.print_contents(loglevel)
+            self.logger.log(loglevel, "--")
+        self.logger.log(loglevel, "Type           : VM")
+        self.logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
+        if self.prematureend != None:
+            self.logger.log(loglevel, "Premature end  : %s" % self.prematureend)
+        ResourceReservation.print_contents(self, loglevel)
+        for susprr in self.post_rrs:
+            self.logger.log(loglevel, "--")
+            susprr.print_contents(loglevel)
+        
+    def is_preemptible(self):
+        return self.lease.preemptible
+
+    def xmlrpc_marshall(self):
+        rr = ResourceReservation.xmlrpc_marshall(self)
+        rr["type"] = "VM"
+        rr["nodes"] = self.nodes.items()
+        return rr
+
+        
+class SuspensionResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vnodes, vmrr):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.vnodes = vnodes
+
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : SUSPEND")
+        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
+        ResourceReservation.print_contents(self, loglevel)
+        
+    def is_first(self):
+        return (self == self.vmrr.post_rrs[0])
+
+    def is_last(self):
+        return (self == self.vmrr.post_rrs[-1])
+        
+    # TODO: Suspension RRs should be preemptible, but preempting a suspension RR
+    # has wider implications (with a non-trivial handling). For now, we leave them 
+    # as non-preemptible, since the probability of preempting a suspension RR is slim.
+    def is_preemptible(self):
+        return False        
+        
+    def xmlrpc_marshall(self):
+        rr = ResourceReservation.xmlrpc_marshall(self)
+        rr["type"] = "SUSP"
+        return rr
+        
+class ResumptionResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vnodes, vmrr):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.vnodes = vnodes
+
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : RESUME")
+        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
+        ResourceReservation.print_contents(self, loglevel)
+
+    def is_first(self):
+        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+        return (self == resm_rrs[0])
+
+    def is_last(self):
+        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
+        return (self == resm_rrs[-1])
+
+    # TODO: Resumption RRs should be preemptible, but preempting a resumption RR
+    # has wider implications (with a non-trivial handling). For now, we leave them 
+    # as non-preemptible, since the probability of preempting a resumption RR is slim.
+    def is_preemptible(self):
+        return False        
+        
+    def xmlrpc_marshall(self):
+        rr = ResourceReservation.xmlrpc_marshall(self)
+        rr["type"] = "RESM"
+        return rr
+    
+class ShutdownResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vnodes, vmrr):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.vnodes = vnodes
+
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : SHUTDOWN")
+        ResourceReservation.print_contents(self, loglevel)
+        
+    def is_preemptible(self):
+        return True        
+        
+    def xmlrpc_marshall(self):
+        rr = ResourceReservation.xmlrpc_marshall(self)
+        rr["type"] = "SHTD"
+        return rr
+
+class MigrationResourceReservation(ResourceReservation):
+    def __init__(self, lease, start, end, res, vmrr, transfers):
+        ResourceReservation.__init__(self, lease, start, end, res)
+        self.vmrr = vmrr
+        self.transfers = transfers
+        
+    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
+        self.logger.log(loglevel, "Type           : MIGRATE")
+        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
+        ResourceReservation.print_contents(self, loglevel)        
+
+    def is_preemptible(self):
+        return False        
+

Deleted: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/scheduler.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -1,1676 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago                                 #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
-# Complutense de Madrid (dsa-research.org)                                   #
-#                                                                            #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
-# not use this file except in compliance with the License. You may obtain    #
-# a copy of the License at                                                   #
-#                                                                            #
-# http://www.apache.org/licenses/LICENSE-2.0                                 #
-#                                                                            #
-# Unless required by applicable law or agreed to in writing, software        #
-# distributed under the License is distributed on an "AS IS" BASIS,          #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
-# See the License for the specific language governing permissions and        #
-# limitations under the License.                                             #
-# -------------------------------------------------------------------------- #
-
-
-"""This module provides the main classes for Haizea's scheduler, particularly
-the Scheduler class. The deployment scheduling code (everything that has to be 
-done to prepare a lease) happens in the modules inside the 
-haizea.resourcemanager.deployment package.
-
-This module provides the following classes:
-
-* SchedException: A scheduling exception
-* ReservationEventHandler: A simple wrapper class
-* Scheduler: Do I really need to spell this one out for you?
-
-TODO: The Scheduler class is in need of some serious refactoring. The likely outcome is
-that it will be divided into two classes: LeaseScheduler, which handles top-level
-lease constructs and doesn't interact with the slot table, and VMScheduler, which
-actually schedules the VMs. The slot table would be contained in VMScheduler and 
-in the lease preparation scheduler. In turn, these two would be contained in
-LeaseScheduler.
-"""
-
-import haizea.resourcemanager.datastruct as ds
-import haizea.common.constants as constants
-from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
-from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation, MigrationResourceReservation, ShutdownResourceReservation
-from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
-from operator import attrgetter, itemgetter
-from mx.DateTime import TimeDelta
-
-import logging
-
-class SchedException(Exception):
-    """A simple exception class used for scheduling exceptions"""
-    pass
-
-class NotSchedulableException(Exception):
-    """A simple exception class used when a lease cannot be scheduled
-    
-    This exception must be raised when a lease cannot be scheduled
-    (this is not necessarily an error condition, but the scheduler will
-    have to react to it)
-    """
-    pass
-
-class CriticalSchedException(Exception):
-    """A simple exception class used for critical scheduling exceptions
-    
-    This exception must be raised when a non-recoverable error happens
-    (e.g., when there are unexplained inconsistencies in the schedule,
-    typically resulting from a code error)
-    """
-    pass
-
-
-class ReservationEventHandler(object):
-    """A wrapper for reservation event handlers.
-    
-    Reservations (in the slot table) can start and they can end. This class
-    provides a convenient wrapper around the event handlers for these two
-    events (see Scheduler.__register_handler for details on event handlers)
-    """
-    def __init__(self, on_start, on_end):
-        self.on_start = on_start
-        self.on_end = on_end
-
-class Scheduler(object):
-    """The Haizea Scheduler
-    
-    Public methods:
-    schedule -- The scheduling function
-    process_reservations -- Processes starting/ending reservations at a given time
-    enqueue -- Queues a best-effort request
-    is_queue_empty -- Is the queue empty?
-    exists_scheduled_leases -- Are there any leases scheduled?
-    
-    Private methods:
-    __schedule_ar_lease -- Schedules an AR lease
-    __schedule_besteffort_lease -- Schedules a best-effort lease
-    __preempt -- Preempts a lease
-    __reevaluate_schedule -- Reevaluate the schedule (used after resources become
-                             unexpectedly unavailable)
-    _handle_* -- Reservation event handlers
-    
-    """
-    def __init__(self, slottable, resourcepool, deployment_scheduler):
-        self.slottable = slottable
-        self.resourcepool = resourcepool
-        self.deployment_scheduler = deployment_scheduler
-        self.logger = logging.getLogger("SCHED")
-
-        self.queue = ds.Queue(self)
-        self.leases = ds.LeaseTable(self)
-        self.completedleases = ds.LeaseTable(self)
-        
-        for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
-            self.slottable.add_node(n)
-
-        self.handlers = {}
-        
-        self.register_handler(type     = ds.VMResourceReservation, 
-                              on_start = Scheduler._handle_start_vm,
-                              on_end   = Scheduler._handle_end_vm)
-
-        self.register_handler(type     = ds.ShutdownResourceReservation, 
-                              on_start = Scheduler._handle_start_shutdown,
-                              on_end   = Scheduler._handle_end_shutdown)
-
-        self.register_handler(type     = ds.SuspensionResourceReservation, 
-                              on_start = Scheduler._handle_start_suspend,
-                              on_end   = Scheduler._handle_end_suspend)
-
-        self.register_handler(type     = ds.ResumptionResourceReservation, 
-                              on_start = Scheduler._handle_start_resume,
-                              on_end   = Scheduler._handle_end_resume)
-        
-        self.register_handler(type     = ds.MigrationResourceReservation, 
-                              on_start = Scheduler._handle_start_migrate,
-                              on_end   = Scheduler._handle_end_migrate)
-        
-        for (type, handler) in self.deployment_scheduler.handlers.items():
-            self.handlers[type] = handler
-
-        backfilling = get_config().get("backfilling")
-        if backfilling == constants.BACKFILLING_OFF:
-            self.maxres = 0
-        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
-            self.maxres = 1
-        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
-            self.maxres = 1000000 # Arbitrarily large
-        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
-            self.maxres = get_config().get("backfilling-reservations")
-
-        self.numbesteffortres = 0
-        
-    def schedule(self, nexttime):      
-        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
-        ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
-        im_leases = [req for req in pending_leases if isinstance(req, ImmediateLease)]
-        be_leases = [req for req in pending_leases if isinstance(req, BestEffortLease)]
-        
-        # Queue best-effort requests
-        for lease in be_leases:
-            self.enqueue(lease)
-        
-        # Process immediate requests
-        for lease_req in im_leases:
-            self.__process_im_request(lease_req, nexttime)
-
-        # Process AR requests
-        for lease_req in ar_leases:
-            self.__process_ar_request(lease_req, nexttime)
-            
-        # Process best-effort requests
-        self.__process_queue(nexttime)
-        
-    
-    def process_reservations(self, nowtime):
-        starting = self.slottable.get_reservations_starting_at(nowtime)
-        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
-        ending = self.slottable.get_reservations_ending_at(nowtime)
-        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
-        for rr in ending:
-            self._handle_end_rr(rr.lease, rr)
-            self.handlers[type(rr)].on_end(self, rr.lease, rr)
-        
-        for rr in starting:
-            self.handlers[type(rr)].on_start(self, rr.lease, rr)
-
-        util = self.slottable.get_utilization(nowtime)
-        if not util.has_key(VMResourceReservation):
-            cpuutil = 0.0
-        else:
-            cpuutil = util[VMResourceReservation]
-        get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, cpuutil)        
-        get_accounting().append_stat(constants.COUNTER_UTILIZATION, util)        
-        
-    def register_handler(self, type, on_start, on_end):
-        handler = ReservationEventHandler(on_start=on_start, on_end=on_end)
-        self.handlers[type] = handler        
-    
-    def enqueue(self, lease_req):
-        """Queues a best-effort lease request"""
-        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
-        lease_req.state = Lease.STATE_QUEUED
-        self.queue.enqueue(lease_req)
-        self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
-
-    def request_lease(self, lease):
-        """
-        Request a lease. At this point, it is simply marked as "Pending" and,
-        next time the scheduling function is called, the fate of the
-        lease will be determined (right now, AR+IM leases get scheduled
-        right away, and best-effort leases get placed on a queue)
-        """
-        lease.state = Lease.STATE_PENDING
-        self.leases.add(lease)
-
-    def is_queue_empty(self):
-        """Return True is the queue is empty, False otherwise"""
-        return self.queue.is_empty()
-
-    
-    def exists_scheduled_leases(self):
-        """Return True if there are any leases scheduled in the future"""
-        return not self.slottable.is_empty()    
-
-    def cancel_lease(self, lease_id):
-        """Cancels a lease.
-        
-        Arguments:
-        lease_id -- ID of lease to cancel
-        """
-        time = get_clock().get_time()
-        
-        self.logger.info("Cancelling lease %i..." % lease_id)
-        if self.leases.has_lease(lease_id):
-            # The lease is either running, or scheduled to run
-            lease = self.leases.get_lease(lease_id)
-            
-            if lease.state == Lease.STATE_ACTIVE:
-                self.logger.info("Lease %i is active. Stopping active reservation..." % lease_id)
-                rr = lease.get_active_reservations(time)[0]
-                if isinstance(rr, VMResourceReservation):
-                    self._handle_unscheduled_end_vm(lease, rr, enact=True)
-                # TODO: Handle cancelations in middle of suspensions and
-                # resumptions                
-            elif lease.state in [Lease.STATE_SCHEDULED, Lease.STATE_READY]:
-                self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease_id)
-                rrs = lease.get_scheduled_reservations()
-                for r in rrs:
-                    lease.remove_rr(r)
-                    self.slottable.removeReservation(r)
-                lease.state = Lease.STATE_CANCELLED
-                self.completedleases.add(lease)
-                self.leases.remove(lease)
-        elif self.queue.has_lease(lease_id):
-            # The lease is in the queue, waiting to be scheduled.
-            # Cancelling is as simple as removing it from the queue
-            self.logger.info("Lease %i is in the queue. Removing..." % lease_id)
-            l = self.queue.get_lease(lease_id)
-            self.queue.remove_lease(lease)
-    
-    def fail_lease(self, lease_id):
-        """Transitions a lease to a failed state, and does any necessary cleaning up
-        
-        TODO: For now, just use the cancelling algorithm
-        
-        Arguments:
-        lease -- Lease to fail
-        """    
-        try:
-            raise
-            self.cancel_lease(lease_id)
-        except Exception, msg:
-            # Exit if something goes horribly wrong
-            raise CriticalSchedException()      
-    
-    def notify_event(self, lease_id, event):
-        time = get_clock().get_time()
-        if event == constants.EVENT_END_VM:
-            lease = self.leases.get_lease(lease_id)
-            vmrr = lease.get_last_vmrr()
-            self._handle_unscheduled_end_vm(lease, vmrr, enact=False)
-
-    
-    def __process_ar_request(self, lease_req, nexttime):
-        self.logger.info("Received AR lease request #%i, %i nodes from %s to %s." % (lease_req.id, lease_req.numnodes, lease_req.start.requested, lease_req.start.requested + lease_req.duration.requested))
-        self.logger.debug("  Start   : %s" % lease_req.start)
-        self.logger.debug("  Duration: %s" % lease_req.duration)
-        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
-        
-        accepted = False
-        try:
-            self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
-            self.leases.add(lease_req)
-            get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
-            accepted = True
-        except SchedException, msg:
-            # Our first try avoided preemption, try again
-            # without avoiding preemption.
-            # TODO: Roll this into the exact slot fitting algorithm
-            try:
-                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-                self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
-                self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
-                self.leases.add(lease_req)
-                get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
-                accepted = True
-            except SchedException, msg:
-                get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
-                self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-
-        if accepted:
-            self.logger.info("AR lease request #%i has been accepted." % lease_req.id)
-        else:
-            self.logger.info("AR lease request #%i has been rejected." % lease_req.id)
-            lease_req.state = Lease.STATE_REJECTED
-            self.completedleases.add(lease_req)
-            self.leases.remove(lease_req)
-        
-        
-    def __process_queue(self, nexttime):
-        done = False
-        newqueue = ds.Queue(self)
-        while not done and not self.is_queue_empty():
-            if self.numbesteffortres == self.maxres and self.slottable.isFull(nexttime):
-                self.logger.debug("Used up all reservations and slot table is full. Skipping rest of queue.")
-                done = True
-            else:
-                lease_req = self.queue.dequeue()
-                try:
-                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease_req.id)
-                    self.logger.debug("  Duration: %s" % lease_req.duration)
-                    self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
-                    self.__schedule_besteffort_lease(lease_req, nexttime)
-                    self.leases.add(lease_req)
-                    get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
-                except SchedException, msg:
-                    # Put back on queue
-                    newqueue.enqueue(lease_req)
-                    self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-                    self.logger.info("Lease %i could not be scheduled at this time." % lease_req.id)
-                    if not self.is_backfilling():
-                        done = True
-        
-        for lease in self.queue:
-            newqueue.enqueue(lease)
-        
-        self.queue = newqueue 
-
-
-    def __process_im_request(self, lease_req, nexttime):
-        self.logger.info("Received immediate lease request #%i (%i nodes)" % (lease_req.id, lease_req.numnodes))
-        self.logger.debug("  Duration: %s" % lease_req.duration)
-        self.logger.debug("  ResReq  : %s" % lease_req.requested_resources)
-        
-        try:
-            self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
-            self.leases.add(lease_req)
-            get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
-            self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
-        except SchedException, msg:
-            get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
-            self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
-    
-    
-    def __schedule_ar_lease(self, lease_req, nexttime, avoidpreempt=True):
-        try:
-            (vmrr, preemptions) = self.__fit_exact(lease_req, preemptible=False, canpreempt=True, avoidpreempt=avoidpreempt)
-            
-            if len(preemptions) > 0:
-                leases = self.__find_preemptable_leases(preemptions, vmrr.start, vmrr.end)
-                self.logger.info("Must preempt leases %s to make room for AR lease #%i" % ([l.id for l in leases], lease_req.id))
-                for lease in leases:
-                    self.__preempt(lease, preemption_time=vmrr.start)
-
-            # Schedule deployment overhead
-            self.deployment_scheduler.schedule(lease_req, vmrr, nexttime)
-            
-            # Commit reservation to slot table
-            # (we don't do this until the very end because the deployment overhead
-            # scheduling could still throw an exception)
-            lease_req.append_vmrr(vmrr)
-            self.slottable.addReservation(vmrr)
-            
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
-        except Exception, msg:
-            raise SchedException, "The requested AR lease is infeasible. Reason: %s" % msg
-
-
-    def __schedule_besteffort_lease(self, lease, nexttime):            
-        try:
-            # Schedule the VMs
-            canreserve = self.__can_reserve_besteffort_in_future()
-            (vmrr, in_future) = self.__fit_asap(lease, nexttime, allow_reservation_in_future = canreserve)
-            
-            # Schedule deployment
-            if lease.state != Lease.STATE_SUSPENDED:
-                self.deployment_scheduler.schedule(lease, vmrr, nexttime)
-            else:
-                self.__schedule_migration(lease, vmrr, nexttime)
-
-            # At this point, the lease is feasible.
-            # Commit changes by adding RRs to lease and to slot table
-            
-            # Add VMRR to lease
-            lease.append_vmrr(vmrr)
-            
-
-            # Add resource reservations to slottable
-            
-            # TODO: deployment RRs should be added here, not in the preparation scheduler
-            
-            # Pre-VM RRs (if any)
-            for rr in vmrr.pre_rrs:
-                self.slottable.addReservation(rr)
-                
-            # VM
-            self.slottable.addReservation(vmrr)
-            
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
-           
-            if in_future:
-                self.numbesteffortres += 1
-                
-            lease.print_contents()
-
-        except SchedException, msg:
-            raise SchedException, "The requested best-effort lease is infeasible. Reason: %s" % msg
-
-        
-        
-        
-    def __schedule_immediate_lease(self, req, nexttime):
-        try:
-            (vmrr, in_future) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
-            # Schedule deployment
-            self.deployment_scheduler.schedule(req, vmrr, nexttime)
-                        
-            req.append_rr(vmrr)
-            self.slottable.addReservation(vmrr)
-            
-            # Post-VM RRs (if any)
-            for rr in vmrr.post_rrs:
-                self.slottable.addReservation(rr)
-                    
-            req.print_contents()
-        except SlotFittingException, msg:
-            raise SchedException, "The requested immediate lease is infeasible. Reason: %s" % msg
-        
-    def __fit_exact(self, leasereq, preemptible=False, canpreempt=True, avoidpreempt=True):
-        lease_id = leasereq.id
-        start = leasereq.start.requested
-        end = leasereq.start.requested + leasereq.duration.requested + self.__estimate_shutdown_time(leasereq)
-        diskImageID = leasereq.diskimage_id
-        numnodes = leasereq.numnodes
-        resreq = leasereq.requested_resources
-
-        availabilitywindow = self.slottable.availabilitywindow
-
-        availabilitywindow.initWindow(start, resreq, canpreempt=canpreempt)
-        availabilitywindow.printContents(withpreemption = False)
-        availabilitywindow.printContents(withpreemption = True)
-
-        mustpreempt = False
-        unfeasiblewithoutpreemption = False
-        
-        fitatstart = availabilitywindow.fitAtStart(canpreempt = False)
-        if fitatstart < numnodes:
-            if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                unfeasiblewithoutpreemption = True
-        feasibleend, canfitnopreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = False)
-        fitatend = sum([n for n in canfitnopreempt.values()])
-        if fitatend < numnodes:
-            if not canpreempt:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                unfeasiblewithoutpreemption = True
-
-        canfitpreempt = None
-        if canpreempt:
-            fitatstart = availabilitywindow.fitAtStart(canpreempt = True)
-            if fitatstart < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            feasibleendpreempt, canfitpreempt = availabilitywindow.findPhysNodesForVMs(numnodes, end, strictend=True, canpreempt = True)
-            fitatend = sum([n for n in canfitpreempt.values()])
-            if fitatend < numnodes:
-                raise SlotFittingException, "Not enough resources in specified interval"
-            else:
-                if unfeasiblewithoutpreemption:
-                    mustpreempt = True
-                else:
-                    mustpreempt = False
-
-        # At this point we know if the lease is feasible, and if
-        # will require preemption.
-        if not mustpreempt:
-           self.logger.debug("The VM reservations for this lease are feasible without preemption.")
-        else:
-           self.logger.debug("The VM reservations for this lease are feasible but will require preemption.")
-
-        # merge canfitnopreempt and canfitpreempt
-        canfit = {}
-        for node in canfitnopreempt:
-            vnodes = canfitnopreempt[node]
-            canfit[node] = [vnodes, vnodes]
-        for node in canfitpreempt:
-            vnodes = canfitpreempt[node]
-            if canfit.has_key(node):
-                canfit[node][1] = vnodes
-            else:
-                canfit[node] = [0, vnodes]
-
-        orderednodes = self.__choose_nodes(canfit, start, canpreempt, avoidpreempt)
-            
-        self.logger.debug("Node ordering: %s" % orderednodes)
-        
-        # vnode -> pnode
-        nodeassignment = {}
-        
-        # pnode -> resourcetuple
-        res = {}
-        
-        # physnode -> how many vnodes
-        preemptions = {}
-        
-        vnode = 1
-        if avoidpreempt:
-            # First pass, without preemption
-            for physnode in orderednodes:
-                canfitinnode = canfit[physnode][0]
-                for i in range(1, canfitinnode+1):
-                    nodeassignment[vnode] = physnode
-                    if res.has_key(physnode):
-                        res[physnode].incr(resreq)
-                    else:
-                        res[physnode] = ds.ResourceTuple.copy(resreq)
-                    canfit[physnode][0] -= 1
-                    canfit[physnode][1] -= 1
-                    vnode += 1
-                    if vnode > numnodes:
-                        break
-                if vnode > numnodes:
-                    break
-            
-        # Second pass, with preemption
-        if mustpreempt or not avoidpreempt:
-            for physnode in orderednodes:
-                canfitinnode = canfit[physnode][1]
-                for i in range(1, canfitinnode+1):
-                    nodeassignment[vnode] = physnode
-                    if res.has_key(physnode):
-                        res[physnode].incr(resreq)
-                    else:
-                        res[physnode] = ds.ResourceTuple.copy(resreq)
-                    canfit[physnode][1] -= 1
-                    vnode += 1
-                    # Check if this will actually result in a preemption
-                    if canfit[physnode][0] == 0:
-                        if preemptions.has_key(physnode):
-                            preemptions[physnode].incr(resreq)
-                        else:
-                            preemptions[physnode] = ds.ResourceTuple.copy(resreq)
-                    else:
-                        canfit[physnode][0] -= 1
-                    if vnode > numnodes:
-                        break
-                if vnode > numnodes:
-                    break
-
-        if vnode <= numnodes:
-            raise SchedException, "Availability window indicated that request is feasible, but could not fit it"
-
-        # Create VM resource reservations
-        vmrr = ds.VMResourceReservation(leasereq, start, end, nodeassignment, res, False)
-        vmrr.state = ResourceReservation.STATE_SCHEDULED
-
-        self.__schedule_shutdown(vmrr)
-
-        return vmrr, preemptions
-
-    def __fit_asap(self, lease, nexttime, allow_reservation_in_future = False):
-        lease_id = lease.id
-        remaining_duration = lease.duration.get_remaining_duration()
-        numnodes = lease.numnodes
-        requested_resources = lease.requested_resources
-        preemptible = lease.preemptible
-        mustresume = (lease.state == Lease.STATE_SUSPENDED)
-        shutdown_time = self.__estimate_shutdown_time(lease)
-        susptype = get_config().get("suspension")
-        if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
-            suspendable = False
-        else:
-            suspendable = True
-
-        # Determine earliest start time in each node
-        if lease.state == Lease.STATE_QUEUED or lease.state == Lease.STATE_PENDING:
-            # Figure out earliest start times based on
-            # image schedule and reusable images
-            earliest = self.deployment_scheduler.find_earliest_starting_times(lease, nexttime)
-        elif lease.state == Lease.STATE_SUSPENDED:
-            # No need to transfer images from repository
-            # (only intra-node transfer)
-            earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
-
-
-        canmigrate = get_config().get("migration")
-
-        #
-        # STEP 1: FIGURE OUT THE MINIMUM DURATION
-        #
-        
-        min_duration = self.__compute_scheduling_threshold(lease)
-
-
-        #
-        # STEP 2: FIND THE CHANGEPOINTS
-        #
-
-        # Find the changepoints, and the nodes we can use at each changepoint
-        # Nodes may not be available at a changepoint because images
-        # cannot be transferred at that time.
-        if not mustresume:
-            cps = [(node, e[0]) for node, e in earliest.items()]
-            cps.sort(key=itemgetter(1))
-            curcp = None
-            changepoints = []
-            nodes = []
-            for node, time in cps:
-                nodes.append(node)
-                if time != curcp:
-                    changepoints.append([time, nodes[:]])
-                    curcp = time
-                else:
-                    changepoints[-1][1] = nodes[:]
-        else:
-            if not canmigrate:
-                vmrr = lease.get_last_vmrr()
-                curnodes = set(vmrr.nodes.values())
-            else:
-                curnodes=None
-                # If we have to resume this lease, make sure that
-                # we have enough time to transfer the images.
-                migratetime = self.__estimate_migration_time(lease)
-                earliesttransfer = get_clock().get_time() + migratetime
-    
-                for n in earliest:
-                    earliest[n][0] = max(earliest[n][0], earliesttransfer)
-
-            changepoints = list(set([x[0] for x in earliest.values()]))
-            changepoints.sort()
-            changepoints = [(x, curnodes) for x in changepoints]
-
-        # If we can make reservations in the future,
-        # we also consider future changepoints
-        # (otherwise, we only allow the VMs to start "now", accounting
-        #  for the fact that vm images will have to be deployed)
-        if allow_reservation_in_future:
-            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
-            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
-            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
-            # included in futurecp.
-            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
-            futurecp = [(p,None) for p in futurecp]
-        else:
-            futurecp = []
-
-
-
-        #
-        # STEP 3: SLOT FITTING
-        #
-        
-        # If resuming, we also have to allocate enough for the resumption
-        if mustresume:
-            duration = remaining_duration + self.__estimate_resume_time(lease)
-        else:
-            duration = remaining_duration
-
-        duration += shutdown_time
-
-        # First, assuming we can't make reservations in the future
-        start, end, canfit = self.__find_fit_at_points(
-                                                       changepoints, 
-                                                       numnodes, 
-                                                       requested_resources, 
-                                                       duration, 
-                                                       suspendable, 
-                                                       min_duration)
-        
-        if start == None:
-            if not allow_reservation_in_future:
-                # We did not find a suitable starting time. This can happen
-                # if we're unable to make future reservations
-                raise SchedException, "Could not find enough resources for this request"
-        else:
-            mustsuspend = (end - start) < duration
-            if mustsuspend and not suspendable:
-                if not allow_reservation_in_future:
-                    raise SchedException, "Scheduling this lease would require preempting it, which is not allowed"
-                else:
-                    start = None # No satisfactory start time
-            
-        # If we haven't been able to fit the lease, check if we can
-        # reserve it in the future
-        if start == None and allow_reservation_in_future:
-            start, end, canfit = self.__find_fit_at_points(
-                                                           futurecp, 
-                                                           numnodes, 
-                                                           requested_resources, 
-                                                           duration, 
-                                                           suspendable, 
-                                                           min_duration
-                                                           )
-
-
-        if start in [p[0] for p in futurecp]:
-            reservation = True
-        else:
-            reservation = False
-
-
-        #
-        # STEP 4: FINAL SLOT FITTING
-        #
-        # At this point, we know the lease fits, but we have to map it to
-        # specific physical nodes.
-        
-        # Sort physical nodes
-        physnodes = canfit.keys()
-        if mustresume:
-            # If we're resuming, we prefer resuming in the nodes we're already
-            # deployed in, to minimize the number of transfers.
-            vmrr = lease.get_last_vmrr()
-            nodes = set(vmrr.nodes.values())
-            availnodes = set(physnodes)
-            deplnodes = availnodes.intersection(nodes)
-            notdeplnodes = availnodes.difference(nodes)
-            physnodes = list(deplnodes) + list(notdeplnodes)
-        else:
-            physnodes.sort() # Arbitrary, prioritize nodes, as in exact
-        
-        # Map to physical nodes
-        mappings = {}
-        res = {}
-        vmnode = 1
-        while vmnode <= numnodes:
-            for n in physnodes:
-                if canfit[n]>0:
-                    canfit[n] -= 1
-                    mappings[vmnode] = n
-                    if res.has_key(n):
-                        res[n].incr(requested_resources)
-                    else:
-                        res[n] = ds.ResourceTuple.copy(requested_resources)
-                    vmnode += 1
-                    break
-
-
-        vmrr = ds.VMResourceReservation(lease, start, end, mappings, res, reservation)
-        vmrr.state = ResourceReservation.STATE_SCHEDULED
-
-        if mustresume:
-            self.__schedule_resumption(vmrr, start)
-
-        mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
-        if mustsuspend:
-            self.__schedule_suspension(vmrr, end)
-        else:
-            # Compensate for any overestimation
-            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
-                vmrr.end = vmrr.start + remaining_duration + shutdown_time
-            self.__schedule_shutdown(vmrr)
-        
-
-        
-        susp_str = res_str = ""
-        if mustresume:
-            res_str = " (resuming)"
-        if mustsuspend:
-            susp_str = " (suspending)"
-        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mappings.values(), start, res_str, end, susp_str))
-
-        return vmrr, reservation
-
-    def __find_fit_at_points(self, changepoints, numnodes, resources, duration, suspendable, min_duration):
-        start = None
-        end = None
-        canfit = None
-        availabilitywindow = self.slottable.availabilitywindow
-
-
-        for p in changepoints:
-            availabilitywindow.initWindow(p[0], resources, p[1], canpreempt = False)
-            availabilitywindow.printContents()
-            
-            if availabilitywindow.fitAtStart() >= numnodes:
-                start=p[0]
-                maxend = start + duration
-                end, canfit = availabilitywindow.findPhysNodesForVMs(numnodes, maxend)
-        
-                self.logger.debug("This lease can be scheduled from %s to %s" % (start, end))
-                
-                if end < maxend:
-                    self.logger.debug("This lease will require suspension (maxend = %s)" % (maxend))
-                    
-                    if not suspendable:
-                        pass
-                        # If we can't suspend, this fit is no good, and we have to keep looking
-                    else:
-                        # If we can suspend, we still have to check if the lease will
-                        # be able to run for the specified minimum duration
-                        if end-start > min_duration:
-                            break # We found a fit; stop looking
-                        else:
-                            self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (end-start, min_duration))
-                            # Set start back to None, to indicate that we haven't
-                            # found a satisfactory start time
-                            start = None
-                else:
-                    # We've found a satisfactory starting time
-                    break        
-                
-        return start, end, canfit
-    
-    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
-        times = [] # (start, end, {pnode -> vnodes})
-        enactment_overhead = get_config().get("enactment-overhead") 
-
-        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
-            # Global exclusion (which represents, e.g., reading/writing the memory image files
-            # from a global file system) meaning no two suspensions/resumptions can happen at 
-            # the same time in the entire resource pool.
-            
-            t = time
-            t_prev = None
-                
-            for (vnode,pnode) in vmrr.nodes.items():
-                if override == None:
-                    mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-                    op_time = self.__compute_suspend_resume_time(mem, rate)
-                else:
-                    op_time = override
-
-                op_time += enactment_overhead
-                    
-                t_prev = t
-                
-                if direction == constants.DIRECTION_FORWARD:
-                    t += op_time
-                    times.append((t_prev, t, {pnode:[vnode]}))
-                elif direction == constants.DIRECTION_BACKWARD:
-                    t -= op_time
-                    times.append((t, t_prev, {pnode:[vnode]}))
-
-        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
-            # Local exclusion (which represents, e.g., reading the memory image files
-            # from a local file system) means no two resumptions can happen at the same
-            # time in the same physical node.
-            pervnode_times = [] # (start, end, vnode)
-            vnodes_in_pnode = {}
-            for (vnode,pnode) in vmrr.nodes.items():
-                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
-            for pnode in vnodes_in_pnode:
-                t = time
-                t_prev = None
-                for vnode in vnodes_in_pnode[pnode]:
-                    if override == None:
-                        mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-                        op_time = self.__compute_suspend_resume_time(mem, rate)
-                    else:
-                        op_time = override                    
-                    
-                    t_prev = t
-                    
-                    if direction == constants.DIRECTION_FORWARD:
-                        t += op_time
-                        pervnode_times.append((t_prev, t, vnode))
-                    elif direction == constants.DIRECTION_BACKWARD:
-                        t -= op_time
-                        pervnode_times.append((t, t_prev, vnode))
-            
-            # Consolidate suspend/resume operations happening at the same time
-            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
-            for (start, end) in uniq_times:
-                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
-                node_mappings = {}
-                for vnode in vnodes:
-                    pnode = vmrr.nodes[vnode]
-                    node_mappings.setdefault(pnode, []).append(vnode)
-                times.append([start,end,node_mappings])
-        
-            # Add the enactment overhead
-            for t in times:
-                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
-                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
-                if direction == constants.DIRECTION_FORWARD:
-                    t[1] += overhead
-                elif direction == constants.DIRECTION_BACKWARD:
-                    t[0] -= overhead
-                    
-            # Fix overlaps
-            if direction == constants.DIRECTION_FORWARD:
-                times.sort(key=itemgetter(0))
-            elif direction == constants.DIRECTION_BACKWARD:
-                times.sort(key=itemgetter(1))
-                times.reverse()
-                
-            prev_start = None
-            prev_end = None
-            for t in times:
-                if prev_start != None:
-                    start = t[0]
-                    end = t[1]
-                    if direction == constants.DIRECTION_FORWARD:
-                        if start < prev_end:
-                            diff = prev_end - start
-                            t[0] += diff
-                            t[1] += diff
-                    elif direction == constants.DIRECTION_BACKWARD:
-                        if end > prev_start:
-                            diff = end - prev_start
-                            t[0] -= diff
-                            t[1] -= diff
-                prev_start = t[0]
-                prev_end = t[1]
-        
-        return times
-    
-    def __schedule_shutdown(self, vmrr):
-        config = get_config()
-        shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
-
-        start = vmrr.end - shutdown_time
-        end = vmrr.end
-        
-        shutdown_rr = ds.ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
-        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
-                
-        vmrr.update_end(start)
-        
-        # If there are any post RRs, remove them
-        for rr in vmrr.post_rrs:
-            self.slottable.removeReservation(rr)
-        vmrr.post_rrs = []
-
-        vmrr.post_rrs.append(shutdown_rr)
-
-    def __schedule_suspension(self, vmrr, suspend_by):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        susp_exclusion = config.get("suspendresume-exclusion")
-        override = get_config().get("override-suspend-time")
-        rate = config.get("suspend-rate") 
-
-        if suspend_by < vmrr.start or suspend_by > vmrr.end:
-            raise SchedException, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
-
-        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
-        suspend_rrs = []
-        for (start, end, node_mappings) in times:
-            suspres = {}
-            all_vnodes = []
-            for (pnode,vnodes) in node_mappings.items():
-                num_vnodes = len(vnodes)
-                r = ds.ResourceTuple.create_empty()
-                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
-                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
-                suspres[pnode] = r          
-                all_vnodes += vnodes                      
-            susprr = ds.SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
-            susprr.state = ResourceReservation.STATE_SCHEDULED
-            suspend_rrs.append(susprr)
-                
-        suspend_rrs.sort(key=attrgetter("start"))
-            
-        susp_start = suspend_rrs[0].start
-        if susp_start < vmrr.start:
-            raise SchedException, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
-        
-        vmrr.update_end(susp_start)
-        
-        # If there are any post RRs, remove them
-        for rr in vmrr.post_rrs:
-            self.slottable.removeReservation(rr)
-        vmrr.post_rrs = []
-
-        for susprr in suspend_rrs:
-            vmrr.post_rrs.append(susprr)       
-            
-    def __schedule_resumption(self, vmrr, resume_at):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        resm_exclusion = config.get("suspendresume-exclusion")        
-        override = get_config().get("override-resume-time")
-        rate = config.get("resume-rate") 
-
-        if resume_at < vmrr.start or resume_at > vmrr.end:
-            raise SchedException, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
-
-        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
-        resume_rrs = []
-        for (start, end, node_mappings) in times:
-            resmres = {}
-            all_vnodes = []
-            for (pnode,vnodes) in node_mappings.items():
-                num_vnodes = len(vnodes)
-                r = ds.ResourceTuple.create_empty()
-                mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
-                r.set_by_type(constants.RES_MEM, mem * num_vnodes)
-                r.set_by_type(constants.RES_DISK, mem * num_vnodes)
-                resmres[pnode] = r
-                all_vnodes += vnodes
-            resmrr = ds.ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
-            resmrr.state = ResourceReservation.STATE_SCHEDULED
-            resume_rrs.append(resmrr)
-                
-        resume_rrs.sort(key=attrgetter("start"))
-            
-        resm_end = resume_rrs[-1].end
-        if resm_end > vmrr.end:
-            raise SchedException, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
-        
-        vmrr.update_start(resm_end)
-        for resmrr in resume_rrs:
-            vmrr.pre_rrs.append(resmrr)        
-           
-
-    # TODO: This has to be tied in with the preparation scheduler
-    def __schedule_migration(self, lease, vmrr, nexttime):
-        last_vmrr = lease.get_last_vmrr()
-        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
-        
-        mustmigrate = False
-        for vnode in vnode_migrations:
-            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
-                mustmigrate = True
-                break
-            
-        if not mustmigrate:
-            return
-
-        # Figure out what migrations can be done simultaneously
-        migrations = []
-        while len(vnode_migrations) > 0:
-            pnodes = set()
-            migration = {}
-            for vnode in vnode_migrations:
-                origin = vnode_migrations[vnode][0]
-                dest = vnode_migrations[vnode][1]
-                if not origin in pnodes and not dest in pnodes:
-                    migration[vnode] = vnode_migrations[vnode]
-                    pnodes.add(origin)
-                    pnodes.add(dest)
-            for vnode in migration:
-                del vnode_migrations[vnode]
-            migrations.append(migration)
-        
-        # Create migration RRs
-        start = last_vmrr.post_rrs[-1].end
-        migr_time = self.__estimate_migration_time(lease)
-        bandwidth = self.resourcepool.info.get_migration_bandwidth()
-        migr_rrs = []
-        for m in migrations:
-            end = start + migr_time
-            res = {}
-            for (origin,dest) in m.values():
-                resorigin = ds.ResourceTuple.create_empty()
-                resorigin.set_by_type(constants.RES_NETOUT, bandwidth)
-                resdest = ds.ResourceTuple.create_empty()
-                resdest.set_by_type(constants.RES_NETIN, bandwidth)
-                res[origin] = resorigin
-                res[dest] = resdest
-            migr_rr = MigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
-            migr_rr.state = ResourceReservation.STATE_SCHEDULED
-            migr_rrs.append(migr_rr)
-            start = end
-
-        migr_rrs.reverse()
-        for migr_rr in migr_rrs:
-            vmrr.pre_rrs.insert(0, migr_rr)
-
-    def __compute_suspend_resume_time(self, mem, rate):
-        time = float(mem) / rate
-        time = round_datetime_delta(TimeDelta(seconds = time))
-        return time
-    
-    def __estimate_suspend_resume_time(self, lease, rate):
-        susp_exclusion = get_config().get("suspendresume-exclusion")        
-        enactment_overhead = get_config().get("enactment-overhead") 
-        mem = lease.requested_resources.get_by_type(constants.RES_MEM)
-        if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
-            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
-        elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
-            # Overestimating
-            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
-
-    def __estimate_shutdown_time(self, lease):
-        enactment_overhead = get_config().get("enactment-overhead").seconds
-        return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
-
-    def __estimate_suspend_time(self, lease):
-        rate = get_config().get("suspend-rate")
-        override = get_config().get("override-suspend-time")
-        if override != None:
-            return override
-        else:
-            return self.__estimate_suspend_resume_time(lease, rate)
-
-    def __estimate_resume_time(self, lease):
-        rate = get_config().get("resume-rate") 
-        override = get_config().get("override-resume-time")
-        if override != None:
-            return override
-        else:
-            return self.__estimate_suspend_resume_time(lease, rate)
-
-
-    def __estimate_migration_time(self, lease):
-        whattomigrate = get_config().get("what-to-migrate")
-        bandwidth = self.resourcepool.info.get_migration_bandwidth()
-        if whattomigrate == constants.MIGRATE_NONE:
-            return TimeDelta(seconds=0)
-        else:
-            if whattomigrate == constants.MIGRATE_MEM:
-                mbtotransfer = lease.requested_resources.get_by_type(constants.RES_MEM)
-            elif whattomigrate == constants.MIGRATE_MEMDISK:
-                mbtotransfer = lease.diskimage_size + lease.requested_resources.get_by_type(constants.RES_MEM)
-            return estimate_transfer_time(mbtotransfer, bandwidth)
-
-    # TODO: Take into account other things like boot overhead, migration overhead, etc.
-    def __compute_scheduling_threshold(self, lease):
-        from haizea.resourcemanager.rm import ResourceManager
-        config = ResourceManager.get_singleton().config
-        threshold = config.get("force-scheduling-threshold")
-        if threshold != None:
-            # If there is a hard-coded threshold, use that
-            return threshold
-        else:
-            factor = config.get("scheduling-threshold-factor")
-            susp_overhead = self.__estimate_suspend_time(lease)
-            safe_duration = susp_overhead
-            
-            if lease.state == Lease.STATE_SUSPENDED:
-                resm_overhead = self.__estimate_resume_time(lease)
-                safe_duration += resm_overhead
-            
-            # TODO: Incorporate other overheads into the minimum duration
-            min_duration = safe_duration
-            
-            # At the very least, we want to allocate enough time for the
-            # safe duration (otherwise, we'll end up with incorrect schedules,
-            # where a lease is scheduled to suspend, but isn't even allocated
-            # enough time to suspend). 
-            # The factor is assumed to be non-negative. i.e., a factor of 0
-            # means we only allocate enough time for potential suspend/resume
-            # operations, while a factor of 1 means the lease will get as much
-            # running time as spend on the runtime overheads involved in setting
-            # it up
-            threshold = safe_duration + (min_duration * factor)
-            return threshold
-
-    def __choose_nodes(self, canfit, start, canpreempt, avoidpreempt):
-        # TODO2: Choose appropriate prioritizing function based on a
-        # config file, instead of hardcoding it)
-        #
-        # TODO3: Basing decisions only on CPU allocations. This is ok for now,
-        # since the memory allocation is proportional to the CPU allocation.
-        # Later on we need to come up with some sort of weighed average.
-        
-        nodes = canfit.keys()
-        
-        # TODO: The deployment module should just provide a list of nodes
-        # it prefers
-        nodeswithimg=[]
-        #self.lease_deployment_type = get_config().get("lease-preparation")
-        #if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
-        #    reusealg = get_config().get("diskimage-reuse")
-        #    if reusealg==constants.REUSE_IMAGECACHES:
-        #        nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
-
-        # Compares node x and node y. 
-        # Returns "x is ??? than y" (???=BETTER/WORSE/EQUAL)
-        def comparenodes(x, y):
-            hasimgX = x in nodeswithimg
-            hasimgY = y in nodeswithimg
-
-            # First comparison: A node with no preemptible VMs is preferible
-            # to one with preemptible VMs (i.e. we want to avoid preempting)
-            canfitnopreemptionX = canfit[x][0]
-            canfitpreemptionX = canfit[x][1]
-            hasPreemptibleX = canfitpreemptionX > canfitnopreemptionX
-            
-            canfitnopreemptionY = canfit[y][0]
-            canfitpreemptionY = canfit[y][1]
-            hasPreemptibleY = canfitpreemptionY > canfitnopreemptionY
-
-            # TODO: Factor out common code
-            if avoidpreempt:
-                if hasPreemptibleX and not hasPreemptibleY:
-                    return constants.WORSE
-                elif not hasPreemptibleX and hasPreemptibleY:
-                    return constants.BETTER
-                elif not hasPreemptibleX and not hasPreemptibleY:
-                    if hasimgX and not hasimgY: 
-                        return constants.BETTER
-                    elif not hasimgX and hasimgY: 
-                        return constants.WORSE
-                    else:
-                        if canfitnopreemptionX > canfitnopreemptionY: return constants.BETTER
-                        elif canfitnopreemptionX < canfitnopreemptionY: return constants.WORSE
-                        else: return constants.EQUAL
-                elif hasPreemptibleX and hasPreemptibleY:
-                    # If both have (some) preemptible resources, we prefer those
-                    # that involve the less preemptions
-                    preemptX = canfitpreemptionX - canfitnopreemptionX
-                    preemptY = canfitpreemptionY - canfitnopreemptionY
-                    if preemptX < preemptY:
-                        return constants.BETTER
-                    elif preemptX > preemptY:
-                        return constants.WORSE
-                    else:
-                        if hasimgX and not hasimgY: return constants.BETTER
-                        elif not hasimgX and hasimgY: return constants.WORSE
-                        else: return constants.EQUAL
-            elif not avoidpreempt:
-                # First criteria: Can we reuse image?
-                if hasimgX and not hasimgY: 
-                    return constants.BETTER
-                elif not hasimgX and hasimgY: 
-                    return constants.WORSE
-                else:
-                    # Now we just want to avoid preemption
-                    if hasPreemptibleX and not hasPreemptibleY:
-                        return constants.WORSE
-                    elif not hasPreemptibleX and hasPreemptibleY:
-                        return constants.BETTER
-                    elif hasPreemptibleX and hasPreemptibleY:
-                        # If both have (some) preemptible resources, we prefer those
-                        # that involve the less preemptions
-                        preemptX = canfitpreemptionX - canfitnopreemptionX
-                        preemptY = canfitpreemptionY - canfitnopreemptionY
-                        if preemptX < preemptY:
-                            return constants.BETTER
-                        elif preemptX > preemptY:
-                            return constants.WORSE
-                        else:
-                            if hasimgX and not hasimgY: return constants.BETTER
-                            elif not hasimgX and hasimgY: return constants.WORSE
-                            else: return constants.EQUAL
-                    else:
-                        return constants.EQUAL
-        
-        # Order nodes
-        nodes.sort(comparenodes)
-        return nodes        
-
-    def __find_preemptable_leases(self, mustpreempt, startTime, endTime):
-        def comparepreemptability(rrX, rrY):
-            if rrX.lease.submit_time > rrY.lease.submit_time:
-                return constants.BETTER
-            elif rrX.lease.submit_time < rrY.lease.submit_time:
-                return constants.WORSE
-            else:
-                return constants.EQUAL        
-            
-        def preemptedEnough(amountToPreempt):
-            for node in amountToPreempt:
-                if not amountToPreempt[node].is_zero_or_less():
-                    return False
-            return True
-        
-        # Get allocations at the specified time
-        atstart = set()
-        atmiddle = set()
-        nodes = set(mustpreempt.keys())
-        
-        reservationsAtStart = self.slottable.getReservationsAt(startTime)
-        reservationsAtStart = [r for r in reservationsAtStart if isinstance(r, VMResourceReservation) and r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtMiddle = self.slottable.get_reservations_starting_between(startTime, endTime)
-        reservationsAtMiddle = [r for r in reservationsAtMiddle if isinstance(r, VMResourceReservation) and r.is_preemptible()
-                        and len(set(r.resources_in_pnode.keys()) & nodes)>0]
-        
-        reservationsAtStart.sort(comparepreemptability)
-        reservationsAtMiddle.sort(comparepreemptability)
-        
-        amountToPreempt = {}
-        for n in mustpreempt:
-            amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-
-        # First step: CHOOSE RESOURCES TO PREEMPT AT START OF RESERVATION
-        for r in reservationsAtStart:
-            # The following will really only come into play when we have
-            # multiple VMs per node
-            mustpreemptres = False
-            for n in r.resources_in_pnode.keys():
-                # Don't need to preempt if we've already preempted all
-                # the needed resources in node n
-                if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                    amountToPreempt[n].decr(r.resources_in_pnode[n])
-                    mustpreemptres = True
-            if mustpreemptres:
-                atstart.add(r)
-            if preemptedEnough(amountToPreempt):
-                break
-        
-        # Second step: CHOOSE RESOURCES TO PREEMPT DURING RESERVATION
-        if len(reservationsAtMiddle)>0:
-            changepoints = set()
-            for r in reservationsAtMiddle:
-                changepoints.add(r.start)
-            changepoints = list(changepoints)
-            changepoints.sort()        
-            
-            for cp in changepoints:
-                amountToPreempt = {}
-                for n in mustpreempt:
-                    amountToPreempt[n] = ds.ResourceTuple.copy(mustpreempt[n])
-                reservations = [r for r in reservationsAtMiddle 
-                                if r.start <= cp and cp < r.end]
-                for r in reservations:
-                    mustpreemptres = False
-                    for n in r.resources_in_pnode.keys():
-                        if amountToPreempt.has_key(n) and not amountToPreempt[n].is_zero_or_less():
-                            amountToPreempt[n].decr(r.resources_in_pnode[n])
-                            mustpreemptres = True
-                    if mustpreemptres:
-                        atmiddle.add(r)
-                    if preemptedEnough(amountToPreempt):
-                        break
-            
-        self.logger.debug("Preempting leases (at start of reservation): %s" % [r.lease.id for r in atstart])
-        self.logger.debug("Preempting leases (in middle of reservation): %s" % [r.lease.id for r in atmiddle])
-        
-        leases = [r.lease for r in atstart|atmiddle]
-        
-        return leases
-        
-    def __preempt(self, lease, preemption_time):
-        
-        self.logger.info("Preempting lease #%i..." % (lease.id))
-        self.logger.vdebug("Lease before preemption:")
-        lease.print_contents()
-        vmrr = lease.get_last_vmrr()
-        
-        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
-            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
-            must_cancel_and_requeue = True
-        else:
-            susptype = get_config().get("suspension")
-            if susptype == constants.SUSPENSION_NONE:
-                must_cancel_and_requeue = True
-            else:
-                time_until_suspend = preemption_time - vmrr.start
-                min_duration = self.__compute_scheduling_threshold(lease)
-                can_suspend = time_until_suspend >= min_duration        
-                if not can_suspend:
-                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
-                    must_cancel_and_requeue = True
-                else:
-                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
-                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
-                        must_cancel_and_requeue = True
-                    else:
-                        self.logger.debug("Lease can be suspended")
-                        must_cancel_and_requeue = False
-                    
-        if must_cancel_and_requeue:
-            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
-            if vmrr.backfill_reservation == True:
-                self.numbesteffortres -= 1
-            # If there are any post RRs, remove them
-            for rr in vmrr.post_rrs:
-                self.slottable.removeReservation(rr)
-            lease.remove_vmrr(vmrr)
-            self.slottable.removeReservation(vmrr)
-            for vnode, pnode in lease.diskimagemap.items():
-                self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
-            self.deployment_scheduler.cancel_deployment(lease)
-            lease.diskimagemap = {}
-            lease.state = Lease.STATE_QUEUED
-            self.__enqueue_in_order(lease)
-            get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
-        else:
-            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
-            # Save original start and end time of the vmrr
-            old_start = vmrr.start
-            old_end = vmrr.end
-            self.__schedule_suspension(vmrr, preemption_time)
-            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
-            for susprr in vmrr.post_rrs:
-                self.slottable.addReservation(susprr)
-            
-            
-        self.logger.vdebug("Lease after preemption:")
-        lease.print_contents()
-        
-    def __reevaluate_schedule(self, endinglease, nodes, nexttime, checkedleases):
-        self.logger.debug("Reevaluating schedule. Checking for leases scheduled in nodes %s after %s" %(nodes, nexttime)) 
-        leases = []
-        vmrrs = self.slottable.get_next_reservations_in_nodes(nexttime, nodes, rr_type=VMResourceReservation, immediately_next=True)
-        leases = set([rr.lease for rr in vmrrs])
-        leases = [l for l in leases if isinstance(l, ds.BestEffortLease) and l.state in (Lease.STATE_SUSPENDED,Lease.STATE_READY) and not l in checkedleases]
-        for lease in leases:
-            self.logger.debug("Found lease %i" % l.id)
-            l.print_contents()
-            # Earliest time can't be earlier than time when images will be
-            # available in node
-            earliest = max(nexttime, lease.imagesavail)
-            self.__slideback(lease, earliest)
-            checkedleases.append(l)
-        #for l in leases:
-        #    vmrr, susprr = l.getLastVMRR()
-        #    self.reevaluateSchedule(l, vmrr.nodes.values(), vmrr.end, checkedleases)
-          
-    def __slideback(self, lease, earliest):
-        vmrr = lease.get_last_vmrr()
-        # Save original start and end time of the vmrr
-        old_start = vmrr.start
-        old_end = vmrr.end
-        nodes = vmrr.nodes.values()
-        if lease.state == Lease.STATE_SUSPENDED:
-            originalstart = vmrr.pre_rrs[0].start
-        else:
-            originalstart = vmrr.start
-        cp = self.slottable.findChangePointsAfter(after=earliest, until=originalstart, nodes=nodes)
-        cp = [earliest] + cp
-        newstart = None
-        for p in cp:
-            self.slottable.availabilitywindow.initWindow(p, lease.requested_resources, canpreempt=False)
-            self.slottable.availabilitywindow.printContents()
-            if self.slottable.availabilitywindow.fitAtStart(nodes=nodes) >= lease.numnodes:
-                (end, canfit) = self.slottable.availabilitywindow.findPhysNodesForVMs(lease.numnodes, originalstart)
-                if end == originalstart and set(nodes) <= set(canfit.keys()):
-                    self.logger.debug("Can slide back to %s" % p)
-                    newstart = p
-                    break
-        if newstart == None:
-            # Can't slide back. Leave as is.
-            pass
-        else:
-            diff = originalstart - newstart
-            if lease.state == Lease.STATE_SUSPENDED:
-                resmrrs = [r for r in vmrr.pre_rrs if isinstance(r, ds.ResumptionResourceReservation)]
-                for resmrr in resmrrs:
-                    resmrr_old_start = resmrr.start
-                    resmrr_old_end = resmrr.end
-                    resmrr.start -= diff
-                    resmrr.end -= diff
-                    self.slottable.update_reservation_with_key_change(resmrr, resmrr_old_start, resmrr_old_end)
-            vmrr.update_start(vmrr.start - diff)
-            
-            # If the lease was going to be suspended, check to see if
-            # we don't need to suspend any more.
-            remdur = lease.duration.get_remaining_duration()
-            if vmrr.is_suspending() and vmrr.end - newstart >= remdur: 
-                vmrr.update_end(vmrr.start + remdur)
-                for susprr in vmrr.post_rrs:
-                    self.slottable.removeReservation(susprr)
-                vmrr.post_rrs = []
-            else:
-                vmrr.update_end(vmrr.end - diff)
-                
-            if not vmrr.is_suspending():
-                # If the VM was set to shutdown, we need to slideback the shutdown RRs
-                for rr in vmrr.post_rrs:
-                    rr_old_start = rr.start
-                    rr_old_end = rr.end
-                    rr.start -= diff
-                    rr.end -= diff
-                    self.slottable.update_reservation_with_key_change(rr, rr_old_start, rr_old_end)
-
-            self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
-            self.logger.vdebug("New lease descriptor (after slideback):")
-            lease.print_contents()
-    
-          
-
-    #-------------------------------------------------------------------#
-    #                                                                   #
-    #                  SLOT TABLE EVENT HANDLERS                        #
-    #                                                                   #
-    #-------------------------------------------------------------------#
-
-    def _handle_start_vm(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
-        l.print_contents()
-        if l.state == Lease.STATE_READY:
-            l.state = Lease.STATE_ACTIVE
-            rr.state = ResourceReservation.STATE_ACTIVE
-            now_time = get_clock().get_time()
-            l.start.actual = now_time
-            
-            try:
-                self.deployment_scheduler.check(l, rr)
-                self.resourcepool.start_vms(l, rr)
-                # The next two lines have to be moved somewhere more
-                # appropriate inside the resourcepool module
-                for (vnode, pnode) in rr.nodes.items():
-                    l.diskimagemap[vnode] = pnode
-            except Exception, e:
-                self.logger.error("ERROR when starting VMs.")
-                raise
-        elif l.state == Lease.STATE_RESUMED_READY:
-            l.state = Lease.STATE_ACTIVE
-            rr.state = ResourceReservation.STATE_ACTIVE
-            # No enactment to do here, since all the suspend/resume actions are
-            # handled during the suspend/resume RRs
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
-        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
-
-
-    def _handle_end_vm(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
-        self.logger.vdebug("LEASE-%i Before:" % l.id)
-        l.print_contents()
-        now_time = round_datetime(get_clock().get_time())
-        diff = now_time - rr.start
-        l.duration.accumulate_duration(diff)
-        rr.state = ResourceReservation.STATE_DONE
-       
-        if isinstance(l, ds.BestEffortLease):
-            if rr.backfill_reservation == True:
-                self.numbesteffortres -= 1
-                
-        self.logger.vdebug("LEASE-%i After:" % l.id)
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
-        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
-
-    def _handle_unscheduled_end_vm(self, l, vmrr, enact=False):
-        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
-        self._handle_end_rr(l, vmrr)
-        for rr in vmrr.post_rrs:
-            self.slottable.removeReservation(rr)
-        vmrr.post_rrs = []
-        # TODO: slideback shutdown RRs
-        vmrr.end = get_clock().get_time()
-        self._handle_end_vm(l, vmrr)
-        self._handle_end_lease(l)
-        nexttime = get_clock().get_next_schedulable_time()
-        if self.is_backfilling():
-            # We need to reevaluate the schedule to see if there are any future
-            # reservations that we can slide back.
-            self.__reevaluate_schedule(l, vmrr.nodes.values(), nexttime, [])
-
-    def _handle_start_shutdown(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
-        l.print_contents()
-        rr.state = ResourceReservation.STATE_ACTIVE
-        self.resourcepool.stop_vms(l, rr)
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
-
-    def _handle_end_shutdown(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
-        l.print_contents()
-        rr.state = ResourceReservation.STATE_DONE
-        self._handle_end_lease(l)
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
-        self.logger.info("Lease %i shutdown." % (l.id))
-
-    def _handle_end_lease(self, l):
-        l.state = Lease.STATE_DONE
-        l.duration.actual = l.duration.accumulated
-        l.end = round_datetime(get_clock().get_time())
-        self.completedleases.add(l)
-        self.leases.remove(l)
-        if isinstance(l, ds.BestEffortLease):
-            get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
-
-
-    def _handle_start_suspend(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
-        l.print_contents()
-        rr.state = ResourceReservation.STATE_ACTIVE
-        self.resourcepool.suspend_vms(l, rr)
-        for vnode in rr.vnodes:
-            pnode = rr.vmrr.nodes[vnode]
-            l.memimagemap[vnode] = pnode
-        if rr.is_first():
-            l.state = Lease.STATE_SUSPENDING
-            l.print_contents()
-            self.logger.info("Suspending lease %i..." % (l.id))
-        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
-
-    def _handle_end_suspend(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
-        l.print_contents()
-        # TODO: React to incomplete suspend
-        self.resourcepool.verify_suspend(l, rr)
-        rr.state = ResourceReservation.STATE_DONE
-        if rr.is_last():
-            l.state = Lease.STATE_SUSPENDED
-            self.__enqueue_in_order(l)
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
-        self.logger.info("Lease %i suspended." % (l.id))
-
-    def _handle_start_resume(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
-        l.print_contents()
-        self.resourcepool.resume_vms(l, rr)
-        rr.state = ResourceReservation.STATE_ACTIVE
-        if rr.is_first():
-            l.state = Lease.STATE_RESUMING
-            l.print_contents()
-            self.logger.info("Resuming lease %i..." % (l.id))
-        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
-
-    def _handle_end_resume(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
-        l.print_contents()
-        # TODO: React to incomplete resume
-        self.resourcepool.verify_resume(l, rr)
-        rr.state = ResourceReservation.STATE_DONE
-        if rr.is_last():
-            l.state = Lease.STATE_RESUMED_READY
-            self.logger.info("Resumed lease %i" % (l.id))
-        for vnode, pnode in rr.vmrr.nodes.items():
-            self.resourcepool.remove_ramfile(pnode, l.id, vnode)
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
-
-    def _handle_start_migrate(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
-        l.print_contents()
-        rr.state = ResourceReservation.STATE_ACTIVE
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
-        self.logger.info("Migrating lease %i..." % (l.id))
-
-    def _handle_end_migrate(self, l, rr):
-        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
-        l.print_contents()
-
-        for vnode in rr.transfers:
-            origin = rr.transfers[vnode][0]
-            dest = rr.transfers[vnode][1]
-            
-            # Update VM image mappings
-            self.resourcepool.remove_diskimage(origin, l.id, vnode)
-            self.resourcepool.add_diskimage(dest, l.diskimage_id, l.diskimage_size, l.id, vnode)
-            l.diskimagemap[vnode] = dest
-
-            # Update RAM file mappings
-            self.resourcepool.remove_ramfile(origin, l.id, vnode)
-            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources.get_by_type(constants.RES_MEM))
-            l.memimagemap[vnode] = dest
-        
-        rr.state = ResourceReservation.STATE_DONE
-        l.print_contents()
-        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
-        self.logger.info("Migrated lease %i..." % (l.id))
-
-    def _handle_end_rr(self, l, rr):
-        self.slottable.removeReservation(rr)
-
-    def __enqueue_in_order(self, lease):
-        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
-        self.queue.enqueue_in_order(lease)
-        
-    def __can_reserve_besteffort_in_future(self):
-        return self.numbesteffortres < self.maxres
-                
-    def is_backfilling(self):
-        return self.maxres > 0

Deleted: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/resourcemanager/slottable.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -1,530 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago                                 #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
-# Complutense de Madrid (dsa-research.org)                                   #
-#                                                                            #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
-# not use this file except in compliance with the License. You may obtain    #
-# a copy of the License at                                                   #
-#                                                                            #
-# http://www.apache.org/licenses/LICENSE-2.0                                 #
-#                                                                            #
-# Unless required by applicable law or agreed to in writing, software        #
-# distributed under the License is distributed on an "AS IS" BASIS,          #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
-# See the License for the specific language governing permissions and        #
-# limitations under the License.                                             #
-# -------------------------------------------------------------------------- #
-
-from mx.DateTime import ISO, TimeDelta
-from operator import attrgetter, itemgetter
-import haizea.common.constants as constants
-import haizea.resourcemanager.datastruct as ds
-import bisect
-import copy
-import logging
-
-class SlotFittingException(Exception):
-    pass
-
-class CriticalSlotFittingException(Exception):
-    pass
-
-
-class Node(object):
-    def __init__(self, capacity, capacitywithpreemption, resourcepoolnode):
-        self.capacity = ds.ResourceTuple.copy(capacity)
-        if capacitywithpreemption == None:
-            self.capacitywithpreemption = None
-        else:
-            self.capacitywithpreemption = ds.ResourceTuple.copy(capacitywithpreemption)
-        self.resourcepoolnode = resourcepoolnode
-        
-    @classmethod
-    def from_resourcepool_node(cls, node):
-        capacity = node.get_capacity()
-        return cls(capacity, capacity, node)
-
-class NodeList(object):
-    def __init__(self):
-        self.nodelist = []
-
-    def add(self, node):
-        self.nodelist.append(node)
-        
-    def __getitem__(self, n):
-        return self.nodelist[n-1]
-
-    def copy(self):
-        nodelist = NodeList()
-        for n in self.nodelist:
-            nodelist.add(Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode))
-        return nodelist
-   
-    def toDict(self):
-        nodelist = self.copy()
-        return dict([(i+1, v) for i, v in enumerate(nodelist)])
-        
-class KeyValueWrapper(object):
-    def __init__(self, key, value):
-        self.key = key
-        self.value = value
-        
-    def __cmp__(self, other):
-        return cmp(self.key, other.key)
-
-class SlotTable(object):
-    def __init__(self):
-        self.logger = logging.getLogger("SLOT")
-        self.nodes = NodeList()
-        self.reservations = []
-        self.reservationsByStart = []
-        self.reservationsByEnd = []
-        self.availabilitycache = {}
-        self.changepointcache = None
-        
-        self.availabilitywindow = AvailabilityWindow(self)
-
-    def add_node(self, resourcepoolnode):
-        self.nodes.add(Node.from_resourcepool_node(resourcepoolnode))
-
-    def is_empty(self):
-        return (len(self.reservationsByStart) == 0)
-
-    def dirty(self):
-        # You're a dirty, dirty slot table and you should be
-        # ashamed of having outdated caches!
-        self.availabilitycache = {}
-        self.changepointcache = None
-        
-    def getAvailabilityCacheMiss(self, time):
-        allnodes = set([i+1 for i in range(len(self.nodes.nodelist))])
-        onlynodes = None       
-        nodes = {} 
-        reservations = self.getReservationsAt(time)
-        # Find how much resources are available on each node
-        canpreempt = True
-        for r in reservations:
-            for node in r.resources_in_pnode:
-                if onlynodes == None or (onlynodes != None and node in onlynodes):
-                    if not nodes.has_key(node):
-                        n = self.nodes[node]
-                        if canpreempt:
-                            nodes[node] = Node(n.capacity, n.capacitywithpreemption, n.resourcepoolnode)
-                        else:
-                            nodes[node] = Node(n.capacity, None, n.resourcepoolnode)
-                    nodes[node].capacity.decr(r.resources_in_pnode[node])
-                    if canpreempt and not r.is_preemptible:
-                        nodes[node].capacitywithpreemption.decr(r.resources_in_pnode[node])
-
-        # For the remaining nodes, use a reference to the original node, not a copy
-        if onlynodes == None:
-            missing = allnodes - set(nodes.keys())
-        else:
-            missing = onlynodes - set(nodes.keys())
-            
-        for node in missing:
-            nodes[node] = self.nodes[node]                    
-            
-        self.availabilitycache[time] = nodes
-
-    def getAvailability(self, time, resreq=None, onlynodes=None, canpreempt=False):
-        if not self.availabilitycache.has_key(time):
-            self.getAvailabilityCacheMiss(time)
-            # Cache miss
-            
-        nodes = self.availabilitycache[time]
-
-        if onlynodes != None:
-            onlynodes = set(onlynodes)
-            nodes = dict([(n,node) for n,node in nodes.items() if n in onlynodes])
-
-        # Keep only those nodes with enough resources
-        if resreq != None:
-            newnodes = {}
-            for n, node in nodes.items():
-                if not resreq.fits_in(node.capacity) or (canpreempt and not resreq.fits_in(node.capacitywithpreemption)):
-                    pass
-                else:
-                    newnodes[n]=node
-            nodes = newnodes
-
-        return nodes
-    
-    def get_utilization(self, time):
-        total = sum([n.capacity.get_by_type(constants.RES_CPU) for n in self.nodes.nodelist])
-        util = {}
-        reservations = self.getReservationsAt(time)
-        for r in reservations:
-            for node in r.resources_in_pnode:
-                if isinstance(r, ds.VMResourceReservation):
-                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
-                    util[type(r)] = use + util.setdefault(type(r),0.0)
-                elif isinstance(r, ds.SuspensionResourceReservation) or isinstance(r, ds.ResumptionResourceReservation) or isinstance(r, ds.ShutdownResourceReservation):
-                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
-                    util[type(r)] = use + util.setdefault(type(r),0.0)
-        util[None] = total - sum(util.values())
-        for k in util:
-            util[k] /= total
-            
-        return util
-
-    def getReservationsAt(self, time):
-        item = KeyValueWrapper(time, None)
-        startpos = bisect.bisect_right(self.reservationsByStart, item)
-        bystart = set([x.value for x in self.reservationsByStart[:startpos]])
-        endpos = bisect.bisect_right(self.reservationsByEnd, item)
-        byend = set([x.value for x in self.reservationsByEnd[endpos:]])
-        res = bystart & byend
-        return list(res)
-    
-    def get_reservations_starting_between(self, start, end):
-        startitem = KeyValueWrapper(start, None)
-        enditem = KeyValueWrapper(end, None)
-        startpos = bisect.bisect_left(self.reservationsByStart, startitem)
-        endpos = bisect.bisect_right(self.reservationsByStart, enditem)
-        res = [x.value for x in self.reservationsByStart[startpos:endpos]]
-        return res
-
-    def get_reservations_starting_after(self, start):
-        startitem = KeyValueWrapper(start, None)
-        startpos = bisect.bisect_left(self.reservationsByStart, startitem)
-        res = [x.value for x in self.reservationsByStart[startpos:]]
-        return res
-
-    def get_reservations_ending_after(self, end):
-        startitem = KeyValueWrapper(end, None)
-        startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
-        res = [x.value for x in self.reservationsByEnd[startpos:]]
-        return res
-
-    def get_reservations_ending_between(self, start, end):
-        startitem = KeyValueWrapper(start, None)
-        enditem = KeyValueWrapper(end, None)
-        startpos = bisect.bisect_left(self.reservationsByEnd, startitem)
-        endpos = bisect.bisect_right(self.reservationsByEnd, enditem)
-        res = [x.value for x in self.reservationsByEnd[startpos:endpos]]
-        return res
-    
-    def get_reservations_starting_at(self, time):
-        return self.get_reservations_starting_between(time, time)
-
-    def get_reservations_ending_at(self, time):
-        return self.get_reservations_ending_between(time, time)
-    
-    # ONLY for simulation
-    def getNextPrematureEnd(self, after):
-        # Inefficient, but ok since this query seldom happens
-        res = [i.value for i in self.reservationsByEnd if isinstance(i.value, ds.VMResourceReservation) and i.value.prematureend > after]
-        if len(res) > 0:
-            prematureends = [r.prematureend for r in res]
-            prematureends.sort()
-            return prematureends[0]
-        else:
-            return None
-    
-    # ONLY for simulation
-    def getPrematurelyEndingRes(self, t):
-        return [i.value for i in self.reservationsByEnd if isinstance(i.value, ds.VMResourceReservation) and i.value.prematureend == t]
-
-    
-    def get_reservations_starting_or_ending_after(self, after):
-        item = KeyValueWrapper(after, None)
-        startpos = bisect.bisect_right(self.reservationsByStart, item)
-        bystart = set([x.value for x in self.reservationsByStart[:startpos]])
-        endpos = bisect.bisect_right(self.reservationsByEnd, item)
-        byend = set([x.value for x in self.reservationsByEnd[endpos:]])
-        res = bystart | byend
-        return list(res)    
-    
-    def addReservation(self, rr):
-        startitem = KeyValueWrapper(rr.start, rr)
-        enditem = KeyValueWrapper(rr.end, rr)
-        bisect.insort(self.reservationsByStart, startitem)
-        bisect.insort(self.reservationsByEnd, enditem)
-        self.dirty()
-
-    # If the slot table keys are not modified (start / end time)
-    # Just remove and reinsert.
-    def updateReservation(self, rr):
-        # TODO: Might be more efficient to resort lists
-        self.removeReservation(rr)
-        self.addReservation(rr)
-        self.dirty()
-
-    # If the slot table keys are modified (start and/or end time)
-    # provide the old keys (so we can remove it using
-    # the m) and updated reservation
-    def update_reservation_with_key_change(self, rr, old_start, old_end):
-        # TODO: Might be more efficient to resort lists
-        self.removeReservation(rr, old_start, old_end)
-        self.addReservation(rr)
-        self.dirty()
-
-
-    def getIndexOfReservation(self, rlist, rr, key):
-        item = KeyValueWrapper(key, None)
-        pos = bisect.bisect_left(rlist, item)
-        found = False
-        while not found:
-            if rlist[pos].value == rr:
-                found = True
-            else:
-                pos += 1
-        return pos
-
-    def removeReservation(self, rr, start=None, end=None):
-        if start == None:
-            start = rr.start
-        if end == None:
-            end = rr.start
-        posstart = self.getIndexOfReservation(self.reservationsByStart, rr, start)
-        posend = self.getIndexOfReservation(self.reservationsByEnd, rr, end)
-        self.reservationsByStart.pop(posstart)
-        self.reservationsByEnd.pop(posend)
-        self.dirty()
-
-    
-    def findChangePointsAfter(self, after, until=None, nodes=None):
-        changepoints = set()
-        res = self.get_reservations_starting_or_ending_after(after)
-        for rr in res:
-            if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
-                if rr.start > after:
-                    changepoints.add(rr.start)
-                if rr.end > after:
-                    changepoints.add(rr.end)
-        changepoints = list(changepoints)
-        if until != None:
-            changepoints =  [c for c in changepoints if c < until]
-        changepoints.sort()
-        return changepoints
-    
-    def peekNextChangePoint(self, time):
-        if self.changepointcache == None:
-            # Cache is empty
-            changepoints = self.findChangePointsAfter(time)
-            changepoints.reverse()
-            self.changepointcache = changepoints
-        if len(self.changepointcache) == 0:
-            return None
-        else:
-            return self.changepointcache[-1]
-    
-    def getNextChangePoint(self, time):
-        p = self.peekNextChangePoint(time)
-        if p != None:
-            self.changepointcache.pop()
-        return p
-        
-    def isFull(self, time):
-        nodes = self.getAvailability(time)
-        avail = sum([node.capacity.get_by_type(constants.RES_CPU) for node in nodes.values()])
-        return (avail == 0)
-    
-    def get_next_reservations_in_nodes(self, time, nodes, rr_type=None, immediately_next = False):
-        nodes = set(nodes)
-        rrs_in_nodes = []
-        earliest_end_time = {}
-        rrs = self.get_reservations_starting_after(time)
-        if rr_type != None:
-            rrs = [rr for rr in rrs if isinstance(rr, rr_type)]
-            
-        # Filter the RRs by nodes
-        for r in rrs:
-            rr_nodes = set(rr.resources_in_pnode.keys())
-            if len(nodes & rr_nodes) > 0:
-                rrs_in_nodes.append(rr)
-                end = rr.end
-                for n in rr_nodes:
-                    if not earliest_end_time.has_key(n):
-                        earliest_end_time[n] = end
-                    else:
-                        if end < earliest_end_time[n]:
-                            earliest_end_time[n] = end
-                            
-        if immediately_next:
-            # We only want to include the ones that are immediately
-            # next. 
-            rr_nodes_excl = set()
-            for n in nodes:
-                if earliest_end_time.has_key(n):
-                    end = earliest_end_time[n]
-                    rrs = [rr for rr in rrs_in_nodes if n in rr.resources_in_pnode.keys() and rr.start < end]
-                    rr_nodes_excl.update(rrs)
-            rrs_in_nodes = list(rr_nodes_excl)
-        
-        return rrs_in_nodes
-
-class AvailEntry(object):
-    def __init__(self, time, avail, availpreempt, resreq):
-        self.time = time
-        self.avail = avail
-        self.availpreempt = availpreempt
-        
-        if avail == None and availpreempt == None:
-            self.canfit = 0
-            self.canfitpreempt = 0
-        else:
-            self.canfit = resreq.get_num_fits_in(avail)
-            if availpreempt == None:
-                self.canfitpreempt = 0
-            else:
-                self.canfitpreempt = resreq.get_num_fits_in(availpreempt)
-        
-    def getCanfit(self, canpreempt):
-        if canpreempt:
-            return self.canfitpreempt
-        else:
-            return self.canfit
-
-
-class AvailabilityWindow(object):
-    def __init__(self, slottable):
-        self.slottable = slottable
-        self.logger = logging.getLogger("SLOTTABLE.WIN")
-        self.time = None
-        self.resreq = None
-        self.onlynodes = None
-        self.avail = None
-        
-    # Create avail structure
-    def initWindow(self, time, resreq, onlynodes = None, canpreempt=False):
-        self.time = time
-        self.resreq = resreq
-        self.onlynodes = onlynodes
-        self.avail = {}
-
-        # Availability at initial time
-        availatstart = self.slottable.getAvailability(self.time, self.resreq, self.onlynodes, canpreempt)
-        for node in availatstart:
-            capacity = availatstart[node].capacity
-            if canpreempt:
-                capacitywithpreemption = availatstart[node].capacitywithpreemption
-            else:
-                capacitywithpreemption = None
-            self.avail[node] = [AvailEntry(self.time, capacity, capacitywithpreemption, self.resreq)]
-        
-        # Determine the availability at the subsequent change points
-        nodes = set(availatstart.keys())
-        res = self.slottable.get_reservations_starting_after(self.time)
-        changepoints = set()
-        for rr in res:
-            if nodes == None or (nodes != None and len(set(rr.resources_in_pnode.keys()) & set(nodes)) > 0):
-                changepoints.add(rr.start)
-        changepoints = list(changepoints)
-        changepoints.sort()
-        for p in changepoints:
-            availatpoint = self.slottable.getAvailability(p, self.resreq, nodes, canpreempt)
-            newnodes = set(availatpoint.keys())
-            
-            # Add entries for nodes that have no resources available
-            # (for, at least, one VM)
-            fullnodes = nodes - newnodes
-            for node in fullnodes:
-                self.avail[node].append(AvailEntry(p, None, None, None))
-                nodes.remove(node)
-                
-            # For the rest, only interested if the available resources
-            # Decrease in the window
-            for node in newnodes:
-                capacity = availatpoint[node].capacity
-                fits = self.resreq.get_num_fits_in(capacity)
-                if canpreempt:
-                    capacitywithpreemption = availatpoint[node].capacitywithpreemption
-                    fitswithpreemption = self.resreq.get_num_fits_in(capacitywithpreemption)
-                prevavail = self.avail[node][-1]
-                if not canpreempt and prevavail.getCanfit(canpreempt=False) > fits:
-                    self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
-                elif canpreempt and (prevavail.getCanfit(canpreempt=False) > fits or prevavail.getCanfit(canpreempt=True) > fitswithpreemption):
-                    self.avail[node].append(AvailEntry(p, capacity, capacitywithpreemption, self.resreq))
-                  
-    
-    def fitAtStart(self, nodes = None, canpreempt = False):
-        if nodes != None:
-            avail = [v for (k, v) in self.avail.items() if k in nodes]
-        else:
-            avail = self.avail.values()
-        if canpreempt:
-            return sum([e[0].canfitpreempt for e in avail])
-        else:
-            return sum([e[0].canfit for e in avail])
-        
-    # TODO: Also return the amount of resources that would have to be
-    # preempted in each physnode
-    def findPhysNodesForVMs(self, numnodes, maxend, strictend=False, canpreempt=False):
-        # Returns the physical nodes that can run all VMs, and the
-        # time at which the VMs must end
-        canfit = dict([(n, v[0].getCanfit(canpreempt)) for (n, v) in self.avail.items()])
-        entries = []
-        for n in self.avail.keys():
-            entries += [(n, e) for e in self.avail[n][1:]]
-        getTime = lambda x: x[1].time
-        entries.sort(key=getTime)
-        if strictend:
-            end = None
-        else:
-            end = maxend
-        for e in entries:
-            physnode = e[0]
-            entry = e[1]
-       
-            if entry.time >= maxend:
-                # Can run to its maximum duration
-                break
-            else:
-                diff = canfit[physnode] - entry.getCanfit(canpreempt)
-                totalcanfit = sum([n for n in canfit.values()]) - diff
-                if totalcanfit < numnodes and not strictend:
-                    # Not enough resources. Must end here
-                    end = entry.time
-                    break
-                else:
-                    # Update canfit
-                    canfit[physnode] = entry.getCanfit(canpreempt)
-
-        # Filter out nodes where we can't fit any vms
-        canfit = dict([(n, v) for (n, v) in canfit.items() if v > 0])
-        
-        return end, canfit
-            
-                    
-    def printContents(self, nodes = None, withpreemption = False):
-        if self.logger.getEffectiveLevel() == constants.LOGLEVEL_VDEBUG:
-            if nodes == None:
-                physnodes = self.avail.keys()
-            else:
-                physnodes = [k for k in self.avail.keys() if k in nodes]
-            physnodes.sort()
-            if withpreemption:
-                p = "(with preemption)"
-            else:
-                p = "(without preemption)"
-            self.logger.vdebug("AVAILABILITY WINDOW (time=%s, nodes=%s) %s"%(self.time, nodes, p))
-            for n in physnodes:
-                contents = "Node %i --- " % n
-                for x in self.avail[n]:
-                    contents += "[ %s " % x.time
-                    contents += "{ "
-                    if x.avail == None and x.availpreempt == None:
-                        contents += "END "
-                    else:
-                        if withpreemption:
-                            res = x.availpreempt
-                            canfit = x.canfitpreempt
-                        else:
-                            res = x.avail
-                            canfit = x.canfit
-                        contents += "%s" % res
-                    contents += "} (Fits: %i) ]  " % canfit
-                self.logger.vdebug(contents)
-                
-
-                
-                          
-                          
-            
-
-
-        
-        
\ No newline at end of file

Modified: trunk/src/haizea/traces/readers.py
===================================================================
--- trunk/src/haizea/traces/readers.py	2008-12-19 21:32:52 UTC (rev 553)
+++ trunk/src/haizea/traces/readers.py	2009-01-06 11:56:27 UTC (rev 554)
@@ -17,7 +17,8 @@
 # -------------------------------------------------------------------------- #
 
 from mx.DateTime import TimeDelta
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ResourceTuple
+from haizea.resourcemanager.leases import Lease, ARLease, BestEffortLease
+from haizea.resourcemanager.scheduler.slottable import ResourceTuple
 import haizea.common.constants as constants
 import haizea.traces.formats as formats
 



More information about the Haizea-commit mailing list