blob: 42cc3a4c68ed14133375cfa74b0a4de1916d065d [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2008, 2011, 2012, 2013 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Script for unittesting the cli module"""
import copy
import testutils
import time
import unittest
import yaml
from cStringIO import StringIO
from ganeti import constants
from ganeti import cli
from ganeti import errors
from ganeti import utils
from ganeti import objects
from ganeti import qlang
from ganeti.errors import OpPrereqError, ParameterError
class TestParseTimespec(unittest.TestCase):
"""Testing case for ParseTimespec"""
def testValidTimes(self):
"""Test valid timespecs"""
test_data = [
("1s", 1),
("1", 1),
("1m", 60),
("1h", 60 * 60),
("1d", 60 * 60 * 24),
("1w", 60 * 60 * 24 * 7),
("4h", 4 * 60 * 60),
("61m", 61 * 60),
]
for value, expected_result in test_data:
self.failUnlessEqual(cli.ParseTimespec(value), expected_result)
def testInvalidTime(self):
"""Test invalid timespecs"""
test_data = [
"1y",
"",
"aaa",
"s",
]
for value in test_data:
self.failUnlessRaises(OpPrereqError, cli.ParseTimespec, value)
class TestToStream(unittest.TestCase):
"""Test the ToStream functions"""
def testBasic(self):
for data in ["foo",
"foo %s",
"foo %(test)s",
"foo %s %s",
"",
]:
buf = StringIO()
cli._ToStream(buf, data)
self.failUnlessEqual(buf.getvalue(), data + "\n")
def testParams(self):
buf = StringIO()
cli._ToStream(buf, "foo %s", 1)
self.failUnlessEqual(buf.getvalue(), "foo 1\n")
buf = StringIO()
cli._ToStream(buf, "foo %s", (15,16))
self.failUnlessEqual(buf.getvalue(), "foo (15, 16)\n")
buf = StringIO()
cli._ToStream(buf, "foo %s %s", "a", "b")
self.failUnlessEqual(buf.getvalue(), "foo a b\n")
class TestGenerateTable(unittest.TestCase):
HEADERS = dict([("f%s" % i, "Field%s" % i) for i in range(5)])
FIELDS1 = ["f1", "f2"]
DATA1 = [
["abc", 1234],
["foobar", 56],
["b", -14],
]
def _test(self, headers, fields, separator, data,
numfields, unitfields, units, expected):
table = cli.GenerateTable(headers, fields, separator, data,
numfields=numfields, unitfields=unitfields,
units=units)
self.assertEqual(table, expected)
def testPlain(self):
exp = [
"Field1 Field2",
"abc 1234",
"foobar 56",
"b -14",
]
self._test(self.HEADERS, self.FIELDS1, None, self.DATA1,
None, None, "m", exp)
def testNoFields(self):
self._test(self.HEADERS, [], None, [[], []],
None, None, "m", ["", "", ""])
self._test(None, [], None, [[], []],
None, None, "m", ["", ""])
def testSeparator(self):
for sep in ["#", ":", ",", "^", "!", "%", "|", "###", "%%", "!!!", "||"]:
exp = [
"Field1%sField2" % sep,
"abc%s1234" % sep,
"foobar%s56" % sep,
"b%s-14" % sep,
]
self._test(self.HEADERS, self.FIELDS1, sep, self.DATA1,
None, None, "m", exp)
def testNoHeader(self):
exp = [
"abc 1234",
"foobar 56",
"b -14",
]
self._test(None, self.FIELDS1, None, self.DATA1,
None, None, "m", exp)
def testUnknownField(self):
headers = {
"f1": "Field1",
}
exp = [
"Field1 UNKNOWN",
"abc 1234",
"foobar 56",
"b -14",
]
self._test(headers, ["f1", "UNKNOWN"], None, self.DATA1,
None, None, "m", exp)
def testNumfields(self):
fields = ["f1", "f2", "f3"]
data = [
["abc", 1234, 0],
["foobar", 56, 3],
["b", -14, "-"],
]
exp = [
"Field1 Field2 Field3",
"abc 1234 0",
"foobar 56 3",
"b -14 -",
]
self._test(self.HEADERS, fields, None, data,
["f2", "f3"], None, "m", exp)
def testUnitfields(self):
expnosep = [
"Field1 Field2 Field3",
"abc 1234 0M",
"foobar 56 3M",
"b -14 -",
]
expsep = [
"Field1:Field2:Field3",
"abc:1234:0M",
"foobar:56:3M",
"b:-14:-",
]
for sep, expected in [(None, expnosep), (":", expsep)]:
fields = ["f1", "f2", "f3"]
data = [
["abc", 1234, 0],
["foobar", 56, 3],
["b", -14, "-"],
]
self._test(self.HEADERS, fields, sep, data,
["f2", "f3"], ["f3"], "h", expected)
def testUnusual(self):
data = [
["%", "xyz"],
["%%", "abc"],
]
exp = [
"Field1 Field2",
"% xyz",
"%% abc",
]
self._test(self.HEADERS, ["f1", "f2"], None, data,
None, None, "m", exp)
class TestFormatQueryResult(unittest.TestCase):
def test(self):
fields = [
objects.QueryFieldDefinition(name="name", title="Name",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="size", title="Size",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="act", title="Active",
kind=constants.QFT_BOOL),
objects.QueryFieldDefinition(name="mem", title="Memory",
kind=constants.QFT_UNIT),
objects.QueryFieldDefinition(name="other", title="SomeList",
kind=constants.QFT_OTHER),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, "nodeA"), (constants.RS_NORMAL, 128),
(constants.RS_NORMAL, False), (constants.RS_NORMAL, 1468006),
(constants.RS_NORMAL, [])],
[(constants.RS_NORMAL, "other"), (constants.RS_NORMAL, 512),
(constants.RS_NORMAL, True), (constants.RS_NORMAL, 16),
(constants.RS_NORMAL, [1, 2, 3])],
[(constants.RS_NORMAL, "xyz"), (constants.RS_NORMAL, 1024),
(constants.RS_NORMAL, True), (constants.RS_NORMAL, 4096),
(constants.RS_NORMAL, [{}, {}])],
])
self.assertEqual(cli.FormatQueryResult(response, unit="h", header=True),
(cli.QR_NORMAL, [
"Name Size Active Memory SomeList",
"nodeA 128 N 1.4T []",
"other 512 Y 16M [1, 2, 3]",
"xyz 1024 Y 4.0G [{}, {}]",
]))
def testTimestampAndUnit(self):
fields = [
objects.QueryFieldDefinition(name="name", title="Name",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="size", title="Size",
kind=constants.QFT_UNIT),
objects.QueryFieldDefinition(name="mtime", title="ModTime",
kind=constants.QFT_TIMESTAMP),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, "a"), (constants.RS_NORMAL, 1024),
(constants.RS_NORMAL, 0)],
[(constants.RS_NORMAL, "b"), (constants.RS_NORMAL, 144996),
(constants.RS_NORMAL, 1291746295)],
])
self.assertEqual(cli.FormatQueryResult(response, unit="m", header=True),
(cli.QR_NORMAL, [
"Name Size ModTime",
"a 1024 %s" % utils.FormatTime(0),
"b 144996 %s" % utils.FormatTime(1291746295),
]))
def testOverride(self):
fields = [
objects.QueryFieldDefinition(name="name", title="Name",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="cust", title="Custom",
kind=constants.QFT_OTHER),
objects.QueryFieldDefinition(name="xt", title="XTime",
kind=constants.QFT_TIMESTAMP),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, "x"), (constants.RS_NORMAL, ["a", "b", "c"]),
(constants.RS_NORMAL, 1234)],
[(constants.RS_NORMAL, "y"), (constants.RS_NORMAL, range(10)),
(constants.RS_NORMAL, 1291746295)],
])
override = {
"cust": (utils.CommaJoin, False),
"xt": (hex, True),
}
self.assertEqual(cli.FormatQueryResult(response, unit="h", header=True,
format_override=override),
(cli.QR_NORMAL, [
"Name Custom XTime",
"x a, b, c 0x4d2",
"y 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 0x4cfe7bf7",
]))
def testSeparator(self):
fields = [
objects.QueryFieldDefinition(name="name", title="Name",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="count", title="Count",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="desc", title="Description",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, "instance1.example.com"),
(constants.RS_NORMAL, 21125), (constants.RS_NORMAL, "Hello World!")],
[(constants.RS_NORMAL, "mail.other.net"),
(constants.RS_NORMAL, -9000), (constants.RS_NORMAL, "a,b,c")],
])
for sep in [":", "|", "#", "|||", "###", "@@@", "@#@"]:
for header in [None, "Name%sCount%sDescription" % (sep, sep)]:
exp = []
if header:
exp.append(header)
exp.extend([
"instance1.example.com%s21125%sHello World!" % (sep, sep),
"mail.other.net%s-9000%sa,b,c" % (sep, sep),
])
self.assertEqual(cli.FormatQueryResult(response, separator=sep,
header=bool(header)),
(cli.QR_NORMAL, exp))
def testStatusWithUnknown(self):
fields = [
objects.QueryFieldDefinition(name="id", title="ID",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="unk", title="unk",
kind=constants.QFT_UNKNOWN),
objects.QueryFieldDefinition(name="unavail", title="Unavail",
kind=constants.QFT_BOOL),
objects.QueryFieldDefinition(name="nodata", title="NoData",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="offline", title="OffLine",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, 1), (constants.RS_UNKNOWN, None),
(constants.RS_NORMAL, False), (constants.RS_NORMAL, ""),
(constants.RS_OFFLINE, None)],
[(constants.RS_NORMAL, 2), (constants.RS_UNKNOWN, None),
(constants.RS_NODATA, None), (constants.RS_NORMAL, "x"),
(constants.RS_OFFLINE, None)],
[(constants.RS_NORMAL, 3), (constants.RS_UNKNOWN, None),
(constants.RS_NORMAL, False), (constants.RS_UNAVAIL, None),
(constants.RS_OFFLINE, None)],
])
self.assertEqual(cli.FormatQueryResult(response, header=True,
separator="|", verbose=True),
(cli.QR_UNKNOWN, [
"ID|unk|Unavail|NoData|OffLine",
"1|(unknown)|N||(offline)",
"2|(unknown)|(nodata)|x|(offline)",
"3|(unknown)|N|(unavail)|(offline)",
]))
self.assertEqual(cli.FormatQueryResult(response, header=True,
separator="|", verbose=False),
(cli.QR_UNKNOWN, [
"ID|unk|Unavail|NoData|OffLine",
"1|??|N||*",
"2|??|?|x|*",
"3|??|N|-|*",
]))
def testNoData(self):
fields = [
objects.QueryFieldDefinition(name="id", title="ID",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="name", title="Name",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[])
self.assertEqual(cli.FormatQueryResult(response, header=True),
(cli.QR_NORMAL, ["ID Name"]))
def testNoDataWithUnknown(self):
fields = [
objects.QueryFieldDefinition(name="id", title="ID",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="unk", title="unk",
kind=constants.QFT_UNKNOWN),
]
response = objects.QueryResponse(fields=fields, data=[])
self.assertEqual(cli.FormatQueryResult(response, header=False),
(cli.QR_UNKNOWN, []))
def testStatus(self):
fields = [
objects.QueryFieldDefinition(name="id", title="ID",
kind=constants.QFT_NUMBER),
objects.QueryFieldDefinition(name="unavail", title="Unavail",
kind=constants.QFT_BOOL),
objects.QueryFieldDefinition(name="nodata", title="NoData",
kind=constants.QFT_TEXT),
objects.QueryFieldDefinition(name="offline", title="OffLine",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[
[(constants.RS_NORMAL, 1), (constants.RS_NORMAL, False),
(constants.RS_NORMAL, ""), (constants.RS_OFFLINE, None)],
[(constants.RS_NORMAL, 2), (constants.RS_NODATA, None),
(constants.RS_NORMAL, "x"), (constants.RS_NORMAL, "abc")],
[(constants.RS_NORMAL, 3), (constants.RS_NORMAL, False),
(constants.RS_UNAVAIL, None), (constants.RS_OFFLINE, None)],
])
self.assertEqual(cli.FormatQueryResult(response, header=False,
separator="|", verbose=True),
(cli.QR_INCOMPLETE, [
"1|N||(offline)",
"2|(nodata)|x|abc",
"3|N|(unavail)|(offline)",
]))
self.assertEqual(cli.FormatQueryResult(response, header=False,
separator="|", verbose=False),
(cli.QR_INCOMPLETE, [
"1|N||*",
"2|?|x|abc",
"3|N|-|*",
]))
def testInvalidFieldType(self):
fields = [
objects.QueryFieldDefinition(name="x", title="x",
kind="#some#other#type"),
]
response = objects.QueryResponse(fields=fields, data=[])
self.assertRaises(NotImplementedError, cli.FormatQueryResult, response)
def testInvalidFieldStatus(self):
fields = [
objects.QueryFieldDefinition(name="x", title="x",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[[(-1, None)]])
self.assertRaises(NotImplementedError, cli.FormatQueryResult, response)
response = objects.QueryResponse(fields=fields, data=[[(-1, "x")]])
self.assertRaises(AssertionError, cli.FormatQueryResult, response)
def testEmptyFieldTitle(self):
fields = [
objects.QueryFieldDefinition(name="x", title="",
kind=constants.QFT_TEXT),
]
response = objects.QueryResponse(fields=fields, data=[])
self.assertRaises(AssertionError, cli.FormatQueryResult, response)
class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase):
def __init__(self, tc, job_id):
self.tc = tc
self.job_id = job_id
self._wfjcr = []
self._jobstatus = []
self._expect_notchanged = False
self._expect_log = []
def CheckEmpty(self):
self.tc.assertFalse(self._wfjcr)
self.tc.assertFalse(self._jobstatus)
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
def AddWfjcResult(self, *args):
self._wfjcr.append(args)
def AddQueryJobsResult(self, *args):
self._jobstatus.append(args)
def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial,
timeout=constants.DEFAULT_WFJC_TIMEOUT):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assertEqualValues(fields, ["status"])
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
(exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0)
self.tc.assertEqualValues(prev_job_info, exp_prev_job_info)
self.tc.assertEqual(prev_log_serial, exp_prev_log_serial)
if result == constants.JOB_NOTCHANGED:
self._expect_notchanged = True
elif result:
(_, logmsgs) = result
if logmsgs:
self._expect_log.extend(logmsgs)
return result
def QueryJobs(self, job_ids, fields):
self.tc.assertEqual(job_ids, [self.job_id])
self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"])
self.tc.assertFalse(self._expect_notchanged)
self.tc.assertFalse(self._expect_log)
result = self._jobstatus.pop(0)
self.tc.assertEqual(len(fields), len(result))
return [result]
def CancelJob(self, job_id):
self.tc.assertEqual(job_id, self.job_id)
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assertEqualValues((serial, timestamp, log_type, log_msg),
self._expect_log.pop(0))
def ReportNotChanged(self, job_id, status):
self.tc.assertEqual(job_id, self.job_id)
self.tc.assert_(self._expect_notchanged)
self._expect_notchanged = False
class TestGenericPollJob(testutils.GanetiTestCase):
def testSuccessWithLog(self):
job_id = 29609
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None,
((constants.JOB_STATUS_QUEUED, ), None))
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
constants.JOB_NOTCHANGED)
cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
((constants.JOB_STATUS_RUNNING, ),
[(1, utils.SplitTime(1273491611.0),
constants.ELOG_MESSAGE, "Step 1"),
(2, utils.SplitTime(1273491615.9),
constants.ELOG_MESSAGE, "Step 2"),
(3, utils.SplitTime(1273491625.02),
constants.ELOG_MESSAGE, "Step 3"),
(4, utils.SplitTime(1273491635.05),
constants.ELOG_MESSAGE, "Step 4"),
(37, utils.SplitTime(1273491645.0),
constants.ELOG_MESSAGE, "Step 5"),
(203, utils.SplitTime(127349155.0),
constants.ELOG_MESSAGE, "Step 6")]))
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203,
((constants.JOB_STATUS_RUNNING, ),
[(300, utils.SplitTime(1273491711.01),
constants.ELOG_MESSAGE, "Step X"),
(302, utils.SplitTime(1273491815.8),
constants.ELOG_MESSAGE, "Step Y"),
(303, utils.SplitTime(1273491925.32),
constants.ELOG_MESSAGE, "Step Z")]))
cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303,
((constants.JOB_STATUS_SUCCESS, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS],
["Hello World", "Foo man bar"])
self.assertEqual(["Hello World", "Foo man bar"],
cli.GenericPollJob(job_id, cbs, cbs))
cbs.CheckEmpty()
def testJobLost(self):
job_id = 13746
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, None)
self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testError(self):
job_id = 31088
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_ERROR],
["Hello World", "Error code 123"])
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testError2(self):
job_id = 22235
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
encexc = errors.EncodeException(errors.LockError("problem"))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_ERROR], [encexc])
self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testWeirdError(self):
job_id = 28847
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
[constants.OP_STATUS_RUNNING,
constants.OP_STATUS_RUNNING],
[None, None])
self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testCancel(self):
job_id = 4275
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING,
[constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELING],
[None, None])
self.assertRaises(errors.JobCanceled, cli.GenericPollJob, job_id, cbs, cbs)
cbs.CheckEmpty()
def testNegativeUpdateFreqParameter(self):
job_id = 12345
cbs = _MockJobPollCb(self, job_id)
self.assertRaises(errors.ParameterError, cli.GenericPollJob, job_id, cbs,
cbs, update_freq=-30)
def testZeroUpdateFreqParameter(self):
job_id = 12345
cbs = _MockJobPollCb(self, job_id)
self.assertRaises(errors.ParameterError, cli.GenericPollJob, job_id, cbs,
cbs, update_freq=0)
def testShouldCancel(self):
job_id = 12345
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
self.assertRaises(errors.JobCanceled, cli.GenericPollJob, job_id, cbs, cbs,
cancel_fn=(lambda: True))
def testIgnoreCancel(self):
job_id = 12345
cbs = _MockJobPollCb(self, job_id)
cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_SUCCESS, ), None))
cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS],
["Hello World", "Foo man bar"])
self.assertEqual(["Hello World", "Foo man bar"],
cli.GenericPollJob(
job_id, cbs, cbs, cancel_fn=(lambda: False)))
cbs.CheckEmpty()
class TestFormatLogMessage(unittest.TestCase):
def test(self):
self.assertEqual(cli.FormatLogMessage(constants.ELOG_MESSAGE,
"Hello World"),
"Hello World")
self.assertRaises(TypeError, cli.FormatLogMessage,
constants.ELOG_MESSAGE, [1, 2, 3])
self.assert_(cli.FormatLogMessage("some other type", (1, 2, 3)))
class TestParseFields(unittest.TestCase):
def test(self):
self.assertEqual(cli.ParseFields(None, []), [])
self.assertEqual(cli.ParseFields("name,foo,hello", []),
["name", "foo", "hello"])
self.assertEqual(cli.ParseFields(None, ["def", "ault", "fields", "here"]),
["def", "ault", "fields", "here"])
self.assertEqual(cli.ParseFields("name,foo", ["def", "ault"]),
["name", "foo"])
self.assertEqual(cli.ParseFields("+name,foo", ["def", "ault"]),
["def", "ault", "name", "foo"])
class TestParseNicOption(unittest.TestCase):
def test(self):
self.assertEqual(cli.ParseNicOption([("0", { "link": "eth0", })]),
[{ "link": "eth0", }])
self.assertEqual(cli.ParseNicOption([("5", { "ip": "192.0.2.7", })]),
[{}, {}, {}, {}, {}, { "ip": "192.0.2.7", }])
def testErrors(self):
for i in [None, "", "abc", "zero", "Hello World", "\0", []]:
self.assertRaises(errors.OpPrereqError, cli.ParseNicOption,
[(i, { "link": "eth0", })])
self.assertRaises(errors.OpPrereqError, cli.ParseNicOption,
[("0", i)])
self.assertRaises(errors.TypeEnforcementError, cli.ParseNicOption,
[(0, { True: False, })])
self.assertRaises(errors.TypeEnforcementError, cli.ParseNicOption,
[(3, { "mode": [], })])
class TestFormatResultError(unittest.TestCase):
def testNormal(self):
for verbose in [False, True]:
self.assertRaises(AssertionError, cli.FormatResultError,
constants.RS_NORMAL, verbose)
def testUnknown(self):
for verbose in [False, True]:
self.assertRaises(NotImplementedError, cli.FormatResultError,
"#some!other!status#", verbose)
def test(self):
for status in constants.RS_ALL:
if status == constants.RS_NORMAL:
continue
self.assertNotEqual(cli.FormatResultError(status, False),
cli.FormatResultError(status, True))
result = cli.FormatResultError(status, True)
self.assertTrue(result.startswith("("))
self.assertTrue(result.endswith(")"))
class TestGetOnlineNodes(unittest.TestCase):
class _FakeClient:
def __init__(self):
self._query = []
def AddQueryResult(self, *args):
self._query.append(args)
def CountPending(self):
return len(self._query)
def Query(self, res, fields, qfilter):
if res != constants.QR_NODE:
raise Exception("Querying wrong resource")
(exp_fields, check_filter, result) = self._query.pop(0)
if exp_fields != fields:
raise Exception("Expected fields %s, got %s" % (exp_fields, fields))
if not (qfilter is None or check_filter(qfilter)):
raise Exception("Filter doesn't match expectations")
return objects.QueryResponse(fields=None, data=result)
def testEmpty(self):
cl = self._FakeClient()
cl.AddQueryResult(["name", "offline", "sip"], None, [])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl), [])
self.assertEqual(cl.CountPending(), 0)
def testNoSpecialFilter(self):
cl = self._FakeClient()
cl.AddQueryResult(["name", "offline", "sip"], None, [
[(constants.RS_NORMAL, "master.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.1")],
[(constants.RS_NORMAL, "node2.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.2")],
])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl),
["master.example.com", "node2.example.com"])
self.assertEqual(cl.CountPending(), 0)
def testNoMaster(self):
cl = self._FakeClient()
def _CheckFilter(qfilter):
self.assertEqual(qfilter, [qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
return True
cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
[(constants.RS_NORMAL, "node2.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.2")],
])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl, filter_master=True),
["node2.example.com"])
self.assertEqual(cl.CountPending(), 0)
def testSecondaryIpAddress(self):
cl = self._FakeClient()
cl.AddQueryResult(["name", "offline", "sip"], None, [
[(constants.RS_NORMAL, "master.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.1")],
[(constants.RS_NORMAL, "node2.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.2")],
])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl, secondary_ips=True),
["192.0.2.1", "192.0.2.2"])
self.assertEqual(cl.CountPending(), 0)
def testNoMasterFilterNodeName(self):
cl = self._FakeClient()
def _CheckFilter(qfilter):
self.assertEqual(qfilter,
[qlang.OP_AND,
[qlang.OP_OR] + [[qlang.OP_EQUAL, "name", name]
for name in ["node2", "node3"]],
[qlang.OP_NOT, [qlang.OP_TRUE, "master"]]])
return True
cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
[(constants.RS_NORMAL, "node2.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.12")],
[(constants.RS_NORMAL, "node3.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.13")],
])
self.assertEqual(cli.GetOnlineNodes(["node2", "node3"], cl=cl,
secondary_ips=True, filter_master=True),
["192.0.2.12", "192.0.2.13"])
self.assertEqual(cl.CountPending(), 0)
def testOfflineNodes(self):
cl = self._FakeClient()
cl.AddQueryResult(["name", "offline", "sip"], None, [
[(constants.RS_NORMAL, "master.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.1")],
[(constants.RS_NORMAL, "node2.example.com"),
(constants.RS_NORMAL, True),
(constants.RS_NORMAL, "192.0.2.2")],
[(constants.RS_NORMAL, "node3.example.com"),
(constants.RS_NORMAL, True),
(constants.RS_NORMAL, "192.0.2.3")],
])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nowarn=True),
["master.example.com"])
self.assertEqual(cl.CountPending(), 0)
def testNodeGroup(self):
cl = self._FakeClient()
def _CheckFilter(qfilter):
self.assertEqual(qfilter,
[qlang.OP_OR, [qlang.OP_EQUAL, "group", "foobar"],
[qlang.OP_EQUAL, "group.uuid", "foobar"]])
return True
cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
[(constants.RS_NORMAL, "master.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.1")],
[(constants.RS_NORMAL, "node3.example.com"),
(constants.RS_NORMAL, False),
(constants.RS_NORMAL, "192.0.2.3")],
])
self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nodegroup="foobar"),
["master.example.com", "node3.example.com"])
self.assertEqual(cl.CountPending(), 0)
class TestFormatTimestamp(unittest.TestCase):
def testGood(self):
self.assertEqual(cli.FormatTimestamp((0, 1)),
time.strftime("%F %T", time.localtime(0)) + ".000001")
self.assertEqual(cli.FormatTimestamp((1332944009, 17376)),
(time.strftime("%F %T", time.localtime(1332944009)) +
".017376"))
def testWrong(self):
for i in [0, [], {}, "", [1]]:
self.assertEqual(cli.FormatTimestamp(i), "?")
class TestFormatUsage(unittest.TestCase):
def test(self):
binary = "gnt-unittest"
commands = {
"cmdA":
(NotImplemented, NotImplemented, NotImplemented, NotImplemented,
"description of A"),
"bbb":
(NotImplemented, NotImplemented, NotImplemented, NotImplemented,
"Hello World," * 10),
"longname":
(NotImplemented, NotImplemented, NotImplemented, NotImplemented,
"Another description"),
}
self.assertEqual(list(cli._FormatUsage(binary, commands)), [
"Usage: gnt-unittest {command} [options...] [argument...]",
"gnt-unittest <command> --help to see details, or man gnt-unittest",
"",
"Commands:",
(" bbb - Hello World,Hello World,Hello World,Hello World,Hello"
" World,Hello"),
" World,Hello World,Hello World,Hello World,Hello World,",
" cmdA - description of A",
" longname - Another description",
"",
])
class TestParseArgs(unittest.TestCase):
def testNoArguments(self):
for argv in [[], ["gnt-unittest"]]:
try:
cli._ParseArgs("gnt-unittest", argv, {}, {}, set())
except cli._ShowUsage, err:
self.assertTrue(err.exit_error)
else:
self.fail("Did not raise exception")
def testVersion(self):
for argv in [["test", "--version"], ["test", "--version", "somethingelse"]]:
try:
cli._ParseArgs("test", argv, {}, {}, set())
except cli._ShowVersion:
pass
else:
self.fail("Did not raise exception")
def testHelp(self):
for argv in [["test", "--help"], ["test", "--help", "somethingelse"]]:
try:
cli._ParseArgs("test", argv, {}, {}, set())
except cli._ShowUsage, err:
self.assertFalse(err.exit_error)
else:
self.fail("Did not raise exception")
def testUnknownCommandOrAlias(self):
for argv in [["test", "list"], ["test", "somethingelse", "--help"]]:
try:
cli._ParseArgs("test", argv, {}, {}, set())
except cli._ShowUsage, err:
self.assertTrue(err.exit_error)
else:
self.fail("Did not raise exception")
def testInvalidAliasList(self):
cmd = {
"list": NotImplemented,
"foo": NotImplemented,
}
aliases = {
"list": NotImplemented,
"foo": NotImplemented,
}
assert sorted(cmd.keys()) == sorted(aliases.keys())
self.assertRaises(AssertionError, cli._ParseArgs, "test",
["test", "list"], cmd, aliases, set())
def testAliasForNonExistantCommand(self):
cmd = {}
aliases = {
"list": NotImplemented,
}
self.assertRaises(errors.ProgrammerError, cli._ParseArgs, "test",
["test", "list"], cmd, aliases, set())
class TestQftNames(unittest.TestCase):
def testComplete(self):
self.assertEqual(frozenset(cli._QFT_NAMES), constants.QFT_ALL)
def testUnique(self):
lcnames = [s.lower() for s in cli._QFT_NAMES.values()]
self.assertFalse(utils.FindDuplicates(lcnames))
def testUppercase(self):
for name in cli._QFT_NAMES.values():
self.assertEqual(name[0], name[0].upper())
class TestFieldDescValues(unittest.TestCase):
def testKnownKind(self):
fdef = objects.QueryFieldDefinition(name="aname",
title="Atitle",
kind=constants.QFT_TEXT,
doc="aaa doc aaa")
self.assertEqual(cli._FieldDescValues(fdef),
["aname", "Text", "Atitle", "aaa doc aaa"])
def testUnknownKind(self):
kind = "#foo#"
self.assertFalse(kind in constants.QFT_ALL)
self.assertFalse(kind in cli._QFT_NAMES)
fdef = objects.QueryFieldDefinition(name="zname", title="Ztitle",
kind=kind, doc="zzz doc zzz")
self.assertEqual(cli._FieldDescValues(fdef),
["zname", kind, "Ztitle", "zzz doc zzz"])
class TestSerializeGenericInfo(unittest.TestCase):
"""Test case for cli._SerializeGenericInfo"""
def _RunTest(self, data, expected):
buf = StringIO()
cli._SerializeGenericInfo(buf, data, 0)
self.assertEqual(buf.getvalue(), expected)
def testSimple(self):
test_samples = [
("abc", "abc\n"),
([], "\n"),
({}, "\n"),
(["1", "2", "3"], "- 1\n- 2\n- 3\n"),
([("z", "26")], "z: 26\n"),
({"z": "26"}, "z: 26\n"),
([("z", "26"), ("a", "1")], "z: 26\na: 1\n"),
({"z": "26", "a": "1"}, "a: 1\nz: 26\n"),
]
for (data, expected) in test_samples:
self._RunTest(data, expected)
def testLists(self):
adict = {
"aa": "11",
"bb": "22",
"cc": "33",
}
adict_exp = ("- aa: 11\n"
" bb: 22\n"
" cc: 33\n")
anobj = [
("zz", "11"),
("ww", "33"),
("xx", "22"),
]
anobj_exp = ("- zz: 11\n"
" ww: 33\n"
" xx: 22\n")
alist = ["aa", "cc", "bb"]
alist_exp = ("- - aa\n"
" - cc\n"
" - bb\n")
test_samples = [
(adict, adict_exp),
(anobj, anobj_exp),
(alist, alist_exp),
]
for (base_data, base_expected) in test_samples:
for k in range(1, 4):
data = k * [base_data]
expected = k * base_expected
self._RunTest(data, expected)
def testDictionaries(self):
data = [
("aaa", ["x", "y"]),
("bbb", {
"w": "1",
"z": "2",
}),
("ccc", [
("xyz", "123"),
("efg", "456"),
]),
]
expected = ("aaa: \n"
" - x\n"
" - y\n"
"bbb: \n"
" w: 1\n"
" z: 2\n"
"ccc: \n"
" xyz: 123\n"
" efg: 456\n")
self._RunTest(data, expected)
self._RunTest(dict(data), expected)
class TestFormatPolicyInfo(unittest.TestCase):
"""Test case for cli.FormatPolicyInfo.
These tests rely on cli._SerializeGenericInfo (tested elsewhere).
"""
def setUp(self):
# Policies are big, and we want to see the difference in case of an error
self.maxDiff = None
def _RenameDictItem(self, parsed, old, new):
self.assertTrue(old in parsed)
self.assertTrue(new not in parsed)
parsed[new] = parsed[old]
del parsed[old]
def _TranslateParsedNames(self, parsed):
for (pretty, raw) in [
("bounds specs", constants.ISPECS_MINMAX),
("allowed disk templates", constants.IPOLICY_DTS)
]:
self._RenameDictItem(parsed, pretty, raw)
for minmax in parsed[constants.ISPECS_MINMAX]:
for key in minmax:
keyparts = key.split("/", 1)
if len(keyparts) > 1:
self._RenameDictItem(minmax, key, keyparts[0])
self.assertTrue(constants.IPOLICY_DTS in parsed)
parsed[constants.IPOLICY_DTS] = yaml.load("[%s]" %
parsed[constants.IPOLICY_DTS])
@staticmethod
def _PrintAndParsePolicy(custom, effective, iscluster):
formatted = cli.FormatPolicyInfo(custom, effective, iscluster)
buf = StringIO()
cli._SerializeGenericInfo(buf, formatted, 0)
return yaml.load(buf.getvalue())
def _PrintAndCheckParsed(self, policy):
parsed = self._PrintAndParsePolicy(policy, NotImplemented, True)
self._TranslateParsedNames(parsed)
self.assertEqual(parsed, policy)
def _CompareClusterGroupItems(self, cluster, group, skip=None):
if isinstance(group, dict):
self.assertTrue(isinstance(cluster, dict))
if skip is None:
skip = frozenset()
self.assertEqual(frozenset(cluster.keys()).difference(skip),
frozenset(group.keys()))
for key in group:
self._CompareClusterGroupItems(cluster[key], group[key])
elif isinstance(group, list):
self.assertTrue(isinstance(cluster, list))
self.assertEqual(len(cluster), len(group))
for (cval, gval) in zip(cluster, group):
self._CompareClusterGroupItems(cval, gval)
else:
self.assertTrue(isinstance(group, basestring))
self.assertEqual("default (%s)" % cluster, group)
def _TestClusterVsGroup(self, policy):
cluster = self._PrintAndParsePolicy(policy, NotImplemented, True)
group = self._PrintAndParsePolicy({}, policy, False)
self._CompareClusterGroupItems(cluster, group, ["std"])
def testWithDefaults(self):
self._PrintAndCheckParsed(constants.IPOLICY_DEFAULTS)
self._TestClusterVsGroup(constants.IPOLICY_DEFAULTS)
class TestCreateIPolicyFromOpts(unittest.TestCase):
"""Test case for cli.CreateIPolicyFromOpts."""
def setUp(self):
# Policies are big, and we want to see the difference in case of an error
self.maxDiff = None
def _RecursiveCheckMergedDicts(self, default_pol, diff_pol, merged_pol,
merge_minmax=False):
self.assertTrue(type(default_pol) is dict)
self.assertTrue(type(diff_pol) is dict)
self.assertTrue(type(merged_pol) is dict)
self.assertEqual(frozenset(default_pol.keys()),
frozenset(merged_pol.keys()))
for (key, val) in merged_pol.items():
if key in diff_pol:
if type(val) is dict:
self._RecursiveCheckMergedDicts(default_pol[key], diff_pol[key], val)
elif (merge_minmax and key == "minmax" and type(val) is list and
len(val) == 1):
self.assertEqual(len(default_pol[key]), 1)
self.assertEqual(len(diff_pol[key]), 1)
self._RecursiveCheckMergedDicts(default_pol[key][0],
diff_pol[key][0], val[0])
else:
self.assertEqual(val, diff_pol[key])
else:
self.assertEqual(val, default_pol[key])
def testClusterPolicy(self):
pol0 = cli.CreateIPolicyFromOpts(
ispecs_mem_size={},
ispecs_cpu_count={},
ispecs_disk_count={},
ispecs_disk_size={},
ispecs_nic_count={},
ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=None,
fill_all=True
)
self.assertEqual(pol0, constants.IPOLICY_DEFAULTS)
exp_pol1 = {
constants.ISPECS_MINMAX: [
{
constants.ISPECS_MIN: {
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_DISK_COUNT: 1,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 12*1024,
constants.ISPEC_DISK_COUNT: 2,
},
},
],
constants.ISPECS_STD: {
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_DISK_COUNT: 2,
},
constants.IPOLICY_VCPU_RATIO: 3.1,
}
pol1 = cli.CreateIPolicyFromOpts(
ispecs_mem_size={"max": "12g"},
ispecs_cpu_count={"min": 2, "std": 2},
ispecs_disk_count={"min": 1, "max": 2, "std": 2},
ispecs_disk_size={},
ispecs_nic_count={},
ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=3.1,
ipolicy_spindle_ratio=None,
fill_all=True
)
self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
exp_pol1, pol1, merge_minmax=True)
exp_pol2 = {
constants.ISPECS_MINMAX: [
{
constants.ISPECS_MIN: {
constants.ISPEC_DISK_SIZE: 512,
constants.ISPEC_NIC_COUNT: 2,
},
constants.ISPECS_MAX: {
constants.ISPEC_NIC_COUNT: 3,
},
},
],
constants.ISPECS_STD: {
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_NIC_COUNT: 3,
},
constants.IPOLICY_SPINDLE_RATIO: 1.3,
constants.IPOLICY_DTS: ["templates"],
}
pol2 = cli.CreateIPolicyFromOpts(
ispecs_mem_size={},
ispecs_cpu_count={"std": 2},
ispecs_disk_count={},
ispecs_disk_size={"min": "0.5g"},
ispecs_nic_count={"min": 2, "max": 3, "std": 3},
ipolicy_disk_templates=["templates"],
ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=1.3,
fill_all=True
)
self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
exp_pol2, pol2, merge_minmax=True)
for fill_all in [False, True]:
exp_pol3 = {
constants.ISPECS_STD: {
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_NIC_COUNT: 3,
},
}
pol3 = cli.CreateIPolicyFromOpts(
std_ispecs={
constants.ISPEC_CPU_COUNT: "2",
constants.ISPEC_NIC_COUNT: "3",
},
ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=None,
fill_all=fill_all
)
if fill_all:
self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
exp_pol3, pol3, merge_minmax=True)
else:
self.assertEqual(pol3, exp_pol3)
def testPartialPolicy(self):
exp_pol0 = objects.MakeEmptyIPolicy()
pol0 = cli.CreateIPolicyFromOpts(
minmax_ispecs=None,
std_ispecs=None,
ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=None,
fill_all=False
)
self.assertEqual(pol0, exp_pol0)
exp_pol1 = {
constants.IPOLICY_VCPU_RATIO: 3.1,
}
pol1 = cli.CreateIPolicyFromOpts(
minmax_ispecs=None,
std_ispecs=None,
ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=3.1,
ipolicy_spindle_ratio=None,
fill_all=False
)
self.assertEqual(pol1, exp_pol1)
exp_pol2 = {
constants.IPOLICY_SPINDLE_RATIO: 1.3,
constants.IPOLICY_DTS: ["templates"],
}
pol2 = cli.CreateIPolicyFromOpts(
minmax_ispecs=None,
std_ispecs=None,
ipolicy_disk_templates=["templates"],
ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=1.3,
fill_all=False
)
self.assertEqual(pol2, exp_pol2)
def _TestInvalidISpecs(self, minmax_ispecs, std_ispecs, fail=True):
for fill_all in [False, True]:
if fail:
self.assertRaises((errors.OpPrereqError,
errors.UnitParseError,
errors.TypeEnforcementError),
cli.CreateIPolicyFromOpts,
minmax_ispecs=minmax_ispecs,
std_ispecs=std_ispecs,
fill_all=fill_all)
else:
cli.CreateIPolicyFromOpts(minmax_ispecs=minmax_ispecs,
std_ispecs=std_ispecs,
fill_all=fill_all)
def testInvalidPolicies(self):
self.assertRaises(AssertionError, cli.CreateIPolicyFromOpts,
std_ispecs={constants.ISPEC_MEM_SIZE: 1024},
ipolicy_disk_templates=None, ipolicy_vcpu_ratio=None,
ipolicy_spindle_ratio=None, group_ipolicy=True)
self.assertRaises(errors.OpPrereqError, cli.CreateIPolicyFromOpts,
ispecs_mem_size={"wrong": "x"}, ispecs_cpu_count={},
ispecs_disk_count={}, ispecs_disk_size={},
ispecs_nic_count={}, ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=None, ipolicy_spindle_ratio=None,
fill_all=True)
self.assertRaises(errors.TypeEnforcementError, cli.CreateIPolicyFromOpts,
ispecs_mem_size={}, ispecs_cpu_count={"min": "default"},
ispecs_disk_count={}, ispecs_disk_size={},
ispecs_nic_count={}, ipolicy_disk_templates=None,
ipolicy_vcpu_ratio=None, ipolicy_spindle_ratio=None,
fill_all=True)
good_mmspecs = [
constants.ISPECS_MINMAX_DEFAULTS,
constants.ISPECS_MINMAX_DEFAULTS,
]
self._TestInvalidISpecs(good_mmspecs, None, fail=False)
broken_mmspecs = copy.deepcopy(good_mmspecs)
for minmaxpair in broken_mmspecs:
for key in constants.ISPECS_MINMAX_KEYS:
for par in constants.ISPECS_PARAMETERS:
old = minmaxpair[key][par]
del minmaxpair[key][par]
self._TestInvalidISpecs(broken_mmspecs, None)
minmaxpair[key][par] = "invalid"
self._TestInvalidISpecs(broken_mmspecs, None)
minmaxpair[key][par] = old
minmaxpair[key]["invalid_key"] = None
self._TestInvalidISpecs(broken_mmspecs, None)
del minmaxpair[key]["invalid_key"]
minmaxpair["invalid_key"] = None
self._TestInvalidISpecs(broken_mmspecs, None)
del minmaxpair["invalid_key"]
assert broken_mmspecs == good_mmspecs
good_stdspecs = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
self._TestInvalidISpecs(None, good_stdspecs, fail=False)
broken_stdspecs = copy.deepcopy(good_stdspecs)
for par in constants.ISPECS_PARAMETERS:
old = broken_stdspecs[par]
broken_stdspecs[par] = "invalid"
self._TestInvalidISpecs(None, broken_stdspecs)
broken_stdspecs[par] = old
broken_stdspecs["invalid_key"] = None
self._TestInvalidISpecs(None, broken_stdspecs)
del broken_stdspecs["invalid_key"]
assert broken_stdspecs == good_stdspecs
def testAllowedValues(self):
allowedv = "blah"
exp_pol1 = {
constants.ISPECS_MINMAX: allowedv,
constants.IPOLICY_DTS: allowedv,
constants.IPOLICY_VCPU_RATIO: allowedv,
constants.IPOLICY_SPINDLE_RATIO: allowedv,
}
pol1 = cli.CreateIPolicyFromOpts(minmax_ispecs=[{allowedv: {}}],
std_ispecs=None,
ipolicy_disk_templates=allowedv,
ipolicy_vcpu_ratio=allowedv,
ipolicy_spindle_ratio=allowedv,
allowed_values=[allowedv])
self.assertEqual(pol1, exp_pol1)
@staticmethod
def _ConvertSpecToStrings(spec):
ret = {}
for (par, val) in spec.items():
ret[par] = str(val)
return ret
def _CheckNewStyleSpecsCall(self, exp_ipolicy, minmax_ispecs, std_ispecs,
group_ipolicy, fill_all):
ipolicy = cli.CreateIPolicyFromOpts(minmax_ispecs=minmax_ispecs,
std_ispecs=std_ispecs,
group_ipolicy=group_ipolicy,
fill_all=fill_all)
self.assertEqual(ipolicy, exp_ipolicy)
def _TestFullISpecsInner(self, skel_exp_ipol, exp_minmax, exp_std,
group_ipolicy, fill_all):
exp_ipol = skel_exp_ipol.copy()
if exp_minmax is not None:
minmax_ispecs = []
for exp_mm_pair in exp_minmax:
mmpair = {}
for (key, spec) in exp_mm_pair.items():
mmpair[key] = self._ConvertSpecToStrings(spec)
minmax_ispecs.append(mmpair)
exp_ipol[constants.ISPECS_MINMAX] = exp_minmax
else:
minmax_ispecs = None
if exp_std is not None:
std_ispecs = self._ConvertSpecToStrings(exp_std)
exp_ipol[constants.ISPECS_STD] = exp_std
else:
std_ispecs = None
self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
group_ipolicy, fill_all)
if minmax_ispecs:
for mmpair in minmax_ispecs:
for (key, spec) in mmpair.items():
for par in [constants.ISPEC_MEM_SIZE, constants.ISPEC_DISK_SIZE]:
if par in spec:
spec[par] += "m"
self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
group_ipolicy, fill_all)
if std_ispecs:
for par in [constants.ISPEC_MEM_SIZE, constants.ISPEC_DISK_SIZE]:
if par in std_ispecs:
std_ispecs[par] += "m"
self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
group_ipolicy, fill_all)
def testFullISpecs(self):
exp_minmax1 = [
{
constants.ISPECS_MIN: {
constants.ISPEC_MEM_SIZE: 512,
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_DISK_COUNT: 2,
constants.ISPEC_DISK_SIZE: 512,
constants.ISPEC_NIC_COUNT: 2,
constants.ISPEC_SPINDLE_USE: 2,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 768*1024,
constants.ISPEC_CPU_COUNT: 7,
constants.ISPEC_DISK_COUNT: 6,
constants.ISPEC_DISK_SIZE: 2048*1024,
constants.ISPEC_NIC_COUNT: 3,
constants.ISPEC_SPINDLE_USE: 3,
},
},
]
exp_minmax2 = [
{
constants.ISPECS_MIN: {
constants.ISPEC_MEM_SIZE: 512,
constants.ISPEC_CPU_COUNT: 2,
constants.ISPEC_DISK_COUNT: 2,
constants.ISPEC_DISK_SIZE: 512,
constants.ISPEC_NIC_COUNT: 2,
constants.ISPEC_SPINDLE_USE: 2,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 768*1024,
constants.ISPEC_CPU_COUNT: 7,
constants.ISPEC_DISK_COUNT: 6,
constants.ISPEC_DISK_SIZE: 2048*1024,
constants.ISPEC_NIC_COUNT: 3,
constants.ISPEC_SPINDLE_USE: 3,
},
},
{
constants.ISPECS_MIN: {
constants.ISPEC_MEM_SIZE: 1024*1024,
constants.ISPEC_CPU_COUNT: 3,
constants.ISPEC_DISK_COUNT: 3,
constants.ISPEC_DISK_SIZE: 256,
constants.ISPEC_NIC_COUNT: 4,
constants.ISPEC_SPINDLE_USE: 5,
},
constants.ISPECS_MAX: {
constants.ISPEC_MEM_SIZE: 2048*1024,
constants.ISPEC_CPU_COUNT: 5,
constants.ISPEC_DISK_COUNT: 5,
constants.ISPEC_DISK_SIZE: 1024*1024,
constants.ISPEC_NIC_COUNT: 5,
constants.ISPEC_SPINDLE_USE: 7,
},
},
]
exp_std1 = {
constants.ISPEC_MEM_SIZE: 768*1024,
constants.ISPEC_CPU_COUNT: 7,
constants.ISPEC_DISK_COUNT: 6,
constants.ISPEC_DISK_SIZE: 2048*1024,
constants.ISPEC_NIC_COUNT: 3,
constants.ISPEC_SPINDLE_USE: 1,
}
for fill_all in [False, True]:
if fill_all:
skel_ipolicy = constants.IPOLICY_DEFAULTS
else:
skel_ipolicy = {}
self._TestFullISpecsInner(skel_ipolicy, None, exp_std1,
False, fill_all)
for exp_minmax in [exp_minmax1, exp_minmax2]:
self._TestFullISpecsInner(skel_ipolicy, exp_minmax, exp_std1,
False, fill_all)
self._TestFullISpecsInner(skel_ipolicy, exp_minmax, None,
False, fill_all)
class TestPrintIPolicyCommand(unittest.TestCase):
"""Test case for cli.PrintIPolicyCommand"""
_SPECS1 = {
"par1": 42,
"par2": "xyz",
}
_SPECS1_STR = "par1=42,par2=xyz"
_SPECS2 = {
"param": 10,
"another_param": 101,
}
_SPECS2_STR = "another_param=101,param=10"
_SPECS3 = {
"par1": 1024,
"param": "abc",
}
_SPECS3_STR = "par1=1024,param=abc"
def _CheckPrintIPolicyCommand(self, ipolicy, isgroup, expected):
buf = StringIO()
cli.PrintIPolicyCommand(buf, ipolicy, isgroup)
self.assertEqual(buf.getvalue(), expected)
def testIgnoreStdForGroup(self):
self._CheckPrintIPolicyCommand({"std": self._SPECS1}, True, "")
def testIgnoreEmpty(self):
policies = [
{},
{"std": {}},
{"minmax": []},
{"minmax": [{}]},
{"minmax": [{
"min": {},
"max": {},
}]},
{"minmax": [{
"min": self._SPECS1,
"max": {},
}]},
]
for pol in policies:
self._CheckPrintIPolicyCommand(pol, False, "")
def testFullPolicies(self):
cases = [
({"std": self._SPECS1},
" %s %s" % (cli.IPOLICY_STD_SPECS_STR, self._SPECS1_STR)),
({"minmax": [{
"min": self._SPECS1,
"max": self._SPECS2,
}]},
" %s min:%s/max:%s" % (cli.IPOLICY_BOUNDS_SPECS_STR,
self._SPECS1_STR, self._SPECS2_STR)),
({"minmax": [
{
"min": self._SPECS1,
"max": self._SPECS2,
},
{
"min": self._SPECS2,
"max": self._SPECS3,
},
]},
" %s min:%s/max:%s//min:%s/max:%s" %
(cli.IPOLICY_BOUNDS_SPECS_STR, self._SPECS1_STR, self._SPECS2_STR,
self._SPECS2_STR, self._SPECS3_STR)),
]
for (pol, exp) in cases:
self._CheckPrintIPolicyCommand(pol, False, exp)
if __name__ == "__main__":
testutils.GanetiTestProgram()