blob: 20d3d86c260fa292151e706d028eacd22add965c [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.client.gnt_instance"""
import unittest
from ganeti import constants
from ganeti import utils
from ganeti import errors
from ganeti import objects
from ganeti.client import gnt_instance
import testutils
class TestConsole(unittest.TestCase):
def setUp(self):
self._output = []
self._cmds = []
self._next_cmd_exitcode = 0
def _Test(self, console, show_command, cluster_name):
return gnt_instance._DoConsole(console, show_command, cluster_name,
feedback_fn=self._Feedback,
_runcmd_fn=self._FakeRunCmd)
def _Feedback(self, msg, *args):
if args:
msg = msg % args
self._output.append(msg)
def _FakeRunCmd(self, cmd, interactive=None):
self.assertTrue(interactive)
self.assertTrue(isinstance(cmd, list))
self._cmds.append(cmd)
return utils.RunResult(self._next_cmd_exitcode, None, "", "", "cmd",
utils.process._TIMEOUT_NONE, 5)
def testMessage(self):
cons = objects.InstanceConsole(instance="inst98.example.com",
kind=constants.CONS_MESSAGE,
message="Hello World")
self.assertEqual(self._Test(cons, False, "cluster.example.com"),
constants.EXIT_SUCCESS)
self.assertEqual(len(self._cmds), 0)
self.assertEqual(self._output, ["Hello World"])
def testVnc(self):
cons = objects.InstanceConsole(instance="inst1.example.com",
kind=constants.CONS_VNC,
host="node1.example.com",
port=5901,
display=1)
self.assertEqual(self._Test(cons, False, "cluster.example.com"),
constants.EXIT_SUCCESS)
self.assertEqual(len(self._cmds), 0)
self.assertEqual(len(self._output), 1)
self.assertTrue(" inst1.example.com " in self._output[0])
self.assertTrue(" node1.example.com:5901 " in self._output[0])
self.assertTrue("vnc://node1.example.com:5901/" in self._output[0])
def testSshShow(self):
cons = objects.InstanceConsole(instance="inst31.example.com",
kind=constants.CONS_SSH,
host="node93.example.com",
user="user_abc",
command="xm console x.y.z")
self.assertEqual(self._Test(cons, True, "cluster.example.com"),
constants.EXIT_SUCCESS)
self.assertEqual(len(self._cmds), 0)
self.assertEqual(len(self._output), 1)
self.assertTrue(" user_abc@node93.example.com " in self._output[0])
self.assertTrue("'xm console x.y.z'" in self._output[0])
def testSshRun(self):
cons = objects.InstanceConsole(instance="inst31.example.com",
kind=constants.CONS_SSH,
host="node93.example.com",
user="user_abc",
command=["xm", "console", "x.y.z"])
self.assertEqual(self._Test(cons, False, "cluster.example.com"),
constants.EXIT_SUCCESS)
self.assertEqual(len(self._cmds), 1)
self.assertEqual(len(self._output), 0)
# This is very important to prevent escapes from the console
self.assertTrue("-oEscapeChar=none" in self._cmds[0])
def testSshRunFail(self):
cons = objects.InstanceConsole(instance="inst31.example.com",
kind=constants.CONS_SSH,
host="node93.example.com",
user="user_abc",
command=["xm", "console", "x.y.z"])
self._next_cmd_exitcode = 100
self.assertRaises(errors.OpExecError, self._Test,
cons, False, "cluster.example.com")
self.assertEqual(len(self._cmds), 1)
self.assertEqual(len(self._output), 0)
class TestConvertNicDiskModifications(unittest.TestCase):
def test(self):
fn = gnt_instance._ConvertNicDiskModifications
self.assertEqual(fn([]), [])
# Error cases
self.assertRaises(errors.OpPrereqError, fn, [
(constants.DDM_REMOVE, { "param": "value", }),
])
self.assertRaises(errors.OpPrereqError, fn, [
(0, { constants.DDM_REMOVE: True, "param": "value", }),
])
self.assertRaises(errors.OpPrereqError, fn, [
(0, {
constants.DDM_REMOVE: True,
constants.DDM_ADD: True,
}),
])
# Legacy calls
for action in constants.DDMS_VALUES:
self.assertEqual(fn([
(action, {}),
]), [
(action, -1, {}),
])
self.assertRaises(errors.OpPrereqError, fn, [
(0, {
action: True,
constants.DDM_MODIFY: True,
}),
])
self.assertEqual(fn([
(constants.DDM_ADD, {
constants.IDISK_SIZE: 1024,
}),
]), [
(constants.DDM_ADD, -1, {
constants.IDISK_SIZE: 1024,
}),
])
# New-style calls
self.assertEqual(fn([
(2, {
constants.IDISK_MODE: constants.DISK_RDWR,
}),
]), [
(constants.DDM_MODIFY, 2, {
constants.IDISK_MODE: constants.DISK_RDWR,
}),
])
self.assertEqual(fn([
(0, {
constants.DDM_ADD: True,
constants.IDISK_SIZE: 4096,
}),
]), [
(constants.DDM_ADD, 0, {
constants.IDISK_SIZE: 4096,
}),
])
self.assertEqual(fn([
(-1, {
constants.DDM_REMOVE: True,
}),
]), [
(constants.DDM_REMOVE, -1, {}),
])
self.assertEqual(fn([
(-1, {
constants.DDM_MODIFY: True,
constants.IDISK_SIZE: 1024,
}),
]), [
(constants.DDM_MODIFY, -1, {
constants.IDISK_SIZE: 1024,
}),
])
# Names and UUIDs
self.assertEqual(fn([
('name', {
constants.IDISK_MODE: constants.DISK_RDWR,
constants.IDISK_NAME: "rename",
}),
]), [
(constants.DDM_MODIFY, 'name', {
constants.IDISK_MODE: constants.DISK_RDWR,
constants.IDISK_NAME: "rename",
}),
])
self.assertEqual(fn([
('024ef14d-4879-400e-8767-d61c051950bf', {
constants.DDM_MODIFY: True,
constants.IDISK_SIZE: 1024,
constants.IDISK_NAME: "name",
}),
]), [
(constants.DDM_MODIFY, '024ef14d-4879-400e-8767-d61c051950bf', {
constants.IDISK_SIZE: 1024,
constants.IDISK_NAME: "name",
}),
])
self.assertEqual(fn([
('name', {
constants.DDM_REMOVE: True,
}),
]), [
(constants.DDM_REMOVE, 'name', {}),
])
class TestParseDiskSizes(unittest.TestCase):
def test(self):
fn = gnt_instance._ParseDiskSizes
self.assertEqual(fn([]), [])
# Missing size parameter
self.assertRaises(errors.OpPrereqError, fn, [
(constants.DDM_ADD, 0, {}),
])
# Converting disk size
self.assertEqual(fn([
(constants.DDM_ADD, 11, {
constants.IDISK_SIZE: "9G",
}),
]), [
(constants.DDM_ADD, 11, {
constants.IDISK_SIZE: 9216,
}),
])
# No size parameter
self.assertEqual(fn([
(constants.DDM_REMOVE, 11, {
"other": "24M",
}),
]), [
(constants.DDM_REMOVE, 11, {
"other": "24M",
}),
])
if __name__ == "__main__":
testutils.GanetiTestProgram()