[haizea-commit] r619 - in branches/TP2.0/src/haizea: common core core/enact core/frontends

haizea-commit at mailman.cs.uchicago.edu haizea-commit at mailman.cs.uchicago.edu
Thu Jul 30 12:31:27 CDT 2009


Author: borja
Date: 2009-07-30 12:31:25 -0500 (Thu, 30 Jul 2009)
New Revision: 619

Modified:
   branches/TP2.0/src/haizea/common/defaults.py
   branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py
   branches/TP2.0/src/haizea/core/configfile.py
   branches/TP2.0/src/haizea/core/enact/opennebula.py
   branches/TP2.0/src/haizea/core/enact/simulated.py
   branches/TP2.0/src/haizea/core/frontends/opennebula.py
   branches/TP2.0/src/haizea/core/manager.py
Log:
Updated OpenNebula request frontend and enactment module so it will work with OpenNebula 1.4 (also updated to use the XMLRPC interface, instead of accessing the database directly)

Modified: branches/TP2.0/src/haizea/common/defaults.py
===================================================================
--- branches/TP2.0/src/haizea/common/defaults.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/common/defaults.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -8,4 +8,6 @@
 
 RPC_SERVER = "localhost"
 RPC_PORT = 42493
-RPC_URI = "http://%s:%i" % (RPC_SERVER, RPC_PORT)
\ No newline at end of file
+RPC_URI = "http://%s:%i" % (RPC_SERVER, RPC_PORT)
+
+OPENNEBULA_RPC_PORT = 2633
\ No newline at end of file

Modified: branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py
===================================================================
--- branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -23,7 +23,7 @@
         self.auth = "%s:%s" % (user, passhash)
         
     @staticmethod
-    def get_userpass_from_env(self):
+    def get_userpass_from_env():
         if not os.environ.has_key("ONE_AUTH"):
             return None
         else:
@@ -77,9 +77,9 @@
         
     def vm_deploy(self, vid, hid):
         try:
-            (rc, value) = self.rpc.one.vm.deploy(self.auth, vid, hid)
-            if rc == False:
-                raise Exception("ONE reported an error: %s" % value)
+            rv = self.rpc.one.vm.deploy(self.auth, vid, hid)
+            if rv[0] == False:
+                raise Exception("ONE reported an error: %s" % rv[1])
             else:
                 return
         except xmlrpclib.Fault, err:
@@ -91,9 +91,9 @@
                           "finalize" ]:
             raise Exception("%s is not a valid action" % action)
         try:
-            (rc, value) = self.rpc.one.vm.deploy(self.auth, action, vid)
-            if rc == False:
-                raise Exception("ONE reported an error: %s" % value)
+            rv = self.rpc.one.vm.action(self.auth, action, vid)
+            if rv[0] == False:
+                raise Exception("ONE reported an error: %s" % rv[1])
             else:
                 return
         except xmlrpclib.Fault, err:
@@ -223,7 +223,7 @@
         if deploy_id == None:
             self.deploy_id = None
         else:
-            self.deploy_id = int(deploy_id)
+            self.deploy_id = deploy_id
         self.memory = int(vm_element.find("MEMORY").text)
         self.cpu = int(vm_element.find("CPU").text)
         self.net_tx = int(vm_element.find("NET_TX").text)

Modified: branches/TP2.0/src/haizea/core/configfile.py
===================================================================
--- branches/TP2.0/src/haizea/core/configfile.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/configfile.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -19,6 +19,7 @@
 from haizea.common.config import ConfigException, Section, Option, Config, OPTTYPE_INT, OPTTYPE_FLOAT, OPTTYPE_STRING, OPTTYPE_BOOLEAN, OPTTYPE_DATETIME, OPTTYPE_TIMEDELTA 
 from haizea.common.utils import generate_config_name
 import haizea.common.constants as constants
+import haizea.common.defaults as defaults
 import sys
 from mx.DateTime import TimeDelta
 import ConfigParser
@@ -685,20 +686,25 @@
                          necessary when using Haizea as an OpenNebula scheduling backend.""")
     opennebula.options = \
     [
-     Option(name        = "db",
-            getter      = "one.db",
+     Option(name        = "host",
+            getter      = "one.host",
             type        = OPTTYPE_STRING,
             required    = True,
             doc         = """
-            Location of OpenNebula database.                
+            Host where OpenNebula is running.
+            Typically, OpenNebula and Haizea will be installed
+            on the same host, so the following option should be
+            set to 'localhost'. If they're on different hosts,
+            make sure you modify this option accordingly.             
             """),
 
-     Option(name        = "onevm",
-            getter      = "onevm",
-            type        = OPTTYPE_STRING,
-            required    = True,
+     Option(name        = "port",
+            getter      = "one.port",
+            type        = OPTTYPE_INT,
+            required    = False,
+            default     = defaults.OPENNEBULA_RPC_PORT,
             doc         = """
-            Location of OpenNebula "onevm" command.                
+            TCP port of OpenNebula's XML RPC server             
             """),
             
      Option(name        = "stop-when-no-more-leases",

Modified: branches/TP2.0/src/haizea/core/enact/opennebula.py
===================================================================
--- branches/TP2.0/src/haizea/core/enact/opennebula.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/enact/opennebula.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -17,74 +17,88 @@
 # -------------------------------------------------------------------------- #
 
 from haizea.core.scheduler import EnactmentError
+from haizea.core.leases import Capacity
 from haizea.core.scheduler.resourcepool import ResourcePoolNode
 from haizea.core.scheduler.slottable import ResourceTuple
 from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
 from haizea.common.utils import get_config
+from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient, OpenNebulaVM, OpenNebulaHost
 import haizea.common.constants as constants
-from pysqlite2 import dbapi2 as sqlite
 import logging
-import commands
 from time import sleep
 
+one_rpc = None
+
+def get_one_xmlrpcclient():
+    global one_rpc
+    if one_rpc == None:
+        host = get_config().get("one.host")
+        port = get_config().get("one.port")
+        user, passw = OpenNebulaXMLRPCClient.get_userpass_from_env()
+        one_rpc = OpenNebulaXMLRPCClient(host, port, user, passw)
+    return one_rpc
+
 class OpenNebulaEnactmentError(EnactmentError):
-    def __init__(self, cmd, status, output):
-        self.cmd = cmd
-        self.status = status
-        self.output = output
-        
-        self.message = "Error when running '%s' (status=%i, output='%s')" % (cmd, status, output)
+    def __init__(self, method, msg):
+        self.method = method
+        self.msg = msg
+        self.message = "Error when invoking '%s': %s" % (method, msg)
 
 class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
-    ONEATTR2HAIZEA = { "TOTALCPU": constants.RES_CPU,
-                   "TOTALMEMORY": constants.RES_MEM }
     
     def __init__(self):
         ResourcePoolInfo.__init__(self)
-        config = get_config()
         self.logger = logging.getLogger("ENACT.ONE.INFO")
 
-        # Get information about nodes from DB
-        conn = sqlite.connect(config.get("one.db"))
-        conn.row_factory = sqlite.Row
-        
-        self.nodes = []
-        cur = conn.cursor()
-        cur.execute("select oid, host_name from host_pool where state != 4")
-        hosts = cur.fetchall()
+        self.rpc = get_one_xmlrpcclient()
+
+        # Get information about nodes from OpenNebula
+        self.nodes = {}
+        hosts = self.rpc.hostpool_info()
         for (i, host) in enumerate(hosts):
-            nod_id = i+1
-            enactID = int(host["oid"])
-            hostname = host["host_name"]
-            capacity = ResourceTuple.create_empty()
-            capacity.set_by_type(constants.RES_DISK, 80000) # OpenNebula currently doesn't provide this
-            capacity.set_by_type(constants.RES_NETIN, 100) # OpenNebula currently doesn't provide this
-            capacity.set_by_type(constants.RES_NETOUT, 100) # OpenNebula currently doesn't provide this
-            cur.execute("select name, value from host_attributes where id=%i" % enactID)
-            attrs = cur.fetchall()
-            for attr in attrs:
-                name = attr["name"]
-                if OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA.has_key(name):
-                    capacity.set_by_type(OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA[name], int(attr["value"]))
-            capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
-            capacity.set_by_type(constants.RES_MEM, capacity.get_by_type(constants.RES_MEM) / 1024.0)
-            node = Node(nod_id, hostname, capacity)
-            node.enactment_info = int(enactID)
-            self.nodes.append(node)
+            if not host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED):
+                nod_id = i+1
+                enact_id = host.id
+                hostname = host.name
+                capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
+                
+                # CPU
+                # OpenNebula reports each CPU as "100"
+                # (so, a 4-core machine is reported as "400")
+                # We need to convert this to a multi-instance
+                # resource type in Haizea
+                cpu = host.max_cpu
+                ncpu = cpu / 100
+                capacity.set_ninstances(constants.RES_CPU, ncpu)
+                for i in range(ncpu):
+                    capacity.set_quantity_instance(constants.RES_CPU, i+1, 100)            
+                
+                # Memory. Must divide by 1024 to obtain quantity in MB
+                capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0)
+                
+                # Disk
+                # OpenNebula doesn't report this correctly yet.
+                # We set it to an arbitrarily high value.
+                capacity.set_quantity(constants.RES_DISK, 80000)
+    
+                node = ResourcePoolNode(nod_id, hostname, capacity)
+                node.enactment_info = enact_id
+                self.nodes[nod_id] = node
             
-        self.logger.info("Fetched %i nodes from ONE db" % len(self.nodes))
-        for n in self.nodes:
-            self.logger.debug("%i %s %s" % (n.nod_id, n.hostname, n.capacity))
+        self.resource_types = []
+        self.resource_types.append((constants.RES_CPU,1))
+        self.resource_types.append((constants.RES_MEM,1))
+        self.resource_types.append((constants.RES_DISK,1))
+            
+        self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))
+        for n in self.nodes.values():
+            self.logger.debug("%i %s %s" % (n.id, n.hostname, n.capacity))
         
     def get_nodes(self):
         return self.nodes
     
     def get_resource_types(self):
-        return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
-                (constants.RES_MEM,  constants.RESTYPE_INT, "Mem"),
-                (constants.RES_DISK, constants.RESTYPE_INT, "Disk"),
-                (constants.RES_NETIN, constants.RESTYPE_INT, "Net (in)"),
-                (constants.RES_NETOUT, constants.RESTYPE_INT, "Net (out)")]
+        return self.resource_types
 
     def get_bandwidth(self):
         return 0
@@ -93,49 +107,38 @@
     def __init__(self):
         VMEnactment.__init__(self)
         self.logger = logging.getLogger("ENACT.ONE.VM")
+        self.rpc = get_one_xmlrpcclient()
 
-        self.onevm = get_config().get("onevm")
-        
-        self.conn = sqlite.connect(get_config().get("one.db"))
-        self.conn.row_factory = sqlite.Row
-
-        
-    def run_command(self, cmd):
-        self.logger.debug("Running command: %s" % cmd)
-        (status, output) = commands.getstatusoutput(cmd)
-        self.logger.debug("Returned status=%i, output='%s'" % (status, output))
-        return status, output
-
     def start(self, action):
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            hostID = action.vnodes[vnode].pnode
-            image = action.vnodes[vnode].diskimage
-            cpu = action.vnodes[vnode].resources.get_by_type(constants.RES_CPU)
-            memory = action.vnodes[vnode].resources.get_by_type(constants.RES_MEM)
+            vid = action.vnodes[vnode].enactment_info
+            hid = action.vnodes[vnode].pnode
             
-            self.logger.debug("Received request to start VM for L%iV%i on host %i, image=%s, cpu=%i, mem=%i"
-                         % (action.lease_haizea_id, vnode, hostID, image, cpu, memory))
+            self.logger.debug("Sending request to start VM for L%iV%i (ONE: vid=%i, hid=%i)"
+                         % (action.lease_haizea_id, vnode, vid, hid))
 
-            cmd = "%s deploy %i %i" % (self.onevm, vmid, hostID)
-            status, output = self.run_command(cmd)
-            if status == 0:
-                self.logger.debug("Command returned succesfully.")
-            else:
-                raise OpenNebulaEnactmentError("onevm deploy", status, output)
+            try:
+                self.rpc.vm_deploy(vid, hid)
+                self.logger.debug("Request succesful.")
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.deploy", msg)
             
     def stop(self, action):
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            cmd = "%s shutdown %i" % (self.onevm, vmid)
-            status, output = self.run_command(cmd)
-            if status == 0:
-                self.logger.debug("Command returned succesfully.")
-            else:
-                raise OpenNebulaEnactmentError("onevm shutdown", status, output)
+            vid = action.vnodes[vnode].enactment_info
+            
+            self.logger.debug("Sending request to shutdown VM for L%iV%i (ONE: vid=%i)"
+                         % (action.lease_haizea_id, vnode, vid))
 
+            try:
+                self.rpc.vm_shutdown(vid)
+                self.logger.debug("Request succesful.")
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.shutdown", msg)
+            
+            # Space out commands to avoid OpenNebula from getting saturated
             # TODO: We should spawn out a thread to do this, so Haizea isn't
             # blocking until all these commands end
             interval = get_config().get("enactment-overhead").seconds
@@ -144,16 +147,18 @@
     def suspend(self, action):
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            cmd = "%s suspend %i" % (self.onevm, vmid)
-            status, output = self.run_command(cmd)
-            if status == 0:
-                self.logger.debug("Command returned succesfully.")
-            else:
-                raise OpenNebulaEnactmentError("onevm suspend", status, output)
+            vid = action.vnodes[vnode].enactment_info
+            
+            self.logger.debug("Sending request to suspend VM for L%iV%i (ONE: vid=%i)"
+                         % (action.lease_haizea_id, vnode, vid))
 
+            try:
+                self.rpc.vm_suspend(vid)
+                self.logger.debug("Request succesful.")
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.suspend", msg)
+            
             # Space out commands to avoid OpenNebula from getting saturated
-            # TODO: We should spawn out a thread to do this
             # TODO: We should spawn out a thread to do this, so Haizea isn't
             # blocking until all these commands end
             interval = get_config().get("enactment-overhead").seconds
@@ -162,14 +167,17 @@
     def resume(self, action):
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            cmd = "%s resume %i" % (self.onevm, vmid)
-            status, output = self.run_command(cmd)
-            if status == 0:
-                self.logger.debug("Command returned succesfully.")
-            else:
-                raise OpenNebulaEnactmentError("onevm resume", status, output)
+            vid = action.vnodes[vnode].enactment_info
+            
+            self.logger.debug("Sending request to resume VM for L%iV%i (ONE: vid=%i)"
+                         % (action.lease_haizea_id, vnode, vid))
 
+            try:
+                self.rpc.vm_resume(vid)
+                self.logger.debug("Request succesful.")
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.resume", msg)
+            
             # Space out commands to avoid OpenNebula from getting saturated
             # TODO: We should spawn out a thread to do this, so Haizea isn't
             # blocking until all these commands end
@@ -177,39 +185,43 @@
             sleep(interval)
 
     def verify_suspend(self, action):
-        # TODO: Do a single query
         result = 0
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            cur = self.conn.cursor()
-            cur.execute("select state from vm_pool where oid = %i" % vmid)
-            onevm = cur.fetchone()        
-            state = onevm["state"]
-            if state == 5:
-                self.logger.debug("Suspend of L%iV%i correct." % (action.lease_haizea_id, vnode))
-            else:
-                self.logger.warning("ONE did not complete suspend  of L%iV%i on time. State is %i" % (action.lease_haizea_id, vnode, state))
-                result = 1
+            vid = action.vnodes[vnode].enactment_info
+            
+            try:
+                vm = self.rpc.vm_info(vid)   
+                state = vm.state
+                if state == OpenNebulaVM.STATE_SUSPENDED:
+                    self.logger.debug("Suspend of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
+                else:
+                    self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
+                    result = 1
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.info", msg)
+
         return result
         
     def verify_resume(self, action):
-        # TODO: Do a single query
         result = 0
         for vnode in action.vnodes:
             # Unpack action
-            vmid = action.vnodes[vnode].enactment_info
-            cur = self.conn.cursor()
-            cur.execute("select state from vm_pool where oid = %i" % vmid)
-            onevm = cur.fetchone()        
-            state = onevm["state"]
-            if state == 3:
-                self.logger.debug("Resume of L%iV%i correct." % (action.lease_haizea_id, vnode))
-            else:
-                self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i" % (action.lease_haizea_id, vnode, state))
-                result = 1
-        return result
+            vid = action.vnodes[vnode].enactment_info
+            
+            try:
+                vm = self.rpc.vm_info(vid)   
+                state = vm.state
+                if state == OpenNebulaVM.STATE_ACTIVE:
+                    self.logger.debug("Resume of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
+                else:
+                    self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
+                    result = 1
+            except Exception, msg:
+                raise OpenNebulaEnactmentError("vm.info", msg)
 
+        return result        
+
 class OpenNebulaDummyDeploymentEnactment(DeploymentEnactment):    
     def __init__(self):
         DeploymentEnactment.__init__(self)

Modified: branches/TP2.0/src/haizea/core/enact/simulated.py
===================================================================
--- branches/TP2.0/src/haizea/core/enact/simulated.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/enact/simulated.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -34,10 +34,6 @@
             # TODO: raise something more meaningful
             raise
         
-        self.resource_types = []
-        self.resource_types.append((constants.RES_CPU,"CPU"))
-        self.resource_types.append((constants.RES_MEM,"Memory"))
-
         # Disk and network should be specified but, if not, we can
         # just add arbitrarily large values.
         if not "Disk" in site.resource_types:
@@ -49,6 +45,8 @@
         if not "Net-out" in site.resource_types:
             site.add_resource("Net-out", [1000000])
         
+        self.resource_types = site.get_resource_types()        
+        
         nodes = site.nodes.get_all_nodes()
         
         self.nodes = dict([(id, ResourcePoolNode(id, "simul-%i" % id, capacity)) for (id, capacity) in nodes.items()])

Modified: branches/TP2.0/src/haizea/core/frontends/opennebula.py
===================================================================
--- branches/TP2.0/src/haizea/core/frontends/opennebula.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/frontends/opennebula.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -17,17 +17,28 @@
 # -------------------------------------------------------------------------- #
 
 import haizea.common.constants as constants
+from haizea.core.leases import Lease, Capacity, Timestamp, Duration, UnmanagedSoftwareEnvironment
 from haizea.core.frontends import RequestFrontend
 from haizea.core.scheduler.slottable import ResourceTuple
 from haizea.common.utils import UNIX2DateTime, round_datetime, get_config, get_clock
-
-from pysqlite2 import dbapi2 as sqlite
+from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient, OpenNebulaVM
 from mx.DateTime import DateTimeDelta, TimeDelta, ISO
 
 import operator
 import logging
 
-class OpenNebulaVM(object):
+one_rpc = None
+
+def get_one_xmlrpcclient():
+    global one_rpc
+    if one_rpc == None:
+        host = get_config().get("one.host")
+        port = get_config().get("one.port")
+        user, passw = OpenNebulaXMLRPCClient.get_userpass_from_env()
+        one_rpc = OpenNebulaXMLRPCClient(host, port, user, passw)
+    return one_rpc
+
+class OpenNebulaHaizeaVM(object):
     HAIZEA_PARAM = "HAIZEA"
     HAIZEA_START = "START"
     HAIZEA_START_NOW = "now"
@@ -38,92 +49,70 @@
     HAIZEA_PREEMPTIBLE_YES = "yes"
     HAIZEA_PREEMPTIBLE_NO = "no"
     HAIZEA_GROUP = "GROUP"
+  
     
-    ONE_CPU="CPU"
-    ONE_MEMORY="MEMORY"
-    ONE_DISK="DISK"
-    ONE_DISK_SOURCE="SOURCE"    
-    
-    def __init__(self, db_entry, attrs):
-        self.db_entry = db_entry
-        
-        self.attrs = dict(attrs)
-        
-        # In OpenNebula 1.2, there can be more than one disk attribute
-        self.attrs[OpenNebulaVM.ONE_DISK] = [value for name, value in attrs if name == OpenNebulaVM.ONE_DISK]
-        
+    def __init__(self, opennebula_vm):                        
         # If there is no HAIZEA parameter, the default is to treat the
         # request as an immediate request with unlimited duration
-        if not self.attrs.has_key(OpenNebulaVM.HAIZEA_PARAM):
-            self.haizea_param = {OpenNebulaVM.HAIZEA_START: OpenNebulaVM.HAIZEA_START_NOW,
-                            OpenNebulaVM.HAIZEA_DURATION: OpenNebulaVM.HAIZEA_DURATION_UNLIMITED,
-                            OpenNebulaVM.HAIZEA_PREEMPTIBLE: OpenNebulaVM.HAIZEA_PREEMPTIBLE_NO}
+        if not opennebula_vm.template.has_key(OpenNebulaHaizeaVM.HAIZEA_PARAM):
+            self.start = OpenNebulaHaizeaVM.HAIZEA_START_NOW
+            self.duration = OpenNebulaHaizeaVM.HAIZEA_DURATION_UNLIMITED
+            self.preemptible = OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE_NO
+            self.group = None
         else:
-            self.haizea_param = self.__get_vector_value(self.attrs[OpenNebulaVM.HAIZEA_PARAM])
-
-        
-    def is_grouped(self):
-        return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_GROUP)
-    
-    def has_haizea_params(self):
-        return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_START)
-    
-    def get_start(self):
-        start = self.haizea_param[OpenNebulaVM.HAIZEA_START]
-        if start == OpenNebulaVM.HAIZEA_START_NOW or start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
-            pass # Leave start as it is
-        elif start[0] == "+":
+            self.start = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_START]
+            self.duration = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_DURATION]
+            self.preemptible = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE]
+            if opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM].has_key(OpenNebulaHaizeaVM.HAIZEA_GROUP):
+                self.group = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_GROUP]
+            else:
+                self.group = None
+                
+        self.submit_time = UNIX2DateTime(opennebula_vm.stime)
+                
+        # Create Timestamp object
+        if self.start == OpenNebulaHaizeaVM.HAIZEA_START_NOW:
+            self.start = Timestamp(Timestamp.NOW)
+        elif self.start == OpenNebulaHaizeaVM.HAIZEA_START_BESTEFFORT:
+            self.start = Timestamp(Timestamp.UNSPECIFIED)
+        elif self.start[0] == "+":
             # Relative time
-            # The following is just for testing:
-            now = get_clock().get_time()
-            start = round_datetime(now + ISO.ParseTime(start[1:]))
-            # Should be:
-            #start = round_datetime(self.get_submit_time() + ISO.ParseTime(start[1:]))
-
+            self.start = Timestamp(round_datetime(self.submit_time + ISO.ParseTime(self.start[1:])))
         else:
-            start = ISO.ParseDateTime(start)
-    
-        return start
-    
-    def get_duration(self):
-        duration = self.haizea_param[OpenNebulaVM.HAIZEA_DURATION]
-        if duration == OpenNebulaVM.HAIZEA_DURATION_UNLIMITED:
+            self.start = Timestamp(ISO.ParseDateTime(self.start))
+            
+        # Create Duration object
+        if self.duration == OpenNebulaHaizeaVM.HAIZEA_DURATION_UNLIMITED:
             # This is an interim solution (make it run for a century).
             # TODO: Integrate concept of unlimited duration in the lease datastruct
-            duration = DateTimeDelta(36500)
+            self.duration = Duration(DateTimeDelta(36500))
         else:
-            duration = ISO.ParseTimeDelta(duration)
-        return duration
-    
-    def get_preemptible(self):
-        preemptible = self.haizea_param[OpenNebulaVM.HAIZEA_PREEMPTIBLE]
-        return (preemptible == OpenNebulaVM.HAIZEA_PREEMPTIBLE_YES)
+            self.duration = Duration(ISO.ParseTimeDelta(self.duration))
+            
 
-    def get_group(self):
-        return self.haizea_param[OpenNebulaVM.HAIZEA_GROUP]
+        self.preemptible = (self.preemptible == OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE_YES)
 
-    def get_submit_time(self):
-        return UNIX2DateTime(self.db_entry["stime"])
     
-    def get_diskimage(self):
-        disk = self.__get_vector_value(self.attrs[OpenNebulaVM.ONE_DISK][0])
-        diskimage = disk[OpenNebulaVM.ONE_DISK_SOURCE]
-        return diskimage
-    
-    def get_diskimage_size(self):
-        return 0 # OpenNebula doesn't provide this
-    
-    def get_resource_requirements(self):
-        resreq = ResourceTuple.create_empty()
-        resreq.set_by_type(constants.RES_CPU, float(self.attrs[OpenNebulaVM.ONE_CPU]))
-        resreq.set_by_type(constants.RES_MEM, int(self.attrs[OpenNebulaVM.ONE_MEMORY]))
-        return resreq    
+        self.capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
+        
+        # CPU
+        # CPUs in VMs are not reported the same as in hosts.
+        # THere are two template values: CPU and VCPU.
+        # CPU reports the percentage of the CPU needed by the VM.
+        # VCPU, which is optional, reports how many CPUs are needed.
+        cpu = int(float(opennebula_vm.template["CPU"]) * 100)
+        if opennebula_vm.template.has_key("VCPU"):
+            ncpu = int(opennebula_vm.template["VCPU"])
+        else:
+            ncpu = 1
+        self.capacity.set_ninstances(constants.RES_CPU, ncpu)
+        for i in range(ncpu):
+            self.capacity.set_quantity_instance(constants.RES_CPU, i+1, cpu)            
+        
+        # Memory. Unlike hosts, memory is reported directly in MBs
+        self.capacity.set_quantity(constants.RES_MEM, int(opennebula_vm.template["MEMORY"]))
 
-    def get_oneid(self):
-        return int(self.db_entry["oid"])
-
-    def __get_vector_value(self, value):
-        return dict([n.split("=") for n in value.split("@^_^@")])
+        self.one_id = opennebula_vm.id
         
     
 class OpenNebulaFrontend(RequestFrontend):    
@@ -132,35 +121,28 @@
         self.manager = manager
         self.processed = []
         self.logger = logging.getLogger("ONEREQ")
-        config = get_config()
+        self.rpc = get_one_xmlrpcclient()
 
-        self.conn = sqlite.connect(config.get("one.db"))
-        self.conn.row_factory = sqlite.Row
         
     def get_accumulated_requests(self):
-        cur = self.conn.cursor()
-        processed = ",".join([`p` for p in self.processed])
-        cur.execute("select * from vm_pool where state=1 and oid not in (%s)" % processed)
-        db_opennebula_vms = cur.fetchall()
-        
+        vms = self.rpc.vmpool_info()
+
         # Extract the pending OpenNebula VMs
-        opennebula_vms = [] # (ONE VM, ONE VM template attributes, ONE Haizea parameter)
-        for vm in db_opennebula_vms:
-            cur.execute("select * from vm_attributes where id=%i" % vm["oid"])
-            template = cur.fetchall()
-            attrs = [(r["name"], r["value"]) for r in template]
-            self.processed.append(vm["oid"])
+        pending_vms = [] 
+        for vm in vms:
+            if not vm.id  in self.processed and vm.state == OpenNebulaVM.STATE_PENDING:
+                vm_detailed = self.rpc.vm_info(vm.id)        
+                pending_vms.append(OpenNebulaHaizeaVM(vm_detailed))
+                self.processed.append(vm.id)
             
-            opennebula_vms.append(OpenNebulaVM(vm, attrs))
-            
-        grouped = [vm for vm in opennebula_vms if vm.is_grouped()]
-        not_grouped = [vm for vm in opennebula_vms if not vm.is_grouped()]
+        grouped = [vm for vm in pending_vms if vm.group != None]
+        not_grouped = [vm for vm in pending_vms if vm.group == None]
         
         # Extract VM groups
-        group_ids = set([vm.get_group() for vm in grouped])
+        group_ids = set([vm.group for vm in grouped])
         groups = {}
         for group_id in group_ids:
-            groups[group_id] = [vm for vm in grouped if vm.get_group() == group_id]
+            groups[group_id] = [vm for vm in grouped if vm.group == group_id]
             
         lease_requests = []
         for group_id, opennebula_vms in groups.items():
@@ -179,41 +161,26 @@
     def __ONEreqs_to_lease(self, opennebula_vms, group_id=None):
         # The vm_with_params is used to extract the HAIZEA parameters.
         # (i.e., lease-wide attributes)
-        vm_with_params = self.__get_vm_with_params(opennebula_vms)
+        vm_with_params = opennebula_vms[0]
 
         # Per-lease attributes
-        start = vm_with_params.get_start()
-        duration = vm_with_params.get_duration()
-        preemptible = vm_with_params.get_preemptible()
-        numnodes = len(opennebula_vms)
+        start = vm_with_params.start
+        duration = vm_with_params.duration
+        preemptible = vm_with_params.preemptible
+        submit_time = vm_with_params.submit_time
 
         # Per-vnode attributes
-        # Since Haizea currently assumes homogeneous nodes in
-        # the lease, we will also use vm_with_params to extract the 
-        # resource requirements.
-        # TODO: When Haizea is modified to support heterogeneous nodes,
-        # extract the resource requirements from each individual VM.
-        submit_time = vm_with_params.get_submit_time()
-        diskimage = vm_with_params.get_diskimage()
-        diskimagesize = vm_with_params.get_diskimage_size()
-        resource_requirements = vm_with_params.get_resource_requirements()
+        requested_resources = dict([(i+1,vm.capacity) for i, vm in enumerate(opennebula_vms)])
 
-        if start == OpenNebulaVM.HAIZEA_START_NOW:
-            lease = ImmediateLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
-        elif start  == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
-            lease = BestEffortLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
-        else:
-            lease = ARLease(submit_time, start, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+        lease = Lease.create_new(submit_time = submit_time, 
+                                 requested_resources = requested_resources, 
+                                 start = start, 
+                                 duration = duration, 
+                                 deadline = None,
+                                 preemptible = preemptible, 
+                                 software = UnmanagedSoftwareEnvironment())
      
         lease.enactment_info = group_id
-        lease.vnode_enactment_info = dict([(i+1,vm.get_oneid()) for i, vm in enumerate(opennebula_vms)])
+        lease.vnode_enactment_info = dict([(i+1,vm.one_id) for i, vm in enumerate(opennebula_vms)])
         return lease
-        
-    def __get_vm_with_params(self, opennebula_vms):
-        # One of the ONE VMs has the start, duration, and preemptible
-        # parameters. Find it.
-        vm_with_params = [vm for vm in opennebula_vms if vm.has_haizea_params()]
-        # There should only be one
-        # TODO: This is the ONE user's responsibility, but we should validate it
-        # nonetheless.
-        return vm_with_params[0]
+

Modified: branches/TP2.0/src/haizea/core/manager.py
===================================================================
--- branches/TP2.0/src/haizea/core/manager.py	2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/manager.py	2009-07-30 17:31:25 UTC (rev 619)
@@ -91,28 +91,16 @@
         
         self.daemon = daemon
         self.pidfile = pidfile
-
-        if mode == "simulated":
-            self.init_simulated_mode()
-        elif mode == "opennebula":
-            self.init_opennebula_mode()
-            
-        # Statistics collection 
-        self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
         
-        self.logger = logging.getLogger("RM")
-
-    def init_simulated_mode(self):
-        """Initializes the resource manager in simulated mode
-        
-        """
-
-        # Simulated-time simulations always run in the foreground
-        clock = self.config.get("clock")
-        if clock == constants.CLOCK_SIMULATED:
-            self.daemon = False
-        
         self.init_logging()
+        
+        if mode == "simulated":
+            # Simulated-time simulations always run in the foreground
+            clock = self.config.get("clock")
+            if clock == constants.CLOCK_SIMULATED:
+                self.daemon = False
+        elif mode == "opennebula":
+            clock = constants.CLOCK_REAL        
                 
         if clock == constants.CLOCK_SIMULATED:
             starttime = self.config.get("starttime")
@@ -121,26 +109,46 @@
         elif clock == constants.CLOCK_REAL:
             wakeup_interval = self.config.get("wakeup-interval")
             non_sched = self.config.get("non-schedulable-interval")
-            self.clock = RealClock(self, wakeup_interval, non_sched)
-            self.rpc_server = RPCServer(self)
+            if mode == "opennebula":
+                fastforward = self.config.get("dry-run")
+            else:
+                fastforward = False
+            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
+            if fastforward:
+                # No need for an RPC server when doing a dry run
+                self.rpc_server = None
+            else:
+                self.rpc_server = RPCServer(self)
                     
-        resources = self.config.get("simul.resources")
-        if resources == "in-tracefile":
-            tracefile = self.config.get("tracefile")
-            site = Site.from_lwf_file(tracefile)
-        elif resources.startswith("file:"):
-            sitefile = resources.split(":")
-            site = Site.from_xml_file(sitefile)
-        else:
-            site = Site.from_resources_string(resources)
-
         # Enactment modules
-        info_enact = SimulatedResourcePoolInfo(site)
-        vm_enact = SimulatedVMEnactment()
-        deploy_enact = SimulatedDeploymentEnactment()
-                
+        if mode == "simulated":
+            resources = self.config.get("simul.resources")
+            if resources == "in-tracefile":
+                tracefile = self.config.get("tracefile")
+                site = Site.from_lwf_file(tracefile)
+            elif resources.startswith("file:"):
+                sitefile = resources.split(":")
+                site = Site.from_xml_file(sitefile)
+            else:
+                site = Site.from_resources_string(resources)
+    
+            info_enact = SimulatedResourcePoolInfo(site)
+            vm_enact = SimulatedVMEnactment()
+            deploy_enact = SimulatedDeploymentEnactment()
+        elif mode == "opennebula":
+            # Enactment modules
+            info_enact = OpenNebulaResourcePoolInfo()
+            vm_enact = OpenNebulaVMEnactment()
+            # No deployment in OpenNebula. Using dummy one for now.
+            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
+
+        if mode == "simulated":
+            preparation_type = self.config.get("lease-preparation")
+        elif mode == "opennebula":
+            # No deployment in OpenNebula.
+            preparation_type = constants.PREPARATION_UNMANAGED
+
         # Resource pool
-        preparation_type = self.config.get("lease-preparation")
         if preparation_type == constants.PREPARATION_TRANSFER:
             if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
                 resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
@@ -150,7 +158,7 @@
             resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
     
         # Slot table
-        slottable = SlotTable(site.get_resource_types())
+        slottable = SlotTable(info_enact.get_resource_types())
         for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
             rt = slottable.create_resource_tuple_from_capacity(n.capacity)
             slottable.add_node(n.id, rt)
@@ -190,13 +198,22 @@
         self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
         
         # Lease request frontends
-        if clock == constants.CLOCK_SIMULATED:
-            # In pure simulation, we can only use the tracefile frontend
-            self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
-        elif clock == constants.CLOCK_REAL:
-            # In simulation with a real clock, only the RPC frontend can be used
-            self.frontends = [RPCFrontend(self)] 
+        if mode == "simulated":
+            if clock == constants.CLOCK_SIMULATED:
+                # In pure simulation, we can only use the tracefile frontend
+                self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
+            elif clock == constants.CLOCK_REAL:
+                # In simulation with a real clock, only the RPC frontend can be used
+                self.frontends = [RPCFrontend(self)]             
+        elif mode == "opennebula":
+                self.frontends = [OpenNebulaFrontend(self)]               
+
+        # Statistics collection 
+        self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
         
+        self.logger = logging.getLogger("RM")
+
+        
     def init_opennebula_mode(self):
         """Initializes the resource manager in OpenNebula mode
         
@@ -204,37 +221,12 @@
         self.init_logging()
 
         # The clock
-        wakeup_interval = self.config.get("wakeup-interval")
-        non_sched = self.config.get("non-schedulable-interval")
-        dry_run = self.config.get("dry-run")
-        fastforward = dry_run
-        self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
         
-        # RPC server
-        if dry_run:
-            # No need for an RPC server when doing a dry run
-            self.rpc_server = None
-        else:
-            self.rpc_server = RPCServer(self)
+
         
-        # Enactment modules
-        info_enact = OpenNebulaResourcePoolInfo()
-        vm_enact = OpenNebulaVMEnactment()
-        # No deployment in OpenNebula. Using dummy one for now.
-        deploy_enact = OpenNebulaDummyDeploymentEnactment()
+
             
-        # Resource pool
-        resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
 
-        # Slot table
-        slottable = SlotTable()
-        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
-            slottable.add_node(n)
-
-
-        # Deployment module
-        preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
-
         # VM Scheduler
         vm_scheduler = VMScheduler(slottable, resourcepool)
     
@@ -242,7 +234,7 @@
         self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
 
         # Lease request frontends
-        self.frontends = [OpenNebulaFrontend(self)]        
+            
         
         
     def init_logging(self):
@@ -502,7 +494,7 @@
         self.logger.error("Original exception:")
         self.__print_exception(exc.exc, exc.get_traceback())
         self.logger.error("Unrecoverable error traceback:")
-        self.__print_exception(exc, sys.exc_traceback)
+        self.__print_exception(exc, sys.exc_info()[2])
         self.__panic()
 
     def __unexpected_exception(self, exc):
@@ -511,7 +503,7 @@
         This method prints information on the unrecoverable error and makes Haizea panic.
         """
         self.logger.error("An unexpected exception has happened.")
-        self.__print_exception(exc, sys.exc_traceback)
+        self.__print_exception(exc, sys.exc_info()[2])
         self.__panic()
             
     def __print_exception(self, exc, exc_traceback):



More information about the Haizea-commit mailing list