| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2008, 2009, 2010 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Script for unittesting the workerpool module""" |
| |
| import unittest |
| import threading |
| import time |
| import sys |
| import zlib |
| import random |
| |
| from ganeti import workerpool |
| from ganeti import errors |
| from ganeti import utils |
| from ganeti import compat |
| |
| import testutils |
| |
| |
| class CountingContext(object): |
| def __init__(self): |
| self._lock = threading.Condition(threading.Lock()) |
| self.done = 0 |
| |
| def DoneTask(self): |
| self._lock.acquire() |
| try: |
| self.done += 1 |
| finally: |
| self._lock.release() |
| |
| def GetDoneTasks(self): |
| self._lock.acquire() |
| try: |
| return self.done |
| finally: |
| self._lock.release() |
| |
| @staticmethod |
| def UpdateChecksum(current, value): |
| return zlib.adler32(str(value), current) |
| |
| |
| class CountingBaseWorker(workerpool.BaseWorker): |
| def RunTask(self, ctx, text): |
| ctx.DoneTask() |
| |
| |
| class ChecksumContext: |
| CHECKSUM_START = zlib.adler32("") |
| |
| def __init__(self): |
| self.lock = threading.Condition(threading.Lock()) |
| self.checksum = self.CHECKSUM_START |
| |
| @staticmethod |
| def UpdateChecksum(current, value): |
| return zlib.adler32(str(value), current) |
| |
| |
| class ChecksumBaseWorker(workerpool.BaseWorker): |
| def RunTask(self, ctx, number): |
| name = "number%s" % number |
| self.SetTaskName(name) |
| |
| # This assertion needs to be checked before updating the checksum. A |
| # failing assertion will then cause the result to be wrong. |
| assert self.getName() == ("%s/%s" % (self._worker_id, name)) |
| |
| ctx.lock.acquire() |
| try: |
| ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number) |
| finally: |
| ctx.lock.release() |
| |
| |
| class ListBuilderContext: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.result = [] |
| self.prioresult = {} |
| |
| |
| class ListBuilderWorker(workerpool.BaseWorker): |
| def RunTask(self, ctx, data): |
| ctx.lock.acquire() |
| try: |
| ctx.result.append((self.GetCurrentPriority(), data)) |
| ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data) |
| finally: |
| ctx.lock.release() |
| |
| |
| class DeferringTaskContext: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.prioresult = {} |
| self.samepriodefer = {} |
| self.num2ordertaskid = {} |
| |
| |
| class DeferringWorker(workerpool.BaseWorker): |
| def RunTask(self, ctx, num, targetprio): |
| ctx.lock.acquire() |
| try: |
| otilst = ctx.num2ordertaskid.setdefault(num, []) |
| otilst.append(self._GetCurrentOrderAndTaskId()) |
| |
| if num in ctx.samepriodefer: |
| del ctx.samepriodefer[num] |
| raise workerpool.DeferTask() |
| |
| if self.GetCurrentPriority() > targetprio: |
| raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1) |
| |
| ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num) |
| finally: |
| ctx.lock.release() |
| |
| |
| class PriorityContext: |
| def __init__(self): |
| self.lock = threading.Lock() |
| self.result = [] |
| |
| |
| class PriorityWorker(workerpool.BaseWorker): |
| def RunTask(self, ctx, data): |
| ctx.lock.acquire() |
| try: |
| ctx.result.append((self.GetCurrentPriority(), data)) |
| finally: |
| ctx.lock.release() |
| |
| |
| class NotImplementedWorker(workerpool.BaseWorker): |
| def RunTask(self): |
| raise NotImplementedError |
| |
| |
| class TestWorkerpool(unittest.TestCase): |
| """Workerpool tests""" |
| |
| def testCounting(self): |
| ctx = CountingContext() |
| wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 3) |
| |
| for i in range(10): |
| wp.AddTask((ctx, "Hello world %s" % i)) |
| |
| wp.Quiesce() |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| self.assertEquals(ctx.GetDoneTasks(), 10) |
| |
| def testNoTasks(self): |
| wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 3) |
| self._CheckNoTasks(wp) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testNoTasksQuiesce(self): |
| wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 3) |
| self._CheckNoTasks(wp) |
| wp.Quiesce() |
| self._CheckNoTasks(wp) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testActive(self): |
| ctx = CountingContext() |
| wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 5) |
| self.assertTrue(wp._active) |
| |
| # Process some tasks |
| for _ in range(10): |
| wp.AddTask((ctx, None)) |
| |
| wp.Quiesce() |
| self._CheckNoTasks(wp) |
| self.assertEquals(ctx.GetDoneTasks(), 10) |
| |
| # Repeat a few times |
| for count in range(10): |
| # Deactivate pool |
| wp.SetActive(False) |
| self._CheckNoTasks(wp) |
| |
| # Queue some more tasks |
| for _ in range(10): |
| wp.AddTask((ctx, None)) |
| |
| for _ in range(5): |
| # Short delays to give other threads a chance to cause breakage |
| time.sleep(.01) |
| wp.AddTask((ctx, "Hello world %s" % 999)) |
| self.assertFalse(wp._active) |
| |
| self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) |
| |
| # Start processing again |
| wp.SetActive(True) |
| self.assertTrue(wp._active) |
| |
| # Wait for tasks to finish |
| wp.Quiesce() |
| self._CheckNoTasks(wp) |
| self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) |
| |
| self._CheckWorkerCount(wp, 5) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testChecksum(self): |
| # Tests whether all tasks are run and, since we're only using a single |
| # thread, whether everything is started in order. |
| wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = ChecksumContext() |
| checksum = ChecksumContext.CHECKSUM_START |
| for i in range(1, 100): |
| checksum = ChecksumContext.UpdateChecksum(checksum, i) |
| wp.AddTask((ctx, i)) |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| # Check sum |
| ctx.lock.acquire() |
| try: |
| self.assertEqual(checksum, ctx.checksum) |
| finally: |
| ctx.lock.release() |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testAddManyTasks(self): |
| ctx = CountingContext() |
| wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 3) |
| |
| wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
| wp.AddTask((ctx, "A separate hello")) |
| wp.AddTask((ctx, "Once more, hi!")) |
| wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| self.assertEquals(ctx.GetDoneTasks(), 22) |
| |
| def testManyTasksSequence(self): |
| ctx = CountingContext() |
| wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 3) |
| self.assertRaises(AssertionError, wp.AddManyTasks, |
| ["Hello world %s" % i for i in range(10)]) |
| self.assertRaises(AssertionError, wp.AddManyTasks, |
| [i for i in range(10)]) |
| self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0) |
| |
| wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) |
| wp.AddTask((ctx, "A separate hello")) |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| self.assertEquals(ctx.GetDoneTasks(), 11) |
| |
| def _CheckNoTasks(self, wp): |
| wp._lock.acquire() |
| try: |
| # The task queue must be empty now |
| self.assertFalse(wp._tasks) |
| self.assertFalse(wp._taskdata) |
| finally: |
| wp._lock.release() |
| |
| def _CheckWorkerCount(self, wp, num_workers): |
| wp._lock.acquire() |
| try: |
| self.assertEqual(len(wp._workers), num_workers) |
| finally: |
| wp._lock.release() |
| |
| def testPriorityChecksum(self): |
| # Tests whether all tasks are run and, since we're only using a single |
| # thread, whether everything is started in order and respects the priority |
| wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = ChecksumContext() |
| |
| data = {} |
| tasks = [] |
| priorities = [] |
| for i in range(1, 333): |
| prio = i % 7 |
| tasks.append((ctx, i)) |
| priorities.append(prio) |
| data.setdefault(prio, []).append(i) |
| |
| wp.AddManyTasks(tasks, priority=priorities) |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| # Check sum |
| ctx.lock.acquire() |
| try: |
| checksum = ChecksumContext.CHECKSUM_START |
| for priority in sorted(data.keys()): |
| for i in data[priority]: |
| checksum = ChecksumContext.UpdateChecksum(checksum, i) |
| |
| self.assertEqual(checksum, ctx.checksum) |
| finally: |
| ctx.lock.release() |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testPriorityListManyTasks(self): |
| # Tests whether all tasks are run and, since we're only using a single |
| # thread, whether everything is started in order and respects the priority |
| wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = ListBuilderContext() |
| |
| # Use static seed for this test |
| rnd = random.Random(0) |
| |
| data = {} |
| tasks = [] |
| priorities = [] |
| for i in range(1, 333): |
| prio = int(rnd.random() * 10) |
| tasks.append((ctx, i)) |
| priorities.append(prio) |
| data.setdefault(prio, []).append((prio, i)) |
| |
| wp.AddManyTasks(tasks, priority=priorities) |
| |
| self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, |
| [("x", ), ("y", )], priority=[1] * 5) |
| self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, |
| [("x", ), ("y", )], task_id=[1] * 5) |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| # Check result |
| ctx.lock.acquire() |
| try: |
| expresult = [] |
| for priority in sorted(data.keys()): |
| expresult.extend(data[priority]) |
| |
| self.assertEqual(expresult, ctx.result) |
| finally: |
| ctx.lock.release() |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testPriorityListSingleTasks(self): |
| # Tests whether all tasks are run and, since we're only using a single |
| # thread, whether everything is started in order and respects the priority |
| wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = ListBuilderContext() |
| |
| # Use static seed for this test |
| rnd = random.Random(26279) |
| |
| data = {} |
| for i in range(1, 333): |
| prio = int(rnd.random() * 30) |
| wp.AddTask((ctx, i), priority=prio) |
| data.setdefault(prio, []).append(i) |
| |
| # Cause some distortion |
| if i % 11 == 0: |
| time.sleep(.001) |
| if i % 41 == 0: |
| wp.Quiesce() |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| # Check result |
| ctx.lock.acquire() |
| try: |
| self.assertEqual(data, ctx.prioresult) |
| finally: |
| ctx.lock.release() |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testDeferTask(self): |
| # Tests whether all tasks are run and, since we're only using a single |
| # thread, whether everything is started in order and respects the priority |
| wp = workerpool.WorkerPool("Test", 1, DeferringWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = DeferringTaskContext() |
| |
| # Use static seed for this test |
| rnd = random.Random(14921) |
| |
| data = {} |
| num2taskid = {} |
| for i in range(1, 333): |
| ctx.lock.acquire() |
| try: |
| if i % 5 == 0: |
| ctx.samepriodefer[i] = True |
| finally: |
| ctx.lock.release() |
| |
| prio = int(rnd.random() * 30) |
| num2taskid[i] = 1000 * i |
| wp.AddTask((ctx, i, prio), priority=50, |
| task_id=num2taskid[i]) |
| data.setdefault(prio, set()).add(i) |
| |
| # Cause some distortion |
| if i % 24 == 0: |
| time.sleep(.001) |
| if i % 31 == 0: |
| wp.Quiesce() |
| |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| # Check result |
| ctx.lock.acquire() |
| try: |
| self.assertEqual(data, ctx.prioresult) |
| |
| all_order_ids = [] |
| |
| for (num, numordertaskid) in ctx.num2ordertaskid.items(): |
| order_ids = map(compat.fst, numordertaskid) |
| self.assertFalse(utils.FindDuplicates(order_ids), |
| msg="Order ID has been reused") |
| all_order_ids.extend(order_ids) |
| |
| for task_id in map(compat.snd, numordertaskid): |
| self.assertEqual(task_id, num2taskid[num], |
| msg=("Task %s used different task IDs" % num)) |
| |
| self.assertFalse(utils.FindDuplicates(all_order_ids), |
| msg="Order ID has been reused") |
| finally: |
| ctx.lock.release() |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testChangeTaskPriority(self): |
| wp = workerpool.WorkerPool("Test", 1, PriorityWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| ctx = PriorityContext() |
| |
| # Use static seed for this test |
| rnd = random.Random(4727) |
| |
| # Disable processing of tasks |
| wp.SetActive(False) |
| |
| # No task ID |
| self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, |
| None, 0) |
| |
| # Pre-generate task IDs and priorities |
| count = 100 |
| task_ids = range(0, count) |
| priorities = range(200, 200 + count) * 2 |
| |
| rnd.shuffle(task_ids) |
| rnd.shuffle(priorities) |
| |
| # Make sure there are some duplicate priorities, but not all |
| priorities[count * 2 - 10:count * 2 - 1] = \ |
| priorities[count - 10: count - 1] |
| |
| assert len(priorities) == 2 * count |
| assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)] |
| |
| # Add some tasks; this loop consumes the first half of all previously |
| # generated priorities |
| for (idx, task_id) in enumerate(task_ids): |
| wp.AddTask((ctx, idx), |
| priority=priorities.pop(), |
| task_id=task_id) |
| |
| self.assertEqual(len(wp._tasks), len(task_ids)) |
| self.assertEqual(len(wp._taskdata), len(task_ids)) |
| |
| # Tasks have been added, so half of the priorities should have been |
| # consumed |
| assert len(priorities) == len(task_ids) |
| |
| # Change task priority |
| expected = [] |
| for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities): |
| wp.ChangeTaskPriority(task_id, prio) |
| expected.append((prio, idx)) |
| |
| self.assertEqual(len(wp._taskdata), len(task_ids)) |
| |
| # Half the entries are now abandoned tasks |
| self.assertEqual(len(wp._tasks), len(task_ids) * 2) |
| |
| assert len(priorities) == count |
| assert len(task_ids) == count |
| |
| # Start processing |
| wp.SetActive(True) |
| |
| # Wait for tasks to finish |
| wp.Quiesce() |
| |
| self._CheckNoTasks(wp) |
| |
| for task_id in task_ids: |
| # All tasks are done |
| self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority, |
| task_id, 0) |
| |
| # Check result |
| ctx.lock.acquire() |
| try: |
| self.assertEqual(ctx.result, sorted(expected)) |
| finally: |
| ctx.lock.release() |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| def testChangeTaskPriorityInteralStructures(self): |
| wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker) |
| try: |
| self._CheckWorkerCount(wp, 1) |
| |
| # Use static seed for this test |
| rnd = random.Random(643) |
| |
| (num1, num2) = rnd.sample(range(1000), 2) |
| |
| # Disable processing of tasks |
| wp.SetActive(False) |
| |
| self.assertFalse(wp._tasks) |
| self.assertFalse(wp._taskdata) |
| |
| # No priority or task ID |
| wp.AddTask(()) |
| self.assertEqual(wp._tasks, [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| ]) |
| self.assertFalse(wp._taskdata) |
| |
| # No task ID |
| wp.AddTask((), priority=7413) |
| self.assertEqual(wp._tasks, [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [7413, 1, None, ()], |
| ]) |
| self.assertFalse(wp._taskdata) |
| |
| # Start adding real tasks |
| wp.AddTask((), priority=10267659, task_id=num1) |
| self.assertEqual(wp._tasks, [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [7413, 1, None, ()], |
| [10267659, 2, num1, ()], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [10267659, 2, num1, ()], |
| }) |
| |
| wp.AddTask((), priority=123, task_id=num2) |
| self.assertEqual(sorted(wp._tasks), [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [123, 3, num2, ()], |
| [7413, 1, None, ()], |
| [10267659, 2, num1, ()], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [10267659, 2, num1, ()], |
| num2: [123, 3, num2, ()], |
| }) |
| |
| wp.ChangeTaskPriority(num1, 100) |
| self.assertEqual(sorted(wp._tasks), [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [100, 2, num1, ()], |
| [123, 3, num2, ()], |
| [7413, 1, None, ()], |
| [10267659, 2, num1, None], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [100, 2, num1, ()], |
| num2: [123, 3, num2, ()], |
| }) |
| |
| wp.ChangeTaskPriority(num2, 91337) |
| self.assertEqual(sorted(wp._tasks), [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [100, 2, num1, ()], |
| [123, 3, num2, None], |
| [7413, 1, None, ()], |
| [91337, 3, num2, ()], |
| [10267659, 2, num1, None], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [100, 2, num1, ()], |
| num2: [91337, 3, num2, ()], |
| }) |
| |
| wp.ChangeTaskPriority(num1, 10139) |
| self.assertEqual(sorted(wp._tasks), [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [100, 2, num1, None], |
| [123, 3, num2, None], |
| [7413, 1, None, ()], |
| [10139, 2, num1, ()], |
| [91337, 3, num2, ()], |
| [10267659, 2, num1, None], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [10139, 2, num1, ()], |
| num2: [91337, 3, num2, ()], |
| }) |
| |
| # Change to the same priority once again |
| wp.ChangeTaskPriority(num1, 10139) |
| self.assertEqual(sorted(wp._tasks), [ |
| [workerpool._DEFAULT_PRIORITY, 0, None, ()], |
| [100, 2, num1, None], |
| [123, 3, num2, None], |
| [7413, 1, None, ()], |
| [10139, 2, num1, None], |
| [10139, 2, num1, ()], |
| [91337, 3, num2, ()], |
| [10267659, 2, num1, None], |
| ]) |
| self.assertEqual(wp._taskdata, { |
| num1: [10139, 2, num1, ()], |
| num2: [91337, 3, num2, ()], |
| }) |
| |
| self._CheckWorkerCount(wp, 1) |
| finally: |
| wp.TerminateWorkers() |
| self._CheckWorkerCount(wp, 0) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |