[haizea-commit] r495 - in trunk/src/haizea/resourcemanager: enact enact/opennebula frontends
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Mon Sep 15 11:50:37 CDT 2008
Author: borja
Date: 2008-09-15 11:50:37 -0500 (Mon, 15 Sep 2008)
New Revision: 495
Modified:
trunk/src/haizea/resourcemanager/enact/actions.py
trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
trunk/src/haizea/resourcemanager/enact/opennebula/info.py
trunk/src/haizea/resourcemanager/frontends/opennebula.py
Log:
Add support for managing groups of OpenNebula VMs as a single lease.
Modified: trunk/src/haizea/resourcemanager/enact/actions.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/actions.py 2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/actions.py 2008-09-15 16:50:37 UTC (rev 495)
@@ -69,6 +69,14 @@
def __init__(self):
VMEnactmentAction.__init__(self)
+ def from_rr(self, rr):
+ VMEnactmentAction.from_rr(self, rr)
+ self.vnodes = dict([(k, v) for (k,v) in self.vnodes.items() if k in rr.vnodes])
+
class VMEnactmentConfirmResumeAction(VMEnactmentAction):
def __init__(self):
VMEnactmentAction.__init__(self)
+
+ def from_rr(self, rr):
+ VMEnactmentAction.from_rr(self, rr)
+ self.vnodes = dict([(k, v) for (k,v) in self.vnodes.items() if k in rr.vnodes])
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py 2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py 2008-09-15 16:50:37 UTC (rev 495)
@@ -18,8 +18,8 @@
from haizea.resourcemanager.enact.opennebula.info import ResourcePoolInfo
from haizea.resourcemanager.enact.opennebula.vm import VMEnactment
-from haizea.resourcemanager.enact.opennebula.storage import StorageEnactment
+from haizea.resourcemanager.enact.simulated.deployment import DeploymentEnactment
info=ResourcePoolInfo
-storage=StorageEnactment
+deployment=DeploymentEnactment
vm=VMEnactment
\ No newline at end of file
Modified: trunk/src/haizea/resourcemanager/enact/opennebula/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/info.py 2008-09-15 16:50:37 UTC (rev 495)
@@ -56,26 +56,18 @@
if oneattr2haizea.has_key(name):
capacity.set_by_type(oneattr2haizea[name], int(attr["value"]))
capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
+ capacity.set_by_type(constants.RES_MEM, capacity.get_by_type(constants.RES_MEM) / 1024.0)
node = Node(self.resourcepool, nod_id, hostname, capacity)
node.enactment_info = int(enactID)
self.nodes.append(node)
self.logger.info("Fetched %i nodes from ONE db" % len(self.nodes))
+ for n in self.nodes:
+ self.logger.debug("%i %s %s" % (n.nod_id, n.hostname, n.capacity))
- # Image repository nodes
- # TODO: No image transfers in OpenNebula yet
- self.FIFOnode = None
- self.EDFnode = None
-
def get_nodes(self):
return self.nodes
- def get_edf_node(self):
- return self.EDFnode
-
- def get_fifo_node(self):
- return self.FIFOnode
-
def get_resource_types(self):
return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
(constants.RES_MEM, constants.RESTYPE_INT, "Mem"),
Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py 2008-09-15 16:50:37 UTC (rev 495)
@@ -17,31 +17,114 @@
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
-from haizea.resourcemanager.frontends.base import RequestFrontend
+from haizea.resourcemanager.frontends import RequestFrontend
from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
-from haizea.common.utils import UNIX2DateTime
+from haizea.common.utils import UNIX2DateTime, round_datetime
+
from pysqlite2 import dbapi2 as sqlite
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
-from haizea.common.utils import round_datetime
+
import operator
import logging
-HAIZEA_PARAM = "HAIZEA"
-HAIZEA_START = "START"
-HAIZEA_START_NOW = "now"
-HAIZEA_START_BESTEFFORT = "best_effort"
-HAIZEA_DURATION = "DURATION"
-HAIZEA_DURATION_UNLIMITED = "unlimited"
-HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
-HAIZEA_PREEMPTIBLE_YES = "yes"
-HAIZEA_PREEMPTIBLE_NO = "no"
+class OpenNebulaVM(object):
+ HAIZEA_PARAM = "HAIZEA"
+ HAIZEA_START = "START"
+ HAIZEA_START_NOW = "now"
+ HAIZEA_START_BESTEFFORT = "best_effort"
+ HAIZEA_DURATION = "DURATION"
+ HAIZEA_DURATION_UNLIMITED = "unlimited"
+ HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
+ HAIZEA_PREEMPTIBLE_YES = "yes"
+ HAIZEA_PREEMPTIBLE_NO = "no"
+ HAIZEA_GROUP = "GROUP"
+
+ ONE_CPU="CPU"
+ ONE_MEMORY="MEMORY"
+ ONE_DISK="DISK"
+ ONE_DISK_SOURCE="SOURCE"
+
+ def __init__(self, db_entry, attrs):
+ self.db_entry = db_entry
+ self.attrs = attrs
+
+ # If there is no HAIZEA parameter, the default is to treat the
+ # request as an immediate request with unlimited duration
+ if not attrs.has_key(OpenNebulaVM.HAIZEA_PARAM):
+ self.haizea_param = {OpenNebulaVM.HAIZEA_START: OpenNebulaVM.HAIZEA_START_NOW,
+ OpenNebulaVM.HAIZEA_DURATION: OpenNebulaVM.HAIZEA_DURATION_UNLIMITED,
+ OpenNebulaVM.HAIZEA_PREEMPTIBLE: OpenNebulaVM.HAIZEA_PREEMPTIBLE_NO}
+ else:
+ self.haizea_param = self.__get_vector_value(attrs[OpenNebulaVM.HAIZEA_PARAM])
-ONE_CPU="CPU"
-ONE_MEMORY="MEMORY"
-ONE_DISK="DISK"
-ONE_DISK_SOURCE="SOURCE"
+
+ def is_grouped(self):
+ return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_GROUP)
+
+ def has_haizea_params(self):
+ return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_START)
+
+ def get_start(self):
+ start = self.haizea_param[OpenNebulaVM.HAIZEA_START]
+ if start == OpenNebulaVM.HAIZEA_START_NOW or start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
+ pass # Leave start as it is
+ elif start[0] == "+":
+ # Relative time
+ # The following is just for testing:
+ from haizea.resourcemanager.rm import ResourceManager
+ now = ResourceManager.get_singleton().clock.get_time()
+ start = round_datetime(now + ISO.ParseTime(start[1:]))
+ # Should be:
+ #start = round_datetime(self.get_submit_time() + ISO.ParseTime(start[1:]))
-class OpenNebulaFrontend(RequestFrontend):
+ else:
+ start = ISO.ParseDateTime(start)
+
+ return start
+
+ def get_duration(self):
+ duration = self.haizea_param[OpenNebulaVM.HAIZEA_DURATION]
+ if duration == OpenNebulaVM.HAIZEA_DURATION_UNLIMITED:
+ # This is an interim solution (make it run for a century).
+ # TODO: Integrate concept of unlimited duration in the lease datastruct
+ duration = DateTimeDelta(36500)
+ else:
+ duration = ISO.ParseTimeDelta(duration)
+ return duration
+
+ def get_preemptible(self):
+ preemptible = self.haizea_param[OpenNebulaVM.HAIZEA_PREEMPTIBLE]
+ return (preemptible == OpenNebulaVM.HAIZEA_PREEMPTIBLE_YES)
+
+ def get_group(self):
+ return self.haizea_param[OpenNebulaVM.HAIZEA_GROUP]
+
+ def get_submit_time(self):
+ return UNIX2DateTime(self.db_entry["stime"])
+
+ def get_diskimage(self):
+ disk = self.__get_vector_value(self.attrs[OpenNebulaVM.ONE_DISK])
+ diskimage = disk[OpenNebulaVM.ONE_DISK_SOURCE]
+ return diskimage
+
+ def get_diskimage_size(self):
+ return 0 # OpenNebula doesn't provide this
+
+ def get_resource_requirements(self):
+ resreq = ResourceTuple.create_empty()
+ resreq.set_by_type(constants.RES_CPU, float(self.attrs[OpenNebulaVM.ONE_CPU]))
+ resreq.set_by_type(constants.RES_MEM, int(self.attrs[OpenNebulaVM.ONE_MEMORY]))
+ return resreq
+
+ def get_oneid(self):
+ return int(self.db_entry["oid"])
+
+ def __get_vector_value(self, value):
+ return dict([n.split("=") for n in value.split(",")])
+
+
+class OpenNebulaFrontend(RequestFrontend):
+
def __init__(self, rm):
self.rm = rm
self.processed = []
@@ -51,111 +134,83 @@
self.conn = sqlite.connect(config.get("one.db"))
self.conn.row_factory = sqlite.Row
- def getAccumulatedRequests(self):
+ def get_accumulated_requests(self):
cur = self.conn.cursor()
processed = ",".join([`p` for p in self.processed])
cur.execute("select * from vmpool where state=1 and oid not in (%s)" % processed)
- openNebulaReqs = cur.fetchall()
- requests = []
- for req in openNebulaReqs:
- cur.execute("select * from vm_template where id=%i" % req["oid"])
+ db_opennebula_vms = cur.fetchall()
+
+ # Extract the pending OpenNebula VMs
+ opennebula_vms = [] # (ONE VM, ONE VM template attributes, ONE Haizea parameter)
+ for vm in db_opennebula_vms:
+ cur.execute("select * from vm_template where id=%i" % vm["oid"])
template = cur.fetchall()
attrs = dict([(r["name"], r["value"]) for r in template])
- self.processed.append(req["oid"])
- requests.append(self.ONEreq2lease(req, attrs))
- requests.sort(key=operator.attrgetter("submit_time"))
- return requests
+ self.processed.append(vm["oid"])
+
+ opennebula_vms.append(OpenNebulaVM(vm, attrs))
+
+ grouped = [vm for vm in opennebula_vms if vm.is_grouped()]
+ not_grouped = [vm for vm in opennebula_vms if not vm.is_grouped()]
+
+ # Extract VM groups
+ group_ids = set([vm.get_group() for vm in grouped])
+ groups = {}
+ for group_id in group_ids:
+ groups[group_id] = [vm for vm in grouped if vm.get_group() == group_id]
+
+ lease_requests = []
+ for group_id, opennebula_vms in groups.items():
+ lease_requests.append(self.__ONEreqs_to_lease(opennebula_vms, group_id))
- def existsMoreRequests(self):
+ for opennebula_vm in not_grouped:
+ lease_requests.append(self.__ONEreqs_to_lease([opennebula_vm]))
+
+ lease_requests.sort(key=operator.attrgetter("submit_time"))
+ return lease_requests
+
+ def exists_more_requests(self):
return True
+
- def ONEreq2lease(self, req, attrs):
- # If there is no HAIZEA parameter, the default is to treat the
- # request as an immediate request with unlimited duration
- if not attrs.has_key(HAIZEA_PARAM):
- haizea_param = {HAIZEA_START: HAIZEA_START_NOW,
- HAIZEA_DURATION: HAIZEA_DURATION_UNLIMITED,
- HAIZEA_PREEMPTIBLE: HAIZEA_PREEMPTIBLE_NO}
- else:
- haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
- start = haizea_param[HAIZEA_START]
- if start == HAIZEA_START_NOW:
- return self.create_immediate_lease(req, attrs, haizea_param)
- elif start == HAIZEA_START_BESTEFFORT:
- return self.create_besteffort_lease(req, attrs, haizea_param)
- else:
- return self.create_ar_lease(req, attrs, haizea_param)
-
- def get_vector_value(self, value):
- return dict([n.split("=") for n in value.split(",")])
-
- def get_common_attrs(self, req, attrs, haizea_param):
- disk = self.get_vector_value(attrs[ONE_DISK])
- tSubmit = UNIX2DateTime(req["stime"])
- vmimage = disk[ONE_DISK_SOURCE]
- vmimagesize = 0
- numnodes = 1
- resreq = ResourceTuple.create_empty()
- resreq.set_by_type(constants.RES_CPU, float(attrs[ONE_CPU]))
- resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY]))
+ def __ONEreqs_to_lease(self, opennebula_vms, group_id=None):
+ # The vm_with_params is used to extract the HAIZEA parameters.
+ # (i.e., lease-wide attributes)
+ vm_with_params = self.__get_vm_with_params(opennebula_vms)
- duration = haizea_param[HAIZEA_DURATION]
- if duration == HAIZEA_DURATION_UNLIMITED:
- # This is an interim solution (make it run for a century).
- # TODO: Integrate concept of unlimited duration in the lease datastruct
- duration = DateTimeDelta(36500)
- else:
- duration = ISO.ParseTimeDelta(duration)
-
- preemptible = haizea_param[HAIZEA_PREEMPTIBLE]
- preemptible = (preemptible == HAIZEA_PREEMPTIBLE_YES)
+ # Per-lease attributes
+ start = vm_with_params.get_start()
+ duration = vm_with_params.get_duration()
+ preemptible = vm_with_params.get_preemptible()
+ numnodes = len(opennebula_vms)
- return tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible
-
- def create_besteffort_lease(self, req, attrs, haizea_param):
- tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
-
- leasereq = BestEffortLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
+ # Per-vnode attributes
+ # Since Haizea currently assumes homogeneous nodes in
+ # the lease, we will also use vm_with_params to extract the
+ # resource requirements.
+ # TODO: When Haizea is modified to support heterogeneous nodes,
+ # extract the resource requirements from each individual VM.
+ submit_time = vm_with_params.get_submit_time()
+ diskimage = vm_with_params.get_diskimage()
+ diskimagesize = vm_with_params.get_diskimage_size()
+ resource_requirements = vm_with_params.get_resource_requirements()
- # Enactment info should be changed to the "array id" when groups
- # are implemented in OpenNebula
- leasereq.enactment_info = int(req["oid"])
- # Only one node for now
- leasereq.vnode_enactment_info = {}
- leasereq.vnode_enactment_info[1] = int(req["oid"])
- return leasereq
-
- def create_ar_lease(self, req, attrs, haizea_param):
- tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
-
- start = haizea_param[HAIZEA_START]
- if start[0] == "+":
- # Relative time
- # For testing, should be:
- # tStart = tSubmit + ISO.ParseTime(tStart[1:])
- start = round_datetime(self.rm.clock.get_time() + ISO.ParseTime(start[1:]))
+ if start == OpenNebulaVM.HAIZEA_START_NOW:
+ lease = ImmediateLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+ elif start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
+ lease = BestEffortLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
else:
- start = ISO.ParseDateTime(start)
- leasereq = ARLease(tSubmit, start, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
-
- # Enactment info should be changed to the "array id" when groups
- # are implemented in OpenNebula
- leasereq.enactmentInfo = int(req["oid"])
- # Only one node for now
- leasereq.vnode_enactment_info = {}
- leasereq.vnode_enactment_info[1] = int(req["oid"])
- return leasereq
-
- def create_immediate_lease(self, req, attrs, haizea_param):
- tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
-
- leasereq = ImmediateLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
-
- # Enactment info should be changed to the "array id" when groups
- # are implemented in OpenNebula
- leasereq.enactment_info = int(req["oid"])
- # Only one node for now
- leasereq.vnode_enactment_info = {}
- leasereq.vnode_enactment_info[1] = int(req["oid"])
- return leasereq
-
\ No newline at end of file
+ lease = ARLease(submit_time, start, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+
+ lease.enactment_info = group_id
+ lease.vnode_enactment_info = dict([(i+1,vm.get_oneid()) for i, vm in enumerate(opennebula_vms)])
+ return lease
+
+ def __get_vm_with_params(self, opennebula_vms):
+ # One of the ONE VMs has the start, duration, and preemptible
+ # parameters. Find it.
+ vm_with_params = [vm for vm in opennebula_vms if vm.has_haizea_params()]
+ # There should only be one
+ # TODO: This is the ONE user's responsibility, but we should validate it
+ # nonetheless.
+ return vm_with_params[0]
More information about the Haizea-commit
mailing list