blob: 4020751feada549d79d80ec6aaf628872b73ddd9 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2006, 2007, 2008, 2010, 2012, 2013 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Script for unittesting the objects module"""
import copy
import pprint
import unittest
from ganeti import constants
from ganeti import objects
from ganeti import errors
from ganeti import serializer
import testutils
class SimpleObject(objects.ConfigObject):
__slots__ = ["a", "b"]
class TestDictState(unittest.TestCase):
"""Simple dict tansformation tests"""
def testSimpleObjectToDict(self):
o1 = SimpleObject(a="1")
self.assertEquals(o1.ToDict(), {"a": "1"})
self.assertEquals(o1.__getstate__(), {"a": "1"})
self.assertEquals(o1.__getstate__(), o1.ToDict())
o1.a = 2
o1.b = 5
self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
o2 = SimpleObject.FromDict(o1.ToDict())
self.assertEquals(o1.ToDict(), {"a": 2, "b": 5})
class TestClusterObject(unittest.TestCase):
"""Tests done on a L{objects.Cluster}"""
def setUp(self):
hvparams = {
constants.HT_FAKE: {
"foo": "bar",
"bar": "foo",
"foobar": "barfoo",
},
}
os_hvp = {
"lenny-image": {
constants.HT_FAKE: {
"foo": "baz",
"foobar": "foobar",
"blah": "blibb",
"blubb": "blah",
},
constants.HT_XEN_PVM: {
"root_path": "/dev/sda5",
"foo": "foobar",
},
},
"ubuntu-hardy": {
},
}
ndparams = {
constants.ND_OOB_PROGRAM: "/bin/cluster-oob",
constants.ND_SPINDLE_COUNT: 1,
constants.ND_EXCLUSIVE_STORAGE: False,
}
self.fake_cl = objects.Cluster(hvparams=hvparams, os_hvp=os_hvp,
ndparams=ndparams)
self.fake_cl.UpgradeConfig()
def testGetHVDefaults(self):
cl = self.fake_cl
self.failUnlessEqual(cl.GetHVDefaults(constants.HT_FAKE),
cl.hvparams[constants.HT_FAKE])
self.failUnlessEqual(cl.GetHVDefaults(None), {})
defaults = cl.GetHVDefaults(constants.HT_XEN_PVM,
os_name="lenny-image")
for param, value in cl.os_hvp["lenny-image"][constants.HT_XEN_PVM].items():
self.assertEqual(value, defaults[param])
def testFillHvFullMerge(self):
inst_hvparams = {
"blah": "blubb",
}
fake_dict = constants.HVC_DEFAULTS[constants.HT_FAKE].copy()
fake_dict.update({
"foo": "baz",
"bar": "foo",
"foobar": "foobar",
"blah": "blubb",
"blubb": "blah",
})
fake_inst = objects.Instance(name="foobar",
os="lenny-image",
hypervisor=constants.HT_FAKE,
hvparams=inst_hvparams)
self.assertEqual(fake_dict, self.fake_cl.FillHV(fake_inst))
def testFillHvGlobalParams(self):
fake_inst = objects.Instance(name="foobar",
os="ubuntu-hardy",
hypervisor=constants.HT_FAKE,
hvparams={})
self.assertEqual(self.fake_cl.hvparams[constants.HT_FAKE],
self.fake_cl.FillHV(fake_inst))
def testFillHvInstParams(self):
inst_hvparams = {
"blah": "blubb",
}
fake_inst = objects.Instance(name="foobar",
os="ubuntu-hardy",
hypervisor=constants.HT_XEN_PVM,
hvparams=inst_hvparams)
filled_conf = self.fake_cl.FillHV(fake_inst)
for param, value in constants.HVC_DEFAULTS[constants.HT_XEN_PVM].items():
if param == "blah":
value = "blubb"
self.assertEqual(value, filled_conf[param])
def testFillHvDefaultParams(self):
fake_inst = objects.Instance(name="foobar",
os="ubuntu-hardy",
hypervisor=constants.HT_XEN_PVM,
hvparams={})
self.assertEqual(constants.HVC_DEFAULTS[constants.HT_XEN_PVM],
self.fake_cl.FillHV(fake_inst))
def testFillHvPartialParams(self):
os = "lenny-image"
fake_inst = objects.Instance(name="foobar",
os=os,
hypervisor=constants.HT_XEN_PVM,
hvparams={})
filled_conf = self.fake_cl.FillHV(fake_inst)
for param, value in self.fake_cl.os_hvp[os][constants.HT_XEN_PVM].items():
self.assertEqual(value, filled_conf[param])
def testFillNdParamsCluster(self):
fake_node = objects.Node(name="test",
ndparams={},
group="testgroup")
fake_group = objects.NodeGroup(name="testgroup",
ndparams={})
self.assertEqual(self.fake_cl.ndparams,
self.fake_cl.FillND(fake_node, fake_group))
def testFillNdParamsNodeGroup(self):
fake_node = objects.Node(name="test",
ndparams={},
group="testgroup")
group_ndparams = {
constants.ND_OOB_PROGRAM: "/bin/group-oob",
constants.ND_SPINDLE_COUNT: 10,
constants.ND_EXCLUSIVE_STORAGE: True,
constants.ND_OVS: True,
constants.ND_OVS_LINK: "eth2",
constants.ND_OVS_NAME: "openvswitch",
constants.ND_SSH_PORT: 122,
constants.ND_CPU_SPEED: 1.1,
}
fake_group = objects.NodeGroup(name="testgroup",
ndparams=group_ndparams)
self.assertEqual(group_ndparams,
self.fake_cl.FillND(fake_node, fake_group))
def testFillNdParamsNode(self):
node_ndparams = {
constants.ND_OOB_PROGRAM: "/bin/node-oob",
constants.ND_SPINDLE_COUNT: 2,
constants.ND_EXCLUSIVE_STORAGE: True,
constants.ND_OVS: True,
constants.ND_OVS_LINK: "eth2",
constants.ND_OVS_NAME: "openvswitch",
constants.ND_SSH_PORT: 222,
constants.ND_CPU_SPEED: 1.1,
}
fake_node = objects.Node(name="test",
ndparams=node_ndparams,
group="testgroup")
fake_group = objects.NodeGroup(name="testgroup",
ndparams={})
self.assertEqual(node_ndparams,
self.fake_cl.FillND(fake_node, fake_group))
def testFillNdParamsAll(self):
node_ndparams = {
constants.ND_OOB_PROGRAM: "/bin/node-oob",
constants.ND_SPINDLE_COUNT: 5,
constants.ND_EXCLUSIVE_STORAGE: True,
constants.ND_OVS: True,
constants.ND_OVS_LINK: "eth2",
constants.ND_OVS_NAME: "openvswitch",
constants.ND_SSH_PORT: 322,
constants.ND_CPU_SPEED: 1.1,
}
fake_node = objects.Node(name="test",
ndparams=node_ndparams,
group="testgroup")
group_ndparams = {
constants.ND_OOB_PROGRAM: "/bin/group-oob",
constants.ND_SPINDLE_COUNT: 4,
constants.ND_SSH_PORT: 422,
}
fake_group = objects.NodeGroup(name="testgroup",
ndparams=group_ndparams)
self.assertEqual(node_ndparams,
self.fake_cl.FillND(fake_node, fake_group))
def testPrimaryHypervisor(self):
assert self.fake_cl.enabled_hypervisors is None
self.fake_cl.enabled_hypervisors = [constants.HT_XEN_HVM]
self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_HVM)
self.fake_cl.enabled_hypervisors = [constants.HT_XEN_PVM, constants.HT_KVM]
self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_PVM)
self.fake_cl.enabled_hypervisors = sorted(constants.HYPER_TYPES)
self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_CHROOT)
def testUpgradeConfig(self):
# FIXME: This test is incomplete
cluster = objects.Cluster()
cluster.UpgradeConfig()
cluster = objects.Cluster(ipolicy={"unknown_key": None})
self.assertRaises(errors.ConfigurationError, cluster.UpgradeConfig)
def testUpgradeEnabledDiskTemplates(self):
cfg = objects.ConfigData()
cfg.cluster = objects.Cluster()
cfg.cluster.volume_group_name = "myvg"
instance1 = objects.Instance()
instance1.disk_template = constants.DT_DISKLESS
instance2 = objects.Instance()
instance2.disk_template = constants.DT_RBD
cfg.instances = { "myinstance1": instance1, "myinstance2": instance2 }
disk2 = objects.Disk(dev_type=constants.DT_RBD)
cfg.disks = { "pinkbunnydisk": disk2 }
nodegroup = objects.NodeGroup()
nodegroup.ipolicy = {}
nodegroup.ipolicy[constants.IPOLICY_DTS] = [instance1.disk_template, \
constants.DT_BLOCK]
cfg.cluster.ipolicy = {}
cfg.cluster.ipolicy[constants.IPOLICY_DTS] = \
[constants.DT_EXT, constants.DT_DISKLESS]
cfg.nodegroups = { "mynodegroup": nodegroup }
cfg._UpgradeEnabledDiskTemplates()
expected_disk_templates = [constants.DT_DRBD8,
constants.DT_PLAIN,
instance1.disk_template,
instance2.disk_template]
self.assertEqual(set(expected_disk_templates),
set(cfg.cluster.enabled_disk_templates))
self.assertEqual(set([instance1.disk_template]),
set(cfg.cluster.ipolicy[constants.IPOLICY_DTS]))
class TestClusterObjectTcpUdpPortPool(unittest.TestCase):
def testNewCluster(self):
self.assertTrue(objects.Cluster().tcpudp_port_pool is None)
def testSerializingEmpty(self):
self.assertEqual(objects.Cluster().ToDict(), {
"tcpudp_port_pool": [],
})
def testSerializing(self):
cluster = objects.Cluster.FromDict({})
self.assertEqual(cluster.tcpudp_port_pool, set())
cluster.tcpudp_port_pool.add(3546)
cluster.tcpudp_port_pool.add(62511)
data = cluster.ToDict()
self.assertEqual(data.keys(), ["tcpudp_port_pool"])
self.assertEqual(sorted(data["tcpudp_port_pool"]), sorted([3546, 62511]))
def testDeserializingEmpty(self):
cluster = objects.Cluster.FromDict({})
self.assertEqual(cluster.tcpudp_port_pool, set())
def testDeserialize(self):
cluster = objects.Cluster.FromDict({
"tcpudp_port_pool": [26214, 10039, 267],
})
self.assertEqual(cluster.tcpudp_port_pool, set([26214, 10039, 267]))
class TestOS(unittest.TestCase):
ALL_DATA = [
"debootstrap",
"debootstrap+default",
"debootstrap++default",
]
def testSplitNameVariant(self):
for name in self.ALL_DATA:
self.assertEqual(len(objects.OS.SplitNameVariant(name)), 2)
def testVariant(self):
self.assertEqual(objects.OS.GetVariant("debootstrap"), "")
self.assertEqual(objects.OS.GetVariant("debootstrap+default"), "default")
class TestInstance(unittest.TestCase):
def testFindDisk(self):
inst = objects.Instance(name="fakeinstdrbd.example.com",
primary_node="node20.example.com",
disks=[
objects.Disk(dev_type=constants.DT_DRBD8, size=786432,
logical_id=("node20.example.com", "node15.example.com",
12300, 0, 0, "secret"),
children=[
objects.Disk(dev_type=constants.DT_PLAIN, size=786432,
logical_id=("myxenvg", "disk0")),
objects.Disk(dev_type=constants.DT_PLAIN, size=128,
logical_id=("myxenvg", "meta0"))
],
iv_name="disk/0")
])
self.assertEqual(inst.FindDisk(0), inst.disks[0])
self.assertRaises(errors.OpPrereqError, inst.FindDisk, "hello")
self.assertRaises(errors.OpPrereqError, inst.FindDisk, 100)
self.assertRaises(errors.OpPrereqError, inst.FindDisk, 1)
class TestNode(unittest.TestCase):
def testEmpty(self):
self.assertEqual(objects.Node().ToDict(), {})
self.assertTrue(isinstance(objects.Node.FromDict({}), objects.Node))
def testHvState(self):
node = objects.Node(name="node18157.example.com", hv_state={
constants.HT_XEN_HVM: objects.NodeHvState(cpu_total=64),
constants.HT_KVM: objects.NodeHvState(cpu_node=1),
})
node2 = objects.Node.FromDict(node.ToDict())
# Make sure nothing can reference it anymore
del node
self.assertEqual(node2.name, "node18157.example.com")
self.assertEqual(frozenset(node2.hv_state), frozenset([
constants.HT_XEN_HVM,
constants.HT_KVM,
]))
self.assertEqual(node2.hv_state[constants.HT_KVM].cpu_node, 1)
self.assertEqual(node2.hv_state[constants.HT_XEN_HVM].cpu_total, 64)
def testDiskState(self):
node = objects.Node(name="node32087.example.com", disk_state={
constants.DT_PLAIN: {
"lv32352": objects.NodeDiskState(total=128),
"lv2082": objects.NodeDiskState(total=512),
},
})
node2 = objects.Node.FromDict(node.ToDict())
# Make sure nothing can reference it anymore
del node
self.assertEqual(node2.name, "node32087.example.com")
self.assertEqual(frozenset(node2.disk_state), frozenset([
constants.DT_PLAIN,
]))
self.assertEqual(frozenset(node2.disk_state[constants.DT_PLAIN]),
frozenset(["lv32352", "lv2082"]))
self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv2082"].total, 512)
self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv32352"].total, 128)
def testFilterEsNdp(self):
node1 = objects.Node(name="node11673.example.com", ndparams={
constants.ND_EXCLUSIVE_STORAGE: True,
})
node2 = objects.Node(name="node11674.example.com", ndparams={
constants.ND_SPINDLE_COUNT: 3,
constants.ND_EXCLUSIVE_STORAGE: False,
})
self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
node1.UpgradeConfig()
self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams)
self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
node2.UpgradeConfig()
self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams)
self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams)
class TestInstancePolicy(unittest.TestCase):
def setUp(self):
# Policies are big, and we want to see the difference in case of an error
self.maxDiff = None
def _AssertIPolicyIsFull(self, policy):
self.assertEqual(frozenset(policy.keys()), constants.IPOLICY_ALL_KEYS)
self.assertTrue(len(policy[constants.ISPECS_MINMAX]) > 0)
for minmax in policy[constants.ISPECS_MINMAX]:
self.assertEqual(frozenset(minmax.keys()), constants.ISPECS_MINMAX_KEYS)
for key in constants.ISPECS_MINMAX_KEYS:
self.assertEqual(frozenset(minmax[key].keys()),
constants.ISPECS_PARAMETERS)
self.assertEqual(frozenset(policy[constants.ISPECS_STD].keys()),
constants.ISPECS_PARAMETERS)
def testDefaultIPolicy(self):
objects.InstancePolicy.CheckParameterSyntax(constants.IPOLICY_DEFAULTS,
True)
self._AssertIPolicyIsFull(constants.IPOLICY_DEFAULTS)
def _AssertPolicyIsBad(self, ipolicy, do_check_std=None):
if do_check_std is None:
check_std_vals = [False, True]
else:
check_std_vals = [do_check_std]
for check_std in check_std_vals:
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckISpecSyntax,
ipolicy, check_std)
def testCheckISpecSyntax(self):
default_stdspec = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
incomplete_ipolicies = [
{
constants.ISPECS_MINMAX: [],
constants.ISPECS_STD: default_stdspec,
},
{
constants.ISPECS_MINMAX: [{}],
constants.ISPECS_STD: default_stdspec,
},
{
constants.ISPECS_MINMAX: [{
constants.ISPECS_MIN: NotImplemented,
}],
constants.ISPECS_STD: default_stdspec,
},
{
constants.ISPECS_MINMAX: [{
constants.ISPECS_MAX: NotImplemented,
}],
constants.ISPECS_STD: default_stdspec,
},
{
constants.ISPECS_MINMAX: [{
constants.ISPECS_MIN: NotImplemented,
constants.ISPECS_MAX: NotImplemented,
}],
},
]
for ipol in incomplete_ipolicies:
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckISpecSyntax,
ipol, True)
oldminmax = ipol[constants.ISPECS_MINMAX]
if oldminmax:
# Prepending valid specs shouldn't change the error
ipol[constants.ISPECS_MINMAX] = ([constants.ISPECS_MINMAX_DEFAULTS] +
oldminmax)
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckISpecSyntax,
ipol, True)
good_ipolicy = {
constants.ISPECS_MINMAX: [
{
constants.ISPECS_MIN: {
constants.ISPEC_MEM_SIZE: 64,
constants.ISPEC_CPU_COUNT: 1,
constants.ISPEC_DISK_COUNT: 2,
constants.ISPEC_DISK_SIZE: 64,
constants.ISPEC_NIC_COUNT: 1,
constants.ISPEC_SPINDLE_USE: 1,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 16384,
constants.ISPEC_CPU_COUNT: 5,
constants.ISPEC_DISK_COUNT: 12,
constants.ISPEC_DISK_SIZE: 1024,
constants.ISPEC_NIC_COUNT: 9,
constants.ISPEC_SPINDLE_USE: 18,
},
},
{
constants.ISPECS_MIN: {
constants.ISPEC_MEM_SIZE: 32768,
constants.ISPEC_CPU_COUNT: 8,
constants.ISPEC_DISK_COUNT: 1,
constants.ISPEC_DISK_SIZE: 1024,
constants.ISPEC_NIC_COUNT: 1,
constants.ISPEC_SPINDLE_USE: 1,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 65536,
constants.ISPEC_CPU_COUNT: 10,
constants.ISPEC_DISK_COUNT: 5,
constants.ISPEC_DISK_SIZE: 1024 * 1024,
constants.ISPEC_NIC_COUNT: 3,
constants.ISPEC_SPINDLE_USE: 12,
},
},
],
}
good_ipolicy[constants.ISPECS_STD] = copy.deepcopy(
good_ipolicy[constants.ISPECS_MINMAX][0][constants.ISPECS_MAX])
# Check that it's really good before making it bad
objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
bad_ipolicy = copy.deepcopy(good_ipolicy)
for minmax in bad_ipolicy[constants.ISPECS_MINMAX]:
for (key, spec) in minmax.items():
for param in spec:
oldv = spec[param]
del spec[param]
self._AssertPolicyIsBad(bad_ipolicy)
if key == constants.ISPECS_MIN:
spec[param] = minmax[constants.ISPECS_MAX][param] + 1
self._AssertPolicyIsBad(bad_ipolicy)
spec[param] = oldv
assert bad_ipolicy == good_ipolicy
stdspec = bad_ipolicy[constants.ISPECS_STD]
for param in stdspec:
oldv = stdspec[param]
del stdspec[param]
self._AssertPolicyIsBad(bad_ipolicy, True)
# Note that std spec is the same as a max spec
stdspec[param] = oldv + 1
self._AssertPolicyIsBad(bad_ipolicy, True)
stdspec[param] = oldv
assert bad_ipolicy == good_ipolicy
for minmax in good_ipolicy[constants.ISPECS_MINMAX]:
for spec in minmax.values():
good_ipolicy[constants.ISPECS_STD] = spec
objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True)
def testCheckISpecParamSyntax(self):
par = "my_parameter"
for check_std in [True, False]:
# Min and max only
good_values = [(11, 11), (11, 40), (0, 0)]
for (mn, mx) in good_values:
minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
minmax[constants.ISPECS_MIN][par] = mn
minmax[constants.ISPECS_MAX][par] = mx
objects.InstancePolicy._CheckISpecParamSyntax(minmax, {}, par,
check_std)
minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS)
minmax[constants.ISPECS_MIN][par] = 11
minmax[constants.ISPECS_MAX][par] = 5
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy._CheckISpecParamSyntax,
minmax, {}, par, check_std)
# Min, std, max
good_values = [
(11, 11, 11),
(11, 11, 40),
(11, 40, 40),
]
for (mn, st, mx) in good_values:
minmax = {
constants.ISPECS_MIN: {par: mn},
constants.ISPECS_MAX: {par: mx},
}
stdspec = {par: st}
objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, par, True)
bad_values = [
(11, 11, 5, True),
(40, 11, 11, True),
(11, 80, 40, False),
(11, 5, 40, False,),
(11, 5, 5, True),
(40, 40, 11, True),
]
for (mn, st, mx, excp) in bad_values:
minmax = {
constants.ISPECS_MIN: {par: mn},
constants.ISPECS_MAX: {par: mx},
}
stdspec = {par: st}
if excp:
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy._CheckISpecParamSyntax,
minmax, stdspec, par, True)
else:
ret = objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec,
par, True)
self.assertFalse(ret)
def testCheckDiskTemplates(self):
invalid = "this_is_not_a_good_template"
for dt in constants.DISK_TEMPLATES:
objects.InstancePolicy.CheckDiskTemplates([dt])
objects.InstancePolicy.CheckDiskTemplates(list(constants.DISK_TEMPLATES))
bad_examples = [
[invalid],
[constants.DT_DRBD8, invalid],
list(constants.DISK_TEMPLATES) + [invalid],
[],
None,
]
for dtl in bad_examples:
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckDiskTemplates,
dtl)
def testCheckParameterSyntax(self):
invalid = "this_key_shouldnt_be_here"
for check_std in [True, False]:
objects.InstancePolicy.CheckParameterSyntax({}, check_std)
policy = {invalid: None}
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckParameterSyntax,
policy, check_std)
for par in constants.IPOLICY_PARAMETERS:
for val in ("blah", None, {}, [42]):
policy = {par: val}
self.assertRaises(errors.ConfigurationError,
objects.InstancePolicy.CheckParameterSyntax,
policy, check_std)
def testFillIPolicyEmpty(self):
policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, {})
objects.InstancePolicy.CheckParameterSyntax(policy, True)
self.assertEqual(policy, constants.IPOLICY_DEFAULTS)
def _AssertISpecsMerged(self, default_spec, diff_spec, merged_spec):
for (param, value) in merged_spec.items():
if param in diff_spec:
self.assertEqual(value, diff_spec[param])
else:
self.assertEqual(value, default_spec[param])
def _AssertIPolicyMerged(self, default_pol, diff_pol, merged_pol):
for (key, value) in merged_pol.items():
if key in diff_pol:
if key == constants.ISPECS_STD:
self._AssertISpecsMerged(default_pol[key], diff_pol[key], value)
else:
self.assertEqual(value, diff_pol[key])
else:
self.assertEqual(value, default_pol[key])
def testFillIPolicy(self):
partial_policies = [
{constants.IPOLICY_VCPU_RATIO: 3.14},
{constants.IPOLICY_SPINDLE_RATIO: 2.72},
{constants.IPOLICY_DTS: [constants.DT_FILE]},
{constants.ISPECS_STD: {constants.ISPEC_DISK_COUNT: 3}},
{constants.ISPECS_MINMAX: [constants.ISPECS_MINMAX_DEFAULTS,
constants.ISPECS_MINMAX_DEFAULTS]}
]
for diff_pol in partial_policies:
policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
objects.InstancePolicy.CheckParameterSyntax(policy, True)
self._AssertIPolicyIsFull(policy)
self._AssertIPolicyMerged(constants.IPOLICY_DEFAULTS, diff_pol, policy)
def testFillIPolicyKeepsUnknown(self):
INVALID_KEY = "invalid_ipolicy_key"
diff_pol = {
INVALID_KEY: None,
}
policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol)
self.assertTrue(INVALID_KEY in policy)
class TestDisk(unittest.TestCase):
def addChild(self, disk):
"""Adds a child of the same device type as the parent."""
disk.children = []
child = objects.Disk()
child.dev_type = disk.dev_type
disk.children.append(child)
def testUpgradeConfigDevTypeLegacy(self):
for old, new in [("drbd8", constants.DT_DRBD8),
("lvm", constants.DT_PLAIN)]:
disk = objects.Disk()
disk.dev_type = old
self.addChild(disk)
disk.UpgradeConfig()
self.assertEqual(new, disk.dev_type)
self.assertEqual(new, disk.children[0].dev_type)
def testUpgradeConfigDevTypeLegacyUnchanged(self):
dev_types = [constants.DT_FILE, constants.DT_SHARED_FILE,
constants.DT_BLOCK, constants.DT_EXT,
constants.DT_RBD, constants.DT_GLUSTER]
for dev_type in dev_types:
disk = objects.Disk()
disk.dev_type = dev_type
self.addChild(disk)
disk.UpgradeConfig()
self.assertEqual(dev_type, disk.dev_type)
self.assertEqual(dev_type, disk.children[0].dev_type)
class TestSimpleFillOS(unittest.TestCase):
# We have to make sure that:
# * From within the configuration, variants override defaults
# * Temporary values override configuration
# * No overlap between public, private and secret dicts is allowed
#
# As a result, here are the actors in this test:
#
# A: temporary public
# B: temporary private
# C: temporary secret
# X: temporary public private secret
# D: configuration public variant
# E: configuration public base
# F: configuration private variant
# G: configuration private base
#
# Every time a param is assigned "ERROR", we expect FillOSParams to override
# it. If it doesn't, it's an error.
#
# Every time a param is assigned itself as a value, it's the value we expect
# FillOSParams to give us back.
def setUp(self):
self.fake_cl = objects.Cluster()
self.fake_cl.UpgradeConfig()
self.fake_cl.osparams = {"os": {"A": "ERROR",
"D": "ERROR",
"E": "E"},
"os+a": {"D": "D"}}
self.fake_cl.osparams_private_cluster = {"os": {"B": "ERROR",
"F": "ERROR",
"G": "G"},
"os+a": {"F": "F"}}
def testConflictPublicPrivate(self):
"Make sure we disallow attempts to override params based on visibility."
public_dict = {"A": "A", "X": "X"}
private_dict = {"B": "B", "X": "X"}
secret_dict = {"C": "C"}
dicts_pp = (public_dict, private_dict)
dicts_pps = (public_dict, private_dict, secret_dict)
# Without secret parameters
self.assertRaises(errors.OpPrereqError,
lambda: self.fake_cl.SimpleFillOS("os+a", *dicts_pp))
# but also with those.
self.assertRaises(errors.OpPrereqError,
lambda: self.fake_cl.SimpleFillOS("os+a", *dicts_pps))
def testConflictPublicSecret(self):
"Make sure we disallow attempts to override params based on visibility."
public_dict = {"A": "A", "X": "X"}
private_dict = {"B": "B"}
secret_dict = {"C": "C", "X": "X"}
dicts_pps = (public_dict, private_dict, secret_dict)
self.assertRaises(errors.OpPrereqError,
lambda: self.fake_cl.SimpleFillOS("os+a", *dicts_pps))
def testConflictPrivateSecret(self):
"Make sure we disallow attempts to override params based on visibility."
public_dict = {"A": "A"}
private_dict = {"B": "B", "X": "X"}
secret_dict = {"C": "C", "X": "X"}
dicts_pps = (public_dict, private_dict, secret_dict)
self.assertRaises(errors.OpPrereqError,
lambda: self.fake_cl.SimpleFillOS("os+a", *dicts_pps))
def testValidValues(self):
"Make sure we handle all overriding we do allow correctly."
public_dict = {"A": "A"}
private_dict = {"B": "B"}
secret_dict = {"C": "C"}
dicts_p = (public_dict,)
dicts_pp = (public_dict, private_dict)
dicts_pps = (public_dict, private_dict, secret_dict)
expected_keys_p = ("A", "D", "E") # nothing private, secret
expected_keys_pp = ("A", "B", "D", "E", "F", "G") # nothing secret
expected_keys_pps = ("A", "B", "C", "D", "E", "F", "G") # all of them
for (dicts, expected_keys) in [(dicts_p, expected_keys_p),
(dicts_pp, expected_keys_pp),
(dicts_pps, expected_keys_pps)]:
result = self.fake_cl.SimpleFillOS("os+a", *dicts)
# Values
for key in result:
if not result[key] == key:
self.fail("Invalid public-private fill with input:\n%s\n%s"
% (pprint.pformat(dicts), result))
# Completeness
if set(result) != set(expected_keys):
self.fail("Problem with key %s from merge result of:\n%s\n%s"
% (set(expected_keys) ^ set(result), # symmetric difference
pprint.pformat(dicts),
result))
if __name__ == "__main__":
testutils.GanetiTestProgram()