[haizea-commit] r498 - in trunk/src/haizea: common resourcemanager resourcemanager/deployment resourcemanager/enact resourcemanager/enact/opennebula resourcemanager/enact/simulated resourcemanager/frontends
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Tue Sep 16 05:18:11 CDT 2008
Author: borja
Date: 2008-09-16 05:18:11 -0500 (Tue, 16 Sep 2008)
New Revision: 498
Removed:
trunk/src/haizea/resourcemanager/deployment/base.py
trunk/src/haizea/resourcemanager/deployment/predeployed.py
trunk/src/haizea/resourcemanager/enact/base.py
trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py
Modified:
trunk/src/haizea/common/utils.py
trunk/src/haizea/resourcemanager/accounting.py
trunk/src/haizea/resourcemanager/deployment/__init__.py
trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
trunk/src/haizea/resourcemanager/deployment/unmanaged.py
trunk/src/haizea/resourcemanager/enact/__init__.py
trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
trunk/src/haizea/resourcemanager/enact/opennebula/info.py
trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
trunk/src/haizea/resourcemanager/enact/simulated/__init__.py
trunk/src/haizea/resourcemanager/enact/simulated/deployment.py
trunk/src/haizea/resourcemanager/enact/simulated/info.py
trunk/src/haizea/resourcemanager/enact/simulated/vm.py
trunk/src/haizea/resourcemanager/frontends/opennebula.py
trunk/src/haizea/resourcemanager/frontends/rpc.py
trunk/src/haizea/resourcemanager/frontends/tracefile.py
trunk/src/haizea/resourcemanager/resourcepool.py
trunk/src/haizea/resourcemanager/rm.py
trunk/src/haizea/resourcemanager/scheduler.py
trunk/src/haizea/resourcemanager/slottable.py
Log:
- Started removing lots of unnecessary dependencies, mostly references from objects to containing objects, which should be avoided. In particular, lots of objects don't have a "convenient" rm attribute pointing back to the Resource Manager. This attribute was only used to access the configuration file, the accounting object, and the clock. So, these three have been made "global" (a little globality is not bad, as long as it's controlled; in this case, the only way of accessing these objects is using the get_config, get_accounting, and get_clock global functions which, in turn, access the ResourceManager singleton).
- Removed cracked-out "dynamic loading" of enactment modules.
Modified: trunk/src/haizea/common/utils.py
===================================================================
--- trunk/src/haizea/common/utils.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/common/utils.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -114,4 +114,15 @@
else:
return cls._singleton
-
\ No newline at end of file
+
+def get_config():
+ from haizea.resourcemanager.rm import ResourceManager
+ return ResourceManager.get_singleton().config
+
+def get_accounting():
+ from haizea.resourcemanager.rm import ResourceManager
+ return ResourceManager.get_singleton().accounting
+
+def get_clock():
+ from haizea.resourcemanager.rm import ResourceManager
+ return ResourceManager.get_singleton().clock
Modified: trunk/src/haizea/resourcemanager/accounting.py
===================================================================
--- trunk/src/haizea/resourcemanager/accounting.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/accounting.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -20,7 +20,7 @@
import os.path
import haizea.common.constants as constants
import haizea.resourcemanager.datastruct as ds
-from haizea.common.utils import pickle
+from haizea.common.utils import pickle, get_config, get_clock
from errno import EEXIST
class AccountingData(object):
@@ -59,9 +59,9 @@
self.datafile = datafile
self.starttime = None
- attrs = self.rm.config.get_attrs()
+ attrs = get_config().get_attrs()
for attr in attrs:
- self.data.attrs[attr] = self.rm.config.get_attr(attr)
+ self.data.attrs[attr] = get_config().get_attr(attr)
def create_counter(self, counter_id, avgtype, initial=0):
self.data.counters[counter_id] = initial
@@ -69,16 +69,16 @@
self.data.counter_avg_type[counter_id] = avgtype
def incr_counter(self, counter_id, lease_id = None):
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
self.append_stat(counter_id, self.data.counters[counter_id] + 1, lease_id, time)
def decr_counter(self, counter_id, lease_id = None):
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
self.append_stat(counter_id, self.data.counters[counter_id] - 1, lease_id, time)
def append_stat(self, counter_id, value, lease_id = None, time = None):
if time == None:
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
if len(self.data.counter_lists[counter_id]) > 0:
prevtime = self.data.counter_lists[counter_id][-1][0]
else:
@@ -100,7 +100,7 @@
def stop(self):
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
# Stop the counters
for counter_id in self.data.counters:
Modified: trunk/src/haizea/resourcemanager/deployment/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/__init__.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/deployment/__init__.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -0,0 +1,30 @@
+# -------------------------------------------------------------------------- #
+# Copyright 2006-2008, University of Chicago #
+# Copyright 2008, Distributed Systems Architecture Group, Universidad #
+# Complutense de Madrid (dsa-research.org) #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); you may #
+# not use this file except in compliance with the License. You may obtain #
+# a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+# -------------------------------------------------------------------------- #
+
+import logging
+
+class DeploymentScheduler(object):
+ def __init__(self, slottable, resourcepool, deployment_enact):
+ self.slottable = slottable
+ self.resourcepool = resourcepool
+ self.deployment_enact = deployment_enact
+ self.logger = logging.getLogger("DEPLOY")
+
+
+class DeploymentSchedException(Exception):
+ pass
\ No newline at end of file
Deleted: trunk/src/haizea/resourcemanager/deployment/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/base.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/deployment/base.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -1,30 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-import logging
-
-class DeploymentBase(object):
- def __init__(self, scheduler):
- self.scheduler = scheduler
- self.slottable = scheduler.slottable
- self.resourcepool = scheduler.resourcepool
- self.logger = logging.getLogger("DEPLOY")
-
-
-class DeploymentSchedException(Exception):
- pass
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/deployment/imagetransfer.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/deployment/imagetransfer.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -18,38 +18,40 @@
import haizea.common.constants as constants
import haizea.resourcemanager.datastruct as ds
-from haizea.resourcemanager.deployment.base import DeploymentBase, DeploymentSchedException
+from haizea.resourcemanager.deployment import DeploymentScheduler, DeploymentSchedException
from haizea.resourcemanager.datastruct import ResourceReservation, Lease, ARLease, BestEffortLease
-from haizea.common.utils import estimate_transfer_time
+from haizea.resourcemanager.scheduler import ReservationEventHandler
+from haizea.common.utils import estimate_transfer_time, get_config
import copy
-class ImageTransferDeployment(DeploymentBase):
- def __init__(self, scheduler):
- DeploymentBase.__init__(self, scheduler)
+class ImageTransferDeploymentScheduler(DeploymentScheduler):
+ def __init__(self, slottable, resourcepool, deployment_enact):
+ DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
# TODO: The following two should be merged into
# something like this:
- # self.imageNode = self.info.getImageNode()
- self.fifo_node = self.resourcepool.deployment.get_fifo_node()
- self.edf_node = self.resourcepool.deployment.get_edf_node()
+ # self.image_node = self.deployment_enact.get_image_node()
+ self.fifo_node = self.deployment_enact.get_fifo_node()
+ self.edf_node = self.deployment_enact.get_edf_node()
self.transfers_edf = []
self.transfers_fifo = []
self.completed_transfers = []
- config = self.scheduler.rm.config
+ config = get_config()
self.reusealg = config.get("diskimage-reuse")
if self.reusealg == constants.REUSE_IMAGECACHES:
self.maxcachesize = config.get("diskimage-cache-size")
else:
self.maxcachesize = None
- self.imagenode_bandwidth = self.resourcepool.deployment.get_bandwidth()
+ self.imagenode_bandwidth = self.deployment_enact.get_bandwidth()
- self.scheduler.register_handler(type = FileTransferResourceReservation,
- on_start = ImageTransferDeployment.handle_start_filetransfer,
- on_end = ImageTransferDeployment.handle_end_filetransfer)
+ self.handlers ={}
+ self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
+ on_start = ImageTransferDeploymentScheduler.handle_start_filetransfer,
+ on_end = ImageTransferDeploymentScheduler.handle_end_filetransfer)
def schedule(self, lease, vmrr, nexttime):
if isinstance(lease, ARLease):
@@ -62,7 +64,7 @@
self.__remove_from_fifo_transfers(lease.id)
def schedule_for_ar(self, lease, vmrr, nexttime):
- config = self.scheduler.rm.config
+ config = get_config()
mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
@@ -120,7 +122,7 @@
self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
def schedule_for_besteffort(self, lease, vmrr, nexttime):
- config = self.scheduler.rm.config
+ config = get_config()
mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
@@ -167,7 +169,7 @@
def find_earliest_starting_times(self, lease_req, nexttime):
nodIDs = [n.nod_id for n in self.resourcepool.get_nodes()]
- config = self.scheduler.rm.config
+ config = get_config()
mechanism = config.get("transfer-mechanism")
reusealg = config.get("diskimage-reuse")
avoidredundant = config.get("avoid-redundant-transfers")
@@ -219,7 +221,7 @@
def schedule_imagetransfer_edf(self, req, vnodes, nexttime):
# Estimate image transfer time
- bandwidth = self.resourcepool.deployment.get_bandwidth()
+ bandwidth = self.deployment_enact.get_bandwidth()
imgTransferTime=self.estimate_image_transfer_time(req, bandwidth)
# Determine start time
@@ -318,7 +320,7 @@
# Estimate image transfer time
bandwidth = self.imagenode_bandwidth
imgTransferTime=self.estimate_image_transfer_time(req, bandwidth)
- config = self.scheduler.rm.config
+ config = get_config()
mechanism = config.get("transfer-mechanism")
startTime = self.get_next_fifo_transfer_time(nexttime)
Deleted: trunk/src/haizea/resourcemanager/deployment/predeployed.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/predeployed.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/deployment/predeployed.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -1,24 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.resourcemanager.deployment.base import DeploymentBase
-
-class PredeployedImagesDeployment(DeploymentBase):
- def __init__(self, scheduler):
- DeploymentBase.__init__(self, scheduler)
-
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/deployment/unmanaged.py
===================================================================
--- trunk/src/haizea/resourcemanager/deployment/unmanaged.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/deployment/unmanaged.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -17,12 +17,12 @@
# -------------------------------------------------------------------------- #
from haizea.resourcemanager.datastruct import Lease
-from haizea.resourcemanager.deployment.base import DeploymentBase
+from haizea.resourcemanager.deployment import DeploymentScheduler
import haizea.common.constants as constants
-class UnmanagedDeployment(DeploymentBase):
- def __init__(self, scheduler):
- DeploymentBase.__init__(self, scheduler)
+class UnmanagedDeploymentScheduler(DeploymentScheduler):
+ def __init__(self, slottable, resourcepool, deployment_enact):
+ DeploymentScheduler.__init__(self, slottable, resourcepool, deployment_enact)
# Add dummy disk images
def schedule(self, lease, vmrr, nexttime):
Modified: trunk/src/haizea/resourcemanager/enact/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/__init__.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/__init__.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,3 +16,37 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
+from haizea.common.utils import abstract
+import haizea.resourcemanager.datastruct as ds
+
+class ResourcePoolInfo(object):
+ def __init__(self):
+ # Initialize the resource types in the ResourceTuple class
+ # TODO: Do this in a less kludgy way
+ resourcetypes = self.get_resource_types()
+ ds.ResourceTuple.set_resource_types(resourcetypes)
+
+
+ def get_nodes(self):
+ """ Returns the nodes in the resource pool. """
+ abstract()
+
+ def get_resource_types(self):
+ abstract()
+
+class VMEnactment(object):
+ def __init__(self):
+ pass
+
+ def start(self, vms): abstract()
+
+ def stop(self, vms): abstract()
+
+ def suspend(self, vms): abstract()
+
+ def resume(self, vms): abstract()
+
+class DeploymentEnactment(object):
+ def __init__(self):
+ pass
+
Deleted: trunk/src/haizea/resourcemanager/enact/base.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/base.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/base.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -1,68 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.common.utils import abstract
-import haizea.resourcemanager.datastruct as ds
-
-class ResourcePoolInfoBase(object):
- def __init__(self, resourcepool):
- self.resourcepool = resourcepool
-
- resourcetypes = self.get_resource_types() #IGNORE:E1111
- ds.ResourceTuple.set_resource_types(resourcetypes)
-
- def get_nodes(self):
- """ Returns the nodes in the resource pool. """
- abstract()
-
- def get_resource_types(self):
- abstract()
-
-class DeploymentEnactmentBase(object):
- def __init__(self, resourcepool):
- self.resourcepool = resourcepool
-
- def get_fifo_node(self):
- """ Returns the image node for FIFO transfers
-
- Note that this function will disappear as soon
- as we merge the EDF and FIFO image nodes (and
- their respective algorithms)
- """
- abstract()
-
- def get_edf_node(self):
- """ Returns the image node for EDF transfers
-
- Note that this function will disappear as soon
- as we merge the EDF and FIFO image nodes (and
- their respective algorithms)
- """
- abstract()
-
-class VMEnactmentBase(object):
- def __init__(self, resourcepool):
- self.resourcepool = resourcepool
-
- def start(self, vms): abstract()
-
- def stop(self, vms): abstract()
-
- def suspend(self, vms): abstract()
-
- def resume(self, vms): abstract()
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,10 +16,3 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.enact.opennebula.info import ResourcePoolInfo
-from haizea.resourcemanager.enact.opennebula.vm import VMEnactment
-from haizea.resourcemanager.enact.simulated.deployment import DeploymentEnactment
-
-info=ResourcePoolInfo
-deployment=DeploymentEnactment
-vm=VMEnactment
\ No newline at end of file
Deleted: trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/deployment.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -1,27 +0,0 @@
-# -------------------------------------------------------------------------- #
-# Copyright 2006-2008, University of Chicago #
-# Copyright 2008, Distributed Systems Architecture Group, Universidad #
-# Complutense de Madrid (dsa-research.org) #
-# #
-# Licensed under the Apache License, Version 2.0 (the "License"); you may #
-# not use this file except in compliance with the License. You may obtain #
-# a copy of the License at #
-# #
-# http://www.apache.org/licenses/LICENSE-2.0 #
-# #
-# Unless required by applicable law or agreed to in writing, software #
-# distributed under the License is distributed on an "AS IS" BASIS, #
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
-# See the License for the specific language governing permissions and #
-# limitations under the License. #
-# -------------------------------------------------------------------------- #
-
-from haizea.resourcemanager.enact.base import DeploymentEnactmentBase
-
-class DeploymentEnactment(DeploymentEnactmentBase):
- def __init__(self, resourcepool):
- DeploymentEnactmentBase.__init__(self, resourcepool)
- self.imagepath="/images/playground/borja"
-
- def resolve_to_file(self, lease_id, vnode, diskImageID):
- return "%s/%s/%s.img" % (self.imagepath, diskImageID, diskImageID)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -17,19 +17,20 @@
# -------------------------------------------------------------------------- #
from haizea.resourcemanager.resourcepool import Node
-from haizea.resourcemanager.enact.base import ResourcePoolInfoBase
+from haizea.resourcemanager.enact import ResourcePoolInfo
+from haizea.common.utils import get_config
import haizea.common.constants as constants
import haizea.resourcemanager.datastruct as ds
from pysqlite2 import dbapi2 as sqlite
import logging
-oneattr2haizea = { "TOTALCPU": constants.RES_CPU,
+class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
+ ONEATTR2HAIZEA = { "TOTALCPU": constants.RES_CPU,
"TOTALMEMORY": constants.RES_MEM }
-
-class ResourcePoolInfo(ResourcePoolInfoBase):
- def __init__(self, resourcepool):
- ResourcePoolInfoBase.__init__(self, resourcepool)
- config = self.resourcepool.rm.config
+
+ def __init__(self):
+ ResourcePoolInfo.__init__(self)
+ config = get_config()
self.logger = logging.getLogger("ENACT.ONE.INFO")
self.suspendresumerate = config.get("one.suspendresume-rate-estimate")
@@ -53,11 +54,11 @@
attrs = cur.fetchall()
for attr in attrs:
name = attr["name"]
- if oneattr2haizea.has_key(name):
- capacity.set_by_type(oneattr2haizea[name], int(attr["value"]))
+ if OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA.has_key(name):
+ capacity.set_by_type(OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA[name], int(attr["value"]))
capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
capacity.set_by_type(constants.RES_MEM, capacity.get_by_type(constants.RES_MEM) / 1024.0)
- node = Node(self.resourcepool, nod_id, hostname, capacity)
+ node = Node(nod_id, hostname, capacity)
node.enactment_info = int(enactID)
self.nodes.append(node)
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/vm.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,20 +16,21 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.enact.base import VMEnactmentBase
+from haizea.resourcemanager.enact import VMEnactment
+from haizea.common.utils import get_config
import haizea.common.constants as constants
import commands
import logging
from pysqlite2 import dbapi2 as sqlite
-class VMEnactment(VMEnactmentBase):
- def __init__(self, resourcepool):
- VMEnactmentBase.__init__(self, resourcepool)
+class OpenNebulaVMEnactment(VMEnactment):
+ def __init__(self):
+ VMEnactment.__init__(self)
self.logger = logging.getLogger("ENACT.ONE.VM")
- self.onevm = self.resourcepool.rm.config.get("onevm")
+ self.onevm = get_config().get("onevm")
- self.conn = sqlite.connect(self.resourcepool.rm.config.get("one.db"))
+ self.conn = sqlite.connect(get_config().get("one.db"))
self.conn.row_factory = sqlite.Row
Modified: trunk/src/haizea/resourcemanager/enact/simulated/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/__init__.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/simulated/__init__.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,10 +16,3 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.enact.simulated.info import ResourcePoolInfo
-from haizea.resourcemanager.enact.simulated.vm import VMEnactment
-from haizea.resourcemanager.enact.simulated.deployment import DeploymentEnactment
-
-info=ResourcePoolInfo
-deployment=DeploymentEnactment
-vm=VMEnactment
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/deployment.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/deployment.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/simulated/deployment.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,21 +16,18 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.enact.base import DeploymentEnactmentBase
+from haizea.resourcemanager.enact import DeploymentEnactment
from haizea.resourcemanager.resourcepool import Node
import haizea.resourcemanager.datastruct as ds
import haizea.common.constants as constants
+from haizea.common.utils import get_config
import logging
-baseCachePath="/vm/cache"
-baseWorkingPath="/vm/working"
-stagingPath="/vm/staging"
-
-class DeploymentEnactment(DeploymentEnactmentBase):
- def __init__(self, resourcepool):
- DeploymentEnactmentBase.__init__(self, resourcepool)
+class SimulatedDeploymentEnactment(DeploymentEnactment):
+ def __init__(self):
+ DeploymentEnactment.__init__(self)
self.logger = logging.getLogger("ENACT.SIMUL.INFO")
- config = self.resourcepool.rm.config
+ config = get_config()
self.bandwidth = config.get("imagetransfer-bandwidth")
@@ -40,8 +37,8 @@
imgcapacity = ds.ResourceTuple.create_empty()
imgcapacity.set_by_type(constants.RES_NETOUT, self.bandwidth)
- self.fifo_node = Node(self.resourcepool, numnodes+1, "FIFOnode", imgcapacity)
- self.edf_node = Node(self.resourcepool, numnodes+2, "EDFnode", imgcapacity)
+ self.fifo_node = Node(numnodes+1, "FIFOnode", imgcapacity)
+ self.edf_node = Node(numnodes+2, "EDFnode", imgcapacity)
def get_edf_node(self):
return self.edf_node
@@ -56,4 +53,4 @@
return self.bandwidth
def resolve_to_file(self, lease_id, vnode, diskimage_id):
- return "%s/%s-L%iV%i" % (baseWorkingPath, diskimage_id, lease_id, vnode)
\ No newline at end of file
+ return "/var/haizea/images/%s-L%iV%i" % (diskimage_id, lease_id, vnode)
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/simulated/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/simulated/info.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -17,23 +17,24 @@
# -------------------------------------------------------------------------- #
from haizea.resourcemanager.resourcepool import Node
-from haizea.resourcemanager.enact.base import ResourcePoolInfoBase
+from haizea.resourcemanager.enact import ResourcePoolInfo
import haizea.common.constants as constants
+from haizea.common.utils import get_config
import haizea.resourcemanager.datastruct as ds
import logging
-class ResourcePoolInfo(ResourcePoolInfoBase):
- def __init__(self, resourcepool):
- ResourcePoolInfoBase.__init__(self, resourcepool)
+class SimulatedResourcePoolInfo(ResourcePoolInfo):
+ def __init__(self):
+ ResourcePoolInfo.__init__(self)
self.logger = logging.getLogger("ENACT.SIMUL.INFO")
- config = self.resourcepool.rm.config
+ config = get_config()
self.suspendresumerate = config.get("simul.suspendresume-rate")
numnodes = config.get("simul.nodes")
capacity = self.parse_resources_string(config.get("simul.resources"))
- self.nodes = [Node(self.resourcepool, i+1, "simul-%i" % (i+1), capacity) for i in range(numnodes)]
+ self.nodes = [Node(i+1, "simul-%i" % (i+1), capacity) for i in range(numnodes)]
for n in self.nodes:
n.enactment_info = n.nod_id
Modified: trunk/src/haizea/resourcemanager/enact/simulated/vm.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/enact/simulated/vm.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,13 +16,13 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.resourcemanager.enact.base import VMEnactmentBase
+from haizea.resourcemanager.enact import VMEnactment
import haizea.common.constants as constants
import logging
-class VMEnactment(VMEnactmentBase):
- def __init__(self, resourcepool):
- VMEnactmentBase.__init__(self, resourcepool)
+class SimulatedVMEnactment(VMEnactment):
+ def __init__(self):
+ VMEnactment.__init__(self)
self.logger = logging.getLogger("ENACT.SIMUL.VM")
def start(self, action):
Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -19,7 +19,7 @@
import haizea.common.constants as constants
from haizea.resourcemanager.frontends import RequestFrontend
from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
-from haizea.common.utils import UNIX2DateTime, round_datetime
+from haizea.common.utils import UNIX2DateTime, round_datetime, get_config, get_clock
from pysqlite2 import dbapi2 as sqlite
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
@@ -71,8 +71,7 @@
elif start[0] == "+":
# Relative time
# The following is just for testing:
- from haizea.resourcemanager.rm import ResourceManager
- now = ResourceManager.get_singleton().clock.get_time()
+ now = get_clock().get_time()
start = round_datetime(now + ISO.ParseTime(start[1:]))
# Should be:
#start = round_datetime(self.get_submit_time() + ISO.ParseTime(start[1:]))
@@ -129,7 +128,7 @@
self.rm = rm
self.processed = []
self.logger = logging.getLogger("ONEREQ")
- config = self.rm.config
+ config = get_config()
self.conn = sqlite.connect(config.get("one.db"))
self.conn.row_factory = sqlite.Row
Modified: trunk/src/haizea/resourcemanager/frontends/rpc.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/rpc.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/frontends/rpc.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -18,7 +18,7 @@
import haizea.common.constants as constants
from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
from haizea.resourcemanager.frontends import RequestFrontend
-from haizea.common.utils import round_datetime
+from haizea.common.utils import round_datetime, get_config, get_clock
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
import logging
@@ -31,7 +31,7 @@
self.rm = rm
self.logger = logging.getLogger("RPCREQ")
self.accumulated = []
- config = self.rm.config
+ config = get_config()
self.rm.rpc_server.register_rpc(self.create_lease)
def get_accumulated_requests(self):
@@ -43,20 +43,20 @@
return True
def create_lease(self, start, duration, preemptible, numnodes, cpu, mem, vmimage, vmimagesize):
- tSubmit = round_datetime(self.rm.clock.get_time())
+ tSubmit = round_datetime(get_clock().get_time())
resreq = ResourceTuple.create_empty()
resreq.set_by_type(constants.RES_CPU, float(cpu))
resreq.set_by_type(constants.RES_MEM, int(mem))
- if duration == HAIZEA_DURATION_UNLIMITED:
+ if duration == RPCFrontend.HAIZEA_DURATION_UNLIMITED:
# This is an interim solution (make it run for a century).
# TODO: Integrate concept of unlimited duration in the lease datastruct
duration = DateTimeDelta(36500)
else:
duration = ISO.ParseTimeDelta(duration)
- if start == HAIZEA_START_NOW:
+ if start == RPCFrontend.HAIZEA_START_NOW:
leasereq = ImmediateLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
- elif start == HAIZEA_START_BESTEFFORT:
+ elif start == RPCFrontend.HAIZEA_START_BESTEFFORT:
leasereq = BestEffortLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
else:
if start[0] == "+":
Modified: trunk/src/haizea/resourcemanager/frontends/tracefile.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/tracefile.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/frontends/tracefile.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -17,6 +17,7 @@
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
+from haizea.common.utils import get_clock
from haizea.resourcemanager.frontends import RequestFrontend
import haizea.traces.readers as tracereaders
from haizea.resourcemanager.datastruct import ARLease, BestEffortLease
@@ -78,7 +79,7 @@
# "accumulated requests". Rather, we just take whatever
# requests are in the trace up to the current time
# reported by the resource manager
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
nowreq = [r for r in self.requests if r.submit_time <= time]
self.requests = [r for r in self.requests if r.submit_time > time]
return nowreq
Modified: trunk/src/haizea/resourcemanager/resourcepool.py
===================================================================
--- trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/resourcepool.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -16,38 +16,27 @@
# limitations under the License. #
# -------------------------------------------------------------------------- #
-from haizea.common.utils import vnodemapstr
+from haizea.common.utils import vnodemapstr, get_accounting
import haizea.common.constants as constants
import haizea.resourcemanager.enact.actions as actions
import logging
+class FailedEnactmentException(Exception):
+ pass
+
class ResourcePool(object):
- def __init__(self, scheduler):
- self.scheduler = scheduler
- self.rm = scheduler.rm
+ def __init__(self, info_enact, vm_enact, deploy_enact):
self.logger = logging.getLogger("RPOOL")
- self.info = None
- self.vm = None
- self.deployment = None
-
- self.load_enactment_modules()
+ self.info = info_enact
+ self.vm = vm_enact
+ # TODO: Ideally, deployment enactment shouldn't be here, specially since
+ # it already "hangs" below the deployment modules. For now,
+ # it does no harm, though.
+ self.deployment = deploy_enact
self.nodes = self.info.get_nodes()
-
- def load_enactment_modules(self):
- mode = self.rm.config.get("mode")
- try:
- exec "import %s.%s as enact" % (constants.ENACT_PACKAGE, mode)
- self.info = enact.info(self) #IGNORE:E0602
- self.vm = enact.vm(self) #IGNORE:E0602
- self.deployment = enact.deployment(self) #IGNORE:E0602
- except Exception, msg:
- self.logger.error("Unable to load enactment modules for mode '%s'" % mode)
- raise
-
-
def start_vms(self, lease, rr):
start_action = actions.VMEnactmentStartAction()
start_action.from_rr(rr)
@@ -63,7 +52,7 @@
self.vm.start(start_action)
except Exception, msg:
self.logger.error("Enactment of start VM failed: %s" % msg)
- self.scheduler.fail_lease(lease.id)
+ raise FailedEnactmentException()
def stop_vms(self, lease, rr):
stop_action = actions.VMEnactmentStopAction()
@@ -72,7 +61,7 @@
self.vm.stop(stop_action)
except Exception, msg:
self.logger.error("Enactment of end VM failed: %s" % msg)
- self.rm.fail_lease(lease)
+ raise FailedEnactmentException()
def suspend_vms(self, lease, rr):
# Add memory image files
@@ -87,7 +76,7 @@
self.vm.suspend(suspend_action)
except Exception, msg:
self.logger.error("Enactment of suspend VM failed: %s" % msg)
- self.rm.fail_lease(lease)
+ raise FailedEnactmentException()
def verify_suspend(self, lease, rr):
verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
@@ -107,7 +96,7 @@
self.vm.resume(resume_action)
except Exception, msg:
self.logger.error("Enactment of resume VM failed: %s" % msg)
- self.rm.fail_lease(lease)
+ raise FailedEnactmentException()
def verify_resume(self, lease, rr):
verify_resume_action = actions.VMEnactmentConfirmResumeAction()
@@ -144,7 +133,7 @@
self.logger.vdebug("Files AFTER:")
self.get_node(pnode).print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
return img
def remove_diskimage(self, pnode, lease, vnode):
@@ -156,7 +145,7 @@
node.print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
def add_ramfile(self, pnode, lease_id, vnode, size):
node = self.get_node(pnode)
@@ -165,7 +154,7 @@
f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
node.add_file(f)
node.print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
def remove_ramfile(self, pnode, lease_id, vnode):
node = self.get_node(pnode)
@@ -173,15 +162,14 @@
node.print_files()
node.remove_ramfile(lease_id, vnode)
node.print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
def get_max_disk_usage(self):
return max([n.get_disk_usage() for n in self.nodes])
class Node(object):
- def __init__(self, resourcepool, nod_id, hostname, capacity):
+ def __init__(self, nod_id, hostname, capacity):
self.logger = logging.getLogger("RESOURCEPOOL")
- self.resourcepool = resourcepool
self.nod_id = nod_id
self.hostname = hostname
self.capacity = capacity
@@ -296,7 +284,7 @@
self.logger.vdebug("Files AFTER:")
self.get_node(pnode).print_files()
- self.rm.accounting.append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
+ get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
return img
def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
@@ -322,13 +310,13 @@
class NodeWithReusableImages(Node):
- def __init__(self, resourcepool, nod_id, hostname, capacity):
- Node.__init__(self, resourcepool, nod_id, hostname, capacity)
+ def __init__(self, nod_id, hostname, capacity):
+ Node.__init__(self, nod_id, hostname, capacity)
self.reusable_images = []
@classmethod
def from_node(cls, n):
- node = cls(n.resourcepool, n.nod_id, n.hostname, n.capacity)
+ node = cls(n.nod_id, n.hostname, n.capacity)
node.enactment_info = n.enactment_info
return node
Modified: trunk/src/haizea/resourcemanager/rm.py
===================================================================
--- trunk/src/haizea/resourcemanager/rm.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/rm.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -33,11 +33,21 @@
import haizea.resourcemanager.accounting as accounting
import haizea.common.constants as constants
+import haizea.resourcemanager.enact as enact
+from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeploymentScheduler
+from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeploymentScheduler
+from haizea.resourcemanager.enact.opennebula.info import OpenNebulaResourcePoolInfo
+from haizea.resourcemanager.enact.opennebula.vm import OpenNebulaVMEnactment
+from haizea.resourcemanager.enact.simulated.info import SimulatedResourcePoolInfo
+from haizea.resourcemanager.enact.simulated.vm import SimulatedVMEnactment
+from haizea.resourcemanager.enact.simulated.deployment import SimulatedDeploymentEnactment
from haizea.resourcemanager.frontends.tracefile import TracefileFrontend
from haizea.resourcemanager.frontends.opennebula import OpenNebulaFrontend
from haizea.resourcemanager.frontends.rpc import RPCFrontend
-from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease
+from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceTuple
from haizea.resourcemanager.scheduler import Scheduler
+from haizea.resourcemanager.slottable import SlotTable
+from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
from haizea.resourcemanager.rpcserver import RPCServer
from haizea.common.utils import abstract, round_datetime, Singleton
@@ -78,49 +88,125 @@
mode = config.get("mode")
clock = config.get("clock")
- if mode == "simulated" and clock == constants.CLOCK_SIMULATED:
- # Simulations always run in the foreground
- self.daemon = False
+ self.daemon = daemon
+ self.pidfile = pidfile
+
+ if mode == "simulated":
+ self.init_simulated_mode()
+ elif mode == "opennebula":
+ self.init_opennebula_mode()
- self.init_logging()
+ # Statistics collection
+ self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
+
+ self.logger = logging.getLogger("RM")
- # The clock
- starttime = config.get("starttime")
+ def init_simulated_mode(self):
+ """Initializes the resource manager in simulated mode
+
+ """
+
+ # Simulations always run in the foreground
+ self.daemon = False
+
+ self.init_logging()
+
+ # The clock
+ if self.clock == constants.CLOCK_SIMULATED:
+ starttime = self.config.get("starttime")
self.clock = SimulatedClock(self, starttime)
self.rpc_server = None
- elif mode == "opennebula" or (mode == "simulated" and clock == constants.CLOCK_REAL):
- self.daemon = daemon
- self.pidfile = pidfile
-
- self.init_logging()
-
- # The clock
- wakeup_interval = config.get("wakeup-interval")
- non_sched = config.get("non-schedulable-interval")
+ elif self.clock == constants.CLOCK_REAL:
+ wakeup_interval = self.config.get("wakeup-interval")
+ non_sched = self.config.get("non-schedulable-interval")
self.clock = RealClock(self, wakeup_interval, non_sched)
-
- # RPC server
self.rpc_server = RPCServer(self)
+
+ # Enactment modules
+ info_enact = SimulatedResourcePoolInfo()
+ vm_enact = SimulatedVMEnactment()
+ deploy_enact = SimulatedDeploymentEnactment()
+
+ # Resource pool
+ if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
+ resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
+ else:
+ resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
+ # Slot table
+ slottable = SlotTable()
+
+ # Deployment scheduler
+ deploy_type = self.config.get("lease-preparation")
+ if deploy_type == constants.DEPLOYMENT_UNMANAGED:
+ deployment_scheduler = UnmanagedDeploymentScheduler(slottable, resourcepool, deploy_enact)
+ elif deploy_type == constants.DEPLOYMENT_TRANSFER:
+ deployment_scheduler = ImageTransferDeploymentScheduler(slottable, resourcepool, deploy_enact)
+
# Scheduler
- self.scheduler = Scheduler(self)
-
+ self.scheduler = Scheduler(self, slottable, resourcepool, deployment_scheduler)
+
+ # TODO: Having the slot table contained in the deployment scheduler, and also
+ # in the "main" scheduler (which itself contains the same slot table) is far
+ # from ideal, although this is mostly a consequence of the Scheduler class
+ # being in need of some serious refactoring. This will be fixed (see Scheduler
+ # class comments for more details)
+
# Lease request frontends
- if mode == "simulated" and clock == constants.CLOCK_SIMULATED:
+ if self.clock == constants.CLOCK_SIMULATED:
# In pure simulation, we can only use the tracefile frontend
self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
- elif mode == "simulated" and clock == constants.CLOCK_REAL:
+ elif self.clock == constants.CLOCK_REAL:
# In simulation with a real clock, only the RPC frontend can be used
- self.frontends = [RPCFrontend(self)]
- elif mode == "opennebula":
- self.frontends = [OpenNebulaFrontend(self)]
+ self.frontends = [RPCFrontend(self)]
+
+ def init_opennebula_mode(self):
+ """Initializes the resource manager in OpenNebula mode
+
+ """
+ self.init_logging()
+
+ # The clock
+ wakeup_interval = self.config.get("wakeup-interval")
+ non_sched = self.config.get("non-schedulable-interval")
+ self.clock = RealClock(self, wakeup_interval, non_sched, True)
+
+ # RPC server
+ self.rpc_server = None
+
+ # Enactment modules
+ info_enact = OpenNebulaResourcePoolInfo()
+ vm_enact = OpenNebulaVMEnactment()
+ # No deployment in OpenNebula. Using simulated one for now.
+ deploy_enact = SimulatedDeploymentEnactment()
- # Statistics collection
- self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
+ # Slot table
+ slottable = SlotTable()
+
+ # Resource pool
+ resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
+
+ # Deployment module
+ deployment = UnmanagedDeploymentScheduler(slottable, resourcepool, deploy_enact)
+
+ # Scheduler
+ self.scheduler = Scheduler(slottable, resourcepool, deployment)
+
+ # TODO: Having the slot table contained in the deployment scheduler, and also
+ # in the "main" scheduler (which itself contains the same slot table) is far
+ # from ideal, although this is mostly a consequence of the Scheduler class
+ # being in need of some serious refactoring. This will be fixed (see Scheduler
+ # class comments for more details)
+
+ # Lease request frontends
+ self.frontends = [OpenNebulaFrontend(self)]
- self.logger = logging.getLogger("RM")
+
+ def init_logging(self):
+ """Initializes logging
+
+ """
- def init_logging(self):
from haizea.resourcemanager.log import HaizeaLogger
logger = logging.getLogger("")
if self.daemon:
@@ -133,8 +219,6 @@
level = logging.getLevelName(self.config.get("loglevel"))
logger.setLevel(level)
logging.setLoggerClass(HaizeaLogger)
- #if self.daemon:
- # logger.info("Logging to file %s" % self.config.get("logfile"))
def daemonize(self):
Modified: trunk/src/haizea/resourcemanager/scheduler.py
===================================================================
--- trunk/src/haizea/resourcemanager/scheduler.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/scheduler.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -18,25 +18,28 @@
"""This module provides the main classes for Haizea's scheduler, particularly
-the Scheduler class. Note that this class includes high-level scheduling
-constructs, and that most of the magic happens in the slottable module. The
-deployment scheduling code (everything that has to be done to prepare a lease)
-happens in the modules inside the haizea.resourcemanager.deployment package.
+the Scheduler class. The deployment scheduling code (everything that has to be
+done to prepare a lease) happens in the modules inside the
+haizea.resourcemanager.deployment package.
This module provides the following classes:
* SchedException: A scheduling exception
* ReservationEventHandler: A simple wrapper class
* Scheduler: Do I really need to spell this one out for you?
+
+TODO: The Scheduler class is in need of some serious refactoring. The likely outcome is
+that it will be divided into two classes: LeaseScheduler, which handles top-level
+lease constructs and doesn't interact with the slot table, and VMScheduler, which
+actually schedules the VMs. The slot table would be contained in VMScheduler and
+in the lease preparation scheduler. In turn, these two would be contained in
+LeaseScheduler.
"""
import haizea.resourcemanager.datastruct as ds
import haizea.common.constants as constants
-from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time
+from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, get_config, get_accounting, get_clock
from haizea.resourcemanager.slottable import SlotTable, SlotFittingException
-from haizea.resourcemanager.deployment.unmanaged import UnmanagedDeployment
-from haizea.resourcemanager.deployment.predeployed import PredeployedImagesDeployment
-from haizea.resourcemanager.deployment.imagetransfer import ImageTransferDeployment
from haizea.resourcemanager.datastruct import Lease, ARLease, BestEffortLease, ImmediateLease, ResourceReservation, VMResourceReservation
from haizea.resourcemanager.resourcepool import ResourcePool, ResourcePoolWithReusableImages
from operator import attrgetter, itemgetter
@@ -48,6 +51,25 @@
"""A simple exception class used for scheduling exceptions"""
pass
+class NotSchedulableException(Exception):
+ """A simple exception class used when a lease cannot be scheduled
+
+ This exception must be raised when a lease cannot be scheduled
+ (this is not necessarily an error condition, but the scheduler will
+ have to react to it)
+ """
+ pass
+
+class CriticalSchedException(Exception):
+ """A simple exception class used for critical scheduling exceptions
+
+ This exception must be raised when a non-recoverable error happens
+ (e.g., when there are unexplained inconsistencies in the schedule,
+ typically resulting from a code error)
+ """
+ pass
+
+
class ReservationEventHandler(object):
"""A wrapper for reservation event handlers.
@@ -78,21 +100,19 @@
_handle_* -- Reservation event handlers
"""
- def __init__(self, rm):
- self.rm = rm
+ def __init__(self, slottable, resourcepool, deployment_scheduler):
+ self.slottable = slottable
+ self.resourcepool = resourcepool
+ self.deployment_scheduler = deployment_scheduler
self.logger = logging.getLogger("SCHED")
- if self.rm.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
- self.resourcepool = ResourcePoolWithReusableImages(self)
- else:
- self.resourcepool = ResourcePool(self)
- self.slottable = SlotTable(self)
+
self.queue = ds.Queue(self)
self.leases = ds.LeaseTable(self)
self.completedleases = ds.LeaseTable(self)
-
+
for n in self.resourcepool.get_nodes() + self.resourcepool.get_aux_nodes():
self.slottable.add_node(n)
-
+
self.handlers = {}
self.register_handler(type = ds.VMResourceReservation,
@@ -106,16 +126,11 @@
self.register_handler(type = ds.ResumptionResourceReservation,
on_start = Scheduler._handle_start_resume,
on_end = Scheduler._handle_end_resume)
-
- deploy_type = self.rm.config.get("lease-preparation")
- if deploy_type == constants.DEPLOYMENT_UNMANAGED:
- self.deployment = UnmanagedDeployment(self)
- elif deploy_type == constants.DEPLOYMENT_PREDEPLOY:
- self.deployment = PredeployedImagesDeployment(self)
- elif deploy_type == constants.DEPLOYMENT_TRANSFER:
- self.deployment = ImageTransferDeployment(self)
+
+ for (type, handler) in self.deployment_scheduler.handlers.items():
+ self.handlers[type] = handler
- backfilling = self.rm.config.get("backfilling")
+ backfilling = get_config().get("backfilling")
if backfilling == constants.BACKFILLING_OFF:
self.maxres = 0
elif backfilling == constants.BACKFILLING_AGGRESSIVE:
@@ -123,10 +138,10 @@
elif backfilling == constants.BACKFILLING_CONSERVATIVE:
self.maxres = 1000000 # Arbitrarily large
elif backfilling == constants.BACKFILLING_INTERMEDIATE:
- self.maxres = self.rm.config.get("backfilling-reservations")
+ self.maxres = get_config().get("backfilling-reservations")
self.numbesteffortres = 0
-
+
def schedule(self, nexttime):
pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)
ar_leases = [req for req in pending_leases if isinstance(req, ARLease)]
@@ -160,7 +175,7 @@
self.handlers[type(rr)].on_start(self, rr.lease, rr)
util = self.slottable.getUtilization(nowtime)
- self.rm.accounting.append_stat(constants.COUNTER_CPUUTILIZATION, util)
+ get_accounting().append_stat(constants.COUNTER_CPUUTILIZATION, util)
def register_handler(self, type, on_start, on_end):
handler = ReservationEventHandler(on_start=on_start, on_end=on_end)
@@ -168,7 +183,7 @@
def enqueue(self, lease_req):
"""Queues a best-effort lease request"""
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
lease_req.state = Lease.STATE_QUEUED
self.queue.enqueue(lease_req)
self.logger.info("Received (and queueing) best-effort lease request #%i, %i nodes for %s." % (lease_req.id, lease_req.numnodes, lease_req.duration.requested))
@@ -198,7 +213,7 @@
Arguments:
lease_id -- ID of lease to cancel
"""
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
self.logger.info("Cancelling lease %i..." % lease_id)
if self.leases.has_lease(lease_id):
@@ -241,12 +256,10 @@
self.cancel_lease(lease_id)
except Exception, msg:
# Exit if something goes horribly wrong
- self.logger.error("Exception when failing lease %i. Dumping state..." % lease_id)
- self.rm.print_stats(logging.getLevelName("ERROR"), verbose=True)
- raise
+ raise CriticalSchedException()
def notify_event(self, lease_id, event):
- time = self.rm.clock.get_time()
+ time = get_clock().get_time()
if event == constants.EVENT_END_VM:
lease = self.leases.get_lease(lease_id)
rr = lease.get_active_reservations(time)[0]
@@ -263,7 +276,7 @@
try:
self.__schedule_ar_lease(lease_req, avoidpreempt=True, nexttime=nexttime)
self.leases.add(lease_req)
- self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
# Our first try avoided preemption, try again
@@ -274,10 +287,10 @@
self.logger.debug("LEASE-%i Trying again without avoiding preemption" % lease_req.id)
self.__schedule_ar_lease(lease_req, nexttime, avoidpreempt=False)
self.leases.add(lease_req)
- self.rm.accounting.incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease_req.id)
accepted = True
except SchedException, msg:
- self.rm.accounting.incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease_req.id)
self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
if accepted:
@@ -301,7 +314,7 @@
self.logger.debug(" ResReq : %s" % lease_req.requested_resources)
self.__schedule_besteffort_lease(lease_req, nexttime)
self.leases.add(lease_req)
- self.rm.accounting.decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
+ get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease_req.id)
except SchedException, msg:
# Put back on queue
newqueue.enqueue(lease_req)
@@ -324,10 +337,10 @@
try:
self.__schedule_immediate_lease(lease_req, nexttime=nexttime)
self.leases.add(lease_req)
- self.rm.accounting.incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease_req.id)
self.logger.info("Immediate lease request #%i has been accepted." % lease_req.id)
except SchedException, msg:
- self.rm.accounting.incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
+ get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease_req.id)
self.logger.debug("LEASE-%i Scheduling exception: %s" % (lease_req.id, msg))
@@ -348,7 +361,7 @@
vmrr.state = ResourceReservation.STATE_SCHEDULED
# Schedule deployment overhead
- self.deployment.schedule(lease_req, vmrr, nexttime)
+ self.deployment_scheduler.schedule(lease_req, vmrr, nexttime)
# Commit reservation to slot table
# (we don't do this until the very end because the deployment overhead
@@ -367,7 +380,7 @@
# Schedule deployment
if lease.state != Lease.STATE_SUSPENDED:
- self.deployment.schedule(lease, vmrr, nexttime)
+ self.deployment_scheduler.schedule(lease, vmrr, nexttime)
else:
# TODO: schedule migrations
pass
@@ -413,7 +426,7 @@
try:
(resmrr, vmrr, susprr, reservation) = self.__fit_asap(req, nexttime, allow_reservation_in_future=False)
# Schedule deployment
- self.deployment.schedule(req, vmrr, nexttime)
+ self.deployment_scheduler.schedule(req, vmrr, nexttime)
req.append_rr(vmrr)
self.slottable.addReservation(vmrr)
@@ -556,7 +569,7 @@
requested_resources = lease.requested_resources
preemptible = lease.preemptible
mustresume = (lease.state == Lease.STATE_SUSPENDED)
- susptype = self.rm.config.get("suspension")
+ susptype = get_config().get("suspension")
if susptype == constants.SUSPENSION_NONE or (susptype == constants.SUSPENSION_SERIAL and lease.numnodes == 1):
suspendable = False
else:
@@ -566,14 +579,14 @@
if lease.state == Lease.STATE_QUEUED:
# Figure out earliest start times based on
# image schedule and reusable images
- earliest = self.deployment.find_earliest_starting_times(lease, nexttime)
+ earliest = self.deployment_scheduler.find_earliest_starting_times(lease, nexttime)
elif lease.state == Lease.STATE_SUSPENDED:
# No need to transfer images from repository
# (only intra-node transfer)
earliest = dict([(node+1, [nexttime, constants.REQTRANSFER_NO, None]) for node in range(lease.numnodes)])
- canmigrate = self.rm.config.get("migration")
+ canmigrate = get_config().get("migration")
#
# STEP 1: FIGURE OUT THE MINIMUM DURATION
@@ -611,7 +624,7 @@
# If we have to resume this lease, make sure that
# we have enough time to transfer the images.
migratetime = self.__estimate_migration_time(lease)
- earliesttransfer = self.rm.clock.get_time() + migratetime
+ earliesttransfer = get_clock().get_time() + migratetime
for n in earliest:
earliest[n][0] = max(earliest[n][0], earliesttransfer)
@@ -981,9 +994,9 @@
# TODO: The deployment module should just provide a list of nodes
# it prefers
nodeswithimg=[]
- #self.lease_deployment_type = self.rm.config.get("lease-preparation")
+ #self.lease_deployment_type = get_config().get("lease-preparation")
#if self.lease_deployment_type == constants.DEPLOYMENT_TRANSFER:
- # reusealg = self.rm.config.get("diskimage-reuse")
+ # reusealg = get_config().get("diskimage-reuse")
# if reusealg==constants.REUSE_IMAGECACHES:
# nodeswithimg = self.resourcepool.getNodesWithImgInPool(diskImageID, start)
@@ -1166,17 +1179,17 @@
# self.slottable.removeReservation(susprr)
for vnode, pnode in lease.vmimagemap.items():
self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
- self.deployment.cancel_deployment(lease)
+ self.deployment_scheduler.cancel_deployment(lease)
lease.vmimagemap = {}
# TODO: Change state back to queued
self.queue.enqueue_in_order(lease)
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
else:
- susptype = self.rm.config.get("suspension")
+ susptype = get_config().get("suspension")
timebeforesuspend = preemption_time - vmrr.start
# TODO: Determine if it is in fact the initial VMRR or not. Right now
# we conservatively overestimate
- canmigrate = self.rm.config.get("migration")
+ canmigrate = get_config().get("migration")
suspendthreshold = lease.get_suspend_threshold(initial=False, suspendrate=suspendresumerate, migrating=canmigrate)
# We can't suspend if we're under the suspend threshold
suspendable = timebeforesuspend >= suspendthreshold
@@ -1200,11 +1213,11 @@
self.slottable.removeReservation(resmrr)
for vnode, pnode in lease.vmimagemap.items():
self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
- self.deployment.cancel_deployment(lease)
+ self.deployment_scheduler.cancel_deployment(lease)
lease.vmimagemap = {}
# TODO: Change state back to queued
self.queue.enqueue_in_order(lease)
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.logger.vdebug("Lease after preemption:")
lease.print_contents()
@@ -1293,11 +1306,11 @@
if l.state == Lease.STATE_READY:
l.state = Lease.STATE_ACTIVE
rr.state = ResourceReservation.STATE_ACTIVE
- now_time = self.rm.clock.get_time()
+ now_time = get_clock().get_time()
l.start.actual = now_time
try:
- self.deployment.check(l, rr)
+ self.deployment_scheduler.check(l, rr)
self.resourcepool.start_vms(l, rr)
# The next two lines have to be moved somewhere more
# appropriate inside the resourcepool module
@@ -1322,7 +1335,7 @@
self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
self.logger.vdebug("LEASE-%i Before:" % l.id)
l.print_contents()
- now_time = round_datetime(self.rm.clock.get_time())
+ now_time = round_datetime(get_clock().get_time())
diff = now_time - rr.start
l.duration.accumulate_duration(diff)
rr.state = ResourceReservation.STATE_DONE
@@ -1333,9 +1346,8 @@
l.end = now_time
self.completedleases.add(l)
self.leases.remove(l)
- self.deployment.cleanup(l, rr)
if isinstance(l, ds.BestEffortLease):
- self.rm.accounting.incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
+ get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
if isinstance(l, ds.BestEffortLease):
if rr.backfill_reservation == True:
@@ -1353,9 +1365,9 @@
for r in rrs:
l.remove_rr(r)
self.slottable.removeReservation(r)
- rr.end = self.rm.clock.get_time()
+ rr.end = get_clock().get_time()
self._handle_end_vm(l, rr, enact=enact)
- nexttime = self.rm.clock.get_next_schedulable_time()
+ nexttime = get_clock().get_next_schedulable_time()
if self.is_backfilling():
# We need to reevaluate the schedule to see if there are any future
# reservations that we can slide back.
@@ -1449,7 +1461,7 @@
self.slottable.removeReservation(rr)
def __enqueue_in_order(self, lease):
- self.rm.accounting.incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
+ get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
self.queue.enqueue_in_order(lease)
def __can_reserve_besteffort_in_future(self):
Modified: trunk/src/haizea/resourcemanager/slottable.py
===================================================================
--- trunk/src/haizea/resourcemanager/slottable.py 2008-09-15 16:53:14 UTC (rev 497)
+++ trunk/src/haizea/resourcemanager/slottable.py 2008-09-16 10:18:11 UTC (rev 498)
@@ -78,10 +78,7 @@
return cmp(self.key, other.key)
class SlotTable(object):
- def __init__(self, scheduler):
- self.scheduler = scheduler
- self.rm = scheduler.rm
- self.resourcepool = scheduler.resourcepool
+ def __init__(self):
self.logger = logging.getLogger("SLOT")
self.nodes = NodeList()
self.reservations = []
More information about the Haizea-commit
mailing list