[haizea-commit] r495 - in trunk/src/haizea/resourcemanager: enact enact/opennebula frontends

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Mon Sep 15 11:50:37 CDT 2008


Author: borja
Date: 2008-09-15 11:50:37 -0500 (Mon, 15 Sep 2008)
New Revision: 495

Modified:
   trunk/src/haizea/resourcemanager/enact/actions.py
   trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
   trunk/src/haizea/resourcemanager/enact/opennebula/info.py
   trunk/src/haizea/resourcemanager/frontends/opennebula.py
Log:
Add support for managing groups of OpenNebula VMs as a single lease.

Modified: trunk/src/haizea/resourcemanager/enact/actions.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/actions.py	2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/actions.py	2008-09-15 16:50:37 UTC (rev 495)
@@ -69,6 +69,14 @@
     def __init__(self):
         VMEnactmentAction.__init__(self)
 
+    def from_rr(self, rr):
+        VMEnactmentAction.from_rr(self, rr)
+        self.vnodes = dict([(k, v) for (k,v) in self.vnodes.items() if k in rr.vnodes])
+
 class VMEnactmentConfirmResumeAction(VMEnactmentAction):
     def __init__(self):
         VMEnactmentAction.__init__(self)
+
+    def from_rr(self, rr):
+        VMEnactmentAction.from_rr(self, rr)
+        self.vnodes = dict([(k, v) for (k,v) in self.vnodes.items() if k in rr.vnodes])

Modified: trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py	2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/__init__.py	2008-09-15 16:50:37 UTC (rev 495)
@@ -18,8 +18,8 @@
 
 from haizea.resourcemanager.enact.opennebula.info import ResourcePoolInfo
 from haizea.resourcemanager.enact.opennebula.vm import VMEnactment
-from haizea.resourcemanager.enact.opennebula.storage import StorageEnactment
+from haizea.resourcemanager.enact.simulated.deployment import DeploymentEnactment
 
 info=ResourcePoolInfo
-storage=StorageEnactment
+deployment=DeploymentEnactment
 vm=VMEnactment
\ No newline at end of file

Modified: trunk/src/haizea/resourcemanager/enact/opennebula/info.py
===================================================================
--- trunk/src/haizea/resourcemanager/enact/opennebula/info.py	2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/enact/opennebula/info.py	2008-09-15 16:50:37 UTC (rev 495)
@@ -56,26 +56,18 @@
                 if oneattr2haizea.has_key(name):
                     capacity.set_by_type(oneattr2haizea[name], int(attr["value"]))
             capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
+            capacity.set_by_type(constants.RES_MEM, capacity.get_by_type(constants.RES_MEM) / 1024.0)
             node = Node(self.resourcepool, nod_id, hostname, capacity)
             node.enactment_info = int(enactID)
             self.nodes.append(node)
             
         self.logger.info("Fetched %i nodes from ONE db" % len(self.nodes))
+        for n in self.nodes:
+            self.logger.debug("%i %s %s" % (n.nod_id, n.hostname, n.capacity))
         
-        # Image repository nodes
-        # TODO: No image transfers in OpenNebula yet
-        self.FIFOnode = None
-        self.EDFnode = None
-        
     def get_nodes(self):
         return self.nodes
     
-    def get_edf_node(self):
-        return self.EDFnode
-    
-    def get_fifo_node(self):
-        return self.FIFOnode
-    
     def get_resource_types(self):
         return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
                 (constants.RES_MEM,  constants.RESTYPE_INT, "Mem"),

Modified: trunk/src/haizea/resourcemanager/frontends/opennebula.py
===================================================================
--- trunk/src/haizea/resourcemanager/frontends/opennebula.py	2008-09-15 09:00:23 UTC (rev 494)
+++ trunk/src/haizea/resourcemanager/frontends/opennebula.py	2008-09-15 16:50:37 UTC (rev 495)
@@ -17,31 +17,114 @@
 # -------------------------------------------------------------------------- #
 
 import haizea.common.constants as constants
-from haizea.resourcemanager.frontends.base import RequestFrontend
+from haizea.resourcemanager.frontends import RequestFrontend
 from haizea.resourcemanager.datastruct import ARLease, BestEffortLease, ImmediateLease, ResourceTuple
-from haizea.common.utils import UNIX2DateTime
+from haizea.common.utils import UNIX2DateTime, round_datetime
+
 from pysqlite2 import dbapi2 as sqlite
 from mx.DateTime import DateTimeDelta, TimeDelta, ISO
-from haizea.common.utils import round_datetime
+
 import operator
 import logging
 
-HAIZEA_PARAM = "HAIZEA"
-HAIZEA_START = "START"
-HAIZEA_START_NOW = "now"
-HAIZEA_START_BESTEFFORT = "best_effort"
-HAIZEA_DURATION = "DURATION"
-HAIZEA_DURATION_UNLIMITED = "unlimited"
-HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
-HAIZEA_PREEMPTIBLE_YES = "yes"
-HAIZEA_PREEMPTIBLE_NO = "no"
+class OpenNebulaVM(object):
+    HAIZEA_PARAM = "HAIZEA"
+    HAIZEA_START = "START"
+    HAIZEA_START_NOW = "now"
+    HAIZEA_START_BESTEFFORT = "best_effort"
+    HAIZEA_DURATION = "DURATION"
+    HAIZEA_DURATION_UNLIMITED = "unlimited"
+    HAIZEA_PREEMPTIBLE = "PREEMPTIBLE"
+    HAIZEA_PREEMPTIBLE_YES = "yes"
+    HAIZEA_PREEMPTIBLE_NO = "no"
+    HAIZEA_GROUP = "GROUP"
+    
+    ONE_CPU="CPU"
+    ONE_MEMORY="MEMORY"
+    ONE_DISK="DISK"
+    ONE_DISK_SOURCE="SOURCE"    
+    
+    def __init__(self, db_entry, attrs):
+        self.db_entry = db_entry
+        self.attrs = attrs
+        
+        # If there is no HAIZEA parameter, the default is to treat the
+        # request as an immediate request with unlimited duration
+        if not attrs.has_key(OpenNebulaVM.HAIZEA_PARAM):
+            self.haizea_param = {OpenNebulaVM.HAIZEA_START: OpenNebulaVM.HAIZEA_START_NOW,
+                            OpenNebulaVM.HAIZEA_DURATION: OpenNebulaVM.HAIZEA_DURATION_UNLIMITED,
+                            OpenNebulaVM.HAIZEA_PREEMPTIBLE: OpenNebulaVM.HAIZEA_PREEMPTIBLE_NO}
+        else:
+            self.haizea_param = self.__get_vector_value(attrs[OpenNebulaVM.HAIZEA_PARAM])
 
-ONE_CPU="CPU"
-ONE_MEMORY="MEMORY"
-ONE_DISK="DISK"
-ONE_DISK_SOURCE="SOURCE"
+        
+    def is_grouped(self):
+        return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_GROUP)
+    
+    def has_haizea_params(self):
+        return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_START)
+    
+    def get_start(self):
+        start = self.haizea_param[OpenNebulaVM.HAIZEA_START]
+        if start == OpenNebulaVM.HAIZEA_START_NOW or start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
+            pass # Leave start as it is
+        elif start[0] == "+":
+            # Relative time
+            # The following is just for testing:
+            from haizea.resourcemanager.rm import ResourceManager
+            now = ResourceManager.get_singleton().clock.get_time()
+            start = round_datetime(now + ISO.ParseTime(start[1:]))
+            # Should be:
+            #start = round_datetime(self.get_submit_time() + ISO.ParseTime(start[1:]))
 
-class OpenNebulaFrontend(RequestFrontend):
+        else:
+            start = ISO.ParseDateTime(start)
+    
+        return start
+    
+    def get_duration(self):
+        duration = self.haizea_param[OpenNebulaVM.HAIZEA_DURATION]
+        if duration == OpenNebulaVM.HAIZEA_DURATION_UNLIMITED:
+            # This is an interim solution (make it run for a century).
+            # TODO: Integrate concept of unlimited duration in the lease datastruct
+            duration = DateTimeDelta(36500)
+        else:
+            duration = ISO.ParseTimeDelta(duration)
+        return duration
+    
+    def get_preemptible(self):
+        preemptible = self.haizea_param[OpenNebulaVM.HAIZEA_PREEMPTIBLE]
+        return (preemptible == OpenNebulaVM.HAIZEA_PREEMPTIBLE_YES)
+
+    def get_group(self):
+        return self.haizea_param[OpenNebulaVM.HAIZEA_GROUP]
+
+    def get_submit_time(self):
+        return UNIX2DateTime(self.db_entry["stime"])
+    
+    def get_diskimage(self):
+        disk = self.__get_vector_value(self.attrs[OpenNebulaVM.ONE_DISK])
+        diskimage = disk[OpenNebulaVM.ONE_DISK_SOURCE]
+        return diskimage
+    
+    def get_diskimage_size(self):
+        return 0 # OpenNebula doesn't provide this
+    
+    def get_resource_requirements(self):
+        resreq = ResourceTuple.create_empty()
+        resreq.set_by_type(constants.RES_CPU, float(self.attrs[OpenNebulaVM.ONE_CPU]))
+        resreq.set_by_type(constants.RES_MEM, int(self.attrs[OpenNebulaVM.ONE_MEMORY]))
+        return resreq    
+
+    def get_oneid(self):
+        return int(self.db_entry["oid"])
+
+    def __get_vector_value(self, value):
+        return dict([n.split("=") for n in value.split(",")])
+        
+    
+class OpenNebulaFrontend(RequestFrontend):    
+    
     def __init__(self, rm):
         self.rm = rm
         self.processed = []
@@ -51,111 +134,83 @@
         self.conn = sqlite.connect(config.get("one.db"))
         self.conn.row_factory = sqlite.Row
         
-    def getAccumulatedRequests(self):
+    def get_accumulated_requests(self):
         cur = self.conn.cursor()
         processed = ",".join([`p` for p in self.processed])
         cur.execute("select * from vmpool where state=1 and oid not in (%s)" % processed)
-        openNebulaReqs = cur.fetchall()
-        requests = []
-        for req in openNebulaReqs:
-            cur.execute("select * from vm_template where id=%i" % req["oid"])
+        db_opennebula_vms = cur.fetchall()
+        
+        # Extract the pending OpenNebula VMs
+        opennebula_vms = [] # (ONE VM, ONE VM template attributes, ONE Haizea parameter)
+        for vm in db_opennebula_vms:
+            cur.execute("select * from vm_template where id=%i" % vm["oid"])
             template = cur.fetchall()
             attrs = dict([(r["name"], r["value"]) for r in template])
-            self.processed.append(req["oid"])
-            requests.append(self.ONEreq2lease(req, attrs))
-        requests.sort(key=operator.attrgetter("submit_time"))
-        return requests
+            self.processed.append(vm["oid"])
+            
+            opennebula_vms.append(OpenNebulaVM(vm, attrs))
+            
+        grouped = [vm for vm in opennebula_vms if vm.is_grouped()]
+        not_grouped = [vm for vm in opennebula_vms if not vm.is_grouped()]
+        
+        # Extract VM groups
+        group_ids = set([vm.get_group() for vm in grouped])
+        groups = {}
+        for group_id in group_ids:
+            groups[group_id] = [vm for vm in grouped if vm.get_group() == group_id]
+            
+        lease_requests = []
+        for group_id, opennebula_vms in groups.items():
+            lease_requests.append(self.__ONEreqs_to_lease(opennebula_vms, group_id))
 
-    def existsMoreRequests(self):
+        for opennebula_vm in not_grouped:
+            lease_requests.append(self.__ONEreqs_to_lease([opennebula_vm]))
+        
+        lease_requests.sort(key=operator.attrgetter("submit_time"))
+        return lease_requests
+
+    def exists_more_requests(self):
         return True
+
     
-    def ONEreq2lease(self, req, attrs):
-        # If there is no HAIZEA parameter, the default is to treat the
-        # request as an immediate request with unlimited duration
-        if not attrs.has_key(HAIZEA_PARAM):
-            haizea_param = {HAIZEA_START: HAIZEA_START_NOW,
-                            HAIZEA_DURATION: HAIZEA_DURATION_UNLIMITED,
-                            HAIZEA_PREEMPTIBLE: HAIZEA_PREEMPTIBLE_NO}
-        else:
-            haizea_param = self.get_vector_value(attrs[HAIZEA_PARAM])
-        start = haizea_param[HAIZEA_START]
-        if start == HAIZEA_START_NOW:
-            return self.create_immediate_lease(req, attrs, haizea_param)
-        elif start  == HAIZEA_START_BESTEFFORT:
-            return self.create_besteffort_lease(req, attrs, haizea_param)
-        else:
-            return self.create_ar_lease(req, attrs, haizea_param)
-    
-    def get_vector_value(self, value):
-        return dict([n.split("=") for n in value.split(",")])
-    
-    def get_common_attrs(self, req, attrs, haizea_param):
-        disk = self.get_vector_value(attrs[ONE_DISK])
-        tSubmit = UNIX2DateTime(req["stime"])
-        vmimage = disk[ONE_DISK_SOURCE]
-        vmimagesize = 0
-        numnodes = 1
-        resreq = ResourceTuple.create_empty()
-        resreq.set_by_type(constants.RES_CPU, float(attrs[ONE_CPU]))
-        resreq.set_by_type(constants.RES_MEM, int(attrs[ONE_MEMORY]))
+    def __ONEreqs_to_lease(self, opennebula_vms, group_id=None):
+        # The vm_with_params is used to extract the HAIZEA parameters.
+        # (i.e., lease-wide attributes)
+        vm_with_params = self.__get_vm_with_params(opennebula_vms)
 
-        duration = haizea_param[HAIZEA_DURATION]
-        if duration == HAIZEA_DURATION_UNLIMITED:
-            # This is an interim solution (make it run for a century).
-            # TODO: Integrate concept of unlimited duration in the lease datastruct
-            duration = DateTimeDelta(36500)
-        else:
-            duration = ISO.ParseTimeDelta(duration)
-            
-        preemptible = haizea_param[HAIZEA_PREEMPTIBLE]
-        preemptible = (preemptible == HAIZEA_PREEMPTIBLE_YES)
+        # Per-lease attributes
+        start = vm_with_params.get_start()
+        duration = vm_with_params.get_duration()
+        preemptible = vm_with_params.get_preemptible()
+        numnodes = len(opennebula_vms)
 
-        return tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible
-    
-    def create_besteffort_lease(self, req, attrs, haizea_param):
-        tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
- 
-        leasereq = BestEffortLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
+        # Per-vnode attributes
+        # Since Haizea currently assumes homogeneous nodes in
+        # the lease, we will also use vm_with_params to extract the 
+        # resource requirements.
+        # TODO: When Haizea is modified to support heterogeneous nodes,
+        # extract the resource requirements from each individual VM.
+        submit_time = vm_with_params.get_submit_time()
+        diskimage = vm_with_params.get_diskimage()
+        diskimagesize = vm_with_params.get_diskimage_size()
+        resource_requirements = vm_with_params.get_resource_requirements()
 
-        # Enactment info should be changed to the "array id" when groups
-        # are implemented in OpenNebula
-        leasereq.enactment_info = int(req["oid"])
-        # Only one node for now
-        leasereq.vnode_enactment_info = {}
-        leasereq.vnode_enactment_info[1] = int(req["oid"])
-        return leasereq
-    
-    def create_ar_lease(self, req, attrs, haizea_param):
-        tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
-
-        start = haizea_param[HAIZEA_START]
-        if start[0] == "+":
-            # Relative time
-            # For testing, should be:
-            # tStart = tSubmit + ISO.ParseTime(tStart[1:])
-            start = round_datetime(self.rm.clock.get_time() + ISO.ParseTime(start[1:]))
+        if start == OpenNebulaVM.HAIZEA_START_NOW:
+            lease = ImmediateLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+        elif start  == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
+            lease = BestEffortLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
         else:
-            start = ISO.ParseDateTime(start)
-        leasereq = ARLease(tSubmit, start, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
-
-        # Enactment info should be changed to the "array id" when groups
-        # are implemented in OpenNebula
-        leasereq.enactmentInfo = int(req["oid"])
-        # Only one node for now
-        leasereq.vnode_enactment_info = {}
-        leasereq.vnode_enactment_info[1] = int(req["oid"])
-        return leasereq
-
-    def create_immediate_lease(self, req, attrs, haizea_param):
-        tSubmit, vmimage, vmimagesize, numnodes, resreq, duration, preemptible = self.get_common_attrs(req, attrs, haizea_param)
- 
-        leasereq = ImmediateLease(tSubmit, duration, vmimage, vmimagesize, numnodes, resreq, preemptible)
-
-        # Enactment info should be changed to the "array id" when groups
-        # are implemented in OpenNebula
-        leasereq.enactment_info = int(req["oid"])
-        # Only one node for now
-        leasereq.vnode_enactment_info = {}
-        leasereq.vnode_enactment_info[1] = int(req["oid"])
-        return leasereq
-        
\ No newline at end of file
+            lease = ARLease(submit_time, start, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+     
+        lease.enactment_info = group_id
+        lease.vnode_enactment_info = dict([(i+1,vm.get_oneid()) for i, vm in enumerate(opennebula_vms)])
+        return lease
+        
+    def __get_vm_with_params(self, opennebula_vms):
+        # One of the ONE VMs has the start, duration, and preemptible
+        # parameters. Find it.
+        vm_with_params = [vm for vm in opennebula_vms if vm.has_haizea_params()]
+        # There should only be one
+        # TODO: This is the ONE user's responsibility, but we should validate it
+        # nonetheless.
+        return vm_with_params[0]



More information about the Haizea-commit mailing list