blob: c8eaffc92fe572fa36efec4839ba56f55501a747 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing the hypervisor.hv_kvm module"""
import threading
import tempfile
import unittest
import socket
import os
import struct
from ganeti import serializer
from ganeti import constants
from ganeti import compat
from ganeti import objects
from ganeti import errors
from ganeti import utils
from ganeti import pathutils
from ganeti.hypervisor import hv_kvm
import testutils
class QmpStub(threading.Thread):
"""Stub for a QMP endpoint for a KVM instance
"""
_QMP_BANNER_DATA = {
"QMP": {
"version": {
"package": "",
"qemu": {
"micro": 50,
"minor": 13,
"major": 0,
},
"capabilities": [],
},
}
}
_EMPTY_RESPONSE = {
"return": [],
}
def __init__(self, socket_filename, server_responses):
"""Creates a QMP stub
@type socket_filename: string
@param socket_filename: filename of the UNIX socket that will be created
this class and used for the communication
@type server_responses: list
@param server_responses: list of responses that the server sends in response
to whatever it receives
"""
threading.Thread.__init__(self)
self.socket_filename = socket_filename
self.script = server_responses
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(self.socket_filename)
self.socket.listen(1)
def run(self):
# Hypothesis: the messages we receive contain only a complete QMP message
# encoded in JSON.
conn, addr = self.socket.accept()
# Send the banner as the first thing
conn.send(self.encode_string(self._QMP_BANNER_DATA))
# Expect qmp_capabilities and return an empty response
conn.recv(4096)
conn.send(self.encode_string(self._EMPTY_RESPONSE))
while True:
# We ignore the expected message, as the purpose of this object is not
# to verify the correctness of the communication but to act as a
# partner for the SUT (System Under Test, that is QmpConnection)
msg = conn.recv(4096)
if not msg:
break
if not self.script:
break
response = self.script.pop(0)
if isinstance(response, str):
conn.send(response)
elif isinstance(response, list):
for chunk in response:
conn.send(chunk)
else:
raise errors.ProgrammerError("Unknown response type for %s" % response)
conn.close()
def encode_string(self, message):
return (serializer.DumpJson(message) +
hv_kvm.QmpConnection._MESSAGE_END_TOKEN)
class TestQmpMessage(testutils.GanetiTestCase):
def testSerialization(self):
test_data = {
"execute": "command",
"arguments": ["a", "b", "c"],
}
message = hv_kvm.QmpMessage(test_data)
for k, v in test_data.items():
self.assertEqual(message[k], v)
serialized = str(message)
self.assertEqual(len(serialized.splitlines()), 1,
msg="Got multi-line message")
rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
self.assertEqual(rebuilt_message, message)
self.assertEqual(len(rebuilt_message), len(test_data))
def testDelete(self):
toDelete = "execute"
test_data = {
toDelete: "command",
"arguments": ["a", "b", "c"],
}
message = hv_kvm.QmpMessage(test_data)
oldLen = len(message)
del(message[toDelete])
newLen = len(message)
self.assertEqual(oldLen - 1, newLen)
class TestQmp(testutils.GanetiTestCase):
def testQmp(self):
requests = [
{"execute": "query-kvm", "arguments": []},
{"execute": "eject", "arguments": {"device": "ide1-cd0"}},
{"execute": "query-status", "arguments": []},
{"execute": "query-name", "arguments": []},
]
server_responses = [
# One message, one send()
'{"return": {"enabled": true, "present": true}}\r\n',
# Message sent using multiple send()
['{"retur', 'n": {}}\r\n'],
# Multiple messages sent using one send()
'{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
'{"return": {"running": true, "singlestep": false}}\r\n',
]
expected_responses = [
{"return": {"enabled": True, "present": True}},
{"return": {}},
{"return": [{"name": "quit"}, {"name": "eject"}]},
{"return": {"running": True, "singlestep": False}},
]
# Set up the stub
socket_file = tempfile.NamedTemporaryFile()
os.remove(socket_file.name)
qmp_stub = QmpStub(socket_file.name, server_responses)
qmp_stub.start()
# Set up the QMP connection
qmp_connection = hv_kvm.QmpConnection(socket_file.name)
qmp_connection.connect()
# Format the script
for request, expected_response in zip(requests, expected_responses):
response = qmp_connection.Execute(request)
msg = hv_kvm.QmpMessage(expected_response)
self.assertEqual(len(str(msg).splitlines()), 1,
msg="Got multi-line message")
self.assertEqual(response, msg)
class TestConsole(unittest.TestCase):
def _Test(self, instance, node, hvparams):
cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, node, hvparams, {})
self.assertTrue(cons.Validate())
return cons
def testSerial(self):
instance = objects.Instance(name="kvm.example.com",
primary_node="node6017-uuid")
node = objects.Node(name="node6017", uuid="node6017-uuid")
hvparams = {
constants.HV_SERIAL_CONSOLE: True,
constants.HV_VNC_BIND_ADDRESS: None,
constants.HV_KVM_SPICE_BIND: None,
}
cons = self._Test(instance, node, hvparams)
self.assertEqual(cons.kind, constants.CONS_SSH)
self.assertEqual(cons.host, node.name)
self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
self.assertEqual(cons.command[1], constants.SOCAT_PATH)
def testVnc(self):
instance = objects.Instance(name="kvm.example.com",
primary_node="node7235-uuid",
network_port=constants.VNC_BASE_PORT + 10)
node = objects.Node(name="node7235", uuid="node7235-uuid")
hvparams = {
constants.HV_SERIAL_CONSOLE: False,
constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
constants.HV_KVM_SPICE_BIND: None,
}
cons = self._Test(instance, node, hvparams)
self.assertEqual(cons.kind, constants.CONS_VNC)
self.assertEqual(cons.host, "192.0.2.1")
self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
self.assertEqual(cons.display, 10)
def testSpice(self):
instance = objects.Instance(name="kvm.example.com",
primary_node="node7235",
network_port=11000)
node = objects.Node(name="node7235", uuid="node7235-uuid")
hvparams = {
constants.HV_SERIAL_CONSOLE: False,
constants.HV_VNC_BIND_ADDRESS: None,
constants.HV_KVM_SPICE_BIND: "192.0.2.1",
}
cons = self._Test(instance, node, hvparams)
self.assertEqual(cons.kind, constants.CONS_SPICE)
self.assertEqual(cons.host, "192.0.2.1")
self.assertEqual(cons.port, 11000)
def testNoConsole(self):
instance = objects.Instance(name="kvm.example.com",
primary_node="node24325",
network_port=0)
node = objects.Node(name="node24325", uuid="node24325-uuid")
hvparams = {
constants.HV_SERIAL_CONSOLE: False,
constants.HV_VNC_BIND_ADDRESS: None,
constants.HV_KVM_SPICE_BIND: None,
}
cons = self._Test(instance, node, hvparams)
self.assertEqual(cons.kind, constants.CONS_MESSAGE)
class TestVersionChecking(testutils.GanetiTestCase):
def testParseVersion(self):
parse = hv_kvm.KVMHypervisor._ParseKVMVersion
help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))
class TestSpiceParameterList(unittest.TestCase):
def test(self):
defaults = constants.HVC_DEFAULTS[constants.HT_KVM]
params = \
compat.UniqueFrozenset(getattr(constants, name)
for name in dir(constants)
if name.startswith("HV_KVM_SPICE_"))
# Parameters whose default value evaluates to True and don't need to be set
defaults_true = frozenset(filter(defaults.__getitem__, params))
self.assertEqual(defaults_true, frozenset([
constants.HV_KVM_SPICE_AUDIO_COMPR,
constants.HV_KVM_SPICE_USE_VDAGENT,
constants.HV_KVM_SPICE_TLS_CIPHERS,
]))
# HV_KVM_SPICE_BIND decides whether the other parameters must be set if
# their default evaluates to False
assert constants.HV_KVM_SPICE_BIND in params
assert constants.HV_KVM_SPICE_BIND not in defaults_true
# Exclude some parameters
params -= defaults_true | frozenset([
constants.HV_KVM_SPICE_BIND,
])
self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)
class TestHelpRegexps(testutils.GanetiTestCase):
def testBootRe(self):
"""Check _BOOT_RE
It has too match -drive.*boot=on|off except if there is another dash-option
at the beginning of the line.
"""
boot_re = hv_kvm.KVMHypervisor._BOOT_RE
help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")
self.assertTrue(boot_re.search(help_091))
self.assertTrue(boot_re.search(help_0125))
self.assertFalse(boot_re.search(help_091_fake))
self.assertFalse(boot_re.search(help_112))
self.assertFalse(boot_re.search(help_10))
self.assertFalse(boot_re.search(help_01590))
class TestGetTunFeatures(unittest.TestCase):
def testWrongIoctl(self):
tmpfile = tempfile.NamedTemporaryFile()
# A file does not have the right ioctls, so this must always fail
result = hv_kvm._GetTunFeatures(tmpfile.fileno())
self.assertTrue(result is None)
def _FakeIoctl(self, features, fd, request, buf):
self.assertEqual(request, hv_kvm.TUNGETFEATURES)
(reqno, ) = struct.unpack("I", buf)
self.assertEqual(reqno, 0)
return struct.pack("I", features)
def test(self):
tmpfile = tempfile.NamedTemporaryFile()
fd = tmpfile.fileno()
for features in [0, hv_kvm.IFF_VNET_HDR]:
fn = compat.partial(self._FakeIoctl, features)
result = hv_kvm._GetTunFeatures(fd, _ioctl=fn)
self.assertEqual(result, features)
class TestProbeTapVnetHdr(unittest.TestCase):
def _FakeTunFeatures(self, expected_fd, flags, fd):
self.assertEqual(fd, expected_fd)
return flags
def test(self):
tmpfile = tempfile.NamedTemporaryFile()
fd = tmpfile.fileno()
for flags in [0, hv_kvm.IFF_VNET_HDR]:
fn = compat.partial(self._FakeTunFeatures, fd, flags)
result = hv_kvm._ProbeTapVnetHdr(fd, _features_fn=fn)
if flags == 0:
self.assertFalse(result)
else:
self.assertTrue(result)
def testUnsupported(self):
tmpfile = tempfile.NamedTemporaryFile()
fd = tmpfile.fileno()
self.assertFalse(hv_kvm._ProbeTapVnetHdr(fd, _features_fn=lambda _: None))
if __name__ == "__main__":
testutils.GanetiTestProgram()