blob: c0d41545c64969d740a2f73c8a2df6926664f116 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.ssconf"""
import os
import unittest
import tempfile
import shutil
import errno
from ganeti import utils
from ganeti import constants
from ganeti import errors
from ganeti import ssconf
import testutils
import mock
class TestReadSsconfFile(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testReadDirectory(self):
self.assertRaises(EnvironmentError, ssconf.ReadSsconfFile, self.tmpdir)
def testNonExistantFile(self):
testfile = utils.PathJoin(self.tmpdir, "does.not.exist")
self.assertFalse(os.path.exists(testfile))
try:
ssconf.ReadSsconfFile(testfile)
except EnvironmentError, err:
self.assertEqual(err.errno, errno.ENOENT)
else:
self.fail("Exception was not raised")
def testEmptyFile(self):
testfile = utils.PathJoin(self.tmpdir, "empty")
utils.WriteFile(testfile, data="")
self.assertEqual(ssconf.ReadSsconfFile(testfile), "")
def testSingleLine(self):
testfile = utils.PathJoin(self.tmpdir, "data")
for nl in range(0, 10):
utils.WriteFile(testfile, data="Hello World" + ("\n" * nl))
self.assertEqual(ssconf.ReadSsconfFile(testfile),
"Hello World")
def testExactlyMaxSize(self):
testfile = utils.PathJoin(self.tmpdir, "data")
data = "A" * ssconf._MAX_SIZE
utils.WriteFile(testfile, data=data)
self.assertEqual(os.path.getsize(testfile), ssconf._MAX_SIZE)
self.assertEqual(ssconf.ReadSsconfFile(testfile),
data)
def testLargeFile(self):
testfile = utils.PathJoin(self.tmpdir, "data")
for size in [ssconf._MAX_SIZE + 1, ssconf._MAX_SIZE * 2]:
utils.WriteFile(testfile, data="A" * size)
self.assertTrue(os.path.getsize(testfile) > ssconf._MAX_SIZE)
self.assertRaises(RuntimeError, ssconf.ReadSsconfFile, testfile)
class TestSimpleStore(unittest.TestCase):
def setUp(self):
self._tmpdir = tempfile.mkdtemp()
self.ssdir = utils.PathJoin(self._tmpdir, "files")
lockfile = utils.PathJoin(self._tmpdir, "lock")
os.mkdir(self.ssdir)
self.sstore = ssconf.SimpleStore(cfg_location=self.ssdir,
_lockfile=lockfile)
def tearDown(self):
shutil.rmtree(self._tmpdir)
def _ReadSsFile(self, filename):
return utils.ReadFile(utils.PathJoin(self.ssdir, "ssconf_%s" % filename))
def testInvalidKey(self):
self.assertRaises(errors.ProgrammerError, self.sstore.KeyToFilename,
"not a valid key")
self.assertRaises(errors.ProgrammerError, self.sstore._ReadFile,
"not a valid key")
def testKeyToFilename(self):
for key in ssconf._VALID_KEYS:
result = self.sstore.KeyToFilename(key)
self.assertTrue(utils.IsBelowDir(self.ssdir, result))
self.assertTrue(os.path.basename(result).startswith("ssconf_"))
def testReadFileNonExistingFile(self):
filename = self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME)
self.assertFalse(os.path.exists(filename))
try:
self.sstore._ReadFile(constants.SS_CLUSTER_NAME)
except errors.ConfigurationError, err:
self.assertTrue(str(err).startswith("Can't read ssconf file"))
else:
self.fail("Exception was not raised")
for default in ["", "Hello World", 0, 100]:
self.assertFalse(os.path.exists(filename))
result = self.sstore._ReadFile(constants.SS_CLUSTER_NAME, default=default)
self.assertEqual(result, default)
def testReadFile(self):
utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
data="cluster.example.com")
self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME),
"cluster.example.com")
self.assertEqual(self.sstore._ReadFile(constants.SS_CLUSTER_NAME,
default="something.example.com"),
"cluster.example.com")
def testReadAllNoFiles(self):
self.assertEqual(self.sstore.ReadAll(), {})
def testReadAllSingleFile(self):
utils.WriteFile(self.sstore.KeyToFilename(constants.SS_CLUSTER_NAME),
data="cluster.example.com")
self.assertEqual(self.sstore.ReadAll(), {
constants.SS_CLUSTER_NAME: "cluster.example.com",
})
def testWriteFiles(self):
values = {
constants.SS_CLUSTER_NAME: "cluster.example.com",
constants.SS_CLUSTER_TAGS: "value\nwith\nnewlines\n",
constants.SS_INSTANCE_LIST: "",
}
self.sstore.WriteFiles(values)
self.assertEqual(sorted(os.listdir(self.ssdir)), sorted([
"ssconf_cluster_name",
"ssconf_cluster_tags",
"ssconf_instance_list",
]))
self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_NAME),
"cluster.example.com\n")
self.assertEqual(self._ReadSsFile(constants.SS_CLUSTER_TAGS),
"value\nwith\nnewlines\n")
self.assertEqual(self._ReadSsFile(constants.SS_INSTANCE_LIST), "")
def testWriteFilesUnknownKey(self):
values = {
"unknown key": "value",
}
self.assertRaises(errors.ProgrammerError, self.sstore.WriteFiles,
values, dry_run=True)
self.assertEqual(os.listdir(self.ssdir), [])
def testWriteFilesDryRun(self):
values = {
constants.SS_CLUSTER_NAME: "cluster.example.com",
}
self.sstore.WriteFiles(values, dry_run=True)
self.assertEqual(os.listdir(self.ssdir), [])
def testWriteFilesNoValues(self):
for dry_run in [False, True]:
self.sstore.WriteFiles({}, dry_run=dry_run)
self.assertEqual(os.listdir(self.ssdir), [])
def testWriteFilesTooLong(self):
values = {
constants.SS_INSTANCE_LIST: "A" * ssconf._MAX_SIZE,
}
for dry_run in [False, True]:
try:
self.sstore.WriteFiles(values, dry_run=dry_run)
except errors.ConfigurationError, err:
self.assertTrue(str(err).startswith("Value 'instance_list' has"))
else:
self.fail("Exception was not raised")
self.assertEqual(os.listdir(self.ssdir), [])
def testGetHvparamsForHypervisor(self):
hvparams = [("a", "A"), ("b", "B"), ("c", "C")]
ssconf_file_content = '\n'.join("%s=%s" % (key, value) for (key, value)
in hvparams)
self.sstore._ReadFile = mock.Mock(return_value=ssconf_file_content)
result = self.sstore.GetHvparamsForHypervisor("foo")
for (key, value) in hvparams:
self.assertTrue(key in result)
self.assertEqual(value, result[key])
class TestVerifyClusterName(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testMissingFile(self):
tmploc = utils.PathJoin(self.tmpdir, "does-not-exist")
ssconf.VerifyClusterName(NotImplemented, _cfg_location=tmploc)
def testMatchingName(self):
tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
for content in ["cluster.example.com", "cluster.example.com\n\n"]:
utils.WriteFile(tmpfile, data=content)
ssconf.VerifyClusterName("cluster.example.com",
_cfg_location=self.tmpdir)
def testNameMismatch(self):
tmpfile = utils.PathJoin(self.tmpdir, "ssconf_cluster_name")
for content in ["something.example.com", "foobar\n\ncluster.example.com"]:
utils.WriteFile(tmpfile, data=content)
self.assertRaises(errors.GenericError, ssconf.VerifyClusterName,
"cluster.example.com", _cfg_location=self.tmpdir)
class TestVerifyKeys(unittest.TestCase):
def testNoKeys(self):
ssconf.VerifyKeys({})
def testValidKeys(self):
ssconf.VerifyKeys(ssconf._VALID_KEYS)
for key in ssconf._VALID_KEYS:
ssconf.VerifyKeys([key])
def testInvalidKeys(self):
for key in ["", ".", " ", "foo", "bar", "HelloWorld"]:
self.assertRaises(errors.GenericError, ssconf.VerifyKeys, [key])
if __name__ == "__main__":
testutils.GanetiTestProgram()