| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2010, 2011, 2012, 2013 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Script for testing ganeti.rpc""" |
| |
| import os |
| import sys |
| import unittest |
| import random |
| import tempfile |
| |
| from ganeti import constants |
| from ganeti import compat |
| from ganeti.rpc import node as rpc |
| from ganeti import rpc_defs |
| from ganeti import http |
| from ganeti import errors |
| from ganeti import serializer |
| from ganeti import objects |
| from ganeti import backend |
| |
| import testutils |
| import mocks |
| |
| |
| class _FakeRequestProcessor: |
| def __init__(self, response_fn): |
| self._response_fn = response_fn |
| self.reqcount = 0 |
| |
| def __call__(self, reqs, lock_monitor_cb=None): |
| assert lock_monitor_cb is None or callable(lock_monitor_cb) |
| for req in reqs: |
| self.reqcount += 1 |
| self._response_fn(req) |
| |
| |
| def GetFakeSimpleStoreClass(fn): |
| class FakeSimpleStore: |
| GetNodePrimaryIPList = fn |
| GetPrimaryIPFamily = lambda _: None |
| |
| return FakeSimpleStore |
| |
| |
| def _RaiseNotImplemented(): |
| """Simple wrapper to raise NotImplementedError. |
| |
| """ |
| raise NotImplementedError |
| |
| |
| class TestRpcProcessor(unittest.TestCase): |
| def _FakeAddressLookup(self, map): |
| return lambda node_list: [map.get(node) for node in node_list] |
| |
| def _GetVersionResponse(self, req): |
| self.assertEqual(req.host, "127.0.0.1") |
| self.assertEqual(req.port, 24094) |
| self.assertEqual(req.path, "/version") |
| self.assertEqual(req.read_timeout, constants.RPC_TMO_URGENT) |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, 123)) |
| |
| def testVersionSuccess(self): |
| resolver = rpc._StaticResolver(["127.0.0.1"]) |
| http_proc = _FakeRequestProcessor(self._GetVersionResponse) |
| proc = rpc._RpcProcessor(resolver, 24094) |
| result = proc(["localhost"], "version", {"localhost": ""}, 60, |
| NotImplemented, _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), ["localhost"]) |
| lhresp = result["localhost"] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, "localhost") |
| self.assertFalse(lhresp.fail_msg) |
| self.assertEqual(lhresp.payload, 123) |
| self.assertEqual(lhresp.call, "version") |
| lhresp.Raise("should not raise") |
| self.assertEqual(http_proc.reqcount, 1) |
| |
| def _ReadTimeoutResponse(self, req): |
| self.assertEqual(req.host, "192.0.2.13") |
| self.assertEqual(req.port, 19176) |
| self.assertEqual(req.path, "/version") |
| self.assertEqual(req.read_timeout, 12356) |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, -1)) |
| |
| def testReadTimeout(self): |
| resolver = rpc._StaticResolver(["192.0.2.13"]) |
| http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse) |
| proc = rpc._RpcProcessor(resolver, 19176) |
| host = "node31856" |
| body = {host: ""} |
| result = proc([host], "version", body, 12356, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), [host]) |
| lhresp = result[host] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, host) |
| self.assertFalse(lhresp.fail_msg) |
| self.assertEqual(lhresp.payload, -1) |
| self.assertEqual(lhresp.call, "version") |
| lhresp.Raise("should not raise") |
| self.assertEqual(http_proc.reqcount, 1) |
| |
| def testOfflineNode(self): |
| resolver = rpc._StaticResolver([rpc._OFFLINE]) |
| http_proc = _FakeRequestProcessor(NotImplemented) |
| proc = rpc._RpcProcessor(resolver, 30668) |
| host = "n17296" |
| body = {host: ""} |
| result = proc([host], "version", body, 60, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), [host]) |
| lhresp = result[host] |
| self.assertTrue(lhresp.offline) |
| self.assertEqual(lhresp.node, host) |
| self.assertTrue(lhresp.fail_msg) |
| self.assertFalse(lhresp.payload) |
| self.assertEqual(lhresp.call, "version") |
| |
| # With a message |
| self.assertRaises(errors.OpExecError, lhresp.Raise, "should raise") |
| |
| # No message |
| self.assertRaises(errors.OpExecError, lhresp.Raise, None) |
| |
| self.assertEqual(http_proc.reqcount, 0) |
| |
| def _GetMultiVersionResponse(self, req): |
| self.assert_(req.host.startswith("node")) |
| self.assertEqual(req.port, 23245) |
| self.assertEqual(req.path, "/version") |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, 987)) |
| |
| def testMultiVersionSuccess(self): |
| nodes = ["node%s" % i for i in range(50)] |
| body = dict((n, "") for n in nodes) |
| resolver = rpc._StaticResolver(nodes) |
| http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse) |
| proc = rpc._RpcProcessor(resolver, 23245) |
| result = proc(nodes, "version", body, 60, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(sorted(result.keys()), sorted(nodes)) |
| |
| for name in nodes: |
| lhresp = result[name] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, name) |
| self.assertFalse(lhresp.fail_msg) |
| self.assertEqual(lhresp.payload, 987) |
| self.assertEqual(lhresp.call, "version") |
| lhresp.Raise("should not raise") |
| |
| self.assertEqual(http_proc.reqcount, len(nodes)) |
| |
| def _GetVersionResponseFail(self, errinfo, req): |
| self.assertEqual(req.path, "/version") |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((False, errinfo)) |
| |
| def testVersionFailure(self): |
| resolver = rpc._StaticResolver(["aef9ur4i.example.com"]) |
| proc = rpc._RpcProcessor(resolver, 5903) |
| for errinfo in [None, "Unknown error"]: |
| http_proc = \ |
| _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail, |
| errinfo)) |
| host = "aef9ur4i.example.com" |
| body = {host: ""} |
| result = proc(body.keys(), "version", body, 60, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), [host]) |
| lhresp = result[host] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, host) |
| self.assert_(lhresp.fail_msg) |
| self.assertFalse(lhresp.payload) |
| self.assertEqual(lhresp.call, "version") |
| self.assertRaises(errors.OpExecError, lhresp.Raise, "failed") |
| self.assertEqual(http_proc.reqcount, 1) |
| |
| def _GetHttpErrorResponse(self, httperrnodes, failnodes, req): |
| self.assertEqual(req.path, "/vg_list") |
| self.assertEqual(req.port, 15165) |
| |
| if req.host in httperrnodes: |
| req.success = False |
| req.error = "Node set up for HTTP errors" |
| |
| elif req.host in failnodes: |
| req.success = True |
| req.resp_status_code = 404 |
| req.resp_body = serializer.DumpJson({ |
| "code": 404, |
| "message": "Method not found", |
| "explain": "Explanation goes here", |
| }) |
| else: |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, hash(req.host))) |
| |
| def testHttpError(self): |
| nodes = ["uaf6pbbv%s" % i for i in range(50)] |
| body = dict((n, "") for n in nodes) |
| resolver = rpc._StaticResolver(nodes) |
| |
| httperrnodes = set(nodes[1::7]) |
| self.assertEqual(len(httperrnodes), 7) |
| |
| failnodes = set(nodes[2::3]) - httperrnodes |
| self.assertEqual(len(failnodes), 14) |
| |
| self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29) |
| |
| proc = rpc._RpcProcessor(resolver, 15165) |
| http_proc = \ |
| _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse, |
| httperrnodes, failnodes)) |
| result = proc(nodes, "vg_list", body, |
| constants.RPC_TMO_URGENT, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(sorted(result.keys()), sorted(nodes)) |
| |
| for name in nodes: |
| lhresp = result[name] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, name) |
| self.assertEqual(lhresp.call, "vg_list") |
| |
| if name in httperrnodes: |
| self.assert_(lhresp.fail_msg) |
| self.assertRaises(errors.OpExecError, lhresp.Raise, "failed") |
| elif name in failnodes: |
| self.assert_(lhresp.fail_msg) |
| self.assertRaises(errors.OpPrereqError, lhresp.Raise, "failed", |
| prereq=True, ecode=errors.ECODE_INVAL) |
| else: |
| self.assertFalse(lhresp.fail_msg) |
| self.assertEqual(lhresp.payload, hash(name)) |
| lhresp.Raise("should not raise") |
| |
| self.assertEqual(http_proc.reqcount, len(nodes)) |
| |
| def _GetInvalidResponseA(self, req): |
| self.assertEqual(req.path, "/version") |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson(("This", "is", "an", "invalid", |
| "response", "!", 1, 2, 3)) |
| |
| def _GetInvalidResponseB(self, req): |
| self.assertEqual(req.path, "/version") |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson("invalid response") |
| |
| def testInvalidResponse(self): |
| resolver = rpc._StaticResolver(["oqo7lanhly.example.com"]) |
| proc = rpc._RpcProcessor(resolver, 19978) |
| |
| for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]: |
| http_proc = _FakeRequestProcessor(fn) |
| host = "oqo7lanhly.example.com" |
| body = {host: ""} |
| result = proc([host], "version", body, 60, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), [host]) |
| lhresp = result[host] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, host) |
| self.assert_(lhresp.fail_msg) |
| self.assertFalse(lhresp.payload) |
| self.assertEqual(lhresp.call, "version") |
| self.assertRaises(errors.OpExecError, lhresp.Raise, "failed") |
| self.assertEqual(http_proc.reqcount, 1) |
| |
| def _GetBodyTestResponse(self, test_data, req): |
| self.assertEqual(req.host, "192.0.2.84") |
| self.assertEqual(req.port, 18700) |
| self.assertEqual(req.path, "/upload_file") |
| self.assertEqual(serializer.LoadJson(req.post_data), test_data) |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, None)) |
| |
| def testResponseBody(self): |
| test_data = { |
| "Hello": "World", |
| "xyz": range(10), |
| } |
| resolver = rpc._StaticResolver(["192.0.2.84"]) |
| http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse, |
| test_data)) |
| proc = rpc._RpcProcessor(resolver, 18700) |
| host = "node19759" |
| body = {host: serializer.DumpJson(test_data)} |
| result = proc([host], "upload_file", body, 30, NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(result.keys(), [host]) |
| lhresp = result[host] |
| self.assertFalse(lhresp.offline) |
| self.assertEqual(lhresp.node, host) |
| self.assertFalse(lhresp.fail_msg) |
| self.assertEqual(lhresp.payload, None) |
| self.assertEqual(lhresp.call, "upload_file") |
| lhresp.Raise("should not raise") |
| self.assertEqual(http_proc.reqcount, 1) |
| |
| |
| class TestSsconfResolver(unittest.TestCase): |
| def testSsconfLookup(self): |
| addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)] |
| node_list = ["node%d.example.com" % n for n in range(0, 255, 13)] |
| node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)] |
| ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list) |
| result = rpc._SsconfResolver(True, node_list, NotImplemented, |
| ssc=ssc, nslookup_fn=NotImplemented) |
| self.assertEqual(result, zip(node_list, addr_list, node_list)) |
| |
| def testNsLookup(self): |
| addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)] |
| node_list = ["node%d.example.com" % n for n in range(0, 255, 13)] |
| ssc = GetFakeSimpleStoreClass(lambda _: []) |
| node_addr_map = dict(zip(node_list, addr_list)) |
| nslookup_fn = lambda name, family=None: node_addr_map.get(name) |
| result = rpc._SsconfResolver(True, node_list, NotImplemented, |
| ssc=ssc, nslookup_fn=nslookup_fn) |
| self.assertEqual(result, zip(node_list, addr_list, node_list)) |
| |
| def testDisabledSsconfIp(self): |
| addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)] |
| node_list = ["node%d.example.com" % n for n in range(0, 255, 13)] |
| ssc = GetFakeSimpleStoreClass(_RaiseNotImplemented) |
| node_addr_map = dict(zip(node_list, addr_list)) |
| nslookup_fn = lambda name, family=None: node_addr_map.get(name) |
| result = rpc._SsconfResolver(False, node_list, NotImplemented, |
| ssc=ssc, nslookup_fn=nslookup_fn) |
| self.assertEqual(result, zip(node_list, addr_list, node_list)) |
| |
| def testBothLookups(self): |
| addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)] |
| node_list = ["node%d.example.com" % n for n in range(0, 255, 13)] |
| n = len(addr_list) / 2 |
| node_addr_list = [" ".join(t) for t in zip(node_list[n:], addr_list[n:])] |
| ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list) |
| node_addr_map = dict(zip(node_list[:n], addr_list[:n])) |
| nslookup_fn = lambda name, family=None: node_addr_map.get(name) |
| result = rpc._SsconfResolver(True, node_list, NotImplemented, |
| ssc=ssc, nslookup_fn=nslookup_fn) |
| self.assertEqual(result, zip(node_list, addr_list, node_list)) |
| |
| def testAddressLookupIPv6(self): |
| addr_list = ["2001:db8::%d" % n for n in range(0, 255, 11)] |
| node_list = ["node%d.example.com" % n for n in range(0, 255, 11)] |
| node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)] |
| ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list) |
| result = rpc._SsconfResolver(True, node_list, NotImplemented, |
| ssc=ssc, nslookup_fn=NotImplemented) |
| self.assertEqual(result, zip(node_list, addr_list, node_list)) |
| |
| |
| class TestStaticResolver(unittest.TestCase): |
| def test(self): |
| addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)] |
| nodes = ["node%s.example.com" % n for n in range(0, 123, 7)] |
| res = rpc._StaticResolver(addresses) |
| self.assertEqual(res(nodes, NotImplemented), zip(nodes, addresses, nodes)) |
| |
| def testWrongLength(self): |
| res = rpc._StaticResolver([]) |
| self.assertRaises(AssertionError, res, ["abc"], NotImplemented) |
| |
| |
| class TestNodeConfigResolver(unittest.TestCase): |
| @staticmethod |
| def _GetSingleOnlineNode(uuid): |
| assert uuid == "node90-uuid" |
| return objects.Node(name="node90.example.com", |
| uuid=uuid, |
| offline=False, |
| primary_ip="192.0.2.90") |
| |
| @staticmethod |
| def _GetSingleOfflineNode(uuid): |
| assert uuid == "node100-uuid" |
| return objects.Node(name="node100.example.com", |
| uuid=uuid, |
| offline=True, |
| primary_ip="192.0.2.100") |
| |
| def testSingleOnline(self): |
| self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode, |
| NotImplemented, |
| ["node90-uuid"], None), |
| [("node90.example.com", "192.0.2.90", "node90-uuid")]) |
| |
| def testSingleOffline(self): |
| self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode, |
| NotImplemented, |
| ["node100-uuid"], None), |
| [("node100.example.com", rpc._OFFLINE, "node100-uuid")]) |
| |
| def testSingleOfflineWithAcceptOffline(self): |
| fn = self._GetSingleOfflineNode |
| assert fn("node100-uuid").offline |
| self.assertEqual(rpc._NodeConfigResolver(fn, NotImplemented, |
| ["node100-uuid"], |
| rpc_defs.ACCEPT_OFFLINE_NODE), |
| [("node100.example.com", "192.0.2.100", "node100-uuid")]) |
| for i in [False, True, "", "Hello", 0, 1]: |
| self.assertRaises(AssertionError, rpc._NodeConfigResolver, |
| fn, NotImplemented, ["node100.example.com"], i) |
| |
| def testUnknownSingleNode(self): |
| self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented, |
| ["node110.example.com"], None), |
| [("node110.example.com", "node110.example.com", |
| "node110.example.com")]) |
| |
| def testMultiEmpty(self): |
| self.assertEqual(rpc._NodeConfigResolver(NotImplemented, |
| lambda: {}, |
| [], None), |
| []) |
| |
| def testMultiSomeOffline(self): |
| nodes = dict(("node%s-uuid" % i, |
| objects.Node(name="node%s.example.com" % i, |
| offline=((i % 3) == 0), |
| primary_ip="192.0.2.%s" % i, |
| uuid="node%s-uuid" % i)) |
| for i in range(1, 255)) |
| |
| # Resolve no names |
| self.assertEqual(rpc._NodeConfigResolver(NotImplemented, |
| lambda: nodes, |
| [], None), |
| []) |
| |
| # Offline, online and unknown hosts |
| self.assertEqual(rpc._NodeConfigResolver(NotImplemented, |
| lambda: nodes, |
| ["node3-uuid", |
| "node92-uuid", |
| "node54-uuid", |
| "unknown.example.com",], |
| None), [ |
| ("node3.example.com", rpc._OFFLINE, "node3-uuid"), |
| ("node92.example.com", "192.0.2.92", "node92-uuid"), |
| ("node54.example.com", rpc._OFFLINE, "node54-uuid"), |
| ("unknown.example.com", "unknown.example.com", "unknown.example.com"), |
| ]) |
| |
| |
| class TestCompress(unittest.TestCase): |
| def test(self): |
| for data in ["", "Hello", "Hello World!\nnew\nlines"]: |
| self.assertEqual(rpc._Compress(NotImplemented, data), |
| (constants.RPC_ENCODING_NONE, data)) |
| |
| for data in [512 * " ", 5242 * "Hello World!\n"]: |
| compressed = rpc._Compress(NotImplemented, data) |
| self.assertEqual(len(compressed), 2) |
| self.assertEqual(backend._Decompress(compressed), data) |
| |
| def testDecompression(self): |
| self.assertRaises(AssertionError, backend._Decompress, "") |
| self.assertRaises(AssertionError, backend._Decompress, [""]) |
| self.assertRaises(AssertionError, backend._Decompress, |
| ("unknown compression", "data")) |
| self.assertRaises(Exception, backend._Decompress, |
| (constants.RPC_ENCODING_ZLIB_BASE64, "invalid zlib data")) |
| |
| |
| class TestRpcClientBase(unittest.TestCase): |
| def testNoHosts(self): |
| cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_SLOW, [], |
| None, None, NotImplemented) |
| http_proc = _FakeRequestProcessor(NotImplemented) |
| client = rpc._RpcClientBase(rpc._StaticResolver([]), NotImplemented, |
| _req_process_fn=http_proc) |
| self.assertEqual(client._Call(cdef, [], []), {}) |
| |
| # Test wrong number of arguments |
| self.assertRaises(errors.ProgrammerError, client._Call, |
| cdef, [], [0, 1, 2]) |
| |
| def testTimeout(self): |
| def _CalcTimeout((arg1, arg2)): |
| return arg1 + arg2 |
| |
| def _VerifyRequest(exp_timeout, req): |
| self.assertEqual(req.read_timeout, exp_timeout) |
| |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, hex(req.read_timeout))) |
| |
| resolver = rpc._StaticResolver([ |
| "192.0.2.1", |
| "192.0.2.2", |
| ]) |
| |
| nodes = [ |
| "node1.example.com", |
| "node2.example.com", |
| ] |
| |
| tests = [(100, None, 100), (30, None, 30)] |
| tests.extend((_CalcTimeout, i, i + 300) |
| for i in [0, 5, 16485, 30516]) |
| |
| for timeout, arg1, exp_timeout in tests: |
| cdef = ("test_call", NotImplemented, None, timeout, [ |
| ("arg1", None, NotImplemented), |
| ("arg2", None, NotImplemented), |
| ], None, None, NotImplemented) |
| |
| http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, |
| exp_timeout)) |
| client = rpc._RpcClientBase(resolver, NotImplemented, |
| _req_process_fn=http_proc) |
| result = client._Call(cdef, nodes, [arg1, 300]) |
| self.assertEqual(len(result), len(nodes)) |
| self.assertTrue(compat.all(not res.fail_msg and |
| res.payload == hex(exp_timeout) |
| for res in result.values())) |
| |
| def testArgumentEncoder(self): |
| (AT1, AT2) = range(1, 3) |
| |
| resolver = rpc._StaticResolver([ |
| "192.0.2.5", |
| "192.0.2.6", |
| ]) |
| |
| nodes = [ |
| "node5.example.com", |
| "node6.example.com", |
| ] |
| |
| encoders = { |
| AT1: lambda _, value: hex(value), |
| AT2: lambda _, value: hash(value), |
| } |
| |
| cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [ |
| ("arg0", None, NotImplemented), |
| ("arg1", AT1, NotImplemented), |
| ("arg1", AT2, NotImplemented), |
| ], None, None, NotImplemented) |
| |
| def _VerifyRequest(req): |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, req.post_data)) |
| |
| http_proc = _FakeRequestProcessor(_VerifyRequest) |
| |
| for num in [0, 3796, 9032119]: |
| client = rpc._RpcClientBase(resolver, encoders.get, |
| _req_process_fn=http_proc) |
| result = client._Call(cdef, nodes, ["foo", num, "Hello%s" % num]) |
| self.assertEqual(len(result), len(nodes)) |
| for res in result.values(): |
| self.assertFalse(res.fail_msg) |
| self.assertEqual(serializer.LoadJson(res.payload), |
| ["foo", hex(num), hash("Hello%s" % num)]) |
| |
| def testPostProc(self): |
| def _VerifyRequest(nums, req): |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, nums)) |
| |
| resolver = rpc._StaticResolver([ |
| "192.0.2.90", |
| "192.0.2.95", |
| ]) |
| |
| nodes = [ |
| "node90.example.com", |
| "node95.example.com", |
| ] |
| |
| def _PostProc(res): |
| self.assertFalse(res.fail_msg) |
| res.payload = sum(res.payload) |
| return res |
| |
| cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [], |
| None, _PostProc, NotImplemented) |
| |
| # Seeded random generator |
| rnd = random.Random(20299) |
| |
| for i in [0, 4, 74, 1391]: |
| nums = [rnd.randint(0, 1000) for _ in range(i)] |
| http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, nums)) |
| client = rpc._RpcClientBase(resolver, NotImplemented, |
| _req_process_fn=http_proc) |
| result = client._Call(cdef, nodes, []) |
| self.assertEqual(len(result), len(nodes)) |
| for res in result.values(): |
| self.assertFalse(res.fail_msg) |
| self.assertEqual(res.payload, sum(nums)) |
| |
| def testPreProc(self): |
| def _VerifyRequest(req): |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, req.post_data)) |
| |
| resolver = rpc._StaticResolver([ |
| "192.0.2.30", |
| "192.0.2.35", |
| ]) |
| |
| nodes = [ |
| "node30.example.com", |
| "node35.example.com", |
| ] |
| |
| def _PreProc(node, data): |
| self.assertEqual(len(data), 1) |
| return data[0] + node |
| |
| cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [ |
| ("arg0", None, NotImplemented), |
| ], _PreProc, None, NotImplemented) |
| |
| http_proc = _FakeRequestProcessor(_VerifyRequest) |
| client = rpc._RpcClientBase(resolver, NotImplemented, |
| _req_process_fn=http_proc) |
| |
| for prefix in ["foo", "bar", "baz"]: |
| result = client._Call(cdef, nodes, [prefix]) |
| self.assertEqual(len(result), len(nodes)) |
| for (idx, (node, res)) in enumerate(result.items()): |
| self.assertFalse(res.fail_msg) |
| self.assertEqual(serializer.LoadJson(res.payload), prefix + node) |
| |
| def testResolverOptions(self): |
| def _VerifyRequest(req): |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, req.post_data)) |
| |
| nodes = [ |
| "node30.example.com", |
| "node35.example.com", |
| ] |
| |
| def _Resolver(expected, hosts, options): |
| self.assertEqual(hosts, nodes) |
| self.assertEqual(options, expected) |
| return zip(hosts, nodes, hosts) |
| |
| def _DynamicResolverOptions((arg0, )): |
| return sum(arg0) |
| |
| tests = [ |
| (None, None, None), |
| (rpc_defs.ACCEPT_OFFLINE_NODE, None, rpc_defs.ACCEPT_OFFLINE_NODE), |
| (False, None, False), |
| (True, None, True), |
| (0, None, 0), |
| (_DynamicResolverOptions, [1, 2, 3], 6), |
| (_DynamicResolverOptions, range(4, 19), 165), |
| ] |
| |
| for (resolver_opts, arg0, expected) in tests: |
| cdef = ("test_call", NotImplemented, resolver_opts, |
| constants.RPC_TMO_NORMAL, [ |
| ("arg0", None, NotImplemented), |
| ], None, None, NotImplemented) |
| |
| http_proc = _FakeRequestProcessor(_VerifyRequest) |
| |
| client = rpc._RpcClientBase(compat.partial(_Resolver, expected), |
| NotImplemented, _req_process_fn=http_proc) |
| result = client._Call(cdef, nodes, [arg0]) |
| self.assertEqual(len(result), len(nodes)) |
| for (idx, (node, res)) in enumerate(result.items()): |
| self.assertFalse(res.fail_msg) |
| |
| |
| class _FakeConfigForRpcRunner: |
| GetAllNodesInfo = NotImplemented |
| |
| def __init__(self, cluster=NotImplemented): |
| self._cluster = cluster |
| self._disks = [ |
| objects.Disk(dev_type=constants.DT_PLAIN, size=4096, |
| logical_id=("vg", "disk6120"), |
| uuid="disk_uuid_1"), |
| objects.Disk(dev_type=constants.DT_PLAIN, size=1024, |
| logical_id=("vg", "disk8508"), |
| uuid="disk_uuid_2"), |
| ] |
| for disk in self._disks: |
| disk.UpgradeConfig() |
| |
| def GetNodeInfo(self, name): |
| return objects.Node(name=name) |
| |
| def GetMultiNodeInfo(self, names): |
| return [(name, self.GetNodeInfo(name)) for name in names] |
| |
| def GetClusterInfo(self): |
| return self._cluster |
| |
| def GetInstanceDiskParams(self, _): |
| return constants.DISK_DT_DEFAULTS |
| |
| def GetInstanceSecondaryNodes(self, _): |
| return [] |
| |
| def GetInstanceDisks(self, _): |
| return self._disks |
| |
| |
| class TestRpcRunner(unittest.TestCase): |
| def testUploadFile(self): |
| data = 1779 * "Hello World\n" |
| |
| tmpfile = tempfile.NamedTemporaryFile() |
| tmpfile.write(data) |
| tmpfile.flush() |
| st = os.stat(tmpfile.name) |
| |
| nodes = [ |
| "node1.example.com", |
| ] |
| |
| def _VerifyRequest(req): |
| (uldata, ) = serializer.LoadJson(req.post_data) |
| self.assertEqual(len(uldata), 7) |
| self.assertEqual(uldata[0], tmpfile.name) |
| self.assertEqual(list(uldata[1]), list(rpc._Compress(nodes[0], data))) |
| self.assertEqual(uldata[2], st.st_mode) |
| self.assertEqual(uldata[3], "user%s" % os.getuid()) |
| self.assertEqual(uldata[4], "group%s" % os.getgid()) |
| self.assertTrue(uldata[5] is not None) |
| self.assertEqual(uldata[6], st.st_mtime) |
| |
| req.success = True |
| req.resp_status_code = http.HTTP_OK |
| req.resp_body = serializer.DumpJson((True, None)) |
| |
| http_proc = _FakeRequestProcessor(_VerifyRequest) |
| |
| std_runner = rpc.RpcRunner(_FakeConfigForRpcRunner(), None, |
| _req_process_fn=http_proc, |
| _getents=mocks.FakeGetentResolver) |
| |
| cfg_runner = rpc.ConfigRunner(None, ["192.0.2.13"], |
| _req_process_fn=http_proc, |
| _getents=mocks.FakeGetentResolver) |
| |
| for runner in [std_runner, cfg_runner]: |
| result = runner.call_upload_file(nodes, tmpfile.name) |
| self.assertEqual(len(result), len(nodes)) |
| for (idx, (node, res)) in enumerate(result.items()): |
| self.assertFalse(res.fail_msg) |
| |
| def testEncodeInstance(self): |
| cluster = objects.Cluster(hvparams={ |
| constants.HT_KVM: { |
| constants.HV_CDROM_IMAGE_PATH: "foo", |
| }, |
| }, |
| beparams={ |
| constants.PP_DEFAULT: { |
| constants.BE_MAXMEM: 8192, |
| }, |
| }, |
| os_hvp={}, |
| osparams={ |
| "linux": { |
| "role": "unknown", |
| }, |
| }) |
| cluster.UpgradeConfig() |
| |
| inst = objects.Instance(name="inst1.example.com", |
| hypervisor=constants.HT_KVM, |
| os="linux", |
| hvparams={ |
| constants.HV_CDROM_IMAGE_PATH: "bar", |
| constants.HV_ROOT_PATH: "/tmp", |
| }, |
| beparams={ |
| constants.BE_MINMEM: 128, |
| constants.BE_MAXMEM: 256, |
| }, |
| nics=[ |
| objects.NIC(nicparams={ |
| constants.NIC_MODE: "mymode", |
| }), |
| ], |
| disk_template=constants.DT_PLAIN, |
| disks=["disk_uuid_1", "disk_uuid_2"] |
| ) |
| inst.UpgradeConfig() |
| |
| cfg = _FakeConfigForRpcRunner(cluster=cluster) |
| runner = rpc.RpcRunner(cfg, None, |
| _req_process_fn=NotImplemented, |
| _getents=mocks.FakeGetentResolver) |
| |
| def _CheckBasics(result): |
| self.assertEqual(result["name"], "inst1.example.com") |
| self.assertEqual(result["os"], "linux") |
| self.assertEqual(result["beparams"][constants.BE_MINMEM], 128) |
| self.assertEqual(len(result["nics"]), 1) |
| self.assertEqual(result["nics"][0]["nicparams"][constants.NIC_MODE], |
| "mymode") |
| |
| # Generic object serialization |
| result = runner._encoder(NotImplemented, (rpc_defs.ED_OBJECT_DICT, inst)) |
| _CheckBasics(result) |
| self.assertEqual(len(result["hvparams"]), 2) |
| |
| result = runner._encoder(NotImplemented, |
| (rpc_defs.ED_OBJECT_DICT_LIST, 5 * [inst])) |
| map(_CheckBasics, result) |
| map(lambda r: self.assertEqual(len(r["hvparams"]), 2), result) |
| |
| # Just an instance |
| result = runner._encoder(NotImplemented, (rpc_defs.ED_INST_DICT, inst)) |
| _CheckBasics(result) |
| self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256) |
| self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar") |
| self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp") |
| self.assertEqual(result["osparams"], { |
| "role": "unknown", |
| }) |
| self.assertEqual(len(result["hvparams"]), |
| len(constants.HVC_DEFAULTS[constants.HT_KVM])) |
| |
| # Instance with OS parameters |
| result = runner._encoder(NotImplemented, |
| (rpc_defs.ED_INST_DICT_OSP_DP, (inst, { |
| "role": "webserver", |
| "other": "field", |
| }))) |
| _CheckBasics(result) |
| self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256) |
| self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar") |
| self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp") |
| self.assertEqual(result["osparams"], { |
| "role": "webserver", |
| "other": "field", |
| }) |
| |
| # Instance with hypervisor and backend parameters |
| result = runner._encoder(NotImplemented, |
| (rpc_defs.ED_INST_DICT_HVP_BEP_DP, (inst, { |
| constants.HT_KVM: { |
| constants.HV_BOOT_ORDER: "xyz", |
| }, |
| }, { |
| constants.BE_VCPUS: 100, |
| constants.BE_MAXMEM: 4096, |
| }))) |
| _CheckBasics(result) |
| self.assertEqual(result["beparams"][constants.BE_MAXMEM], 4096) |
| self.assertEqual(result["beparams"][constants.BE_VCPUS], 100) |
| self.assertEqual(result["hvparams"][constants.HT_KVM], { |
| constants.HV_BOOT_ORDER: "xyz", |
| }) |
| del result["disks_info"][0]["ctime"] |
| del result["disks_info"][0]["mtime"] |
| del result["disks_info"][1]["ctime"] |
| del result["disks_info"][1]["mtime"] |
| self.assertEqual(result["disks_info"], [{ |
| "dev_type": constants.DT_PLAIN, |
| "dynamic_params": {}, |
| "size": 4096, |
| "logical_id": ("vg", "disk6120"), |
| "params": constants.DISK_DT_DEFAULTS[inst.disk_template], |
| "serial_no": 1, |
| "uuid": "disk_uuid_1", |
| }, { |
| "dev_type": constants.DT_PLAIN, |
| "dynamic_params": {}, |
| "size": 1024, |
| "logical_id": ("vg", "disk8508"), |
| "params": constants.DISK_DT_DEFAULTS[inst.disk_template], |
| "serial_no": 1, |
| "uuid": "disk_uuid_2", |
| }]) |
| |
| inst_disks = cfg.GetInstanceDisks(inst.uuid) |
| self.assertTrue(compat.all(disk.params == {} for disk in inst_disks), |
| msg="Configuration objects were modified") |
| |
| |
| class TestLegacyNodeInfo(unittest.TestCase): |
| KEY_BOOT = "bootid" |
| KEY_NAME = "name" |
| KEY_STORAGE_FREE = "storage_free" |
| KEY_STORAGE_TOTAL = "storage_size" |
| KEY_CPU_COUNT = "cpu_count" |
| KEY_SPINDLES_FREE = "spindles_free" |
| KEY_SPINDLES_TOTAL = "spindles_total" |
| KEY_STORAGE_TYPE = "type" # key for storage type |
| VAL_BOOT = 0 |
| VAL_VG_NAME = "xy" |
| VAL_VG_FREE = 11 |
| VAL_VG_TOTAL = 12 |
| VAL_VG_TYPE = "lvm-vg" |
| VAL_CPU_COUNT = 2 |
| VAL_PV_NAME = "ab" |
| VAL_PV_FREE = 31 |
| VAL_PV_TOTAL = 32 |
| VAL_PV_TYPE = "lvm-pv" |
| DICT_VG = { |
| KEY_NAME: VAL_VG_NAME, |
| KEY_STORAGE_FREE: VAL_VG_FREE, |
| KEY_STORAGE_TOTAL: VAL_VG_TOTAL, |
| KEY_STORAGE_TYPE: VAL_VG_TYPE, |
| } |
| DICT_HV = {KEY_CPU_COUNT: VAL_CPU_COUNT} |
| DICT_SP = { |
| KEY_STORAGE_TYPE: VAL_PV_TYPE, |
| KEY_NAME: VAL_PV_NAME, |
| KEY_STORAGE_FREE: VAL_PV_FREE, |
| KEY_STORAGE_TOTAL: VAL_PV_TOTAL, |
| } |
| STD_LST = [VAL_BOOT, [DICT_VG, DICT_SP], [DICT_HV]] |
| STD_DICT = { |
| KEY_BOOT: VAL_BOOT, |
| KEY_NAME: VAL_VG_NAME, |
| KEY_STORAGE_FREE: VAL_VG_FREE, |
| KEY_STORAGE_TOTAL: VAL_VG_TOTAL, |
| KEY_SPINDLES_FREE: VAL_PV_FREE, |
| KEY_SPINDLES_TOTAL: VAL_PV_TOTAL, |
| KEY_CPU_COUNT: VAL_CPU_COUNT, |
| } |
| |
| def testWithSpindles(self): |
| result = rpc.MakeLegacyNodeInfo(self.STD_LST, constants.DT_PLAIN) |
| self.assertEqual(result, self.STD_DICT) |
| |
| def testNoSpindles(self): |
| my_lst = [self.VAL_BOOT, [self.DICT_VG], [self.DICT_HV]] |
| result = rpc.MakeLegacyNodeInfo(my_lst, constants.DT_PLAIN) |
| expected_dict = dict((k,v) for k, v in self.STD_DICT.iteritems()) |
| expected_dict[self.KEY_SPINDLES_FREE] = 0 |
| expected_dict[self.KEY_SPINDLES_TOTAL] = 0 |
| self.assertEqual(result, expected_dict) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |