blob: 54f199c5465568746ce2f098fe3b73433a22bd81 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2011, 2012, 2013 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Script for testing ganeti.rpc"""
import os
import sys
import unittest
import random
import tempfile
from ganeti import constants
from ganeti import compat
from ganeti.rpc import node as rpc
from ganeti import rpc_defs
from ganeti import http
from ganeti import errors
from ganeti import serializer
from ganeti import objects
from ganeti import backend
import testutils
import mocks
class _FakeRequestProcessor:
def __init__(self, response_fn):
self._response_fn = response_fn
self.reqcount = 0
def __call__(self, reqs, lock_monitor_cb=None):
assert lock_monitor_cb is None or callable(lock_monitor_cb)
for req in reqs:
self.reqcount += 1
self._response_fn(req)
def GetFakeSimpleStoreClass(fn):
class FakeSimpleStore:
GetNodePrimaryIPList = fn
GetPrimaryIPFamily = lambda _: None
return FakeSimpleStore
def _RaiseNotImplemented():
"""Simple wrapper to raise NotImplementedError.
"""
raise NotImplementedError
class TestRpcProcessor(unittest.TestCase):
def _FakeAddressLookup(self, map):
return lambda node_list: [map.get(node) for node in node_list]
def _GetVersionResponse(self, req):
self.assertEqual(req.host, "127.0.0.1")
self.assertEqual(req.port, 24094)
self.assertEqual(req.path, "/version")
self.assertEqual(req.read_timeout, constants.RPC_TMO_URGENT)
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, 123))
def testVersionSuccess(self):
resolver = rpc._StaticResolver(["127.0.0.1"])
http_proc = _FakeRequestProcessor(self._GetVersionResponse)
proc = rpc._RpcProcessor(resolver, 24094)
result = proc(["localhost"], "version", {"localhost": ""}, 60,
NotImplemented, _req_process_fn=http_proc)
self.assertEqual(result.keys(), ["localhost"])
lhresp = result["localhost"]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, "localhost")
self.assertFalse(lhresp.fail_msg)
self.assertEqual(lhresp.payload, 123)
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
self.assertEqual(http_proc.reqcount, 1)
def _ReadTimeoutResponse(self, req):
self.assertEqual(req.host, "192.0.2.13")
self.assertEqual(req.port, 19176)
self.assertEqual(req.path, "/version")
self.assertEqual(req.read_timeout, 12356)
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, -1))
def testReadTimeout(self):
resolver = rpc._StaticResolver(["192.0.2.13"])
http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
proc = rpc._RpcProcessor(resolver, 19176)
host = "node31856"
body = {host: ""}
result = proc([host], "version", body, 12356, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(result.keys(), [host])
lhresp = result[host]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, host)
self.assertFalse(lhresp.fail_msg)
self.assertEqual(lhresp.payload, -1)
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
self.assertEqual(http_proc.reqcount, 1)
def testOfflineNode(self):
resolver = rpc._StaticResolver([rpc._OFFLINE])
http_proc = _FakeRequestProcessor(NotImplemented)
proc = rpc._RpcProcessor(resolver, 30668)
host = "n17296"
body = {host: ""}
result = proc([host], "version", body, 60, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(result.keys(), [host])
lhresp = result[host]
self.assertTrue(lhresp.offline)
self.assertEqual(lhresp.node, host)
self.assertTrue(lhresp.fail_msg)
self.assertFalse(lhresp.payload)
self.assertEqual(lhresp.call, "version")
# With a message
self.assertRaises(errors.OpExecError, lhresp.Raise, "should raise")
# No message
self.assertRaises(errors.OpExecError, lhresp.Raise, None)
self.assertEqual(http_proc.reqcount, 0)
def _GetMultiVersionResponse(self, req):
self.assert_(req.host.startswith("node"))
self.assertEqual(req.port, 23245)
self.assertEqual(req.path, "/version")
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, 987))
def testMultiVersionSuccess(self):
nodes = ["node%s" % i for i in range(50)]
body = dict((n, "") for n in nodes)
resolver = rpc._StaticResolver(nodes)
http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
proc = rpc._RpcProcessor(resolver, 23245)
result = proc(nodes, "version", body, 60, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(sorted(result.keys()), sorted(nodes))
for name in nodes:
lhresp = result[name]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, name)
self.assertFalse(lhresp.fail_msg)
self.assertEqual(lhresp.payload, 987)
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
self.assertEqual(http_proc.reqcount, len(nodes))
def _GetVersionResponseFail(self, errinfo, req):
self.assertEqual(req.path, "/version")
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((False, errinfo))
def testVersionFailure(self):
resolver = rpc._StaticResolver(["aef9ur4i.example.com"])
proc = rpc._RpcProcessor(resolver, 5903)
for errinfo in [None, "Unknown error"]:
http_proc = \
_FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
errinfo))
host = "aef9ur4i.example.com"
body = {host: ""}
result = proc(body.keys(), "version", body, 60, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(result.keys(), [host])
lhresp = result[host]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, host)
self.assert_(lhresp.fail_msg)
self.assertFalse(lhresp.payload)
self.assertEqual(lhresp.call, "version")
self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
self.assertEqual(http_proc.reqcount, 1)
def _GetHttpErrorResponse(self, httperrnodes, failnodes, req):
self.assertEqual(req.path, "/vg_list")
self.assertEqual(req.port, 15165)
if req.host in httperrnodes:
req.success = False
req.error = "Node set up for HTTP errors"
elif req.host in failnodes:
req.success = True
req.resp_status_code = 404
req.resp_body = serializer.DumpJson({
"code": 404,
"message": "Method not found",
"explain": "Explanation goes here",
})
else:
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, hash(req.host)))
def testHttpError(self):
nodes = ["uaf6pbbv%s" % i for i in range(50)]
body = dict((n, "") for n in nodes)
resolver = rpc._StaticResolver(nodes)
httperrnodes = set(nodes[1::7])
self.assertEqual(len(httperrnodes), 7)
failnodes = set(nodes[2::3]) - httperrnodes
self.assertEqual(len(failnodes), 14)
self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29)
proc = rpc._RpcProcessor(resolver, 15165)
http_proc = \
_FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
httperrnodes, failnodes))
result = proc(nodes, "vg_list", body,
constants.RPC_TMO_URGENT, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(sorted(result.keys()), sorted(nodes))
for name in nodes:
lhresp = result[name]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, name)
self.assertEqual(lhresp.call, "vg_list")
if name in httperrnodes:
self.assert_(lhresp.fail_msg)
self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
elif name in failnodes:
self.assert_(lhresp.fail_msg)
self.assertRaises(errors.OpPrereqError, lhresp.Raise, "failed",
prereq=True, ecode=errors.ECODE_INVAL)
else:
self.assertFalse(lhresp.fail_msg)
self.assertEqual(lhresp.payload, hash(name))
lhresp.Raise("should not raise")
self.assertEqual(http_proc.reqcount, len(nodes))
def _GetInvalidResponseA(self, req):
self.assertEqual(req.path, "/version")
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson(("This", "is", "an", "invalid",
"response", "!", 1, 2, 3))
def _GetInvalidResponseB(self, req):
self.assertEqual(req.path, "/version")
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson("invalid response")
def testInvalidResponse(self):
resolver = rpc._StaticResolver(["oqo7lanhly.example.com"])
proc = rpc._RpcProcessor(resolver, 19978)
for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
http_proc = _FakeRequestProcessor(fn)
host = "oqo7lanhly.example.com"
body = {host: ""}
result = proc([host], "version", body, 60, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(result.keys(), [host])
lhresp = result[host]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, host)
self.assert_(lhresp.fail_msg)
self.assertFalse(lhresp.payload)
self.assertEqual(lhresp.call, "version")
self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
self.assertEqual(http_proc.reqcount, 1)
def _GetBodyTestResponse(self, test_data, req):
self.assertEqual(req.host, "192.0.2.84")
self.assertEqual(req.port, 18700)
self.assertEqual(req.path, "/upload_file")
self.assertEqual(serializer.LoadJson(req.post_data), test_data)
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, None))
def testResponseBody(self):
test_data = {
"Hello": "World",
"xyz": range(10),
}
resolver = rpc._StaticResolver(["192.0.2.84"])
http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
test_data))
proc = rpc._RpcProcessor(resolver, 18700)
host = "node19759"
body = {host: serializer.DumpJson(test_data)}
result = proc([host], "upload_file", body, 30, NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(result.keys(), [host])
lhresp = result[host]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.node, host)
self.assertFalse(lhresp.fail_msg)
self.assertEqual(lhresp.payload, None)
self.assertEqual(lhresp.call, "upload_file")
lhresp.Raise("should not raise")
self.assertEqual(http_proc.reqcount, 1)
class TestSsconfResolver(unittest.TestCase):
def testSsconfLookup(self):
addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
result = rpc._SsconfResolver(True, node_list, NotImplemented,
ssc=ssc, nslookup_fn=NotImplemented)
self.assertEqual(result, zip(node_list, addr_list, node_list))
def testNsLookup(self):
addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
ssc = GetFakeSimpleStoreClass(lambda _: [])
node_addr_map = dict(zip(node_list, addr_list))
nslookup_fn = lambda name, family=None: node_addr_map.get(name)
result = rpc._SsconfResolver(True, node_list, NotImplemented,
ssc=ssc, nslookup_fn=nslookup_fn)
self.assertEqual(result, zip(node_list, addr_list, node_list))
def testDisabledSsconfIp(self):
addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
ssc = GetFakeSimpleStoreClass(_RaiseNotImplemented)
node_addr_map = dict(zip(node_list, addr_list))
nslookup_fn = lambda name, family=None: node_addr_map.get(name)
result = rpc._SsconfResolver(False, node_list, NotImplemented,
ssc=ssc, nslookup_fn=nslookup_fn)
self.assertEqual(result, zip(node_list, addr_list, node_list))
def testBothLookups(self):
addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
n = len(addr_list) / 2
node_addr_list = [" ".join(t) for t in zip(node_list[n:], addr_list[n:])]
ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
node_addr_map = dict(zip(node_list[:n], addr_list[:n]))
nslookup_fn = lambda name, family=None: node_addr_map.get(name)
result = rpc._SsconfResolver(True, node_list, NotImplemented,
ssc=ssc, nslookup_fn=nslookup_fn)
self.assertEqual(result, zip(node_list, addr_list, node_list))
def testAddressLookupIPv6(self):
addr_list = ["2001:db8::%d" % n for n in range(0, 255, 11)]
node_list = ["node%d.example.com" % n for n in range(0, 255, 11)]
node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
result = rpc._SsconfResolver(True, node_list, NotImplemented,
ssc=ssc, nslookup_fn=NotImplemented)
self.assertEqual(result, zip(node_list, addr_list, node_list))
class TestStaticResolver(unittest.TestCase):
def test(self):
addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)]
nodes = ["node%s.example.com" % n for n in range(0, 123, 7)]
res = rpc._StaticResolver(addresses)
self.assertEqual(res(nodes, NotImplemented), zip(nodes, addresses, nodes))
def testWrongLength(self):
res = rpc._StaticResolver([])
self.assertRaises(AssertionError, res, ["abc"], NotImplemented)
class TestNodeConfigResolver(unittest.TestCase):
@staticmethod
def _GetSingleOnlineNode(uuid):
assert uuid == "node90-uuid"
return objects.Node(name="node90.example.com",
uuid=uuid,
offline=False,
primary_ip="192.0.2.90")
@staticmethod
def _GetSingleOfflineNode(uuid):
assert uuid == "node100-uuid"
return objects.Node(name="node100.example.com",
uuid=uuid,
offline=True,
primary_ip="192.0.2.100")
def testSingleOnline(self):
self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode,
NotImplemented,
["node90-uuid"], None),
[("node90.example.com", "192.0.2.90", "node90-uuid")])
def testSingleOffline(self):
self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode,
NotImplemented,
["node100-uuid"], None),
[("node100.example.com", rpc._OFFLINE, "node100-uuid")])
def testSingleOfflineWithAcceptOffline(self):
fn = self._GetSingleOfflineNode
assert fn("node100-uuid").offline
self.assertEqual(rpc._NodeConfigResolver(fn, NotImplemented,
["node100-uuid"],
rpc_defs.ACCEPT_OFFLINE_NODE),
[("node100.example.com", "192.0.2.100", "node100-uuid")])
for i in [False, True, "", "Hello", 0, 1]:
self.assertRaises(AssertionError, rpc._NodeConfigResolver,
fn, NotImplemented, ["node100.example.com"], i)
def testUnknownSingleNode(self):
self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented,
["node110.example.com"], None),
[("node110.example.com", "node110.example.com",
"node110.example.com")])
def testMultiEmpty(self):
self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
lambda: {},
[], None),
[])
def testMultiSomeOffline(self):
nodes = dict(("node%s-uuid" % i,
objects.Node(name="node%s.example.com" % i,
offline=((i % 3) == 0),
primary_ip="192.0.2.%s" % i,
uuid="node%s-uuid" % i))
for i in range(1, 255))
# Resolve no names
self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
lambda: nodes,
[], None),
[])
# Offline, online and unknown hosts
self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
lambda: nodes,
["node3-uuid",
"node92-uuid",
"node54-uuid",
"unknown.example.com",],
None), [
("node3.example.com", rpc._OFFLINE, "node3-uuid"),
("node92.example.com", "192.0.2.92", "node92-uuid"),
("node54.example.com", rpc._OFFLINE, "node54-uuid"),
("unknown.example.com", "unknown.example.com", "unknown.example.com"),
])
class TestCompress(unittest.TestCase):
def test(self):
for data in ["", "Hello", "Hello World!\nnew\nlines"]:
self.assertEqual(rpc._Compress(NotImplemented, data),
(constants.RPC_ENCODING_NONE, data))
for data in [512 * " ", 5242 * "Hello World!\n"]:
compressed = rpc._Compress(NotImplemented, data)
self.assertEqual(len(compressed), 2)
self.assertEqual(backend._Decompress(compressed), data)
def testDecompression(self):
self.assertRaises(AssertionError, backend._Decompress, "")
self.assertRaises(AssertionError, backend._Decompress, [""])
self.assertRaises(AssertionError, backend._Decompress,
("unknown compression", "data"))
self.assertRaises(Exception, backend._Decompress,
(constants.RPC_ENCODING_ZLIB_BASE64, "invalid zlib data"))
class TestRpcClientBase(unittest.TestCase):
def testNoHosts(self):
cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_SLOW, [],
None, None, NotImplemented)
http_proc = _FakeRequestProcessor(NotImplemented)
client = rpc._RpcClientBase(rpc._StaticResolver([]), NotImplemented,
_req_process_fn=http_proc)
self.assertEqual(client._Call(cdef, [], []), {})
# Test wrong number of arguments
self.assertRaises(errors.ProgrammerError, client._Call,
cdef, [], [0, 1, 2])
def testTimeout(self):
def _CalcTimeout((arg1, arg2)):
return arg1 + arg2
def _VerifyRequest(exp_timeout, req):
self.assertEqual(req.read_timeout, exp_timeout)
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, hex(req.read_timeout)))
resolver = rpc._StaticResolver([
"192.0.2.1",
"192.0.2.2",
])
nodes = [
"node1.example.com",
"node2.example.com",
]
tests = [(100, None, 100), (30, None, 30)]
tests.extend((_CalcTimeout, i, i + 300)
for i in [0, 5, 16485, 30516])
for timeout, arg1, exp_timeout in tests:
cdef = ("test_call", NotImplemented, None, timeout, [
("arg1", None, NotImplemented),
("arg2", None, NotImplemented),
], None, None, NotImplemented)
http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest,
exp_timeout))
client = rpc._RpcClientBase(resolver, NotImplemented,
_req_process_fn=http_proc)
result = client._Call(cdef, nodes, [arg1, 300])
self.assertEqual(len(result), len(nodes))
self.assertTrue(compat.all(not res.fail_msg and
res.payload == hex(exp_timeout)
for res in result.values()))
def testArgumentEncoder(self):
(AT1, AT2) = range(1, 3)
resolver = rpc._StaticResolver([
"192.0.2.5",
"192.0.2.6",
])
nodes = [
"node5.example.com",
"node6.example.com",
]
encoders = {
AT1: lambda _, value: hex(value),
AT2: lambda _, value: hash(value),
}
cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
("arg0", None, NotImplemented),
("arg1", AT1, NotImplemented),
("arg1", AT2, NotImplemented),
], None, None, NotImplemented)
def _VerifyRequest(req):
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, req.post_data))
http_proc = _FakeRequestProcessor(_VerifyRequest)
for num in [0, 3796, 9032119]:
client = rpc._RpcClientBase(resolver, encoders.get,
_req_process_fn=http_proc)
result = client._Call(cdef, nodes, ["foo", num, "Hello%s" % num])
self.assertEqual(len(result), len(nodes))
for res in result.values():
self.assertFalse(res.fail_msg)
self.assertEqual(serializer.LoadJson(res.payload),
["foo", hex(num), hash("Hello%s" % num)])
def testPostProc(self):
def _VerifyRequest(nums, req):
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, nums))
resolver = rpc._StaticResolver([
"192.0.2.90",
"192.0.2.95",
])
nodes = [
"node90.example.com",
"node95.example.com",
]
def _PostProc(res):
self.assertFalse(res.fail_msg)
res.payload = sum(res.payload)
return res
cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [],
None, _PostProc, NotImplemented)
# Seeded random generator
rnd = random.Random(20299)
for i in [0, 4, 74, 1391]:
nums = [rnd.randint(0, 1000) for _ in range(i)]
http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, nums))
client = rpc._RpcClientBase(resolver, NotImplemented,
_req_process_fn=http_proc)
result = client._Call(cdef, nodes, [])
self.assertEqual(len(result), len(nodes))
for res in result.values():
self.assertFalse(res.fail_msg)
self.assertEqual(res.payload, sum(nums))
def testPreProc(self):
def _VerifyRequest(req):
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, req.post_data))
resolver = rpc._StaticResolver([
"192.0.2.30",
"192.0.2.35",
])
nodes = [
"node30.example.com",
"node35.example.com",
]
def _PreProc(node, data):
self.assertEqual(len(data), 1)
return data[0] + node
cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
("arg0", None, NotImplemented),
], _PreProc, None, NotImplemented)
http_proc = _FakeRequestProcessor(_VerifyRequest)
client = rpc._RpcClientBase(resolver, NotImplemented,
_req_process_fn=http_proc)
for prefix in ["foo", "bar", "baz"]:
result = client._Call(cdef, nodes, [prefix])
self.assertEqual(len(result), len(nodes))
for (idx, (node, res)) in enumerate(result.items()):
self.assertFalse(res.fail_msg)
self.assertEqual(serializer.LoadJson(res.payload), prefix + node)
def testResolverOptions(self):
def _VerifyRequest(req):
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, req.post_data))
nodes = [
"node30.example.com",
"node35.example.com",
]
def _Resolver(expected, hosts, options):
self.assertEqual(hosts, nodes)
self.assertEqual(options, expected)
return zip(hosts, nodes, hosts)
def _DynamicResolverOptions((arg0, )):
return sum(arg0)
tests = [
(None, None, None),
(rpc_defs.ACCEPT_OFFLINE_NODE, None, rpc_defs.ACCEPT_OFFLINE_NODE),
(False, None, False),
(True, None, True),
(0, None, 0),
(_DynamicResolverOptions, [1, 2, 3], 6),
(_DynamicResolverOptions, range(4, 19), 165),
]
for (resolver_opts, arg0, expected) in tests:
cdef = ("test_call", NotImplemented, resolver_opts,
constants.RPC_TMO_NORMAL, [
("arg0", None, NotImplemented),
], None, None, NotImplemented)
http_proc = _FakeRequestProcessor(_VerifyRequest)
client = rpc._RpcClientBase(compat.partial(_Resolver, expected),
NotImplemented, _req_process_fn=http_proc)
result = client._Call(cdef, nodes, [arg0])
self.assertEqual(len(result), len(nodes))
for (idx, (node, res)) in enumerate(result.items()):
self.assertFalse(res.fail_msg)
class _FakeConfigForRpcRunner:
GetAllNodesInfo = NotImplemented
def __init__(self, cluster=NotImplemented):
self._cluster = cluster
self._disks = [
objects.Disk(dev_type=constants.DT_PLAIN, size=4096,
logical_id=("vg", "disk6120"),
uuid="disk_uuid_1"),
objects.Disk(dev_type=constants.DT_PLAIN, size=1024,
logical_id=("vg", "disk8508"),
uuid="disk_uuid_2"),
]
for disk in self._disks:
disk.UpgradeConfig()
def GetNodeInfo(self, name):
return objects.Node(name=name)
def GetMultiNodeInfo(self, names):
return [(name, self.GetNodeInfo(name)) for name in names]
def GetClusterInfo(self):
return self._cluster
def GetInstanceDiskParams(self, _):
return constants.DISK_DT_DEFAULTS
def GetInstanceSecondaryNodes(self, _):
return []
def GetInstanceDisks(self, _):
return self._disks
class TestRpcRunner(unittest.TestCase):
def testUploadFile(self):
data = 1779 * "Hello World\n"
tmpfile = tempfile.NamedTemporaryFile()
tmpfile.write(data)
tmpfile.flush()
st = os.stat(tmpfile.name)
nodes = [
"node1.example.com",
]
def _VerifyRequest(req):
(uldata, ) = serializer.LoadJson(req.post_data)
self.assertEqual(len(uldata), 7)
self.assertEqual(uldata[0], tmpfile.name)
self.assertEqual(list(uldata[1]), list(rpc._Compress(nodes[0], data)))
self.assertEqual(uldata[2], st.st_mode)
self.assertEqual(uldata[3], "user%s" % os.getuid())
self.assertEqual(uldata[4], "group%s" % os.getgid())
self.assertTrue(uldata[5] is not None)
self.assertEqual(uldata[6], st.st_mtime)
req.success = True
req.resp_status_code = http.HTTP_OK
req.resp_body = serializer.DumpJson((True, None))
http_proc = _FakeRequestProcessor(_VerifyRequest)
std_runner = rpc.RpcRunner(_FakeConfigForRpcRunner(), None,
_req_process_fn=http_proc,
_getents=mocks.FakeGetentResolver)
cfg_runner = rpc.ConfigRunner(None, ["192.0.2.13"],
_req_process_fn=http_proc,
_getents=mocks.FakeGetentResolver)
for runner in [std_runner, cfg_runner]:
result = runner.call_upload_file(nodes, tmpfile.name)
self.assertEqual(len(result), len(nodes))
for (idx, (node, res)) in enumerate(result.items()):
self.assertFalse(res.fail_msg)
def testEncodeInstance(self):
cluster = objects.Cluster(hvparams={
constants.HT_KVM: {
constants.HV_CDROM_IMAGE_PATH: "foo",
},
},
beparams={
constants.PP_DEFAULT: {
constants.BE_MAXMEM: 8192,
},
},
os_hvp={},
osparams={
"linux": {
"role": "unknown",
},
})
cluster.UpgradeConfig()
inst = objects.Instance(name="inst1.example.com",
hypervisor=constants.HT_KVM,
os="linux",
hvparams={
constants.HV_CDROM_IMAGE_PATH: "bar",
constants.HV_ROOT_PATH: "/tmp",
},
beparams={
constants.BE_MINMEM: 128,
constants.BE_MAXMEM: 256,
},
nics=[
objects.NIC(nicparams={
constants.NIC_MODE: "mymode",
}),
],
disk_template=constants.DT_PLAIN,
disks=["disk_uuid_1", "disk_uuid_2"]
)
inst.UpgradeConfig()
cfg = _FakeConfigForRpcRunner(cluster=cluster)
runner = rpc.RpcRunner(cfg, None,
_req_process_fn=NotImplemented,
_getents=mocks.FakeGetentResolver)
def _CheckBasics(result):
self.assertEqual(result["name"], "inst1.example.com")
self.assertEqual(result["os"], "linux")
self.assertEqual(result["beparams"][constants.BE_MINMEM], 128)
self.assertEqual(len(result["nics"]), 1)
self.assertEqual(result["nics"][0]["nicparams"][constants.NIC_MODE],
"mymode")
# Generic object serialization
result = runner._encoder(NotImplemented, (rpc_defs.ED_OBJECT_DICT, inst))
_CheckBasics(result)
self.assertEqual(len(result["hvparams"]), 2)
result = runner._encoder(NotImplemented,
(rpc_defs.ED_OBJECT_DICT_LIST, 5 * [inst]))
map(_CheckBasics, result)
map(lambda r: self.assertEqual(len(r["hvparams"]), 2), result)
# Just an instance
result = runner._encoder(NotImplemented, (rpc_defs.ED_INST_DICT, inst))
_CheckBasics(result)
self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar")
self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp")
self.assertEqual(result["osparams"], {
"role": "unknown",
})
self.assertEqual(len(result["hvparams"]),
len(constants.HVC_DEFAULTS[constants.HT_KVM]))
# Instance with OS parameters
result = runner._encoder(NotImplemented,
(rpc_defs.ED_INST_DICT_OSP_DP, (inst, {
"role": "webserver",
"other": "field",
})))
_CheckBasics(result)
self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar")
self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp")
self.assertEqual(result["osparams"], {
"role": "webserver",
"other": "field",
})
# Instance with hypervisor and backend parameters
result = runner._encoder(NotImplemented,
(rpc_defs.ED_INST_DICT_HVP_BEP_DP, (inst, {
constants.HT_KVM: {
constants.HV_BOOT_ORDER: "xyz",
},
}, {
constants.BE_VCPUS: 100,
constants.BE_MAXMEM: 4096,
})))
_CheckBasics(result)
self.assertEqual(result["beparams"][constants.BE_MAXMEM], 4096)
self.assertEqual(result["beparams"][constants.BE_VCPUS], 100)
self.assertEqual(result["hvparams"][constants.HT_KVM], {
constants.HV_BOOT_ORDER: "xyz",
})
del result["disks_info"][0]["ctime"]
del result["disks_info"][0]["mtime"]
del result["disks_info"][1]["ctime"]
del result["disks_info"][1]["mtime"]
self.assertEqual(result["disks_info"], [{
"dev_type": constants.DT_PLAIN,
"dynamic_params": {},
"size": 4096,
"logical_id": ("vg", "disk6120"),
"params": constants.DISK_DT_DEFAULTS[inst.disk_template],
"serial_no": 1,
"uuid": "disk_uuid_1",
}, {
"dev_type": constants.DT_PLAIN,
"dynamic_params": {},
"size": 1024,
"logical_id": ("vg", "disk8508"),
"params": constants.DISK_DT_DEFAULTS[inst.disk_template],
"serial_no": 1,
"uuid": "disk_uuid_2",
}])
inst_disks = cfg.GetInstanceDisks(inst.uuid)
self.assertTrue(compat.all(disk.params == {} for disk in inst_disks),
msg="Configuration objects were modified")
class TestLegacyNodeInfo(unittest.TestCase):
KEY_BOOT = "bootid"
KEY_NAME = "name"
KEY_STORAGE_FREE = "storage_free"
KEY_STORAGE_TOTAL = "storage_size"
KEY_CPU_COUNT = "cpu_count"
KEY_SPINDLES_FREE = "spindles_free"
KEY_SPINDLES_TOTAL = "spindles_total"
KEY_STORAGE_TYPE = "type" # key for storage type
VAL_BOOT = 0
VAL_VG_NAME = "xy"
VAL_VG_FREE = 11
VAL_VG_TOTAL = 12
VAL_VG_TYPE = "lvm-vg"
VAL_CPU_COUNT = 2
VAL_PV_NAME = "ab"
VAL_PV_FREE = 31
VAL_PV_TOTAL = 32
VAL_PV_TYPE = "lvm-pv"
DICT_VG = {
KEY_NAME: VAL_VG_NAME,
KEY_STORAGE_FREE: VAL_VG_FREE,
KEY_STORAGE_TOTAL: VAL_VG_TOTAL,
KEY_STORAGE_TYPE: VAL_VG_TYPE,
}
DICT_HV = {KEY_CPU_COUNT: VAL_CPU_COUNT}
DICT_SP = {
KEY_STORAGE_TYPE: VAL_PV_TYPE,
KEY_NAME: VAL_PV_NAME,
KEY_STORAGE_FREE: VAL_PV_FREE,
KEY_STORAGE_TOTAL: VAL_PV_TOTAL,
}
STD_LST = [VAL_BOOT, [DICT_VG, DICT_SP], [DICT_HV]]
STD_DICT = {
KEY_BOOT: VAL_BOOT,
KEY_NAME: VAL_VG_NAME,
KEY_STORAGE_FREE: VAL_VG_FREE,
KEY_STORAGE_TOTAL: VAL_VG_TOTAL,
KEY_SPINDLES_FREE: VAL_PV_FREE,
KEY_SPINDLES_TOTAL: VAL_PV_TOTAL,
KEY_CPU_COUNT: VAL_CPU_COUNT,
}
def testWithSpindles(self):
result = rpc.MakeLegacyNodeInfo(self.STD_LST, constants.DT_PLAIN)
self.assertEqual(result, self.STD_DICT)
def testNoSpindles(self):
my_lst = [self.VAL_BOOT, [self.DICT_VG], [self.DICT_HV]]
result = rpc.MakeLegacyNodeInfo(my_lst, constants.DT_PLAIN)
expected_dict = dict((k,v) for k, v in self.STD_DICT.iteritems())
expected_dict[self.KEY_SPINDLES_FREE] = 0
expected_dict[self.KEY_SPINDLES_TOTAL] = 0
self.assertEqual(result, expected_dict)
if __name__ == "__main__":
testutils.GanetiTestProgram()