| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2010, 2012, 2013 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Script for unittesting the objects module""" |
| |
| |
| import copy |
| import unittest |
| |
| from ganeti import constants |
| from ganeti import objects |
| from ganeti import errors |
| |
| import testutils |
| |
| |
| class SimpleObject(objects.ConfigObject): |
| __slots__ = ["a", "b"] |
| |
| |
| class TestDictState(unittest.TestCase): |
| """Simple dict tansformation tests""" |
| |
| def testSimpleObjectToDict(self): |
| o1 = SimpleObject(a="1") |
| self.assertEquals(o1.ToDict(), {"a": "1"}) |
| self.assertEquals(o1.__getstate__(), {"a": "1"}) |
| self.assertEquals(o1.__getstate__(), o1.ToDict()) |
| o1.a = 2 |
| o1.b = 5 |
| self.assertEquals(o1.ToDict(), {"a": 2, "b": 5}) |
| o2 = SimpleObject.FromDict(o1.ToDict()) |
| self.assertEquals(o1.ToDict(), {"a": 2, "b": 5}) |
| |
| |
| class TestClusterObject(unittest.TestCase): |
| """Tests done on a L{objects.Cluster}""" |
| |
| def setUp(self): |
| hvparams = { |
| constants.HT_FAKE: { |
| "foo": "bar", |
| "bar": "foo", |
| "foobar": "barfoo", |
| }, |
| } |
| os_hvp = { |
| "lenny-image": { |
| constants.HT_FAKE: { |
| "foo": "baz", |
| "foobar": "foobar", |
| "blah": "blibb", |
| "blubb": "blah", |
| }, |
| constants.HT_XEN_PVM: { |
| "root_path": "/dev/sda5", |
| "foo": "foobar", |
| }, |
| }, |
| "ubuntu-hardy": { |
| }, |
| } |
| ndparams = { |
| constants.ND_OOB_PROGRAM: "/bin/cluster-oob", |
| constants.ND_SPINDLE_COUNT: 1, |
| constants.ND_EXCLUSIVE_STORAGE: False, |
| } |
| |
| self.fake_cl = objects.Cluster(hvparams=hvparams, os_hvp=os_hvp, |
| ndparams=ndparams) |
| self.fake_cl.UpgradeConfig() |
| |
| def testGetHVDefaults(self): |
| cl = self.fake_cl |
| self.failUnlessEqual(cl.GetHVDefaults(constants.HT_FAKE), |
| cl.hvparams[constants.HT_FAKE]) |
| self.failUnlessEqual(cl.GetHVDefaults(None), {}) |
| defaults = cl.GetHVDefaults(constants.HT_XEN_PVM, |
| os_name="lenny-image") |
| for param, value in cl.os_hvp["lenny-image"][constants.HT_XEN_PVM].items(): |
| self.assertEqual(value, defaults[param]) |
| |
| def testFillHvFullMerge(self): |
| inst_hvparams = { |
| "blah": "blubb", |
| } |
| |
| fake_dict = constants.HVC_DEFAULTS[constants.HT_FAKE].copy() |
| fake_dict.update({ |
| "foo": "baz", |
| "bar": "foo", |
| "foobar": "foobar", |
| "blah": "blubb", |
| "blubb": "blah", |
| }) |
| fake_inst = objects.Instance(name="foobar", |
| os="lenny-image", |
| hypervisor=constants.HT_FAKE, |
| hvparams=inst_hvparams) |
| self.assertEqual(fake_dict, self.fake_cl.FillHV(fake_inst)) |
| |
| def testFillHvGlobalParams(self): |
| fake_inst = objects.Instance(name="foobar", |
| os="ubuntu-hardy", |
| hypervisor=constants.HT_FAKE, |
| hvparams={}) |
| self.assertEqual(self.fake_cl.hvparams[constants.HT_FAKE], |
| self.fake_cl.FillHV(fake_inst)) |
| |
| def testFillHvInstParams(self): |
| inst_hvparams = { |
| "blah": "blubb", |
| } |
| fake_inst = objects.Instance(name="foobar", |
| os="ubuntu-hardy", |
| hypervisor=constants.HT_XEN_PVM, |
| hvparams=inst_hvparams) |
| filled_conf = self.fake_cl.FillHV(fake_inst) |
| for param, value in constants.HVC_DEFAULTS[constants.HT_XEN_PVM].items(): |
| if param == "blah": |
| value = "blubb" |
| self.assertEqual(value, filled_conf[param]) |
| |
| def testFillHvDefaultParams(self): |
| fake_inst = objects.Instance(name="foobar", |
| os="ubuntu-hardy", |
| hypervisor=constants.HT_XEN_PVM, |
| hvparams={}) |
| self.assertEqual(constants.HVC_DEFAULTS[constants.HT_XEN_PVM], |
| self.fake_cl.FillHV(fake_inst)) |
| |
| def testFillHvPartialParams(self): |
| os = "lenny-image" |
| fake_inst = objects.Instance(name="foobar", |
| os=os, |
| hypervisor=constants.HT_XEN_PVM, |
| hvparams={}) |
| filled_conf = self.fake_cl.FillHV(fake_inst) |
| for param, value in self.fake_cl.os_hvp[os][constants.HT_XEN_PVM].items(): |
| self.assertEqual(value, filled_conf[param]) |
| |
| def testFillNdParamsCluster(self): |
| fake_node = objects.Node(name="test", |
| ndparams={}, |
| group="testgroup") |
| fake_group = objects.NodeGroup(name="testgroup", |
| ndparams={}) |
| self.assertEqual(self.fake_cl.ndparams, |
| self.fake_cl.FillND(fake_node, fake_group)) |
| |
| def testFillNdParamsNodeGroup(self): |
| fake_node = objects.Node(name="test", |
| ndparams={}, |
| group="testgroup") |
| group_ndparams = { |
| constants.ND_OOB_PROGRAM: "/bin/group-oob", |
| constants.ND_SPINDLE_COUNT: 10, |
| constants.ND_EXCLUSIVE_STORAGE: True, |
| } |
| fake_group = objects.NodeGroup(name="testgroup", |
| ndparams=group_ndparams) |
| self.assertEqual(group_ndparams, |
| self.fake_cl.FillND(fake_node, fake_group)) |
| |
| def testFillNdParamsNode(self): |
| node_ndparams = { |
| constants.ND_OOB_PROGRAM: "/bin/node-oob", |
| constants.ND_SPINDLE_COUNT: 2, |
| constants.ND_EXCLUSIVE_STORAGE: True, |
| } |
| fake_node = objects.Node(name="test", |
| ndparams=node_ndparams, |
| group="testgroup") |
| fake_group = objects.NodeGroup(name="testgroup", |
| ndparams={}) |
| self.assertEqual(node_ndparams, |
| self.fake_cl.FillND(fake_node, fake_group)) |
| |
| def testFillNdParamsAll(self): |
| node_ndparams = { |
| constants.ND_OOB_PROGRAM: "/bin/node-oob", |
| constants.ND_SPINDLE_COUNT: 5, |
| constants.ND_EXCLUSIVE_STORAGE: True, |
| } |
| fake_node = objects.Node(name="test", |
| ndparams=node_ndparams, |
| group="testgroup") |
| group_ndparams = { |
| constants.ND_OOB_PROGRAM: "/bin/group-oob", |
| constants.ND_SPINDLE_COUNT: 4, |
| } |
| fake_group = objects.NodeGroup(name="testgroup", |
| ndparams=group_ndparams) |
| self.assertEqual(node_ndparams, |
| self.fake_cl.FillND(fake_node, fake_group)) |
| |
| def testPrimaryHypervisor(self): |
| assert self.fake_cl.enabled_hypervisors is None |
| self.fake_cl.enabled_hypervisors = [constants.HT_XEN_HVM] |
| self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_HVM) |
| |
| self.fake_cl.enabled_hypervisors = [constants.HT_XEN_PVM, constants.HT_KVM] |
| self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_XEN_PVM) |
| |
| self.fake_cl.enabled_hypervisors = sorted(constants.HYPER_TYPES) |
| self.assertEqual(self.fake_cl.primary_hypervisor, constants.HT_CHROOT) |
| |
| def testUpgradeConfig(self): |
| # FIXME: This test is incomplete |
| cluster = objects.Cluster() |
| cluster.UpgradeConfig() |
| cluster = objects.Cluster(ipolicy={"unknown_key": None}) |
| self.assertRaises(errors.ConfigurationError, cluster.UpgradeConfig) |
| |
| def testUpgradeEnabledDiskTemplates(self): |
| cfg = objects.ConfigData() |
| cfg.cluster = objects.Cluster() |
| cfg.cluster.volume_group_name = "myvg" |
| instance1 = objects.Instance() |
| instance1.disk_template = constants.DT_DISKLESS |
| instance2 = objects.Instance() |
| instance2.disk_template = constants.DT_RBD |
| cfg.instances = { "myinstance1": instance1, "myinstance2": instance2 } |
| nodegroup = objects.NodeGroup() |
| nodegroup.ipolicy = {} |
| nodegroup.ipolicy[constants.IPOLICY_DTS] = [instance1.disk_template, \ |
| constants.DT_BLOCK] |
| cfg.cluster.ipolicy = {} |
| cfg.cluster.ipolicy[constants.IPOLICY_DTS] = \ |
| [constants.DT_EXT, constants.DT_DISKLESS] |
| cfg.nodegroups = { "mynodegroup": nodegroup } |
| cfg._UpgradeEnabledDiskTemplates() |
| expected_disk_templates = [constants.DT_DRBD8, |
| constants.DT_PLAIN, |
| instance1.disk_template, |
| instance2.disk_template] |
| self.assertEqual(set(expected_disk_templates), |
| set(cfg.cluster.enabled_disk_templates)) |
| self.assertEqual(set([instance1.disk_template]), |
| set(cfg.cluster.ipolicy[constants.IPOLICY_DTS])) |
| |
| |
| class TestClusterObjectTcpUdpPortPool(unittest.TestCase): |
| def testNewCluster(self): |
| self.assertTrue(objects.Cluster().tcpudp_port_pool is None) |
| |
| def testSerializingEmpty(self): |
| self.assertEqual(objects.Cluster().ToDict(), { |
| "tcpudp_port_pool": [], |
| }) |
| |
| def testSerializing(self): |
| cluster = objects.Cluster.FromDict({}) |
| self.assertEqual(cluster.tcpudp_port_pool, set()) |
| |
| cluster.tcpudp_port_pool.add(3546) |
| cluster.tcpudp_port_pool.add(62511) |
| |
| data = cluster.ToDict() |
| self.assertEqual(data.keys(), ["tcpudp_port_pool"]) |
| self.assertEqual(sorted(data["tcpudp_port_pool"]), sorted([3546, 62511])) |
| |
| def testDeserializingEmpty(self): |
| cluster = objects.Cluster.FromDict({}) |
| self.assertEqual(cluster.tcpudp_port_pool, set()) |
| |
| def testDeserialize(self): |
| cluster = objects.Cluster.FromDict({ |
| "tcpudp_port_pool": [26214, 10039, 267], |
| }) |
| self.assertEqual(cluster.tcpudp_port_pool, set([26214, 10039, 267])) |
| |
| |
| class TestOS(unittest.TestCase): |
| ALL_DATA = [ |
| "debootstrap", |
| "debootstrap+default", |
| "debootstrap++default", |
| ] |
| |
| def testSplitNameVariant(self): |
| for name in self.ALL_DATA: |
| self.assertEqual(len(objects.OS.SplitNameVariant(name)), 2) |
| |
| def testVariant(self): |
| self.assertEqual(objects.OS.GetVariant("debootstrap"), "") |
| self.assertEqual(objects.OS.GetVariant("debootstrap+default"), "default") |
| |
| |
| class TestInstance(unittest.TestCase): |
| def _GenericCheck(self, inst): |
| for i in [inst.all_nodes, inst.secondary_nodes]: |
| self.assertTrue(isinstance(inst.all_nodes, (list, tuple)), |
| msg="Data type doesn't guarantee order") |
| |
| self.assertTrue(inst.primary_node not in inst.secondary_nodes) |
| self.assertEqual(inst.all_nodes[0], inst.primary_node, |
| msg="Primary node not first node in list") |
| |
| def testNodesNoDisks(self): |
| inst = objects.Instance(name="fakeinst.example.com", |
| primary_node="pnode.example.com", |
| disks=[ |
| ]) |
| |
| self._GenericCheck(inst) |
| self.assertEqual(len(inst.secondary_nodes), 0) |
| self.assertEqual(set(inst.all_nodes), set([inst.primary_node])) |
| self.assertEqual(inst.MapLVsByNode(), { |
| inst.primary_node: [], |
| }) |
| |
| def testNodesPlainDisks(self): |
| inst = objects.Instance(name="fakeinstplain.example.com", |
| primary_node="node3.example.com", |
| disks=[ |
| objects.Disk(dev_type=constants.DT_PLAIN, size=128, |
| logical_id=("myxenvg", "disk25494")), |
| objects.Disk(dev_type=constants.DT_PLAIN, size=512, |
| logical_id=("myxenvg", "disk29071")), |
| ]) |
| |
| self._GenericCheck(inst) |
| self.assertEqual(len(inst.secondary_nodes), 0) |
| self.assertEqual(set(inst.all_nodes), set([inst.primary_node])) |
| self.assertEqual(inst.MapLVsByNode(), { |
| inst.primary_node: ["myxenvg/disk25494", "myxenvg/disk29071"], |
| }) |
| |
| def testNodesDrbdDisks(self): |
| inst = objects.Instance(name="fakeinstdrbd.example.com", |
| primary_node="node10.example.com", |
| disks=[ |
| objects.Disk(dev_type=constants.DT_DRBD8, size=786432, |
| logical_id=("node10.example.com", "node15.example.com", |
| 12300, 0, 0, "secret"), |
| children=[ |
| objects.Disk(dev_type=constants.DT_PLAIN, size=786432, |
| logical_id=("myxenvg", "disk0")), |
| objects.Disk(dev_type=constants.DT_PLAIN, size=128, |
| logical_id=("myxenvg", "meta0")) |
| ], |
| iv_name="disk/0") |
| ]) |
| |
| self._GenericCheck(inst) |
| self.assertEqual(set(inst.secondary_nodes), set(["node15.example.com"])) |
| self.assertEqual(set(inst.all_nodes), |
| set([inst.primary_node, "node15.example.com"])) |
| self.assertEqual(inst.MapLVsByNode(), { |
| inst.primary_node: ["myxenvg/disk0", "myxenvg/meta0"], |
| "node15.example.com": ["myxenvg/disk0", "myxenvg/meta0"], |
| }) |
| |
| self.assertEqual(inst.FindDisk(0), inst.disks[0]) |
| self.assertRaises(errors.OpPrereqError, inst.FindDisk, "hello") |
| self.assertRaises(errors.OpPrereqError, inst.FindDisk, 100) |
| self.assertRaises(errors.OpPrereqError, inst.FindDisk, 1) |
| |
| |
| class TestNode(unittest.TestCase): |
| def testEmpty(self): |
| self.assertEqual(objects.Node().ToDict(), {}) |
| self.assertTrue(isinstance(objects.Node.FromDict({}), objects.Node)) |
| |
| def testHvState(self): |
| node = objects.Node(name="node18157.example.com", hv_state={ |
| constants.HT_XEN_HVM: objects.NodeHvState(cpu_total=64), |
| constants.HT_KVM: objects.NodeHvState(cpu_node=1), |
| }) |
| |
| node2 = objects.Node.FromDict(node.ToDict()) |
| |
| # Make sure nothing can reference it anymore |
| del node |
| |
| self.assertEqual(node2.name, "node18157.example.com") |
| self.assertEqual(frozenset(node2.hv_state), frozenset([ |
| constants.HT_XEN_HVM, |
| constants.HT_KVM, |
| ])) |
| self.assertEqual(node2.hv_state[constants.HT_KVM].cpu_node, 1) |
| self.assertEqual(node2.hv_state[constants.HT_XEN_HVM].cpu_total, 64) |
| |
| def testDiskState(self): |
| node = objects.Node(name="node32087.example.com", disk_state={ |
| constants.DT_PLAIN: { |
| "lv32352": objects.NodeDiskState(total=128), |
| "lv2082": objects.NodeDiskState(total=512), |
| }, |
| }) |
| |
| node2 = objects.Node.FromDict(node.ToDict()) |
| |
| # Make sure nothing can reference it anymore |
| del node |
| |
| self.assertEqual(node2.name, "node32087.example.com") |
| self.assertEqual(frozenset(node2.disk_state), frozenset([ |
| constants.DT_PLAIN, |
| ])) |
| self.assertEqual(frozenset(node2.disk_state[constants.DT_PLAIN]), |
| frozenset(["lv32352", "lv2082"])) |
| self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv2082"].total, 512) |
| self.assertEqual(node2.disk_state[constants.DT_PLAIN]["lv32352"].total, 128) |
| |
| def testFilterEsNdp(self): |
| node1 = objects.Node(name="node11673.example.com", ndparams={ |
| constants.ND_EXCLUSIVE_STORAGE: True, |
| }) |
| node2 = objects.Node(name="node11674.example.com", ndparams={ |
| constants.ND_SPINDLE_COUNT: 3, |
| constants.ND_EXCLUSIVE_STORAGE: False, |
| }) |
| self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams) |
| node1.UpgradeConfig() |
| self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node1.ndparams) |
| self.assertTrue(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams) |
| self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams) |
| node2.UpgradeConfig() |
| self.assertFalse(constants.ND_EXCLUSIVE_STORAGE in node2.ndparams) |
| self.assertTrue(constants.ND_SPINDLE_COUNT in node2.ndparams) |
| |
| |
| class TestInstancePolicy(unittest.TestCase): |
| def setUp(self): |
| # Policies are big, and we want to see the difference in case of an error |
| self.maxDiff = None |
| |
| def _AssertIPolicyIsFull(self, policy): |
| self.assertEqual(frozenset(policy.keys()), constants.IPOLICY_ALL_KEYS) |
| self.assertTrue(len(policy[constants.ISPECS_MINMAX]) > 0) |
| for minmax in policy[constants.ISPECS_MINMAX]: |
| self.assertEqual(frozenset(minmax.keys()), constants.ISPECS_MINMAX_KEYS) |
| for key in constants.ISPECS_MINMAX_KEYS: |
| self.assertEqual(frozenset(minmax[key].keys()), |
| constants.ISPECS_PARAMETERS) |
| self.assertEqual(frozenset(policy[constants.ISPECS_STD].keys()), |
| constants.ISPECS_PARAMETERS) |
| |
| def testDefaultIPolicy(self): |
| objects.InstancePolicy.CheckParameterSyntax(constants.IPOLICY_DEFAULTS, |
| True) |
| self._AssertIPolicyIsFull(constants.IPOLICY_DEFAULTS) |
| |
| def _AssertPolicyIsBad(self, ipolicy, do_check_std=None): |
| if do_check_std is None: |
| check_std_vals = [False, True] |
| else: |
| check_std_vals = [do_check_std] |
| for check_std in check_std_vals: |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckISpecSyntax, |
| ipolicy, check_std) |
| |
| def testCheckISpecSyntax(self): |
| default_stdspec = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD] |
| incomplete_ipolicies = [ |
| { |
| constants.ISPECS_MINMAX: [], |
| constants.ISPECS_STD: default_stdspec, |
| }, |
| { |
| constants.ISPECS_MINMAX: [{}], |
| constants.ISPECS_STD: default_stdspec, |
| }, |
| { |
| constants.ISPECS_MINMAX: [{ |
| constants.ISPECS_MIN: NotImplemented, |
| }], |
| constants.ISPECS_STD: default_stdspec, |
| }, |
| { |
| constants.ISPECS_MINMAX: [{ |
| constants.ISPECS_MAX: NotImplemented, |
| }], |
| constants.ISPECS_STD: default_stdspec, |
| }, |
| { |
| constants.ISPECS_MINMAX: [{ |
| constants.ISPECS_MIN: NotImplemented, |
| constants.ISPECS_MAX: NotImplemented, |
| }], |
| }, |
| ] |
| for ipol in incomplete_ipolicies: |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckISpecSyntax, |
| ipol, True) |
| oldminmax = ipol[constants.ISPECS_MINMAX] |
| if oldminmax: |
| # Prepending valid specs shouldn't change the error |
| ipol[constants.ISPECS_MINMAX] = ([constants.ISPECS_MINMAX_DEFAULTS] + |
| oldminmax) |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckISpecSyntax, |
| ipol, True) |
| |
| good_ipolicy = { |
| constants.ISPECS_MINMAX: [ |
| { |
| constants.ISPECS_MIN: { |
| constants.ISPEC_MEM_SIZE: 64, |
| constants.ISPEC_CPU_COUNT: 1, |
| constants.ISPEC_DISK_COUNT: 2, |
| constants.ISPEC_DISK_SIZE: 64, |
| constants.ISPEC_NIC_COUNT: 1, |
| constants.ISPEC_SPINDLE_USE: 1, |
| }, |
| constants.ISPECS_MAX: { |
| constants.ISPEC_MEM_SIZE: 16384, |
| constants.ISPEC_CPU_COUNT: 5, |
| constants.ISPEC_DISK_COUNT: 12, |
| constants.ISPEC_DISK_SIZE: 1024, |
| constants.ISPEC_NIC_COUNT: 9, |
| constants.ISPEC_SPINDLE_USE: 18, |
| }, |
| }, |
| { |
| constants.ISPECS_MIN: { |
| constants.ISPEC_MEM_SIZE: 32768, |
| constants.ISPEC_CPU_COUNT: 8, |
| constants.ISPEC_DISK_COUNT: 1, |
| constants.ISPEC_DISK_SIZE: 1024, |
| constants.ISPEC_NIC_COUNT: 1, |
| constants.ISPEC_SPINDLE_USE: 1, |
| }, |
| constants.ISPECS_MAX: { |
| constants.ISPEC_MEM_SIZE: 65536, |
| constants.ISPEC_CPU_COUNT: 10, |
| constants.ISPEC_DISK_COUNT: 5, |
| constants.ISPEC_DISK_SIZE: 1024 * 1024, |
| constants.ISPEC_NIC_COUNT: 3, |
| constants.ISPEC_SPINDLE_USE: 12, |
| }, |
| }, |
| ], |
| } |
| good_ipolicy[constants.ISPECS_STD] = copy.deepcopy( |
| good_ipolicy[constants.ISPECS_MINMAX][0][constants.ISPECS_MAX]) |
| # Check that it's really good before making it bad |
| objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True) |
| |
| bad_ipolicy = copy.deepcopy(good_ipolicy) |
| for minmax in bad_ipolicy[constants.ISPECS_MINMAX]: |
| for (key, spec) in minmax.items(): |
| for param in spec: |
| oldv = spec[param] |
| del spec[param] |
| self._AssertPolicyIsBad(bad_ipolicy) |
| if key == constants.ISPECS_MIN: |
| spec[param] = minmax[constants.ISPECS_MAX][param] + 1 |
| self._AssertPolicyIsBad(bad_ipolicy) |
| spec[param] = oldv |
| assert bad_ipolicy == good_ipolicy |
| |
| stdspec = bad_ipolicy[constants.ISPECS_STD] |
| for param in stdspec: |
| oldv = stdspec[param] |
| del stdspec[param] |
| self._AssertPolicyIsBad(bad_ipolicy, True) |
| # Note that std spec is the same as a max spec |
| stdspec[param] = oldv + 1 |
| self._AssertPolicyIsBad(bad_ipolicy, True) |
| stdspec[param] = oldv |
| assert bad_ipolicy == good_ipolicy |
| |
| for minmax in good_ipolicy[constants.ISPECS_MINMAX]: |
| for spec in minmax.values(): |
| good_ipolicy[constants.ISPECS_STD] = spec |
| objects.InstancePolicy.CheckISpecSyntax(good_ipolicy, True) |
| |
| def testCheckISpecParamSyntax(self): |
| par = "my_parameter" |
| for check_std in [True, False]: |
| # Min and max only |
| good_values = [(11, 11), (11, 40), (0, 0)] |
| for (mn, mx) in good_values: |
| minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS) |
| minmax[constants.ISPECS_MIN][par] = mn |
| minmax[constants.ISPECS_MAX][par] = mx |
| objects.InstancePolicy._CheckISpecParamSyntax(minmax, {}, par, |
| check_std) |
| minmax = dict((k, {}) for k in constants.ISPECS_MINMAX_KEYS) |
| minmax[constants.ISPECS_MIN][par] = 11 |
| minmax[constants.ISPECS_MAX][par] = 5 |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy._CheckISpecParamSyntax, |
| minmax, {}, par, check_std) |
| # Min, std, max |
| good_values = [ |
| (11, 11, 11), |
| (11, 11, 40), |
| (11, 40, 40), |
| ] |
| for (mn, st, mx) in good_values: |
| minmax = { |
| constants.ISPECS_MIN: {par: mn}, |
| constants.ISPECS_MAX: {par: mx}, |
| } |
| stdspec = {par: st} |
| objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, par, True) |
| bad_values = [ |
| (11, 11, 5, True), |
| (40, 11, 11, True), |
| (11, 80, 40, False), |
| (11, 5, 40, False,), |
| (11, 5, 5, True), |
| (40, 40, 11, True), |
| ] |
| for (mn, st, mx, excp) in bad_values: |
| minmax = { |
| constants.ISPECS_MIN: {par: mn}, |
| constants.ISPECS_MAX: {par: mx}, |
| } |
| stdspec = {par: st} |
| if excp: |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy._CheckISpecParamSyntax, |
| minmax, stdspec, par, True) |
| else: |
| ret = objects.InstancePolicy._CheckISpecParamSyntax(minmax, stdspec, |
| par, True) |
| self.assertFalse(ret) |
| |
| def testCheckDiskTemplates(self): |
| invalid = "this_is_not_a_good_template" |
| for dt in constants.DISK_TEMPLATES: |
| objects.InstancePolicy.CheckDiskTemplates([dt]) |
| objects.InstancePolicy.CheckDiskTemplates(list(constants.DISK_TEMPLATES)) |
| bad_examples = [ |
| [invalid], |
| [constants.DT_DRBD8, invalid], |
| list(constants.DISK_TEMPLATES) + [invalid], |
| [], |
| None, |
| ] |
| for dtl in bad_examples: |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckDiskTemplates, |
| dtl) |
| |
| def testCheckParameterSyntax(self): |
| invalid = "this_key_shouldnt_be_here" |
| for check_std in [True, False]: |
| objects.InstancePolicy.CheckParameterSyntax({}, check_std) |
| policy = {invalid: None} |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckParameterSyntax, |
| policy, check_std) |
| for par in constants.IPOLICY_PARAMETERS: |
| for val in ("blah", None, {}, [42]): |
| policy = {par: val} |
| self.assertRaises(errors.ConfigurationError, |
| objects.InstancePolicy.CheckParameterSyntax, |
| policy, check_std) |
| |
| def testFillIPolicyEmpty(self): |
| policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, {}) |
| objects.InstancePolicy.CheckParameterSyntax(policy, True) |
| self.assertEqual(policy, constants.IPOLICY_DEFAULTS) |
| |
| def _AssertISpecsMerged(self, default_spec, diff_spec, merged_spec): |
| for (param, value) in merged_spec.items(): |
| if param in diff_spec: |
| self.assertEqual(value, diff_spec[param]) |
| else: |
| self.assertEqual(value, default_spec[param]) |
| |
| def _AssertIPolicyMerged(self, default_pol, diff_pol, merged_pol): |
| for (key, value) in merged_pol.items(): |
| if key in diff_pol: |
| if key == constants.ISPECS_STD: |
| self._AssertISpecsMerged(default_pol[key], diff_pol[key], value) |
| else: |
| self.assertEqual(value, diff_pol[key]) |
| else: |
| self.assertEqual(value, default_pol[key]) |
| |
| def testFillIPolicy(self): |
| partial_policies = [ |
| {constants.IPOLICY_VCPU_RATIO: 3.14}, |
| {constants.IPOLICY_SPINDLE_RATIO: 2.72}, |
| {constants.IPOLICY_DTS: [constants.DT_FILE]}, |
| {constants.ISPECS_STD: {constants.ISPEC_DISK_COUNT: 3}}, |
| {constants.ISPECS_MINMAX: [constants.ISPECS_MINMAX_DEFAULTS, |
| constants.ISPECS_MINMAX_DEFAULTS]} |
| ] |
| for diff_pol in partial_policies: |
| policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol) |
| objects.InstancePolicy.CheckParameterSyntax(policy, True) |
| self._AssertIPolicyIsFull(policy) |
| self._AssertIPolicyMerged(constants.IPOLICY_DEFAULTS, diff_pol, policy) |
| |
| def testFillIPolicyKeepsUnknown(self): |
| INVALID_KEY = "invalid_ipolicy_key" |
| diff_pol = { |
| INVALID_KEY: None, |
| } |
| policy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, diff_pol) |
| self.assertTrue(INVALID_KEY in policy) |
| |
| |
| class TestDisk(unittest.TestCase): |
| def addChild(self, disk): |
| """Adds a child of the same device type as the parent.""" |
| disk.children = [] |
| child = objects.Disk() |
| child.dev_type = disk.dev_type |
| disk.children.append(child) |
| |
| def testUpgradeConfigDevTypeLegacy(self): |
| for old, new in [("drbd8", constants.DT_DRBD8), |
| ("lvm", constants.DT_PLAIN)]: |
| disk = objects.Disk() |
| disk.dev_type = old |
| self.addChild(disk) |
| disk.UpgradeConfig() |
| self.assertEqual(new, disk.dev_type) |
| self.assertEqual(new, disk.children[0].dev_type) |
| |
| def testUpgradeConfigDevTypeLegacyUnchanged(self): |
| dev_types = [constants.DT_FILE, constants.DT_SHARED_FILE, |
| constants.DT_BLOCK, constants.DT_EXT, |
| constants.DT_RBD] |
| for dev_type in dev_types: |
| disk = objects.Disk() |
| disk.dev_type = dev_type |
| self.addChild(disk) |
| disk.UpgradeConfig() |
| self.assertEqual(dev_type, disk.dev_type) |
| self.assertEqual(dev_type, disk.children[0].dev_type) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |