#!/usr/bin/python
#

# Copyright (C) 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Script for testing ganeti.utils.algo"""

import unittest
import random
import operator

from ganeti import constants
from ganeti import compat
from ganeti.utils import algo

import testutils


class TestUniqueSequence(unittest.TestCase):
  """Test case for UniqueSequence"""

  def _test(self, input, expected):
    self.assertEqual(algo.UniqueSequence(input), expected)

  def runTest(self):
    # Ordered input
    self._test([1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
    self._test([1, 2, 2, 3], [1, 2, 3])
    self._test([1, 2, 3, 3], [1, 2, 3])

    # Unordered input
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])

    # Strings
    self._test(["a", "a"], ["a"])
    self._test(["a", "b"], ["a", "b"])
    self._test(["a", "b", "a"], ["a", "b"])


class TestFindDuplicates(unittest.TestCase):
  """Test case for FindDuplicates"""

  def _Test(self, seq, expected):
    result = algo.FindDuplicates(seq)
    self.assertEqual(result, algo.UniqueSequence(result))
    self.assertEqual(set(result), set(expected))

  def test(self):
    self._Test([], [])
    self._Test([1, 2, 3], [])
    self._Test([9, 8, 8, 0, 5, 1, 7, 0, 6, 7], [8, 0, 7])
    for exp in [[1, 2, 3], [3, 2, 1]]:
      self._Test([1, 1, 2, 2, 3, 3], exp)

    self._Test(["A", "a", "B"], [])
    self._Test(["a", "A", "a", "B"], ["a"])
    self._Test("Hello World out there!", ["e", " ", "o", "r", "t", "l"])

    self._Test(self._Gen(False), [])
    self._Test(self._Gen(True), range(1, 10))

  @staticmethod
  def _Gen(dup):
    for i in range(10):
      yield i
      if dup:
        for _ in range(i):
          yield i


class TestNiceSort(unittest.TestCase):
  def test(self):
    self.assertEqual(algo.NiceSort([]), [])
    self.assertEqual(algo.NiceSort(["foo"]), ["foo"])
    self.assertEqual(algo.NiceSort(["bar", ""]), ["", "bar"])
    self.assertEqual(algo.NiceSort([",", "."]), [",", "."])
    self.assertEqual(algo.NiceSort(["0.1", "0.2"]), ["0.1", "0.2"])
    self.assertEqual(algo.NiceSort(["0;099", "0,099", "0.1", "0.2"]),
                     ["0,099", "0.1", "0.2", "0;099"])

    data = ["a0", "a1", "a99", "a20", "a2", "b10", "b70", "b00", "0000"]
    self.assertEqual(algo.NiceSort(data),
                     ["0000", "a0", "a1", "a2", "a20", "a99",
                      "b00", "b10", "b70"])

    data = ["a0-0", "a1-0", "a99-10", "a20-3", "a0-4", "a99-3", "a09-2",
            "Z", "a9-1", "A", "b"]
    self.assertEqual(algo.NiceSort(data),
                     ["A", "Z", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
                      "a20-3", "a99-3", "a99-10", "b"])
    self.assertEqual(algo.NiceSort(data, key=str.lower),
                     ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
                      "a20-3", "a99-3", "a99-10", "b", "Z"])
    self.assertEqual(algo.NiceSort(data, key=str.upper),
                     ["A", "a0-0", "a0-4", "a1-0", "a9-1", "a09-2",
                      "a20-3", "a99-3", "a99-10", "b", "Z"])

  def testLargeA(self):
    data = [
      "Eegah9ei", "xij88brTulHYAv8IEOyU", "3jTwJPtrXOY22bwL2YoW",
      "Z8Ljf1Pf5eBfNg171wJR", "WvNJd91OoXvLzdEiEXa6", "uHXAyYYftCSG1o7qcCqe",
      "xpIUJeVT1Rp", "KOt7vn1dWXi", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
      "cPRi0lM7HLnSuWA2G9", "KVQqLPDjcPjf8T3oyzjcOsfkb",
      "guKJkXnkULealVC8CyF1xefym", "pqF8dkU5B1cMnyZuREaSOADYx",
      ]
    self.assertEqual(algo.NiceSort(data), [
      "3jTwJPtrXOY22bwL2YoW", "Eegah9ei", "KOt7vn1dWXi",
      "KVQqLPDjcPjf8T3oyzjcOsfkb", "WvNJd91OoXvLzdEiEXa6",
      "Z8Ljf1Pf5eBfNg171wJR", "a07h8feON165N67PIE", "bH4Q7aCu3PUPjK3JtH",
      "cPRi0lM7HLnSuWA2G9", "guKJkXnkULealVC8CyF1xefym",
      "pqF8dkU5B1cMnyZuREaSOADYx", "uHXAyYYftCSG1o7qcCqe",
      "xij88brTulHYAv8IEOyU", "xpIUJeVT1Rp"
      ])

  def testLargeB(self):
    data = [
      "inst-0.0.0.0-0.0.0.0",
      "inst-0.1.0.0-0.0.0.0",
      "inst-0.2.0.0-0.0.0.0",
      "inst-0.2.1.0-0.0.0.0",
      "inst-0.2.2.0-0.0.0.0",
      "inst-0.2.2.0-0.0.0.9",
      "inst-0.2.2.0-0.0.3.9",
      "inst-0.2.2.0-0.2.0.9",
      "inst-0.2.2.0-0.9.0.9",
      "inst-0.20.2.0-0.0.0.0",
      "inst-0.20.2.0-0.9.0.9",
      "inst-10.020.2.0-0.9.0.10",
      "inst-15.020.2.0-0.9.1.00",
      "inst-100.020.2.0-0.9.0.9",

      # Only the last group, not converted to a number anymore, differs
      "inst-100.020.2.0a999",
      "inst-100.020.2.0b000",
      "inst-100.020.2.0c10",
      "inst-100.020.2.0c101",
      "inst-100.020.2.0c2",
      "inst-100.020.2.0c20",
      "inst-100.020.2.0c3",
      "inst-100.020.2.0c39123",
      ]

    rnd = random.Random(16205)
    for _ in range(10):
      testdata = data[:]
      rnd.shuffle(testdata)
      assert testdata != data
      self.assertEqual(algo.NiceSort(testdata), data)

  class _CallCount:
    def __init__(self, fn):
      self.count = 0
      self.fn = fn

    def __call__(self, *args):
      self.count += 1
      return self.fn(*args)

  def testKeyfuncA(self):
    # Generate some random numbers
    rnd = random.Random(21131)
    numbers = [rnd.randint(0, 10000) for _ in range(999)]
    assert numbers != sorted(numbers)

    # Convert to hex
    data = [hex(i) for i in numbers]
    datacopy = data[:]

    keyfn = self._CallCount(lambda value: str(int(value, 16)))

    # Sort with key function converting hex to decimal
    result = algo.NiceSort(data, key=keyfn)

    self.assertEqual([hex(i) for i in sorted(numbers)], result)
    self.assertEqual(data, datacopy, msg="Input data was modified in NiceSort")
    self.assertEqual(keyfn.count, len(numbers),
                     msg="Key function was not called once per value")

  class _TestData:
    def __init__(self, name, value):
      self.name = name
      self.value = value

  def testKeyfuncB(self):
    rnd = random.Random(27396)
    data = []
    for i in range(123):
      v1 = rnd.randint(0, 5)
      v2 = rnd.randint(0, 5)
      data.append(self._TestData("inst-%s-%s-%s" % (v1, v2, i),
                                 (v1, v2, i)))
    rnd.shuffle(data)
    assert data != sorted(data, key=operator.attrgetter("name"))

    keyfn = self._CallCount(operator.attrgetter("name"))

    # Sort by name
    result = algo.NiceSort(data, key=keyfn)

    self.assertEqual(result, sorted(data, key=operator.attrgetter("value")))
    self.assertEqual(keyfn.count, len(data),
                     msg="Key function was not called once per value")

  def testNiceSortKey(self):
    self.assertEqual(algo.NiceSortKey(""),
                     ([None] * algo._SORTER_GROUPS) + [""])
    self.assertEqual(algo.NiceSortKey("Hello World"),
                     ["Hello World"] +
                     ([None] * int(algo._SORTER_GROUPS - 1)) + [""])
    self.assertEqual(algo.NiceSortKey("node1.net75.bld3.example.com"),
                     ["node", 1, ".net", 75, ".bld", 3, ".example.com",
                      None, ""])


class TestInvertDict(unittest.TestCase):
  def testInvertDict(self):
    test_dict = { "foo": 1, "bar": 2, "baz": 5 }
    self.assertEqual(algo.InvertDict(test_dict),
                     { 1: "foo", 2: "bar", 5: "baz"})


class TestInsertAtPos(unittest.TestCase):
  def test(self):
    a = [1, 5, 6]
    b = [2, 3, 4]
    self.assertEqual(algo.InsertAtPos(a, 1, b), [1, 2, 3, 4, 5, 6])
    self.assertEqual(algo.InsertAtPos(a, 0, b), b + a)
    self.assertEqual(algo.InsertAtPos(a, len(a), b), a + b)
    self.assertEqual(algo.InsertAtPos(a, 2, b), [1, 5, 2, 3, 4, 6])


class TimeMock:
  def __init__(self, values):
    self.values = values

  def __call__(self):
    return self.values.pop(0)


class TestRunningTimeout(unittest.TestCase):
  def setUp(self):
    self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])

  def testRemainingFloat(self):
    timeout = algo.RunningTimeout(5.0, True, _time_fn=self.time_fn)
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
    self.assertAlmostEqual(timeout.Remaining(), -1.5)

  def testRemaining(self):
    self.time_fn = TimeMock([0, 2, 4, 5, 6])
    timeout = algo.RunningTimeout(5, True, _time_fn=self.time_fn)
    self.assertEqual(timeout.Remaining(), 3)
    self.assertEqual(timeout.Remaining(), 1)
    self.assertEqual(timeout.Remaining(), 0)
    self.assertEqual(timeout.Remaining(), -1)

  def testRemainingNonNegative(self):
    timeout = algo.RunningTimeout(5.0, False, _time_fn=self.time_fn)
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
    self.assertEqual(timeout.Remaining(), 0.0)

  def testNegativeTimeout(self):
    self.assertRaises(ValueError, algo.RunningTimeout, -1.0, True)


class TestJoinDisjointDicts(unittest.TestCase):
  def setUp(self):
    self.non_empty_dict = {"a": 1, "b": 2}
    self.empty_dict = {}

  def testWithEmptyDicts(self):
    self.assertEqual(self.empty_dict, algo.JoinDisjointDicts(self.empty_dict,
      self.empty_dict))
    self.assertEqual(self.non_empty_dict, algo.JoinDisjointDicts(
      self.empty_dict, self.non_empty_dict))
    self.assertEqual(self.non_empty_dict, algo.JoinDisjointDicts(
      self.non_empty_dict, self.empty_dict))

  def testNonDisjoint(self):
    self.assertRaises(AssertionError, algo.JoinDisjointDicts,
      self.non_empty_dict, self.non_empty_dict)

  def testCommonCase(self):
    dict_a = {"TEST1": 1, "TEST2": 2}
    dict_b = {"TEST3": 3, "TEST4": 4}

    result = dict_a.copy()
    result.update(dict_b)

    self.assertEqual(result, algo.JoinDisjointDicts(dict_a, dict_b))
    self.assertEqual(result, algo.JoinDisjointDicts(dict_b, dict_a))


class TestSequenceToDict(unittest.TestCase):
  def testEmpty(self):
    self.assertEqual(algo.SequenceToDict([]), {})
    self.assertEqual(algo.SequenceToDict({}), {})

  def testSimple(self):
    data = [(i, str(i), "test%s" % i) for i in range(391)]
    self.assertEqual(algo.SequenceToDict(data),
      dict((i, (i, str(i), "test%s" % i))
           for i in range(391)))

  def testCustomKey(self):
    data = [(i, hex(i), "test%s" % i) for i in range(100)]
    self.assertEqual(algo.SequenceToDict(data, key=compat.snd),
      dict((hex(i), (i, hex(i), "test%s" % i))
           for i in range(100)))
    self.assertEqual(algo.SequenceToDict(data,
                                         key=lambda (a, b, val): hash(val)),
      dict((hash("test%s" % i), (i, hex(i), "test%s" % i))
           for i in range(100)))

  def testDuplicate(self):
    self.assertRaises(ValueError, algo.SequenceToDict,
                      [(0, 0), (0, 0)])
    self.assertRaises(ValueError, algo.SequenceToDict,
                      [(i, ) for i in range(200)] + [(10, )])


class TestFlatToDict(unittest.TestCase):
  def testNormal(self):
    data = [
      ("lv/xenvg", {"foo": "bar", "bar": "baz"}),
      ("lv/xenfoo", {"foo": "bar", "baz": "blubb"}),
      ("san/foo", {"ip": "127.0.0.1", "port": 1337}),
      ("san/blubb/blibb", 54),
      ]
    reference = {
      "lv": {
        "xenvg": {"foo": "bar", "bar": "baz"},
        "xenfoo": {"foo": "bar", "baz": "blubb"},
        },
      "san": {
        "foo": {"ip": "127.0.0.1", "port": 1337},
        "blubb": {"blibb": 54},
        },
      }
    self.assertEqual(algo.FlatToDict(data), reference)

  def testUnlikeDepth(self):
    data = [
      ("san/foo", {"ip": "127.0.0.1", "port": 1337}),
      ("san/foo/blubb", 23), # Another foo entry under san
      ("san/blubb/blibb", 54),
      ]
    self.assertRaises(AssertionError, algo.FlatToDict, data)


if __name__ == "__main__":
  testutils.GanetiTestProgram()
