[haizea-commit] r619 - in branches/TP2.0/src/haizea: common core core/enact core/frontends
haizea-commit at mailman.cs.uchicago.edu
haizea-commit at mailman.cs.uchicago.edu
Thu Jul 30 12:31:27 CDT 2009
Author: borja
Date: 2009-07-30 12:31:25 -0500 (Thu, 30 Jul 2009)
New Revision: 619
Modified:
branches/TP2.0/src/haizea/common/defaults.py
branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py
branches/TP2.0/src/haizea/core/configfile.py
branches/TP2.0/src/haizea/core/enact/opennebula.py
branches/TP2.0/src/haizea/core/enact/simulated.py
branches/TP2.0/src/haizea/core/frontends/opennebula.py
branches/TP2.0/src/haizea/core/manager.py
Log:
Updated OpenNebula request frontend and enactment module so it will work with OpenNebula 1.4 (also updated to use the XMLRPC interface, instead of accessing the database directly)
Modified: branches/TP2.0/src/haizea/common/defaults.py
===================================================================
--- branches/TP2.0/src/haizea/common/defaults.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/common/defaults.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -8,4 +8,6 @@
RPC_SERVER = "localhost"
RPC_PORT = 42493
-RPC_URI = "http://%s:%i" % (RPC_SERVER, RPC_PORT)
\ No newline at end of file
+RPC_URI = "http://%s:%i" % (RPC_SERVER, RPC_PORT)
+
+OPENNEBULA_RPC_PORT = 2633
\ No newline at end of file
Modified: branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py
===================================================================
--- branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/common/opennebula_xmlrpc.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -23,7 +23,7 @@
self.auth = "%s:%s" % (user, passhash)
@staticmethod
- def get_userpass_from_env(self):
+ def get_userpass_from_env():
if not os.environ.has_key("ONE_AUTH"):
return None
else:
@@ -77,9 +77,9 @@
def vm_deploy(self, vid, hid):
try:
- (rc, value) = self.rpc.one.vm.deploy(self.auth, vid, hid)
- if rc == False:
- raise Exception("ONE reported an error: %s" % value)
+ rv = self.rpc.one.vm.deploy(self.auth, vid, hid)
+ if rv[0] == False:
+ raise Exception("ONE reported an error: %s" % rv[1])
else:
return
except xmlrpclib.Fault, err:
@@ -91,9 +91,9 @@
"finalize" ]:
raise Exception("%s is not a valid action" % action)
try:
- (rc, value) = self.rpc.one.vm.deploy(self.auth, action, vid)
- if rc == False:
- raise Exception("ONE reported an error: %s" % value)
+ rv = self.rpc.one.vm.action(self.auth, action, vid)
+ if rv[0] == False:
+ raise Exception("ONE reported an error: %s" % rv[1])
else:
return
except xmlrpclib.Fault, err:
@@ -223,7 +223,7 @@
if deploy_id == None:
self.deploy_id = None
else:
- self.deploy_id = int(deploy_id)
+ self.deploy_id = deploy_id
self.memory = int(vm_element.find("MEMORY").text)
self.cpu = int(vm_element.find("CPU").text)
self.net_tx = int(vm_element.find("NET_TX").text)
Modified: branches/TP2.0/src/haizea/core/configfile.py
===================================================================
--- branches/TP2.0/src/haizea/core/configfile.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/configfile.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -19,6 +19,7 @@
from haizea.common.config import ConfigException, Section, Option, Config, OPTTYPE_INT, OPTTYPE_FLOAT, OPTTYPE_STRING, OPTTYPE_BOOLEAN, OPTTYPE_DATETIME, OPTTYPE_TIMEDELTA
from haizea.common.utils import generate_config_name
import haizea.common.constants as constants
+import haizea.common.defaults as defaults
import sys
from mx.DateTime import TimeDelta
import ConfigParser
@@ -685,20 +686,25 @@
necessary when using Haizea as an OpenNebula scheduling backend.""")
opennebula.options = \
[
- Option(name = "db",
- getter = "one.db",
+ Option(name = "host",
+ getter = "one.host",
type = OPTTYPE_STRING,
required = True,
doc = """
- Location of OpenNebula database.
+ Host where OpenNebula is running.
+ Typically, OpenNebula and Haizea will be installed
+ on the same host, so the following option should be
+ set to 'localhost'. If they're on different hosts,
+ make sure you modify this option accordingly.
"""),
- Option(name = "onevm",
- getter = "onevm",
- type = OPTTYPE_STRING,
- required = True,
+ Option(name = "port",
+ getter = "one.port",
+ type = OPTTYPE_INT,
+ required = False,
+ default = defaults.OPENNEBULA_RPC_PORT,
doc = """
- Location of OpenNebula "onevm" command.
+ TCP port of OpenNebula's XML RPC server
"""),
Option(name = "stop-when-no-more-leases",
Modified: branches/TP2.0/src/haizea/core/enact/opennebula.py
===================================================================
--- branches/TP2.0/src/haizea/core/enact/opennebula.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/enact/opennebula.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -17,74 +17,88 @@
# -------------------------------------------------------------------------- #
from haizea.core.scheduler import EnactmentError
+from haizea.core.leases import Capacity
from haizea.core.scheduler.resourcepool import ResourcePoolNode
from haizea.core.scheduler.slottable import ResourceTuple
from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
from haizea.common.utils import get_config
+from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient, OpenNebulaVM, OpenNebulaHost
import haizea.common.constants as constants
-from pysqlite2 import dbapi2 as sqlite
import logging
-import commands
from time import sleep
+one_rpc = None
+
+def get_one_xmlrpcclient():
+ global one_rpc
+ if one_rpc == None:
+ host = get_config().get("one.host")
+ port = get_config().get("one.port")
+ user, passw = OpenNebulaXMLRPCClient.get_userpass_from_env()
+ one_rpc = OpenNebulaXMLRPCClient(host, port, user, passw)
+ return one_rpc
+
class OpenNebulaEnactmentError(EnactmentError):
- def __init__(self, cmd, status, output):
- self.cmd = cmd
- self.status = status
- self.output = output
-
- self.message = "Error when running '%s' (status=%i, output='%s')" % (cmd, status, output)
+ def __init__(self, method, msg):
+ self.method = method
+ self.msg = msg
+ self.message = "Error when invoking '%s': %s" % (method, msg)
class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
- ONEATTR2HAIZEA = { "TOTALCPU": constants.RES_CPU,
- "TOTALMEMORY": constants.RES_MEM }
def __init__(self):
ResourcePoolInfo.__init__(self)
- config = get_config()
self.logger = logging.getLogger("ENACT.ONE.INFO")
- # Get information about nodes from DB
- conn = sqlite.connect(config.get("one.db"))
- conn.row_factory = sqlite.Row
-
- self.nodes = []
- cur = conn.cursor()
- cur.execute("select oid, host_name from host_pool where state != 4")
- hosts = cur.fetchall()
+ self.rpc = get_one_xmlrpcclient()
+
+ # Get information about nodes from OpenNebula
+ self.nodes = {}
+ hosts = self.rpc.hostpool_info()
for (i, host) in enumerate(hosts):
- nod_id = i+1
- enactID = int(host["oid"])
- hostname = host["host_name"]
- capacity = ResourceTuple.create_empty()
- capacity.set_by_type(constants.RES_DISK, 80000) # OpenNebula currently doesn't provide this
- capacity.set_by_type(constants.RES_NETIN, 100) # OpenNebula currently doesn't provide this
- capacity.set_by_type(constants.RES_NETOUT, 100) # OpenNebula currently doesn't provide this
- cur.execute("select name, value from host_attributes where id=%i" % enactID)
- attrs = cur.fetchall()
- for attr in attrs:
- name = attr["name"]
- if OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA.has_key(name):
- capacity.set_by_type(OpenNebulaResourcePoolInfo.ONEATTR2HAIZEA[name], int(attr["value"]))
- capacity.set_by_type(constants.RES_CPU, capacity.get_by_type(constants.RES_CPU) / 100.0)
- capacity.set_by_type(constants.RES_MEM, capacity.get_by_type(constants.RES_MEM) / 1024.0)
- node = Node(nod_id, hostname, capacity)
- node.enactment_info = int(enactID)
- self.nodes.append(node)
+ if not host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED):
+ nod_id = i+1
+ enact_id = host.id
+ hostname = host.name
+ capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
+
+ # CPU
+ # OpenNebula reports each CPU as "100"
+ # (so, a 4-core machine is reported as "400")
+ # We need to convert this to a multi-instance
+ # resource type in Haizea
+ cpu = host.max_cpu
+ ncpu = cpu / 100
+ capacity.set_ninstances(constants.RES_CPU, ncpu)
+ for i in range(ncpu):
+ capacity.set_quantity_instance(constants.RES_CPU, i+1, 100)
+
+ # Memory. Must divide by 1024 to obtain quantity in MB
+ capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0)
+
+ # Disk
+ # OpenNebula doesn't report this correctly yet.
+ # We set it to an arbitrarily high value.
+ capacity.set_quantity(constants.RES_DISK, 80000)
+
+ node = ResourcePoolNode(nod_id, hostname, capacity)
+ node.enactment_info = enact_id
+ self.nodes[nod_id] = node
- self.logger.info("Fetched %i nodes from ONE db" % len(self.nodes))
- for n in self.nodes:
- self.logger.debug("%i %s %s" % (n.nod_id, n.hostname, n.capacity))
+ self.resource_types = []
+ self.resource_types.append((constants.RES_CPU,1))
+ self.resource_types.append((constants.RES_MEM,1))
+ self.resource_types.append((constants.RES_DISK,1))
+
+ self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))
+ for n in self.nodes.values():
+ self.logger.debug("%i %s %s" % (n.id, n.hostname, n.capacity))
def get_nodes(self):
return self.nodes
def get_resource_types(self):
- return [(constants.RES_CPU, constants.RESTYPE_FLOAT, "CPU"),
- (constants.RES_MEM, constants.RESTYPE_INT, "Mem"),
- (constants.RES_DISK, constants.RESTYPE_INT, "Disk"),
- (constants.RES_NETIN, constants.RESTYPE_INT, "Net (in)"),
- (constants.RES_NETOUT, constants.RESTYPE_INT, "Net (out)")]
+ return self.resource_types
def get_bandwidth(self):
return 0
@@ -93,49 +107,38 @@
def __init__(self):
VMEnactment.__init__(self)
self.logger = logging.getLogger("ENACT.ONE.VM")
+ self.rpc = get_one_xmlrpcclient()
- self.onevm = get_config().get("onevm")
-
- self.conn = sqlite.connect(get_config().get("one.db"))
- self.conn.row_factory = sqlite.Row
-
-
- def run_command(self, cmd):
- self.logger.debug("Running command: %s" % cmd)
- (status, output) = commands.getstatusoutput(cmd)
- self.logger.debug("Returned status=%i, output='%s'" % (status, output))
- return status, output
-
def start(self, action):
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- hostID = action.vnodes[vnode].pnode
- image = action.vnodes[vnode].diskimage
- cpu = action.vnodes[vnode].resources.get_by_type(constants.RES_CPU)
- memory = action.vnodes[vnode].resources.get_by_type(constants.RES_MEM)
+ vid = action.vnodes[vnode].enactment_info
+ hid = action.vnodes[vnode].pnode
- self.logger.debug("Received request to start VM for L%iV%i on host %i, image=%s, cpu=%i, mem=%i"
- % (action.lease_haizea_id, vnode, hostID, image, cpu, memory))
+ self.logger.debug("Sending request to start VM for L%iV%i (ONE: vid=%i, hid=%i)"
+ % (action.lease_haizea_id, vnode, vid, hid))
- cmd = "%s deploy %i %i" % (self.onevm, vmid, hostID)
- status, output = self.run_command(cmd)
- if status == 0:
- self.logger.debug("Command returned succesfully.")
- else:
- raise OpenNebulaEnactmentError("onevm deploy", status, output)
+ try:
+ self.rpc.vm_deploy(vid, hid)
+ self.logger.debug("Request succesful.")
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.deploy", msg)
def stop(self, action):
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- cmd = "%s shutdown %i" % (self.onevm, vmid)
- status, output = self.run_command(cmd)
- if status == 0:
- self.logger.debug("Command returned succesfully.")
- else:
- raise OpenNebulaEnactmentError("onevm shutdown", status, output)
+ vid = action.vnodes[vnode].enactment_info
+
+ self.logger.debug("Sending request to shutdown VM for L%iV%i (ONE: vid=%i)"
+ % (action.lease_haizea_id, vnode, vid))
+ try:
+ self.rpc.vm_shutdown(vid)
+ self.logger.debug("Request succesful.")
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.shutdown", msg)
+
+ # Space out commands to avoid OpenNebula from getting saturated
# TODO: We should spawn out a thread to do this, so Haizea isn't
# blocking until all these commands end
interval = get_config().get("enactment-overhead").seconds
@@ -144,16 +147,18 @@
def suspend(self, action):
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- cmd = "%s suspend %i" % (self.onevm, vmid)
- status, output = self.run_command(cmd)
- if status == 0:
- self.logger.debug("Command returned succesfully.")
- else:
- raise OpenNebulaEnactmentError("onevm suspend", status, output)
+ vid = action.vnodes[vnode].enactment_info
+
+ self.logger.debug("Sending request to suspend VM for L%iV%i (ONE: vid=%i)"
+ % (action.lease_haizea_id, vnode, vid))
+ try:
+ self.rpc.vm_suspend(vid)
+ self.logger.debug("Request succesful.")
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.suspend", msg)
+
# Space out commands to avoid OpenNebula from getting saturated
- # TODO: We should spawn out a thread to do this
# TODO: We should spawn out a thread to do this, so Haizea isn't
# blocking until all these commands end
interval = get_config().get("enactment-overhead").seconds
@@ -162,14 +167,17 @@
def resume(self, action):
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- cmd = "%s resume %i" % (self.onevm, vmid)
- status, output = self.run_command(cmd)
- if status == 0:
- self.logger.debug("Command returned succesfully.")
- else:
- raise OpenNebulaEnactmentError("onevm resume", status, output)
+ vid = action.vnodes[vnode].enactment_info
+
+ self.logger.debug("Sending request to resume VM for L%iV%i (ONE: vid=%i)"
+ % (action.lease_haizea_id, vnode, vid))
+ try:
+ self.rpc.vm_resume(vid)
+ self.logger.debug("Request succesful.")
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.resume", msg)
+
# Space out commands to avoid OpenNebula from getting saturated
# TODO: We should spawn out a thread to do this, so Haizea isn't
# blocking until all these commands end
@@ -177,39 +185,43 @@
sleep(interval)
def verify_suspend(self, action):
- # TODO: Do a single query
result = 0
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- cur = self.conn.cursor()
- cur.execute("select state from vm_pool where oid = %i" % vmid)
- onevm = cur.fetchone()
- state = onevm["state"]
- if state == 5:
- self.logger.debug("Suspend of L%iV%i correct." % (action.lease_haizea_id, vnode))
- else:
- self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i" % (action.lease_haizea_id, vnode, state))
- result = 1
+ vid = action.vnodes[vnode].enactment_info
+
+ try:
+ vm = self.rpc.vm_info(vid)
+ state = vm.state
+ if state == OpenNebulaVM.STATE_SUSPENDED:
+ self.logger.debug("Suspend of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
+ else:
+ self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
+ result = 1
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.info", msg)
+
return result
def verify_resume(self, action):
- # TODO: Do a single query
result = 0
for vnode in action.vnodes:
# Unpack action
- vmid = action.vnodes[vnode].enactment_info
- cur = self.conn.cursor()
- cur.execute("select state from vm_pool where oid = %i" % vmid)
- onevm = cur.fetchone()
- state = onevm["state"]
- if state == 3:
- self.logger.debug("Resume of L%iV%i correct." % (action.lease_haizea_id, vnode))
- else:
- self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i" % (action.lease_haizea_id, vnode, state))
- result = 1
- return result
+ vid = action.vnodes[vnode].enactment_info
+
+ try:
+ vm = self.rpc.vm_info(vid)
+ state = vm.state
+ if state == OpenNebulaVM.STATE_ACTIVE:
+ self.logger.debug("Resume of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
+ else:
+ self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
+ result = 1
+ except Exception, msg:
+ raise OpenNebulaEnactmentError("vm.info", msg)
+ return result
+
class OpenNebulaDummyDeploymentEnactment(DeploymentEnactment):
def __init__(self):
DeploymentEnactment.__init__(self)
Modified: branches/TP2.0/src/haizea/core/enact/simulated.py
===================================================================
--- branches/TP2.0/src/haizea/core/enact/simulated.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/enact/simulated.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -34,10 +34,6 @@
# TODO: raise something more meaningful
raise
- self.resource_types = []
- self.resource_types.append((constants.RES_CPU,"CPU"))
- self.resource_types.append((constants.RES_MEM,"Memory"))
-
# Disk and network should be specified but, if not, we can
# just add arbitrarily large values.
if not "Disk" in site.resource_types:
@@ -49,6 +45,8 @@
if not "Net-out" in site.resource_types:
site.add_resource("Net-out", [1000000])
+ self.resource_types = site.get_resource_types()
+
nodes = site.nodes.get_all_nodes()
self.nodes = dict([(id, ResourcePoolNode(id, "simul-%i" % id, capacity)) for (id, capacity) in nodes.items()])
Modified: branches/TP2.0/src/haizea/core/frontends/opennebula.py
===================================================================
--- branches/TP2.0/src/haizea/core/frontends/opennebula.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/frontends/opennebula.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -17,17 +17,28 @@
# -------------------------------------------------------------------------- #
import haizea.common.constants as constants
+from haizea.core.leases import Lease, Capacity, Timestamp, Duration, UnmanagedSoftwareEnvironment
from haizea.core.frontends import RequestFrontend
from haizea.core.scheduler.slottable import ResourceTuple
from haizea.common.utils import UNIX2DateTime, round_datetime, get_config, get_clock
-
-from pysqlite2 import dbapi2 as sqlite
+from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient, OpenNebulaVM
from mx.DateTime import DateTimeDelta, TimeDelta, ISO
import operator
import logging
-class OpenNebulaVM(object):
+one_rpc = None
+
+def get_one_xmlrpcclient():
+ global one_rpc
+ if one_rpc == None:
+ host = get_config().get("one.host")
+ port = get_config().get("one.port")
+ user, passw = OpenNebulaXMLRPCClient.get_userpass_from_env()
+ one_rpc = OpenNebulaXMLRPCClient(host, port, user, passw)
+ return one_rpc
+
+class OpenNebulaHaizeaVM(object):
HAIZEA_PARAM = "HAIZEA"
HAIZEA_START = "START"
HAIZEA_START_NOW = "now"
@@ -38,92 +49,70 @@
HAIZEA_PREEMPTIBLE_YES = "yes"
HAIZEA_PREEMPTIBLE_NO = "no"
HAIZEA_GROUP = "GROUP"
+
- ONE_CPU="CPU"
- ONE_MEMORY="MEMORY"
- ONE_DISK="DISK"
- ONE_DISK_SOURCE="SOURCE"
-
- def __init__(self, db_entry, attrs):
- self.db_entry = db_entry
-
- self.attrs = dict(attrs)
-
- # In OpenNebula 1.2, there can be more than one disk attribute
- self.attrs[OpenNebulaVM.ONE_DISK] = [value for name, value in attrs if name == OpenNebulaVM.ONE_DISK]
-
+ def __init__(self, opennebula_vm):
# If there is no HAIZEA parameter, the default is to treat the
# request as an immediate request with unlimited duration
- if not self.attrs.has_key(OpenNebulaVM.HAIZEA_PARAM):
- self.haizea_param = {OpenNebulaVM.HAIZEA_START: OpenNebulaVM.HAIZEA_START_NOW,
- OpenNebulaVM.HAIZEA_DURATION: OpenNebulaVM.HAIZEA_DURATION_UNLIMITED,
- OpenNebulaVM.HAIZEA_PREEMPTIBLE: OpenNebulaVM.HAIZEA_PREEMPTIBLE_NO}
+ if not opennebula_vm.template.has_key(OpenNebulaHaizeaVM.HAIZEA_PARAM):
+ self.start = OpenNebulaHaizeaVM.HAIZEA_START_NOW
+ self.duration = OpenNebulaHaizeaVM.HAIZEA_DURATION_UNLIMITED
+ self.preemptible = OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE_NO
+ self.group = None
else:
- self.haizea_param = self.__get_vector_value(self.attrs[OpenNebulaVM.HAIZEA_PARAM])
-
-
- def is_grouped(self):
- return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_GROUP)
-
- def has_haizea_params(self):
- return self.haizea_param.has_key(OpenNebulaVM.HAIZEA_START)
-
- def get_start(self):
- start = self.haizea_param[OpenNebulaVM.HAIZEA_START]
- if start == OpenNebulaVM.HAIZEA_START_NOW or start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
- pass # Leave start as it is
- elif start[0] == "+":
+ self.start = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_START]
+ self.duration = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_DURATION]
+ self.preemptible = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE]
+ if opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM].has_key(OpenNebulaHaizeaVM.HAIZEA_GROUP):
+ self.group = opennebula_vm.template[OpenNebulaHaizeaVM.HAIZEA_PARAM][OpenNebulaHaizeaVM.HAIZEA_GROUP]
+ else:
+ self.group = None
+
+ self.submit_time = UNIX2DateTime(opennebula_vm.stime)
+
+ # Create Timestamp object
+ if self.start == OpenNebulaHaizeaVM.HAIZEA_START_NOW:
+ self.start = Timestamp(Timestamp.NOW)
+ elif self.start == OpenNebulaHaizeaVM.HAIZEA_START_BESTEFFORT:
+ self.start = Timestamp(Timestamp.UNSPECIFIED)
+ elif self.start[0] == "+":
# Relative time
- # The following is just for testing:
- now = get_clock().get_time()
- start = round_datetime(now + ISO.ParseTime(start[1:]))
- # Should be:
- #start = round_datetime(self.get_submit_time() + ISO.ParseTime(start[1:]))
-
+ self.start = Timestamp(round_datetime(self.submit_time + ISO.ParseTime(self.start[1:])))
else:
- start = ISO.ParseDateTime(start)
-
- return start
-
- def get_duration(self):
- duration = self.haizea_param[OpenNebulaVM.HAIZEA_DURATION]
- if duration == OpenNebulaVM.HAIZEA_DURATION_UNLIMITED:
+ self.start = Timestamp(ISO.ParseDateTime(self.start))
+
+ # Create Duration object
+ if self.duration == OpenNebulaHaizeaVM.HAIZEA_DURATION_UNLIMITED:
# This is an interim solution (make it run for a century).
# TODO: Integrate concept of unlimited duration in the lease datastruct
- duration = DateTimeDelta(36500)
+ self.duration = Duration(DateTimeDelta(36500))
else:
- duration = ISO.ParseTimeDelta(duration)
- return duration
-
- def get_preemptible(self):
- preemptible = self.haizea_param[OpenNebulaVM.HAIZEA_PREEMPTIBLE]
- return (preemptible == OpenNebulaVM.HAIZEA_PREEMPTIBLE_YES)
+ self.duration = Duration(ISO.ParseTimeDelta(self.duration))
+
- def get_group(self):
- return self.haizea_param[OpenNebulaVM.HAIZEA_GROUP]
+ self.preemptible = (self.preemptible == OpenNebulaHaizeaVM.HAIZEA_PREEMPTIBLE_YES)
- def get_submit_time(self):
- return UNIX2DateTime(self.db_entry["stime"])
- def get_diskimage(self):
- disk = self.__get_vector_value(self.attrs[OpenNebulaVM.ONE_DISK][0])
- diskimage = disk[OpenNebulaVM.ONE_DISK_SOURCE]
- return diskimage
-
- def get_diskimage_size(self):
- return 0 # OpenNebula doesn't provide this
-
- def get_resource_requirements(self):
- resreq = ResourceTuple.create_empty()
- resreq.set_by_type(constants.RES_CPU, float(self.attrs[OpenNebulaVM.ONE_CPU]))
- resreq.set_by_type(constants.RES_MEM, int(self.attrs[OpenNebulaVM.ONE_MEMORY]))
- return resreq
+ self.capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
+
+ # CPU
+ # CPUs in VMs are not reported the same as in hosts.
+ # THere are two template values: CPU and VCPU.
+ # CPU reports the percentage of the CPU needed by the VM.
+ # VCPU, which is optional, reports how many CPUs are needed.
+ cpu = int(float(opennebula_vm.template["CPU"]) * 100)
+ if opennebula_vm.template.has_key("VCPU"):
+ ncpu = int(opennebula_vm.template["VCPU"])
+ else:
+ ncpu = 1
+ self.capacity.set_ninstances(constants.RES_CPU, ncpu)
+ for i in range(ncpu):
+ self.capacity.set_quantity_instance(constants.RES_CPU, i+1, cpu)
+
+ # Memory. Unlike hosts, memory is reported directly in MBs
+ self.capacity.set_quantity(constants.RES_MEM, int(opennebula_vm.template["MEMORY"]))
- def get_oneid(self):
- return int(self.db_entry["oid"])
-
- def __get_vector_value(self, value):
- return dict([n.split("=") for n in value.split("@^_^@")])
+ self.one_id = opennebula_vm.id
class OpenNebulaFrontend(RequestFrontend):
@@ -132,35 +121,28 @@
self.manager = manager
self.processed = []
self.logger = logging.getLogger("ONEREQ")
- config = get_config()
+ self.rpc = get_one_xmlrpcclient()
- self.conn = sqlite.connect(config.get("one.db"))
- self.conn.row_factory = sqlite.Row
def get_accumulated_requests(self):
- cur = self.conn.cursor()
- processed = ",".join([`p` for p in self.processed])
- cur.execute("select * from vm_pool where state=1 and oid not in (%s)" % processed)
- db_opennebula_vms = cur.fetchall()
-
+ vms = self.rpc.vmpool_info()
+
# Extract the pending OpenNebula VMs
- opennebula_vms = [] # (ONE VM, ONE VM template attributes, ONE Haizea parameter)
- for vm in db_opennebula_vms:
- cur.execute("select * from vm_attributes where id=%i" % vm["oid"])
- template = cur.fetchall()
- attrs = [(r["name"], r["value"]) for r in template]
- self.processed.append(vm["oid"])
+ pending_vms = []
+ for vm in vms:
+ if not vm.id in self.processed and vm.state == OpenNebulaVM.STATE_PENDING:
+ vm_detailed = self.rpc.vm_info(vm.id)
+ pending_vms.append(OpenNebulaHaizeaVM(vm_detailed))
+ self.processed.append(vm.id)
- opennebula_vms.append(OpenNebulaVM(vm, attrs))
-
- grouped = [vm for vm in opennebula_vms if vm.is_grouped()]
- not_grouped = [vm for vm in opennebula_vms if not vm.is_grouped()]
+ grouped = [vm for vm in pending_vms if vm.group != None]
+ not_grouped = [vm for vm in pending_vms if vm.group == None]
# Extract VM groups
- group_ids = set([vm.get_group() for vm in grouped])
+ group_ids = set([vm.group for vm in grouped])
groups = {}
for group_id in group_ids:
- groups[group_id] = [vm for vm in grouped if vm.get_group() == group_id]
+ groups[group_id] = [vm for vm in grouped if vm.group == group_id]
lease_requests = []
for group_id, opennebula_vms in groups.items():
@@ -179,41 +161,26 @@
def __ONEreqs_to_lease(self, opennebula_vms, group_id=None):
# The vm_with_params is used to extract the HAIZEA parameters.
# (i.e., lease-wide attributes)
- vm_with_params = self.__get_vm_with_params(opennebula_vms)
+ vm_with_params = opennebula_vms[0]
# Per-lease attributes
- start = vm_with_params.get_start()
- duration = vm_with_params.get_duration()
- preemptible = vm_with_params.get_preemptible()
- numnodes = len(opennebula_vms)
+ start = vm_with_params.start
+ duration = vm_with_params.duration
+ preemptible = vm_with_params.preemptible
+ submit_time = vm_with_params.submit_time
# Per-vnode attributes
- # Since Haizea currently assumes homogeneous nodes in
- # the lease, we will also use vm_with_params to extract the
- # resource requirements.
- # TODO: When Haizea is modified to support heterogeneous nodes,
- # extract the resource requirements from each individual VM.
- submit_time = vm_with_params.get_submit_time()
- diskimage = vm_with_params.get_diskimage()
- diskimagesize = vm_with_params.get_diskimage_size()
- resource_requirements = vm_with_params.get_resource_requirements()
+ requested_resources = dict([(i+1,vm.capacity) for i, vm in enumerate(opennebula_vms)])
- if start == OpenNebulaVM.HAIZEA_START_NOW:
- lease = ImmediateLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
- elif start == OpenNebulaVM.HAIZEA_START_BESTEFFORT:
- lease = BestEffortLease(submit_time, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
- else:
- lease = ARLease(submit_time, start, duration, diskimage, diskimagesize, numnodes, resource_requirements, preemptible)
+ lease = Lease.create_new(submit_time = submit_time,
+ requested_resources = requested_resources,
+ start = start,
+ duration = duration,
+ deadline = None,
+ preemptible = preemptible,
+ software = UnmanagedSoftwareEnvironment())
lease.enactment_info = group_id
- lease.vnode_enactment_info = dict([(i+1,vm.get_oneid()) for i, vm in enumerate(opennebula_vms)])
+ lease.vnode_enactment_info = dict([(i+1,vm.one_id) for i, vm in enumerate(opennebula_vms)])
return lease
-
- def __get_vm_with_params(self, opennebula_vms):
- # One of the ONE VMs has the start, duration, and preemptible
- # parameters. Find it.
- vm_with_params = [vm for vm in opennebula_vms if vm.has_haizea_params()]
- # There should only be one
- # TODO: This is the ONE user's responsibility, but we should validate it
- # nonetheless.
- return vm_with_params[0]
+
Modified: branches/TP2.0/src/haizea/core/manager.py
===================================================================
--- branches/TP2.0/src/haizea/core/manager.py 2009-07-30 17:30:12 UTC (rev 618)
+++ branches/TP2.0/src/haizea/core/manager.py 2009-07-30 17:31:25 UTC (rev 619)
@@ -91,28 +91,16 @@
self.daemon = daemon
self.pidfile = pidfile
-
- if mode == "simulated":
- self.init_simulated_mode()
- elif mode == "opennebula":
- self.init_opennebula_mode()
-
- # Statistics collection
- self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
- self.logger = logging.getLogger("RM")
-
- def init_simulated_mode(self):
- """Initializes the resource manager in simulated mode
-
- """
-
- # Simulated-time simulations always run in the foreground
- clock = self.config.get("clock")
- if clock == constants.CLOCK_SIMULATED:
- self.daemon = False
-
self.init_logging()
+
+ if mode == "simulated":
+ # Simulated-time simulations always run in the foreground
+ clock = self.config.get("clock")
+ if clock == constants.CLOCK_SIMULATED:
+ self.daemon = False
+ elif mode == "opennebula":
+ clock = constants.CLOCK_REAL
if clock == constants.CLOCK_SIMULATED:
starttime = self.config.get("starttime")
@@ -121,26 +109,46 @@
elif clock == constants.CLOCK_REAL:
wakeup_interval = self.config.get("wakeup-interval")
non_sched = self.config.get("non-schedulable-interval")
- self.clock = RealClock(self, wakeup_interval, non_sched)
- self.rpc_server = RPCServer(self)
+ if mode == "opennebula":
+ fastforward = self.config.get("dry-run")
+ else:
+ fastforward = False
+ self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
+ if fastforward:
+ # No need for an RPC server when doing a dry run
+ self.rpc_server = None
+ else:
+ self.rpc_server = RPCServer(self)
- resources = self.config.get("simul.resources")
- if resources == "in-tracefile":
- tracefile = self.config.get("tracefile")
- site = Site.from_lwf_file(tracefile)
- elif resources.startswith("file:"):
- sitefile = resources.split(":")
- site = Site.from_xml_file(sitefile)
- else:
- site = Site.from_resources_string(resources)
-
# Enactment modules
- info_enact = SimulatedResourcePoolInfo(site)
- vm_enact = SimulatedVMEnactment()
- deploy_enact = SimulatedDeploymentEnactment()
-
+ if mode == "simulated":
+ resources = self.config.get("simul.resources")
+ if resources == "in-tracefile":
+ tracefile = self.config.get("tracefile")
+ site = Site.from_lwf_file(tracefile)
+ elif resources.startswith("file:"):
+ sitefile = resources.split(":")
+ site = Site.from_xml_file(sitefile)
+ else:
+ site = Site.from_resources_string(resources)
+
+ info_enact = SimulatedResourcePoolInfo(site)
+ vm_enact = SimulatedVMEnactment()
+ deploy_enact = SimulatedDeploymentEnactment()
+ elif mode == "opennebula":
+ # Enactment modules
+ info_enact = OpenNebulaResourcePoolInfo()
+ vm_enact = OpenNebulaVMEnactment()
+ # No deployment in OpenNebula. Using dummy one for now.
+ deploy_enact = OpenNebulaDummyDeploymentEnactment()
+
+ if mode == "simulated":
+ preparation_type = self.config.get("lease-preparation")
+ elif mode == "opennebula":
+ # No deployment in OpenNebula.
+ preparation_type = constants.PREPARATION_UNMANAGED
+
# Resource pool
- preparation_type = self.config.get("lease-preparation")
if preparation_type == constants.PREPARATION_TRANSFER:
if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
@@ -150,7 +158,7 @@
resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
# Slot table
- slottable = SlotTable(site.get_resource_types())
+ slottable = SlotTable(info_enact.get_resource_types())
for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
rt = slottable.create_resource_tuple_from_capacity(n.capacity)
slottable.add_node(n.id, rt)
@@ -190,13 +198,22 @@
self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
# Lease request frontends
- if clock == constants.CLOCK_SIMULATED:
- # In pure simulation, we can only use the tracefile frontend
- self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
- elif clock == constants.CLOCK_REAL:
- # In simulation with a real clock, only the RPC frontend can be used
- self.frontends = [RPCFrontend(self)]
+ if mode == "simulated":
+ if clock == constants.CLOCK_SIMULATED:
+ # In pure simulation, we can only use the tracefile frontend
+ self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
+ elif clock == constants.CLOCK_REAL:
+ # In simulation with a real clock, only the RPC frontend can be used
+ self.frontends = [RPCFrontend(self)]
+ elif mode == "opennebula":
+ self.frontends = [OpenNebulaFrontend(self)]
+
+ # Statistics collection
+ self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
+ self.logger = logging.getLogger("RM")
+
+
def init_opennebula_mode(self):
"""Initializes the resource manager in OpenNebula mode
@@ -204,37 +221,12 @@
self.init_logging()
# The clock
- wakeup_interval = self.config.get("wakeup-interval")
- non_sched = self.config.get("non-schedulable-interval")
- dry_run = self.config.get("dry-run")
- fastforward = dry_run
- self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
- # RPC server
- if dry_run:
- # No need for an RPC server when doing a dry run
- self.rpc_server = None
- else:
- self.rpc_server = RPCServer(self)
+
- # Enactment modules
- info_enact = OpenNebulaResourcePoolInfo()
- vm_enact = OpenNebulaVMEnactment()
- # No deployment in OpenNebula. Using dummy one for now.
- deploy_enact = OpenNebulaDummyDeploymentEnactment()
+
- # Resource pool
- resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
- # Slot table
- slottable = SlotTable()
- for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
- slottable.add_node(n)
-
-
- # Deployment module
- preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
-
# VM Scheduler
vm_scheduler = VMScheduler(slottable, resourcepool)
@@ -242,7 +234,7 @@
self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
# Lease request frontends
- self.frontends = [OpenNebulaFrontend(self)]
+
def init_logging(self):
@@ -502,7 +494,7 @@
self.logger.error("Original exception:")
self.__print_exception(exc.exc, exc.get_traceback())
self.logger.error("Unrecoverable error traceback:")
- self.__print_exception(exc, sys.exc_traceback)
+ self.__print_exception(exc, sys.exc_info()[2])
self.__panic()
def __unexpected_exception(self, exc):
@@ -511,7 +503,7 @@
This method prints information on the unrecoverable error and makes Haizea panic.
"""
self.logger.error("An unexpected exception has happened.")
- self.__print_exception(exc, sys.exc_traceback)
+ self.__print_exception(exc, sys.exc_info()[2])
self.__panic()
def __print_exception(self, exc, exc_traceback):
More information about the Haizea-commit
mailing list