#!/usr/bin/python
#

# Copyright (C) 2008, 2009, 2010 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


"""Script for unittesting the workerpool module"""

import unittest
import threading
import time
import sys
import zlib
import random

from ganeti import workerpool
from ganeti import errors
from ganeti import utils
from ganeti import compat

import testutils


class CountingContext(object):
  def __init__(self):
    self._lock = threading.Condition(threading.Lock())
    self.done = 0

  def DoneTask(self):
    self._lock.acquire()
    try:
      self.done += 1
    finally:
      self._lock.release()

  def GetDoneTasks(self):
    self._lock.acquire()
    try:
      return self.done
    finally:
      self._lock.release()

  @staticmethod
  def UpdateChecksum(current, value):
    return zlib.adler32(str(value), current)


class CountingBaseWorker(workerpool.BaseWorker):
  def RunTask(self, ctx, text):
    ctx.DoneTask()


class ChecksumContext:
  CHECKSUM_START = zlib.adler32("")

  def __init__(self):
    self.lock = threading.Condition(threading.Lock())
    self.checksum = self.CHECKSUM_START

  @staticmethod
  def UpdateChecksum(current, value):
    return zlib.adler32(str(value), current)


class ChecksumBaseWorker(workerpool.BaseWorker):
  def RunTask(self, ctx, number):
    name = "number%s" % number
    self.SetTaskName(name)

    # This assertion needs to be checked before updating the checksum. A
    # failing assertion will then cause the result to be wrong.
    assert self.getName() == ("%s/%s" % (self._worker_id, name))

    ctx.lock.acquire()
    try:
      ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
    finally:
      ctx.lock.release()


class ListBuilderContext:
  def __init__(self):
    self.lock = threading.Lock()
    self.result = []
    self.prioresult = {}


class ListBuilderWorker(workerpool.BaseWorker):
  def RunTask(self, ctx, data):
    ctx.lock.acquire()
    try:
      ctx.result.append((self.GetCurrentPriority(), data))
      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
    finally:
      ctx.lock.release()


class DeferringTaskContext:
  def __init__(self):
    self.lock = threading.Lock()
    self.prioresult = {}
    self.samepriodefer = {}
    self.num2ordertaskid = {}


class DeferringWorker(workerpool.BaseWorker):
  def RunTask(self, ctx, num, targetprio):
    ctx.lock.acquire()
    try:
      otilst = ctx.num2ordertaskid.setdefault(num, [])
      otilst.append(self._GetCurrentOrderAndTaskId())

      if num in ctx.samepriodefer:
        del ctx.samepriodefer[num]
        raise workerpool.DeferTask()

      if self.GetCurrentPriority() > targetprio:
        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)

      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
    finally:
      ctx.lock.release()


class PriorityContext:
  def __init__(self):
    self.lock = threading.Lock()
    self.result = []


class PriorityWorker(workerpool.BaseWorker):
  def RunTask(self, ctx, data):
    ctx.lock.acquire()
    try:
      ctx.result.append((self.GetCurrentPriority(), data))
    finally:
      ctx.lock.release()


class NotImplementedWorker(workerpool.BaseWorker):
  def RunTask(self):
    raise NotImplementedError


class TestWorkerpool(unittest.TestCase):
  """Workerpool tests"""

  def testCounting(self):
    ctx = CountingContext()
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
    try:
      self._CheckWorkerCount(wp, 3)

      for i in range(10):
        wp.AddTask((ctx, "Hello world %s" % i))

      wp.Quiesce()
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

    self.assertEquals(ctx.GetDoneTasks(), 10)

  def testNoTasks(self):
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
    try:
      self._CheckWorkerCount(wp, 3)
      self._CheckNoTasks(wp)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testNoTasksQuiesce(self):
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
    try:
      self._CheckWorkerCount(wp, 3)
      self._CheckNoTasks(wp)
      wp.Quiesce()
      self._CheckNoTasks(wp)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testChecksum(self):
    # Tests whether all tasks are run and, since we're only using a single
    # thread, whether everything is started in order.
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
    try:
      self._CheckWorkerCount(wp, 1)

      ctx = ChecksumContext()
      checksum = ChecksumContext.CHECKSUM_START
      for i in range(1, 100):
        checksum = ChecksumContext.UpdateChecksum(checksum, i)
        wp.AddTask((ctx, i))

      wp.Quiesce()

      self._CheckNoTasks(wp)

      # Check sum
      ctx.lock.acquire()
      try:
        self.assertEqual(checksum, ctx.checksum)
      finally:
        ctx.lock.release()
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testAddManyTasks(self):
    ctx = CountingContext()
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
    try:
      self._CheckWorkerCount(wp, 3)

      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
      wp.AddTask((ctx, "A separate hello"))
      wp.AddTask((ctx, "Once more, hi!"))
      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])

      wp.Quiesce()

      self._CheckNoTasks(wp)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

    self.assertEquals(ctx.GetDoneTasks(), 22)

  def testManyTasksSequence(self):
    ctx = CountingContext()
    wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
    try:
      self._CheckWorkerCount(wp, 3)
      self.assertRaises(AssertionError, wp.AddManyTasks,
                        ["Hello world %s" % i for i in range(10)])
      self.assertRaises(AssertionError, wp.AddManyTasks,
                        [i for i in range(10)])
      self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)

      wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
      wp.AddTask((ctx, "A separate hello"))

      wp.Quiesce()

      self._CheckNoTasks(wp)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

    self.assertEquals(ctx.GetDoneTasks(), 11)

  def _CheckNoTasks(self, wp):
    wp._lock.acquire()
    try:
      # The task queue must be empty now
      self.assertFalse(wp._tasks)
      self.assertFalse(wp._taskdata)
    finally:
      wp._lock.release()

  def _CheckWorkerCount(self, wp, num_workers):
    wp._lock.acquire()
    try:
      self.assertEqual(len(wp._workers), num_workers)
    finally:
      wp._lock.release()

  def testPriorityChecksum(self):
    # Tests whether all tasks are run and, since we're only using a single
    # thread, whether everything is started in order and respects the priority
    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
    try:
      self._CheckWorkerCount(wp, 1)

      ctx = ChecksumContext()

      data = {}
      tasks = []
      priorities = []
      for i in range(1, 333):
        prio = i % 7
        tasks.append((ctx, i))
        priorities.append(prio)
        data.setdefault(prio, []).append(i)

      wp.AddManyTasks(tasks, priority=priorities)

      wp.Quiesce()

      self._CheckNoTasks(wp)

      # Check sum
      ctx.lock.acquire()
      try:
        checksum = ChecksumContext.CHECKSUM_START
        for priority in sorted(data.keys()):
          for i in data[priority]:
            checksum = ChecksumContext.UpdateChecksum(checksum, i)

        self.assertEqual(checksum, ctx.checksum)
      finally:
        ctx.lock.release()

      self._CheckWorkerCount(wp, 1)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testPriorityListManyTasks(self):
    # Tests whether all tasks are run and, since we're only using a single
    # thread, whether everything is started in order and respects the priority
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
    try:
      self._CheckWorkerCount(wp, 1)

      ctx = ListBuilderContext()

      # Use static seed for this test
      rnd = random.Random(0)

      data = {}
      tasks = []
      priorities = []
      for i in range(1, 333):
        prio = int(rnd.random() * 10)
        tasks.append((ctx, i))
        priorities.append(prio)
        data.setdefault(prio, []).append((prio, i))

      wp.AddManyTasks(tasks, priority=priorities)

      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
                        [("x", ), ("y", )], priority=[1] * 5)
      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
                        [("x", ), ("y", )], task_id=[1] * 5)

      wp.Quiesce()

      self._CheckNoTasks(wp)

      # Check result
      ctx.lock.acquire()
      try:
        expresult = []
        for priority in sorted(data.keys()):
          expresult.extend(data[priority])

        self.assertEqual(expresult, ctx.result)
      finally:
        ctx.lock.release()

      self._CheckWorkerCount(wp, 1)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testPriorityListSingleTasks(self):
    # Tests whether all tasks are run and, since we're only using a single
    # thread, whether everything is started in order and respects the priority
    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
    try:
      self._CheckWorkerCount(wp, 1)

      ctx = ListBuilderContext()

      # Use static seed for this test
      rnd = random.Random(26279)

      data = {}
      for i in range(1, 333):
        prio = int(rnd.random() * 30)
        wp.AddTask((ctx, i), priority=prio)
        data.setdefault(prio, []).append(i)

        # Cause some distortion
        if i % 11 == 0:
          time.sleep(.001)
        if i % 41 == 0:
          wp.Quiesce()

      wp.Quiesce()

      self._CheckNoTasks(wp)

      # Check result
      ctx.lock.acquire()
      try:
        self.assertEqual(data, ctx.prioresult)
      finally:
        ctx.lock.release()

      self._CheckWorkerCount(wp, 1)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)

  def testDeferTask(self):
    # Tests whether all tasks are run and, since we're only using a single
    # thread, whether everything is started in order and respects the priority
    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
    try:
      self._CheckWorkerCount(wp, 1)

      ctx = DeferringTaskContext()

      # Use static seed for this test
      rnd = random.Random(14921)

      data = {}
      num2taskid = {}
      for i in range(1, 333):
        ctx.lock.acquire()
        try:
          if i % 5 == 0:
            ctx.samepriodefer[i] = True
        finally:
          ctx.lock.release()

        prio = int(rnd.random() * 30)
        num2taskid[i] = 1000 * i
        wp.AddTask((ctx, i, prio), priority=50,
                   task_id=num2taskid[i])
        data.setdefault(prio, set()).add(i)

        # Cause some distortion
        if i % 24 == 0:
          time.sleep(.001)
        if i % 31 == 0:
          wp.Quiesce()

      wp.Quiesce()

      self._CheckNoTasks(wp)

      # Check result
      ctx.lock.acquire()
      try:
        self.assertEqual(data, ctx.prioresult)

        all_order_ids = []

        for (num, numordertaskid) in ctx.num2ordertaskid.items():
          order_ids = map(compat.fst, numordertaskid)
          self.assertFalse(utils.FindDuplicates(order_ids),
                           msg="Order ID has been reused")
          all_order_ids.extend(order_ids)

          for task_id in map(compat.snd, numordertaskid):
            self.assertEqual(task_id, num2taskid[num],
                             msg=("Task %s used different task IDs" % num))

        self.assertFalse(utils.FindDuplicates(all_order_ids),
                         msg="Order ID has been reused")
      finally:
        ctx.lock.release()

      self._CheckWorkerCount(wp, 1)
    finally:
      wp.TerminateWorkers()
      self._CheckWorkerCount(wp, 0)


if __name__ == "__main__":
  testutils.GanetiTestProgram()
