| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2011 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Script for testing ganeti.client.gnt_instance""" |
| |
| import unittest |
| |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import errors |
| from ganeti import objects |
| from ganeti.client import gnt_instance |
| |
| import testutils |
| |
| |
| class TestConsole(unittest.TestCase): |
| def setUp(self): |
| self._output = [] |
| self._cmds = [] |
| self._next_cmd_exitcode = 0 |
| |
| def _Test(self, console, show_command, cluster_name): |
| return gnt_instance._DoConsole(console, show_command, cluster_name, |
| feedback_fn=self._Feedback, |
| _runcmd_fn=self._FakeRunCmd) |
| |
| def _Feedback(self, msg, *args): |
| if args: |
| msg = msg % args |
| self._output.append(msg) |
| |
| def _FakeRunCmd(self, cmd, interactive=None): |
| self.assertTrue(interactive) |
| self.assertTrue(isinstance(cmd, list)) |
| self._cmds.append(cmd) |
| return utils.RunResult(self._next_cmd_exitcode, None, "", "", "cmd", |
| utils.process._TIMEOUT_NONE, 5) |
| |
| def testMessage(self): |
| cons = objects.InstanceConsole(instance="inst98.example.com", |
| kind=constants.CONS_MESSAGE, |
| message="Hello World") |
| self.assertEqual(self._Test(cons, False, "cluster.example.com"), |
| constants.EXIT_SUCCESS) |
| self.assertEqual(len(self._cmds), 0) |
| self.assertEqual(self._output, ["Hello World"]) |
| |
| def testVnc(self): |
| cons = objects.InstanceConsole(instance="inst1.example.com", |
| kind=constants.CONS_VNC, |
| host="node1.example.com", |
| port=5901, |
| display=1) |
| self.assertEqual(self._Test(cons, False, "cluster.example.com"), |
| constants.EXIT_SUCCESS) |
| self.assertEqual(len(self._cmds), 0) |
| self.assertEqual(len(self._output), 1) |
| self.assertTrue(" inst1.example.com " in self._output[0]) |
| self.assertTrue(" node1.example.com:5901 " in self._output[0]) |
| self.assertTrue("vnc://node1.example.com:5901/" in self._output[0]) |
| |
| def testSshShow(self): |
| cons = objects.InstanceConsole(instance="inst31.example.com", |
| kind=constants.CONS_SSH, |
| host="node93.example.com", |
| user="user_abc", |
| command="xm console x.y.z") |
| self.assertEqual(self._Test(cons, True, "cluster.example.com"), |
| constants.EXIT_SUCCESS) |
| self.assertEqual(len(self._cmds), 0) |
| self.assertEqual(len(self._output), 1) |
| self.assertTrue(" user_abc@node93.example.com " in self._output[0]) |
| self.assertTrue("'xm console x.y.z'" in self._output[0]) |
| |
| def testSshRun(self): |
| cons = objects.InstanceConsole(instance="inst31.example.com", |
| kind=constants.CONS_SSH, |
| host="node93.example.com", |
| user="user_abc", |
| command=["xm", "console", "x.y.z"]) |
| self.assertEqual(self._Test(cons, False, "cluster.example.com"), |
| constants.EXIT_SUCCESS) |
| self.assertEqual(len(self._cmds), 1) |
| self.assertEqual(len(self._output), 0) |
| |
| # This is very important to prevent escapes from the console |
| self.assertTrue("-oEscapeChar=none" in self._cmds[0]) |
| |
| def testSshRunFail(self): |
| cons = objects.InstanceConsole(instance="inst31.example.com", |
| kind=constants.CONS_SSH, |
| host="node93.example.com", |
| user="user_abc", |
| command=["xm", "console", "x.y.z"]) |
| |
| self._next_cmd_exitcode = 100 |
| self.assertRaises(errors.OpExecError, self._Test, |
| cons, False, "cluster.example.com") |
| self.assertEqual(len(self._cmds), 1) |
| self.assertEqual(len(self._output), 0) |
| |
| |
| class TestConvertNicDiskModifications(unittest.TestCase): |
| def testErrorMods(self): |
| fn = gnt_instance._ConvertNicDiskModifications |
| |
| self.assertEqual(fn([]), []) |
| |
| # Error cases |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (constants.DDM_REMOVE, {"param": "value", }), |
| ]) |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (0, {constants.DDM_REMOVE: True, "param": "value", }), |
| ]) |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (constants.DDM_DETACH, {"param": "value", }), |
| ]) |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (0, {constants.DDM_DETACH: True, "param": "value", }), |
| ]) |
| |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (0, { |
| constants.DDM_REMOVE: True, |
| constants.DDM_ADD: True, |
| }), |
| ]) |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (0, { |
| constants.DDM_DETACH: True, |
| constants.DDM_MODIFY: True, |
| }), |
| ]) |
| |
| def testLegacyCalls(self): |
| fn = gnt_instance._ConvertNicDiskModifications |
| |
| for action in constants.DDMS_VALUES: |
| self.assertEqual(fn([ |
| (action, {}), |
| ]), [ |
| (action, -1, {}), |
| ]) |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (0, { |
| action: True, |
| constants.DDM_MODIFY: True, |
| }), |
| ]) |
| |
| self.assertEqual(fn([ |
| (constants.DDM_ADD, { |
| constants.IDISK_SIZE: 1024, |
| }), |
| ]), [ |
| (constants.DDM_ADD, -1, { |
| constants.IDISK_SIZE: 1024, |
| }), |
| ]) |
| |
| def testNewStyleCalls(self): |
| fn = gnt_instance._ConvertNicDiskModifications |
| |
| self.assertEqual(fn([ |
| (2, { |
| constants.IDISK_MODE: constants.DISK_RDWR, |
| }), |
| ]), [ |
| (constants.DDM_MODIFY, 2, { |
| constants.IDISK_MODE: constants.DISK_RDWR, |
| }), |
| ]) |
| |
| self.assertEqual(fn([ |
| (0, { |
| constants.DDM_ADD: True, |
| constants.IDISK_SIZE: 4096, |
| }), |
| ]), [ |
| (constants.DDM_ADD, 0, { |
| constants.IDISK_SIZE: 4096, |
| }), |
| ]) |
| |
| self.assertEqual(fn([ |
| (-1, { |
| constants.DDM_REMOVE: True, |
| }), |
| ]), [ |
| (constants.DDM_REMOVE, -1, {}), |
| ]) |
| |
| self.assertEqual(fn([ |
| (-1, { |
| constants.DDM_MODIFY: True, |
| constants.IDISK_SIZE: 1024, |
| }), |
| ]), [ |
| (constants.DDM_MODIFY, -1, { |
| constants.IDISK_SIZE: 1024, |
| }), |
| ]) |
| |
| def testNamesUUIDs(self): |
| fn = gnt_instance._ConvertNicDiskModifications |
| |
| self.assertEqual(fn([ |
| ('name', { |
| constants.IDISK_MODE: constants.DISK_RDWR, |
| constants.IDISK_NAME: "rename", |
| }), |
| ]), [ |
| (constants.DDM_MODIFY, 'name', { |
| constants.IDISK_MODE: constants.DISK_RDWR, |
| constants.IDISK_NAME: "rename", |
| }), |
| ]) |
| self.assertEqual(fn([ |
| ('024ef14d-4879-400e-8767-d61c051950bf', { |
| constants.DDM_MODIFY: True, |
| constants.IDISK_SIZE: 1024, |
| constants.IDISK_NAME: "name", |
| }), |
| ]), [ |
| (constants.DDM_MODIFY, '024ef14d-4879-400e-8767-d61c051950bf', { |
| constants.IDISK_SIZE: 1024, |
| constants.IDISK_NAME: "name", |
| }), |
| ]) |
| self.assertEqual(fn([ |
| ('name', { |
| constants.DDM_REMOVE: True, |
| }), |
| ]), [ |
| (constants.DDM_REMOVE, 'name', {}), |
| ]) |
| |
| |
| class TestParseDiskSizes(unittest.TestCase): |
| def test(self): |
| fn = gnt_instance._ParseDiskSizes |
| |
| self.assertEqual(fn([]), []) |
| |
| # Missing size parameter |
| self.assertRaises(errors.OpPrereqError, fn, [ |
| (constants.DDM_ADD, 0, {}), |
| ]) |
| |
| # Converting disk size |
| self.assertEqual(fn([ |
| (constants.DDM_ADD, 11, { |
| constants.IDISK_SIZE: "9G", |
| }), |
| ]), [ |
| (constants.DDM_ADD, 11, { |
| constants.IDISK_SIZE: 9216, |
| }), |
| ]) |
| |
| # No size parameter |
| self.assertEqual(fn([ |
| (constants.DDM_REMOVE, 11, { |
| "other": "24M", |
| }), |
| ]), [ |
| (constants.DDM_REMOVE, 11, { |
| "other": "24M", |
| }), |
| ]) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |