blob: 0fcbe460e8441a64a283253c661ed5b235ff2795 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2008, 2009, 2010 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Script for unittesting the workerpool module"""
import unittest
import threading
import time
import sys
import zlib
import random
from ganeti import workerpool
from ganeti import errors
from ganeti import utils
from ganeti import compat
import testutils
class CountingContext(object):
def __init__(self):
self._lock = threading.Condition(threading.Lock())
self.done = 0
def DoneTask(self):
self._lock.acquire()
try:
self.done += 1
finally:
self._lock.release()
def GetDoneTasks(self):
self._lock.acquire()
try:
return self.done
finally:
self._lock.release()
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class CountingBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, text):
ctx.DoneTask()
class ChecksumContext:
CHECKSUM_START = zlib.adler32("")
def __init__(self):
self.lock = threading.Condition(threading.Lock())
self.checksum = self.CHECKSUM_START
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class ChecksumBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, number):
name = "number%s" % number
self.SetTaskName(name)
# This assertion needs to be checked before updating the checksum. A
# failing assertion will then cause the result to be wrong.
assert self.getName() == ("%s/%s" % (self._worker_id, name))
ctx.lock.acquire()
try:
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
finally:
ctx.lock.release()
class ListBuilderContext:
def __init__(self):
self.lock = threading.Lock()
self.result = []
self.prioresult = {}
class ListBuilderWorker(workerpool.BaseWorker):
def RunTask(self, ctx, data):
ctx.lock.acquire()
try:
ctx.result.append((self.GetCurrentPriority(), data))
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
finally:
ctx.lock.release()
class DeferringTaskContext:
def __init__(self):
self.lock = threading.Lock()
self.prioresult = {}
self.samepriodefer = {}
self.num2ordertaskid = {}
class DeferringWorker(workerpool.BaseWorker):
def RunTask(self, ctx, num, targetprio):
ctx.lock.acquire()
try:
otilst = ctx.num2ordertaskid.setdefault(num, [])
otilst.append(self._GetCurrentOrderAndTaskId())
if num in ctx.samepriodefer:
del ctx.samepriodefer[num]
raise workerpool.DeferTask()
if self.GetCurrentPriority() > targetprio:
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
finally:
ctx.lock.release()
class PriorityContext:
def __init__(self):
self.lock = threading.Lock()
self.result = []
class PriorityWorker(workerpool.BaseWorker):
def RunTask(self, ctx, data):
ctx.lock.acquire()
try:
ctx.result.append((self.GetCurrentPriority(), data))
finally:
ctx.lock.release()
class NotImplementedWorker(workerpool.BaseWorker):
def RunTask(self):
raise NotImplementedError
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
def testCounting(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
for i in range(10):
wp.AddTask((ctx, "Hello world %s" % i))
wp.Quiesce()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 10)
def testNoTasks(self):
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
checksum = ChecksumContext.CHECKSUM_START
for i in range(1, 100):
checksum = ChecksumContext.UpdateChecksum(checksum, i)
wp.AddTask((ctx, i))
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testAddManyTasks(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.AddTask((ctx, "Once more, hi!"))
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 22)
def testManyTasksSequence(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self.assertRaises(AssertionError, wp.AddManyTasks,
["Hello world %s" % i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks,
[i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 11)
def _CheckNoTasks(self, wp):
wp._lock.acquire()
try:
# The task queue must be empty now
self.assertFalse(wp._tasks)
self.assertFalse(wp._taskdata)
finally:
wp._lock.release()
def _CheckWorkerCount(self, wp, num_workers):
wp._lock.acquire()
try:
self.assertEqual(len(wp._workers), num_workers)
finally:
wp._lock.release()
def testPriorityChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = i % 7
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append(i)
wp.AddManyTasks(tasks, priority=priorities)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
checksum = ChecksumContext.CHECKSUM_START
for priority in sorted(data.keys()):
for i in data[priority]:
checksum = ChecksumContext.UpdateChecksum(checksum, i)
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListManyTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(0)
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = int(rnd.random() * 10)
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append((prio, i))
wp.AddManyTasks(tasks, priority=priorities)
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], priority=[1] * 5)
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], task_id=[1] * 5)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
expresult = []
for priority in sorted(data.keys()):
expresult.extend(data[priority])
self.assertEqual(expresult, ctx.result)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListSingleTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(26279)
data = {}
for i in range(1, 333):
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i), priority=prio)
data.setdefault(prio, []).append(i)
# Cause some distortion
if i % 11 == 0:
time.sleep(.001)
if i % 41 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testDeferTask(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = DeferringTaskContext()
# Use static seed for this test
rnd = random.Random(14921)
data = {}
num2taskid = {}
for i in range(1, 333):
ctx.lock.acquire()
try:
if i % 5 == 0:
ctx.samepriodefer[i] = True
finally:
ctx.lock.release()
prio = int(rnd.random() * 30)
num2taskid[i] = 1000 * i
wp.AddTask((ctx, i, prio), priority=50,
task_id=num2taskid[i])
data.setdefault(prio, set()).add(i)
# Cause some distortion
if i % 24 == 0:
time.sleep(.001)
if i % 31 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
all_order_ids = []
for (num, numordertaskid) in ctx.num2ordertaskid.items():
order_ids = map(compat.fst, numordertaskid)
self.assertFalse(utils.FindDuplicates(order_ids),
msg="Order ID has been reused")
all_order_ids.extend(order_ids)
for task_id in map(compat.snd, numordertaskid):
self.assertEqual(task_id, num2taskid[num],
msg=("Task %s used different task IDs" % num))
self.assertFalse(utils.FindDuplicates(all_order_ids),
msg="Order ID has been reused")
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
if __name__ == "__main__":
testutils.GanetiTestProgram()