blob: 44f45c57bf003d7eaa3a045b481a957f32d3838e [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.backend"""
import mock
import os
import shutil
import tempfile
import testutils
import unittest
from ganeti import backend
from ganeti import constants
from ganeti import errors
from ganeti import hypervisor
from ganeti import netutils
from ganeti import utils
class TestX509Certificates(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test(self):
(name, cert_pem) = backend.CreateX509Certificate(300, cryptodir=self.tmpdir)
self.assertEqual(utils.ReadFile(os.path.join(self.tmpdir, name,
backend._X509_CERT_FILE)),
cert_pem)
self.assert_(0 < os.path.getsize(os.path.join(self.tmpdir, name,
backend._X509_KEY_FILE)))
(name2, cert_pem2) = \
backend.CreateX509Certificate(300, cryptodir=self.tmpdir)
backend.RemoveX509Certificate(name, cryptodir=self.tmpdir)
backend.RemoveX509Certificate(name2, cryptodir=self.tmpdir)
self.assertEqual(utils.ListVisibleFiles(self.tmpdir), [])
def testNonEmpty(self):
(name, _) = backend.CreateX509Certificate(300, cryptodir=self.tmpdir)
utils.WriteFile(utils.PathJoin(self.tmpdir, name, "hello-world"),
data="Hello World")
self.assertRaises(backend.RPCFail, backend.RemoveX509Certificate,
name, cryptodir=self.tmpdir)
self.assertEqual(utils.ListVisibleFiles(self.tmpdir), [name])
class TestNodeVerify(testutils.GanetiTestCase):
def setUp(self):
testutils.GanetiTestCase.setUp(self)
self._mock_hv = None
def _GetHypervisor(self, hv_name):
self._mock_hv = hypervisor.GetHypervisor(hv_name)
self._mock_hv.ValidateParameters = mock.Mock()
self._mock_hv.Verify = mock.Mock()
return self._mock_hv
def testMasterIPLocalhost(self):
# this a real functional test, but requires localhost to be reachable
local_data = (netutils.Hostname.GetSysName(),
constants.IP4_ADDRESS_LOCALHOST)
result = backend.VerifyNode({constants.NV_MASTERIP: local_data}, None, {})
self.failUnless(constants.NV_MASTERIP in result,
"Master IP data not returned")
self.failUnless(result[constants.NV_MASTERIP], "Cannot reach localhost")
def testMasterIPUnreachable(self):
# Network 192.0.2.0/24 is reserved for test/documentation as per
# RFC 5737
bad_data = ("master.example.com", "192.0.2.1")
# we just test that whatever TcpPing returns, VerifyNode returns too
netutils.TcpPing = lambda a, b, source=None: False
result = backend.VerifyNode({constants.NV_MASTERIP: bad_data}, None, {})
self.failUnless(constants.NV_MASTERIP in result,
"Master IP data not returned")
self.failIf(result[constants.NV_MASTERIP],
"Result from netutils.TcpPing corrupted")
def testVerifyHvparams(self):
test_hvparams = {constants.HV_XEN_CMD: constants.XEN_CMD_XL}
test_what = {constants.NV_HVPARAMS: \
[("mynode", constants.HT_XEN_PVM, test_hvparams)]}
result = {}
backend._VerifyHvparams(test_what, True, result,
get_hv_fn=self._GetHypervisor)
self._mock_hv.ValidateParameters.assert_called_with(test_hvparams)
def testVerifyHypervisors(self):
hvname = constants.HT_XEN_PVM
hvparams = {constants.HV_XEN_CMD: constants.XEN_CMD_XL}
all_hvparams = {hvname: hvparams}
test_what = {constants.NV_HYPERVISOR: [hvname]}
result = {}
backend._VerifyHypervisors(
test_what, True, result, all_hvparams=all_hvparams,
get_hv_fn=self._GetHypervisor)
self._mock_hv.Verify.assert_called_with(hvparams=hvparams)
def _DefRestrictedCmdOwner():
return (os.getuid(), os.getgid())
class TestVerifyRestrictedCmdName(unittest.TestCase):
def testAcceptableName(self):
for i in ["foo", "bar", "z1", "000first", "hello-world"]:
for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]:
(status, msg) = backend._VerifyRestrictedCmdName(fn(i))
self.assertTrue(status)
self.assertTrue(msg is None)
def testEmptyAndSpace(self):
for i in ["", " ", "\t", "\n"]:
(status, msg) = backend._VerifyRestrictedCmdName(i)
self.assertFalse(status)
self.assertEqual(msg, "Missing command name")
def testNameWithSlashes(self):
for i in ["/", "./foo", "../moo", "some/name"]:
(status, msg) = backend._VerifyRestrictedCmdName(i)
self.assertFalse(status)
self.assertEqual(msg, "Invalid command name")
def testForbiddenCharacters(self):
for i in ["#", ".", "..", "bash -c ls", "'"]:
(status, msg) = backend._VerifyRestrictedCmdName(i)
self.assertFalse(status)
self.assertEqual(msg, "Command name contains forbidden characters")
class TestVerifyRestrictedCmdDirectory(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testCanNotStat(self):
tmpname = utils.PathJoin(self.tmpdir, "foobar")
self.assertFalse(os.path.exists(tmpname))
(status, msg) = \
backend._VerifyRestrictedCmdDirectory(tmpname, _owner=NotImplemented)
self.assertFalse(status)
self.assertTrue(msg.startswith("Can't stat(2) '"))
def testTooPermissive(self):
tmpname = utils.PathJoin(self.tmpdir, "foobar")
os.mkdir(tmpname)
for mode in [0777, 0706, 0760, 0722]:
os.chmod(tmpname, mode)
self.assertTrue(os.path.isdir(tmpname))
(status, msg) = \
backend._VerifyRestrictedCmdDirectory(tmpname, _owner=NotImplemented)
self.assertFalse(status)
self.assertTrue(msg.startswith("Permissions on '"))
def testNoDirectory(self):
tmpname = utils.PathJoin(self.tmpdir, "foobar")
utils.WriteFile(tmpname, data="empty\n")
self.assertTrue(os.path.isfile(tmpname))
(status, msg) = \
backend._VerifyRestrictedCmdDirectory(tmpname,
_owner=_DefRestrictedCmdOwner())
self.assertFalse(status)
self.assertTrue(msg.endswith("is not a directory"))
def testNormal(self):
tmpname = utils.PathJoin(self.tmpdir, "foobar")
os.mkdir(tmpname)
os.chmod(tmpname, 0755)
self.assertTrue(os.path.isdir(tmpname))
(status, msg) = \
backend._VerifyRestrictedCmdDirectory(tmpname,
_owner=_DefRestrictedCmdOwner())
self.assertTrue(status)
self.assertTrue(msg is None)
class TestVerifyRestrictedCmd(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testCanNotStat(self):
tmpname = utils.PathJoin(self.tmpdir, "helloworld")
self.assertFalse(os.path.exists(tmpname))
(status, msg) = \
backend._VerifyRestrictedCmd(self.tmpdir, "helloworld",
_owner=NotImplemented)
self.assertFalse(status)
self.assertTrue(msg.startswith("Can't stat(2) '"))
def testNotExecutable(self):
tmpname = utils.PathJoin(self.tmpdir, "cmdname")
utils.WriteFile(tmpname, data="empty\n")
(status, msg) = \
backend._VerifyRestrictedCmd(self.tmpdir, "cmdname",
_owner=_DefRestrictedCmdOwner())
self.assertFalse(status)
self.assertTrue(msg.startswith("access(2) thinks '"))
def testExecutable(self):
tmpname = utils.PathJoin(self.tmpdir, "cmdname")
utils.WriteFile(tmpname, data="empty\n", mode=0700)
(status, executable) = \
backend._VerifyRestrictedCmd(self.tmpdir, "cmdname",
_owner=_DefRestrictedCmdOwner())
self.assertTrue(status)
self.assertEqual(executable, tmpname)
class TestPrepareRestrictedCmd(unittest.TestCase):
_TEST_PATH = "/tmp/some/test/path"
def testDirFails(self):
def fn(path):
self.assertEqual(path, self._TEST_PATH)
return (False, "test error 31420")
(status, msg) = \
backend._PrepareRestrictedCmd(self._TEST_PATH, "cmd21152",
_verify_dir=fn,
_verify_name=NotImplemented,
_verify_cmd=NotImplemented)
self.assertFalse(status)
self.assertEqual(msg, "test error 31420")
def testNameFails(self):
def fn(cmd):
self.assertEqual(cmd, "cmd4617")
return (False, "test error 591")
(status, msg) = \
backend._PrepareRestrictedCmd(self._TEST_PATH, "cmd4617",
_verify_dir=lambda _: (True, None),
_verify_name=fn,
_verify_cmd=NotImplemented)
self.assertFalse(status)
self.assertEqual(msg, "test error 591")
def testCommandFails(self):
def fn(path, cmd):
self.assertEqual(path, self._TEST_PATH)
self.assertEqual(cmd, "cmd17577")
return (False, "test error 25524")
(status, msg) = \
backend._PrepareRestrictedCmd(self._TEST_PATH, "cmd17577",
_verify_dir=lambda _: (True, None),
_verify_name=lambda _: (True, None),
_verify_cmd=fn)
self.assertFalse(status)
self.assertEqual(msg, "test error 25524")
def testSuccess(self):
def fn(path, cmd):
return (True, utils.PathJoin(path, cmd))
(status, executable) = \
backend._PrepareRestrictedCmd(self._TEST_PATH, "cmd22633",
_verify_dir=lambda _: (True, None),
_verify_name=lambda _: (True, None),
_verify_cmd=fn)
self.assertTrue(status)
self.assertEqual(executable, utils.PathJoin(self._TEST_PATH, "cmd22633"))
def _SleepForRestrictedCmd(duration):
assert duration > 5
def _GenericRestrictedCmdError(cmd):
return "Executing command '%s' failed" % cmd
class TestRunRestrictedCmd(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testNonExistantLockDirectory(self):
lockfile = utils.PathJoin(self.tmpdir, "does", "not", "exist")
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
self.assertFalse(os.path.exists(lockfile))
self.assertRaises(backend.RPCFail,
backend.RunRestrictedCmd, "test",
_lock_timeout=NotImplemented,
_lock_file=lockfile,
_path=NotImplemented,
_sleep_fn=sleep_fn,
_prepare_fn=NotImplemented,
_runcmd_fn=NotImplemented,
_enabled=True)
self.assertEqual(sleep_fn.Count(), 1)
@staticmethod
def _TryLock(lockfile):
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
result = False
try:
backend.RunRestrictedCmd("test22717",
_lock_timeout=0.1,
_lock_file=lockfile,
_path=NotImplemented,
_sleep_fn=sleep_fn,
_prepare_fn=NotImplemented,
_runcmd_fn=NotImplemented,
_enabled=True)
except backend.RPCFail, err:
assert str(err) == _GenericRestrictedCmdError("test22717"), \
"Did not fail with generic error message"
result = True
assert sleep_fn.Count() == 1
return result
def testLockHeldByOtherProcess(self):
lockfile = utils.PathJoin(self.tmpdir, "lock")
lock = utils.FileLock.Open(lockfile)
lock.Exclusive(blocking=True, timeout=1.0)
try:
self.assertTrue(utils.RunInSeparateProcess(self._TryLock, lockfile))
finally:
lock.Close()
@staticmethod
def _PrepareRaisingException(path, cmd):
assert cmd == "test23122"
raise Exception("test")
def testPrepareRaisesException(self):
lockfile = utils.PathJoin(self.tmpdir, "lock")
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
prepare_fn = testutils.CallCounter(self._PrepareRaisingException)
try:
backend.RunRestrictedCmd("test23122",
_lock_timeout=1.0, _lock_file=lockfile,
_path=NotImplemented, _runcmd_fn=NotImplemented,
_sleep_fn=sleep_fn, _prepare_fn=prepare_fn,
_enabled=True)
except backend.RPCFail, err:
self.assertEqual(str(err), _GenericRestrictedCmdError("test23122"))
else:
self.fail("Didn't fail")
self.assertEqual(sleep_fn.Count(), 1)
self.assertEqual(prepare_fn.Count(), 1)
@staticmethod
def _PrepareFails(path, cmd):
assert cmd == "test29327"
return ("some error message", None)
def testPrepareFails(self):
lockfile = utils.PathJoin(self.tmpdir, "lock")
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
prepare_fn = testutils.CallCounter(self._PrepareFails)
try:
backend.RunRestrictedCmd("test29327",
_lock_timeout=1.0, _lock_file=lockfile,
_path=NotImplemented, _runcmd_fn=NotImplemented,
_sleep_fn=sleep_fn, _prepare_fn=prepare_fn,
_enabled=True)
except backend.RPCFail, err:
self.assertEqual(str(err), _GenericRestrictedCmdError("test29327"))
else:
self.fail("Didn't fail")
self.assertEqual(sleep_fn.Count(), 1)
self.assertEqual(prepare_fn.Count(), 1)
@staticmethod
def _SuccessfulPrepare(path, cmd):
return (True, utils.PathJoin(path, cmd))
def testRunCmdFails(self):
lockfile = utils.PathJoin(self.tmpdir, "lock")
def fn(args, env=NotImplemented, reset_env=NotImplemented,
postfork_fn=NotImplemented):
self.assertEqual(args, [utils.PathJoin(self.tmpdir, "test3079")])
self.assertEqual(env, {})
self.assertTrue(reset_env)
self.assertTrue(callable(postfork_fn))
trylock = utils.FileLock.Open(lockfile)
try:
# See if lockfile is still held
self.assertRaises(EnvironmentError, trylock.Exclusive, blocking=False)
# Call back to release lock
postfork_fn(NotImplemented)
# See if lockfile can be acquired
trylock.Exclusive(blocking=False)
finally:
trylock.Close()
# Simulate a failed command
return utils.RunResult(constants.EXIT_FAILURE, None,
"stdout", "stderr406328567",
utils.ShellQuoteArgs(args),
NotImplemented, NotImplemented)
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
prepare_fn = testutils.CallCounter(self._SuccessfulPrepare)
runcmd_fn = testutils.CallCounter(fn)
try:
backend.RunRestrictedCmd("test3079",
_lock_timeout=1.0, _lock_file=lockfile,
_path=self.tmpdir, _runcmd_fn=runcmd_fn,
_sleep_fn=sleep_fn, _prepare_fn=prepare_fn,
_enabled=True)
except backend.RPCFail, err:
self.assertTrue(str(err).startswith("Restricted command 'test3079'"
" failed:"))
self.assertTrue("stderr406328567" in str(err),
msg="Error did not include output")
else:
self.fail("Didn't fail")
self.assertEqual(sleep_fn.Count(), 0)
self.assertEqual(prepare_fn.Count(), 1)
self.assertEqual(runcmd_fn.Count(), 1)
def testRunCmdSucceeds(self):
lockfile = utils.PathJoin(self.tmpdir, "lock")
def fn(args, env=NotImplemented, reset_env=NotImplemented,
postfork_fn=NotImplemented):
self.assertEqual(args, [utils.PathJoin(self.tmpdir, "test5667")])
self.assertEqual(env, {})
self.assertTrue(reset_env)
# Call back to release lock
postfork_fn(NotImplemented)
# Simulate a successful command
return utils.RunResult(constants.EXIT_SUCCESS, None, "stdout14463", "",
utils.ShellQuoteArgs(args),
NotImplemented, NotImplemented)
sleep_fn = testutils.CallCounter(_SleepForRestrictedCmd)
prepare_fn = testutils.CallCounter(self._SuccessfulPrepare)
runcmd_fn = testutils.CallCounter(fn)
result = backend.RunRestrictedCmd("test5667",
_lock_timeout=1.0, _lock_file=lockfile,
_path=self.tmpdir, _runcmd_fn=runcmd_fn,
_sleep_fn=sleep_fn,
_prepare_fn=prepare_fn,
_enabled=True)
self.assertEqual(result, "stdout14463")
self.assertEqual(sleep_fn.Count(), 0)
self.assertEqual(prepare_fn.Count(), 1)
self.assertEqual(runcmd_fn.Count(), 1)
def testCommandsDisabled(self):
try:
backend.RunRestrictedCmd("test",
_lock_timeout=NotImplemented,
_lock_file=NotImplemented,
_path=NotImplemented,
_sleep_fn=NotImplemented,
_prepare_fn=NotImplemented,
_runcmd_fn=NotImplemented,
_enabled=False)
except backend.RPCFail, err:
self.assertEqual(str(err),
"Restricted commands disabled at configure time")
else:
self.fail("Did not raise exception")
class TestSetWatcherPause(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filename = utils.PathJoin(self.tmpdir, "pause")
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testUnsetNonExisting(self):
self.assertFalse(os.path.exists(self.filename))
backend.SetWatcherPause(None, _filename=self.filename)
self.assertFalse(os.path.exists(self.filename))
def testSetNonNumeric(self):
for i in ["", [], {}, "Hello World", "0", "1.0"]:
self.assertFalse(os.path.exists(self.filename))
try:
backend.SetWatcherPause(i, _filename=self.filename)
except backend.RPCFail, err:
self.assertEqual(str(err), "Duration must be numeric")
else:
self.fail("Did not raise exception")
self.assertFalse(os.path.exists(self.filename))
def testSet(self):
self.assertFalse(os.path.exists(self.filename))
for i in range(10):
backend.SetWatcherPause(i, _filename=self.filename)
self.assertEqual(utils.ReadFile(self.filename), "%s\n" % i)
self.assertEqual(os.stat(self.filename).st_mode & 0777, 0644)
class TestGetBlockDevSymlinkPath(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _Test(self, name, idx):
self.assertEqual(backend._GetBlockDevSymlinkPath(name, idx,
_dir=self.tmpdir),
("%s/%s%s%s" % (self.tmpdir, name,
constants.DISK_SEPARATOR, idx)))
def test(self):
for idx in range(100):
self._Test("inst1.example.com", idx)
class TestGetInstanceList(unittest.TestCase):
def setUp(self):
self._test_hv = self._TestHypervisor()
self._test_hv.ListInstances = mock.Mock(
return_value=["instance1", "instance2", "instance3"] )
class _TestHypervisor(hypervisor.hv_base.BaseHypervisor):
def __init__(self):
hypervisor.hv_base.BaseHypervisor.__init__(self)
def _GetHypervisor(self, name):
return self._test_hv
def testHvparams(self):
fake_hvparams = {constants.HV_XEN_CMD: constants.XEN_CMD_XL}
hvparams = {constants.HT_FAKE: fake_hvparams}
backend.GetInstanceList([constants.HT_FAKE], all_hvparams=hvparams,
get_hv_fn=self._GetHypervisor)
self._test_hv.ListInstances.assert_called_with(hvparams=fake_hvparams)
class TestGetHvInfo(unittest.TestCase):
def setUp(self):
self._test_hv = self._TestHypervisor()
self._test_hv.GetNodeInfo = mock.Mock()
class _TestHypervisor(hypervisor.hv_base.BaseHypervisor):
def __init__(self):
hypervisor.hv_base.BaseHypervisor.__init__(self)
def _GetHypervisor(self, name):
return self._test_hv
def testGetHvInfoAllNone(self):
result = backend._GetHvInfoAll(None)
self.assertTrue(result is None)
def testGetHvInfoAll(self):
hvname = constants.HT_XEN_PVM
hvparams = {constants.HV_XEN_CMD: constants.XEN_CMD_XL}
hv_specs = [(hvname, hvparams)]
backend._GetHvInfoAll(hv_specs, self._GetHypervisor)
self._test_hv.GetNodeInfo.assert_called_with(hvparams=hvparams)
class TestApplyStorageInfoFunction(unittest.TestCase):
_STORAGE_KEY = "some_key"
_SOME_ARGS = ["some_args"]
def setUp(self):
self.mock_storage_fn = mock.Mock()
def testApplyValidStorageType(self):
storage_type = constants.ST_LVM_VG
info_fn_orig = backend._STORAGE_TYPE_INFO_FN
backend._STORAGE_TYPE_INFO_FN = {
storage_type: self.mock_storage_fn
}
backend._ApplyStorageInfoFunction(
storage_type, self._STORAGE_KEY, self._SOME_ARGS)
self.mock_storage_fn.assert_called_with(self._STORAGE_KEY, self._SOME_ARGS)
backend._STORAGE_TYPE_INFO_FN = info_fn_orig
def testApplyInValidStorageType(self):
storage_type = "invalid_storage_type"
info_fn_orig = backend._STORAGE_TYPE_INFO_FN
backend._STORAGE_TYPE_INFO_FN = {}
self.assertRaises(KeyError, backend._ApplyStorageInfoFunction,
storage_type, self._STORAGE_KEY, self._SOME_ARGS)
backend._STORAGE_TYPE_INFO_FN = info_fn_orig
def testApplyNotImplementedStorageType(self):
storage_type = "not_implemented_storage_type"
info_fn_orig = backend._STORAGE_TYPE_INFO_FN
backend._STORAGE_TYPE_INFO_FN = {storage_type: None}
self.assertRaises(NotImplementedError,
backend._ApplyStorageInfoFunction,
storage_type, self._STORAGE_KEY, self._SOME_ARGS)
backend._STORAGE_TYPE_INFO_FN = info_fn_orig
class TestGetLvmVgSpaceInfo(unittest.TestCase):
def testValid(self):
path = "somepath"
excl_stor = True
orig_fn = backend._GetVgInfo
backend._GetVgInfo = mock.Mock()
backend._GetLvmVgSpaceInfo(path, [excl_stor])
backend._GetVgInfo.assert_called_with(path, excl_stor)
backend._GetVgInfo = orig_fn
def testNoExclStorageNotBool(self):
path = "somepath"
excl_stor = "123"
self.assertRaises(errors.ProgrammerError, backend._GetLvmVgSpaceInfo,
path, [excl_stor])
def testNoExclStorageNotInList(self):
path = "somepath"
excl_stor = "123"
self.assertRaises(errors.ProgrammerError, backend._GetLvmVgSpaceInfo,
path, excl_stor)
class TestGetLvmPvSpaceInfo(unittest.TestCase):
def testValid(self):
path = "somepath"
excl_stor = True
orig_fn = backend._GetVgSpindlesInfo
backend._GetVgSpindlesInfo = mock.Mock()
backend._GetLvmPvSpaceInfo(path, [excl_stor])
backend._GetVgSpindlesInfo.assert_called_with(path, excl_stor)
backend._GetVgSpindlesInfo = orig_fn
class TestCheckStorageParams(unittest.TestCase):
def testParamsNone(self):
self.assertRaises(errors.ProgrammerError, backend._CheckStorageParams,
None, NotImplemented)
def testParamsWrongType(self):
self.assertRaises(errors.ProgrammerError, backend._CheckStorageParams,
"string", NotImplemented)
def testParamsEmpty(self):
backend._CheckStorageParams([], 0)
def testParamsValidNumber(self):
backend._CheckStorageParams(["a", True], 2)
def testParamsInvalidNumber(self):
self.assertRaises(errors.ProgrammerError, backend._CheckStorageParams,
["b", False], 3)
class TestGetVgSpindlesInfo(unittest.TestCase):
def setUp(self):
self.vg_free = 13
self.vg_size = 31
self.mock_fn = mock.Mock(return_value=(self.vg_free, self.vg_size))
def testValidInput(self):
name = "myvg"
excl_stor = True
result = backend._GetVgSpindlesInfo(name, excl_stor, info_fn=self.mock_fn)
self.mock_fn.assert_called_with(name)
self.assertEqual(name, result["name"])
self.assertEqual(constants.ST_LVM_PV, result["type"])
self.assertEqual(self.vg_free, result["storage_free"])
self.assertEqual(self.vg_size, result["storage_size"])
def testNoExclStor(self):
name = "myvg"
excl_stor = False
result = backend._GetVgSpindlesInfo(name, excl_stor, info_fn=self.mock_fn)
self.mock_fn.assert_not_called()
self.assertEqual(name, result["name"])
self.assertEqual(constants.ST_LVM_PV, result["type"])
self.assertEqual(0, result["storage_free"])
self.assertEqual(0, result["storage_size"])
class TestGetVgSpindlesInfo(unittest.TestCase):
def testValidInput(self):
self.vg_free = 13
self.vg_size = 31
self.mock_fn = mock.Mock(return_value=[(self.vg_free, self.vg_size)])
name = "myvg"
excl_stor = True
result = backend._GetVgInfo(name, excl_stor, info_fn=self.mock_fn)
self.mock_fn.assert_called_with([name], excl_stor)
self.assertEqual(name, result["name"])
self.assertEqual(constants.ST_LVM_VG, result["type"])
self.assertEqual(self.vg_free, result["storage_free"])
self.assertEqual(self.vg_size, result["storage_size"])
def testNoExclStor(self):
name = "myvg"
excl_stor = True
self.mock_fn = mock.Mock(return_value=None)
result = backend._GetVgInfo(name, excl_stor, info_fn=self.mock_fn)
self.mock_fn.assert_called_with([name], excl_stor)
self.assertEqual(name, result["name"])
self.assertEqual(constants.ST_LVM_VG, result["type"])
self.assertEqual(None, result["storage_free"])
self.assertEqual(None, result["storage_size"])
class TestGetNodeInfo(unittest.TestCase):
_SOME_RESULT = None
def testApplyStorageInfoFunction(self):
orig_fn = backend._ApplyStorageInfoFunction
backend._ApplyStorageInfoFunction = mock.Mock(
return_value=self._SOME_RESULT)
storage_units = [(st, st + "_key", [st + "_params"]) for st in
constants.STORAGE_TYPES]
backend.GetNodeInfo(storage_units, None)
call_args_list = backend._ApplyStorageInfoFunction.call_args_list
self.assertEqual(len(constants.STORAGE_TYPES), len(call_args_list))
for call in call_args_list:
storage_type, storage_key, storage_params = call[0]
self.assertEqual(storage_type + "_key", storage_key)
self.assertEqual([storage_type + "_params"], storage_params)
self.assertTrue(storage_type in constants.STORAGE_TYPES)
backend._ApplyStorageInfoFunction = orig_fn
class TestSpaceReportingConstants(unittest.TestCase):
"""Ensures consistency between STS_REPORT and backend.
These tests ensure, that the constant 'STS_REPORT' is consitent
with the implementation of invoking space reporting functions
in backend.py. Once space reporting is available for all types,
the constant can be removed and these tests as well.
"""
def testAllReportingTypesHaveAReportingFunction(self):
for storage_type in constants.STS_REPORT:
self.assertTrue(backend._STORAGE_TYPE_INFO_FN[storage_type] is not None)
def testAllNotReportingTypesDoneHaveFunction(self):
non_reporting_types = set(constants.STORAGE_TYPES)\
- set(constants.STS_REPORT)
for storage_type in non_reporting_types:
self.assertEqual(None, backend._STORAGE_TYPE_INFO_FN[storage_type])
if __name__ == "__main__":
testutils.GanetiTestProgram()