#!/usr/bin/python
#

# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for testing the hypervisor.hv_kvm module"""

import threading
import tempfile
import unittest
import socket
import os
import struct

from ganeti import serializer
from ganeti import constants
from ganeti import compat
from ganeti import objects
from ganeti import errors
from ganeti import utils
from ganeti import pathutils

from ganeti.hypervisor import hv_kvm

import testutils


class QmpStub(threading.Thread):
  """Stub for a QMP endpoint for a KVM instance

  """
  _QMP_BANNER_DATA = {
    "QMP": {
      "version": {
        "package": "",
        "qemu": {
          "micro": 50,
          "minor": 13,
          "major": 0,
          },
        "capabilities": [],
        },
      }
    }
  _EMPTY_RESPONSE = {
    "return": [],
    }

  def __init__(self, socket_filename, server_responses):
    """Creates a QMP stub

    @type socket_filename: string
    @param socket_filename: filename of the UNIX socket that will be created
                            this class and used for the communication
    @type server_responses: list
    @param server_responses: list of responses that the server sends in response
                             to whatever it receives
    """
    threading.Thread.__init__(self)
    self.socket_filename = socket_filename
    self.script = server_responses

    self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    self.socket.bind(self.socket_filename)
    self.socket.listen(1)

  def run(self):
    # Hypothesis: the messages we receive contain only a complete QMP message
    # encoded in JSON.
    conn, addr = self.socket.accept()

    # Send the banner as the first thing
    conn.send(self.encode_string(self._QMP_BANNER_DATA))

    # Expect qmp_capabilities and return an empty response
    conn.recv(4096)
    conn.send(self.encode_string(self._EMPTY_RESPONSE))

    while True:
      # We ignore the expected message, as the purpose of this object is not
      # to verify the correctness of the communication but to act as a
      # partner for the SUT (System Under Test, that is QmpConnection)
      msg = conn.recv(4096)
      if not msg:
        break

      if not self.script:
        break
      response = self.script.pop(0)
      if isinstance(response, str):
        conn.send(response)
      elif isinstance(response, list):
        for chunk in response:
          conn.send(chunk)
      else:
        raise errors.ProgrammerError("Unknown response type for %s" % response)

    conn.close()

  def encode_string(self, message):
    return (serializer.DumpJson(message) +
            hv_kvm.QmpConnection._MESSAGE_END_TOKEN)


class TestQmpMessage(testutils.GanetiTestCase):
  def testSerialization(self):
    test_data = {
      "execute": "command",
      "arguments": ["a", "b", "c"],
      }
    message = hv_kvm.QmpMessage(test_data)

    for k, v in test_data.items():
      self.assertEqual(message[k], v)

    serialized = str(message)
    self.assertEqual(len(serialized.splitlines()), 1,
                     msg="Got multi-line message")

    rebuilt_message = hv_kvm.QmpMessage.BuildFromJsonString(serialized)
    self.assertEqual(rebuilt_message, message)
    self.assertEqual(len(rebuilt_message), len(test_data))

  def testDelete(self):
    toDelete = "execute"
    test_data = {
      toDelete: "command",
      "arguments": ["a", "b", "c"],
      }
    message = hv_kvm.QmpMessage(test_data)

    oldLen = len(message)
    del(message[toDelete])
    newLen = len(message)
    self.assertEqual(oldLen - 1, newLen)


class TestQmp(testutils.GanetiTestCase):
  def testQmp(self):
    requests = [
      {"execute": "query-kvm", "arguments": []},
      {"execute": "eject", "arguments": {"device": "ide1-cd0"}},
      {"execute": "query-status", "arguments": []},
      {"execute": "query-name", "arguments": []},
      ]

    server_responses = [
      # One message, one send()
      '{"return": {"enabled": true, "present": true}}\r\n',

      # Message sent using multiple send()
      ['{"retur', 'n": {}}\r\n'],

      # Multiple messages sent using one send()
      '{"return": [{"name": "quit"}, {"name": "eject"}]}\r\n'
      '{"return": {"running": true, "singlestep": false}}\r\n',
      ]

    expected_responses = [
      {"return": {"enabled": True, "present": True}},
      {"return": {}},
      {"return": [{"name": "quit"}, {"name": "eject"}]},
      {"return": {"running": True, "singlestep": False}},
      ]

    # Set up the stub
    socket_file = tempfile.NamedTemporaryFile()
    os.remove(socket_file.name)
    qmp_stub = QmpStub(socket_file.name, server_responses)
    qmp_stub.start()

    # Set up the QMP connection
    qmp_connection = hv_kvm.QmpConnection(socket_file.name)
    qmp_connection.connect()

    # Format the script
    for request, expected_response in zip(requests, expected_responses):
      response = qmp_connection.Execute(request)
      msg = hv_kvm.QmpMessage(expected_response)
      self.assertEqual(len(str(msg).splitlines()), 1,
                       msg="Got multi-line message")
      self.assertEqual(response, msg)


class TestConsole(unittest.TestCase):
  def _Test(self, instance, node, hvparams):
    cons = hv_kvm.KVMHypervisor.GetInstanceConsole(instance, node, hvparams, {})
    self.assertTrue(cons.Validate())
    return cons

  def testSerial(self):
    instance = objects.Instance(name="kvm.example.com",
                                primary_node="node6017-uuid")
    node = objects.Node(name="node6017", uuid="node6017-uuid")
    hvparams = {
      constants.HV_SERIAL_CONSOLE: True,
      constants.HV_VNC_BIND_ADDRESS: None,
      constants.HV_KVM_SPICE_BIND: None,
      }
    cons = self._Test(instance, node, hvparams)
    self.assertEqual(cons.kind, constants.CONS_SSH)
    self.assertEqual(cons.host, node.name)
    self.assertEqual(cons.command[0], pathutils.KVM_CONSOLE_WRAPPER)
    self.assertEqual(cons.command[1], constants.SOCAT_PATH)

  def testVnc(self):
    instance = objects.Instance(name="kvm.example.com",
                                primary_node="node7235-uuid",
                                network_port=constants.VNC_BASE_PORT + 10)
    node = objects.Node(name="node7235", uuid="node7235-uuid")
    hvparams = {
      constants.HV_SERIAL_CONSOLE: False,
      constants.HV_VNC_BIND_ADDRESS: "192.0.2.1",
      constants.HV_KVM_SPICE_BIND: None,
      }
    cons = self._Test(instance, node, hvparams)
    self.assertEqual(cons.kind, constants.CONS_VNC)
    self.assertEqual(cons.host, "192.0.2.1")
    self.assertEqual(cons.port, constants.VNC_BASE_PORT + 10)
    self.assertEqual(cons.display, 10)

  def testSpice(self):
    instance = objects.Instance(name="kvm.example.com",
                                primary_node="node7235",
                                network_port=11000)
    node = objects.Node(name="node7235", uuid="node7235-uuid")
    hvparams = {
      constants.HV_SERIAL_CONSOLE: False,
      constants.HV_VNC_BIND_ADDRESS: None,
      constants.HV_KVM_SPICE_BIND: "192.0.2.1",
      }
    cons = self._Test(instance, node, hvparams)
    self.assertEqual(cons.kind, constants.CONS_SPICE)
    self.assertEqual(cons.host, "192.0.2.1")
    self.assertEqual(cons.port, 11000)

  def testNoConsole(self):
    instance = objects.Instance(name="kvm.example.com",
                                primary_node="node24325",
                                network_port=0)
    node = objects.Node(name="node24325", uuid="node24325-uuid")
    hvparams = {
      constants.HV_SERIAL_CONSOLE: False,
      constants.HV_VNC_BIND_ADDRESS: None,
      constants.HV_KVM_SPICE_BIND: None,
      }
    cons = self._Test(instance, node, hvparams)
    self.assertEqual(cons.kind, constants.CONS_MESSAGE)


class TestVersionChecking(testutils.GanetiTestCase):
  def testParseVersion(self):
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
    self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
    self.assertEqual(parse(help_0125), ("0.12.5", 0, 12, 5))
    self.assertEqual(parse(help_091), ("0.9.1", 0, 9, 1))


class TestSpiceParameterList(unittest.TestCase):
  def test(self):
    defaults = constants.HVC_DEFAULTS[constants.HT_KVM]

    params = \
      compat.UniqueFrozenset(getattr(constants, name)
                             for name in dir(constants)
                             if name.startswith("HV_KVM_SPICE_"))

    # Parameters whose default value evaluates to True and don't need to be set
    defaults_true = frozenset(filter(defaults.__getitem__, params))

    self.assertEqual(defaults_true, frozenset([
      constants.HV_KVM_SPICE_AUDIO_COMPR,
      constants.HV_KVM_SPICE_USE_VDAGENT,
      constants.HV_KVM_SPICE_TLS_CIPHERS,
      ]))

    # HV_KVM_SPICE_BIND decides whether the other parameters must be set if
    # their default evaluates to False
    assert constants.HV_KVM_SPICE_BIND in params
    assert constants.HV_KVM_SPICE_BIND not in defaults_true

    # Exclude some parameters
    params -= defaults_true | frozenset([
      constants.HV_KVM_SPICE_BIND,
      ])

    self.assertEqual(hv_kvm._SPICE_ADDITIONAL_PARAMS, params)


class TestHelpRegexps(testutils.GanetiTestCase):
  def testBootRe(self):
    """Check _BOOT_RE

    It has too match -drive.*boot=on|off except if there is another dash-option
    at the beginning of the line.

    """
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
    help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")

    self.assertTrue(boot_re.search(help_091))
    self.assertTrue(boot_re.search(help_0125))
    self.assertFalse(boot_re.search(help_091_fake))
    self.assertFalse(boot_re.search(help_112))
    self.assertFalse(boot_re.search(help_10))
    self.assertFalse(boot_re.search(help_01590))


class TestGetTunFeatures(unittest.TestCase):
  def testWrongIoctl(self):
    tmpfile = tempfile.NamedTemporaryFile()
    # A file does not have the right ioctls, so this must always fail
    result = hv_kvm._GetTunFeatures(tmpfile.fileno())
    self.assertTrue(result is None)

  def _FakeIoctl(self, features, fd, request, buf):
    self.assertEqual(request, hv_kvm.TUNGETFEATURES)

    (reqno, ) = struct.unpack("I", buf)
    self.assertEqual(reqno, 0)

    return struct.pack("I", features)

  def test(self):
    tmpfile = tempfile.NamedTemporaryFile()
    fd = tmpfile.fileno()

    for features in [0, hv_kvm.IFF_VNET_HDR]:
      fn = compat.partial(self._FakeIoctl, features)
      result = hv_kvm._GetTunFeatures(fd, _ioctl=fn)
      self.assertEqual(result, features)


class TestProbeTapVnetHdr(unittest.TestCase):
  def _FakeTunFeatures(self, expected_fd, flags, fd):
    self.assertEqual(fd, expected_fd)
    return flags

  def test(self):
    tmpfile = tempfile.NamedTemporaryFile()
    fd = tmpfile.fileno()

    for flags in [0, hv_kvm.IFF_VNET_HDR]:
      fn = compat.partial(self._FakeTunFeatures, fd, flags)

      result = hv_kvm._ProbeTapVnetHdr(fd, _features_fn=fn)
      if flags == 0:
        self.assertFalse(result)
      else:
        self.assertTrue(result)

  def testUnsupported(self):
    tmpfile = tempfile.NamedTemporaryFile()
    fd = tmpfile.fileno()

    self.assertFalse(hv_kvm._ProbeTapVnetHdr(fd, _features_fn=lambda _: None))


if __name__ == "__main__":
  testutils.GanetiTestProgram()
