| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2006, 2007, 2010 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Script for unittesting the locking module""" |
| |
| |
| import os |
| import unittest |
| import time |
| import Queue |
| import threading |
| import random |
| import gc |
| import itertools |
| |
| from ganeti import constants |
| from ganeti import locking |
| from ganeti import errors |
| from ganeti import utils |
| from ganeti import compat |
| from ganeti import objects |
| from ganeti import query |
| |
| import testutils |
| |
| |
| # This is used to test the ssynchronize decorator. |
| # Since it's passed as input to a decorator it must be declared as a global. |
| _decoratorlock = locking.SharedLock("decorator lock") |
| |
| #: List for looping tests |
| ITERATIONS = range(8) |
| |
| |
| def _Repeat(fn): |
| """Decorator for executing a function many times""" |
| def wrapper(*args, **kwargs): |
| for i in ITERATIONS: |
| fn(*args, **kwargs) |
| return wrapper |
| |
| |
| def SafeSleep(duration): |
| start = time.time() |
| while True: |
| delay = start + duration - time.time() |
| if delay <= 0.0: |
| break |
| time.sleep(delay) |
| |
| |
| class _ThreadedTestCase(unittest.TestCase): |
| """Test class that supports adding/waiting on threads""" |
| def setUp(self): |
| unittest.TestCase.setUp(self) |
| self.done = Queue.Queue(0) |
| self.threads = [] |
| |
| def _addThread(self, *args, **kwargs): |
| """Create and remember a new thread""" |
| t = threading.Thread(*args, **kwargs) |
| self.threads.append(t) |
| t.start() |
| return t |
| |
| def _waitThreads(self): |
| """Wait for all our threads to finish""" |
| for t in self.threads: |
| t.join(60) |
| self.failIf(t.isAlive()) |
| self.threads = [] |
| |
| |
| class _ConditionTestCase(_ThreadedTestCase): |
| """Common test case for conditions""" |
| |
| def setUp(self, cls): |
| _ThreadedTestCase.setUp(self) |
| self.lock = threading.Lock() |
| self.cond = cls(self.lock) |
| |
| def _testAcquireRelease(self): |
| self.assertFalse(self.cond._is_owned()) |
| self.assertRaises(RuntimeError, self.cond.wait, None) |
| self.assertRaises(RuntimeError, self.cond.notifyAll) |
| |
| self.cond.acquire() |
| self.assert_(self.cond._is_owned()) |
| self.cond.notifyAll() |
| self.assert_(self.cond._is_owned()) |
| self.cond.release() |
| |
| self.assertFalse(self.cond._is_owned()) |
| self.assertRaises(RuntimeError, self.cond.wait, None) |
| self.assertRaises(RuntimeError, self.cond.notifyAll) |
| |
| def _testNotification(self): |
| def _NotifyAll(): |
| self.done.put("NE") |
| self.cond.acquire() |
| self.done.put("NA") |
| self.cond.notifyAll() |
| self.done.put("NN") |
| self.cond.release() |
| |
| self.cond.acquire() |
| self._addThread(target=_NotifyAll) |
| self.assertEqual(self.done.get(True, 1), "NE") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.cond.wait(None) |
| self.assertEqual(self.done.get(True, 1), "NA") |
| self.assertEqual(self.done.get(True, 1), "NN") |
| self.assert_(self.cond._is_owned()) |
| self.cond.release() |
| self.assertFalse(self.cond._is_owned()) |
| |
| |
| class TestSingleNotifyPipeCondition(_ConditionTestCase): |
| """SingleNotifyPipeCondition tests""" |
| |
| def setUp(self): |
| _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition) |
| |
| def testAcquireRelease(self): |
| self._testAcquireRelease() |
| |
| def testNotification(self): |
| self._testNotification() |
| |
| def testWaitReuse(self): |
| self.cond.acquire() |
| self.cond.wait(0) |
| self.cond.wait(0.1) |
| self.cond.release() |
| |
| def testNoNotifyReuse(self): |
| self.cond.acquire() |
| self.cond.notifyAll() |
| self.assertRaises(RuntimeError, self.cond.wait, None) |
| self.assertRaises(RuntimeError, self.cond.notifyAll) |
| self.cond.release() |
| |
| |
| class TestPipeCondition(_ConditionTestCase): |
| """PipeCondition tests""" |
| |
| def setUp(self): |
| _ConditionTestCase.setUp(self, locking.PipeCondition) |
| |
| def testAcquireRelease(self): |
| self._testAcquireRelease() |
| |
| def testNotification(self): |
| self._testNotification() |
| |
| def _TestWait(self, fn): |
| threads = [ |
| self._addThread(target=fn), |
| self._addThread(target=fn), |
| self._addThread(target=fn), |
| ] |
| |
| # Wait for threads to be waiting |
| for _ in threads: |
| self.assertEqual(self.done.get(True, 1), "A") |
| |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| self.cond.acquire() |
| self.assertEqual(len(self.cond._waiters), 3) |
| self.assertEqual(self.cond._waiters, set(threads)) |
| |
| self.assertTrue(repr(self.cond).startswith("<")) |
| self.assertTrue("waiters=" in repr(self.cond)) |
| |
| # This new thread can't acquire the lock, and thus call wait, before we |
| # release it |
| self._addThread(target=fn) |
| self.cond.notifyAll() |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.cond.release() |
| |
| # We should now get 3 W and 1 A (for the new thread) in whatever order |
| w = 0 |
| a = 0 |
| for i in range(4): |
| got = self.done.get(True, 1) |
| if got == "W": |
| w += 1 |
| elif got == "A": |
| a += 1 |
| else: |
| self.fail("Got %s on the done queue" % got) |
| |
| self.assertEqual(w, 3) |
| self.assertEqual(a, 1) |
| |
| self.cond.acquire() |
| self.cond.notifyAll() |
| self.cond.release() |
| self._waitThreads() |
| self.assertEqual(self.done.get_nowait(), "W") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| def testBlockingWait(self): |
| def _BlockingWait(): |
| self.cond.acquire() |
| self.done.put("A") |
| self.cond.wait(None) |
| self.cond.release() |
| self.done.put("W") |
| |
| self._TestWait(_BlockingWait) |
| |
| def testLongTimeoutWait(self): |
| def _Helper(): |
| self.cond.acquire() |
| self.done.put("A") |
| self.cond.wait(15.0) |
| self.cond.release() |
| self.done.put("W") |
| |
| self._TestWait(_Helper) |
| |
| def _TimeoutWait(self, timeout, check): |
| self.cond.acquire() |
| self.cond.wait(timeout) |
| self.cond.release() |
| self.done.put(check) |
| |
| def testShortTimeoutWait(self): |
| self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) |
| self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) |
| self._waitThreads() |
| self.assertEqual(self.done.get_nowait(), "T1") |
| self.assertEqual(self.done.get_nowait(), "T1") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| def testZeroTimeoutWait(self): |
| self._addThread(target=self._TimeoutWait, args=(0, "T0")) |
| self._addThread(target=self._TimeoutWait, args=(0, "T0")) |
| self._addThread(target=self._TimeoutWait, args=(0, "T0")) |
| self._waitThreads() |
| self.assertEqual(self.done.get_nowait(), "T0") |
| self.assertEqual(self.done.get_nowait(), "T0") |
| self.assertEqual(self.done.get_nowait(), "T0") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| |
| class TestSharedLock(_ThreadedTestCase): |
| """SharedLock tests""" |
| |
| def setUp(self): |
| _ThreadedTestCase.setUp(self) |
| self.sl = locking.SharedLock("TestSharedLock") |
| |
| self.assertTrue(repr(self.sl).startswith("<")) |
| self.assertTrue("name=TestSharedLock" in repr(self.sl)) |
| |
| def testSequenceAndOwnership(self): |
| self.assertFalse(self.sl.is_owned()) |
| self.sl.acquire(shared=1) |
| self.assert_(self.sl.is_owned()) |
| self.assert_(self.sl.is_owned(shared=1)) |
| self.assertFalse(self.sl.is_owned(shared=0)) |
| self.sl.release() |
| self.assertFalse(self.sl.is_owned()) |
| self.sl.acquire() |
| self.assert_(self.sl.is_owned()) |
| self.assertFalse(self.sl.is_owned(shared=1)) |
| self.assert_(self.sl.is_owned(shared=0)) |
| self.sl.release() |
| self.assertFalse(self.sl.is_owned()) |
| self.sl.acquire(shared=1) |
| self.assert_(self.sl.is_owned()) |
| self.assert_(self.sl.is_owned(shared=1)) |
| self.assertFalse(self.sl.is_owned(shared=0)) |
| self.sl.release() |
| self.assertFalse(self.sl.is_owned()) |
| |
| def testBooleanValue(self): |
| # semaphores are supposed to return a true value on a successful acquire |
| self.assert_(self.sl.acquire(shared=1)) |
| self.sl.release() |
| self.assert_(self.sl.acquire()) |
| self.sl.release() |
| |
| def testDoubleLockingStoE(self): |
| self.sl.acquire(shared=1) |
| self.assertRaises(AssertionError, self.sl.acquire) |
| |
| def testDoubleLockingEtoS(self): |
| self.sl.acquire() |
| self.assertRaises(AssertionError, self.sl.acquire, shared=1) |
| |
| def testDoubleLockingStoS(self): |
| self.sl.acquire(shared=1) |
| self.assertRaises(AssertionError, self.sl.acquire, shared=1) |
| |
| def testDoubleLockingEtoE(self): |
| self.sl.acquire() |
| self.assertRaises(AssertionError, self.sl.acquire) |
| |
| # helper functions: called in a separate thread they acquire the lock, send |
| # their identifier on the done queue, then release it. |
| def _doItSharer(self): |
| try: |
| self.sl.acquire(shared=1) |
| self.done.put("SHR") |
| self.sl.release() |
| except errors.LockError: |
| self.done.put("ERR") |
| |
| def _doItExclusive(self): |
| try: |
| self.sl.acquire() |
| self.done.put("EXC") |
| self.sl.release() |
| except errors.LockError: |
| self.done.put("ERR") |
| |
| def _doItDelete(self): |
| try: |
| self.sl.delete() |
| self.done.put("DEL") |
| except errors.LockError: |
| self.done.put("ERR") |
| |
| def testSharersCanCoexist(self): |
| self.sl.acquire(shared=1) |
| threading.Thread(target=self._doItSharer).start() |
| self.assert_(self.done.get(True, 1)) |
| self.sl.release() |
| |
| @_Repeat |
| def testExclusiveBlocksExclusive(self): |
| self.sl.acquire() |
| self._addThread(target=self._doItExclusive) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "EXC") |
| |
| @_Repeat |
| def testExclusiveBlocksDelete(self): |
| self.sl.acquire() |
| self._addThread(target=self._doItDelete) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "DEL") |
| self.sl = locking.SharedLock(self.sl.name) |
| |
| @_Repeat |
| def testExclusiveBlocksSharer(self): |
| self.sl.acquire() |
| self._addThread(target=self._doItSharer) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "SHR") |
| |
| @_Repeat |
| def testSharerBlocksExclusive(self): |
| self.sl.acquire(shared=1) |
| self._addThread(target=self._doItExclusive) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "EXC") |
| |
| @_Repeat |
| def testSharerBlocksDelete(self): |
| self.sl.acquire(shared=1) |
| self._addThread(target=self._doItDelete) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "DEL") |
| self.sl = locking.SharedLock(self.sl.name) |
| |
| @_Repeat |
| def testWaitingExclusiveBlocksSharer(self): |
| """SKIPPED testWaitingExclusiveBlockSharer""" |
| return |
| |
| self.sl.acquire(shared=1) |
| # the lock is acquired in shared mode... |
| self._addThread(target=self._doItExclusive) |
| # ...but now an exclusive is waiting... |
| self._addThread(target=self._doItSharer) |
| # ...so the sharer should be blocked as well |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| # The exclusive passed before |
| self.failUnlessEqual(self.done.get_nowait(), "EXC") |
| self.failUnlessEqual(self.done.get_nowait(), "SHR") |
| |
| @_Repeat |
| def testWaitingSharerBlocksExclusive(self): |
| """SKIPPED testWaitingSharerBlocksExclusive""" |
| return |
| |
| self.sl.acquire() |
| # the lock is acquired in exclusive mode... |
| self._addThread(target=self._doItSharer) |
| # ...but now a sharer is waiting... |
| self._addThread(target=self._doItExclusive) |
| # ...the exclusive is waiting too... |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| self.sl.release() |
| self._waitThreads() |
| # The sharer passed before |
| self.assertEqual(self.done.get_nowait(), "SHR") |
| self.assertEqual(self.done.get_nowait(), "EXC") |
| |
| def testDelete(self): |
| self.sl.delete() |
| self.assertRaises(errors.LockError, self.sl.acquire) |
| self.assertRaises(errors.LockError, self.sl.acquire, shared=1) |
| self.assertRaises(errors.LockError, self.sl.delete) |
| |
| def testDeleteTimeout(self): |
| self.assertTrue(self.sl.delete(timeout=60)) |
| |
| def testDeleteTimeoutFail(self): |
| ready = threading.Event() |
| finish = threading.Event() |
| |
| def fn(): |
| self.sl.acquire(shared=0) |
| ready.set() |
| |
| finish.wait() |
| self.sl.release() |
| |
| self._addThread(target=fn) |
| ready.wait() |
| |
| # Test if deleting a lock owned in exclusive mode by another thread fails |
| # to delete when a timeout is used |
| self.assertFalse(self.sl.delete(timeout=0.02)) |
| |
| finish.set() |
| self._waitThreads() |
| |
| self.assertTrue(self.sl.delete()) |
| self.assertRaises(errors.LockError, self.sl.acquire) |
| |
| def testNoDeleteIfSharer(self): |
| self.sl.acquire(shared=1) |
| self.assertRaises(AssertionError, self.sl.delete) |
| |
| @_Repeat |
| def testDeletePendingSharersExclusiveDelete(self): |
| self.sl.acquire() |
| self._addThread(target=self._doItSharer) |
| self._addThread(target=self._doItSharer) |
| self._addThread(target=self._doItExclusive) |
| self._addThread(target=self._doItDelete) |
| self.sl.delete() |
| self._waitThreads() |
| # The threads who were pending return ERR |
| for _ in range(4): |
| self.assertEqual(self.done.get_nowait(), "ERR") |
| self.sl = locking.SharedLock(self.sl.name) |
| |
| @_Repeat |
| def testDeletePendingDeleteExclusiveSharers(self): |
| self.sl.acquire() |
| self._addThread(target=self._doItDelete) |
| self._addThread(target=self._doItExclusive) |
| self._addThread(target=self._doItSharer) |
| self._addThread(target=self._doItSharer) |
| self.sl.delete() |
| self._waitThreads() |
| # The two threads who were pending return both ERR |
| self.assertEqual(self.done.get_nowait(), "ERR") |
| self.assertEqual(self.done.get_nowait(), "ERR") |
| self.assertEqual(self.done.get_nowait(), "ERR") |
| self.assertEqual(self.done.get_nowait(), "ERR") |
| self.sl = locking.SharedLock(self.sl.name) |
| |
| @_Repeat |
| def testExclusiveAcquireTimeout(self): |
| for shared in [0, 1]: |
| on_queue = threading.Event() |
| release_exclusive = threading.Event() |
| |
| def _LockExclusive(): |
| self.sl.acquire(shared=0, test_notify=on_queue.set) |
| self.done.put("A: start wait") |
| release_exclusive.wait() |
| self.done.put("A: end wait") |
| self.sl.release() |
| |
| # Start thread to hold lock in exclusive mode |
| self._addThread(target=_LockExclusive) |
| |
| # Wait for wait to begin |
| self.assertEqual(self.done.get(timeout=60), "A: start wait") |
| |
| # Wait up to 60s to get lock, but release exclusive lock as soon as we're |
| # on the queue |
| self.failUnless(self.sl.acquire(shared=shared, timeout=60, |
| test_notify=release_exclusive.set)) |
| |
| self.done.put("got 2nd") |
| self.sl.release() |
| |
| self._waitThreads() |
| |
| self.assertEqual(self.done.get_nowait(), "A: end wait") |
| self.assertEqual(self.done.get_nowait(), "got 2nd") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| @_Repeat |
| def testAcquireExpiringTimeout(self): |
| def _AcquireWithTimeout(shared, timeout): |
| if not self.sl.acquire(shared=shared, timeout=timeout): |
| self.done.put("timeout") |
| |
| for shared in [0, 1]: |
| # Lock exclusively |
| self.sl.acquire() |
| |
| # Start shared acquires with timeout between 0 and 20 ms |
| for i in range(11): |
| self._addThread(target=_AcquireWithTimeout, |
| args=(shared, i * 2.0 / 1000.0)) |
| |
| # Wait for threads to finish (makes sure the acquire timeout expires |
| # before releasing the lock) |
| self._waitThreads() |
| |
| # Release lock |
| self.sl.release() |
| |
| for _ in range(11): |
| self.assertEqual(self.done.get_nowait(), "timeout") |
| |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| @_Repeat |
| def testSharedSkipExclusiveAcquires(self): |
| # Tests whether shared acquires jump in front of exclusive acquires in the |
| # queue. |
| |
| def _Acquire(shared, name, notify_ev, wait_ev): |
| if notify_ev: |
| notify_fn = notify_ev.set |
| else: |
| notify_fn = None |
| |
| if wait_ev: |
| wait_ev.wait() |
| |
| if not self.sl.acquire(shared=shared, test_notify=notify_fn): |
| return |
| |
| self.done.put(name) |
| self.sl.release() |
| |
| # Get exclusive lock while we fill the queue |
| self.sl.acquire() |
| |
| shrcnt1 = 5 |
| shrcnt2 = 7 |
| shrcnt3 = 9 |
| shrcnt4 = 2 |
| |
| # Add acquires using threading.Event for synchronization. They'll be |
| # acquired exactly in the order defined in this list. |
| acquires = (shrcnt1 * [(1, "shared 1")] + |
| 3 * [(0, "exclusive 1")] + |
| shrcnt2 * [(1, "shared 2")] + |
| shrcnt3 * [(1, "shared 3")] + |
| shrcnt4 * [(1, "shared 4")] + |
| 3 * [(0, "exclusive 2")]) |
| |
| ev_cur = None |
| ev_prev = None |
| |
| for args in acquires: |
| ev_cur = threading.Event() |
| self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev)) |
| ev_prev = ev_cur |
| |
| # Wait for last acquire to start |
| ev_prev.wait() |
| |
| # Expect 6 pending exclusive acquires and 1 for all shared acquires |
| # together |
| self.assertEqual(self.sl._count_pending(), 7) |
| |
| # Release exclusive lock and wait |
| self.sl.release() |
| |
| self._waitThreads() |
| |
| # Check sequence |
| for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4): |
| # Shared locks aren't guaranteed to be notified in order, but they'll be |
| # first |
| tmp = self.done.get_nowait() |
| if tmp == "shared 1": |
| shrcnt1 -= 1 |
| elif tmp == "shared 2": |
| shrcnt2 -= 1 |
| elif tmp == "shared 3": |
| shrcnt3 -= 1 |
| elif tmp == "shared 4": |
| shrcnt4 -= 1 |
| self.assertEqual(shrcnt1, 0) |
| self.assertEqual(shrcnt2, 0) |
| self.assertEqual(shrcnt3, 0) |
| self.assertEqual(shrcnt3, 0) |
| |
| for _ in range(3): |
| self.assertEqual(self.done.get_nowait(), "exclusive 1") |
| |
| for _ in range(3): |
| self.assertEqual(self.done.get_nowait(), "exclusive 2") |
| |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| def testIllegalDowngrade(self): |
| # Not yet acquired |
| self.assertRaises(AssertionError, self.sl.downgrade) |
| |
| # Acquire in shared mode, downgrade should be no-op |
| self.assertTrue(self.sl.acquire(shared=1)) |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| self.assertTrue(self.sl.downgrade()) |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| self.sl.release() |
| |
| def testDowngrade(self): |
| self.assertTrue(self.sl.acquire()) |
| self.assertTrue(self.sl.is_owned(shared=0)) |
| self.assertTrue(self.sl.downgrade()) |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| self.sl.release() |
| |
| @_Repeat |
| def testDowngradeJumpsAheadOfExclusive(self): |
| def _KeepExclusive(ev_got, ev_downgrade, ev_release): |
| self.assertTrue(self.sl.acquire()) |
| self.assertTrue(self.sl.is_owned(shared=0)) |
| ev_got.set() |
| ev_downgrade.wait() |
| self.assertTrue(self.sl.is_owned(shared=0)) |
| self.assertTrue(self.sl.downgrade()) |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| ev_release.wait() |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| self.sl.release() |
| |
| def _KeepExclusive2(ev_started, ev_release): |
| self.assertTrue(self.sl.acquire(test_notify=ev_started.set)) |
| self.assertTrue(self.sl.is_owned(shared=0)) |
| ev_release.wait() |
| self.assertTrue(self.sl.is_owned(shared=0)) |
| self.sl.release() |
| |
| def _KeepShared(ev_started, ev_got, ev_release): |
| self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set)) |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| ev_got.set() |
| ev_release.wait() |
| self.assertTrue(self.sl.is_owned(shared=1)) |
| self.sl.release() |
| |
| # Acquire lock in exclusive mode |
| ev_got_excl1 = threading.Event() |
| ev_downgrade_excl1 = threading.Event() |
| ev_release_excl1 = threading.Event() |
| th_excl1 = self._addThread(target=_KeepExclusive, |
| args=(ev_got_excl1, ev_downgrade_excl1, |
| ev_release_excl1)) |
| ev_got_excl1.wait() |
| |
| # Start a second exclusive acquire |
| ev_started_excl2 = threading.Event() |
| ev_release_excl2 = threading.Event() |
| th_excl2 = self._addThread(target=_KeepExclusive2, |
| args=(ev_started_excl2, ev_release_excl2)) |
| ev_started_excl2.wait() |
| |
| # Start shared acquires, will jump ahead of second exclusive acquire when |
| # first exclusive acquire downgrades |
| ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)] |
| ev_release_shared = threading.Event() |
| |
| th_shared = [self._addThread(target=_KeepShared, |
| args=(ev_started, ev_got, ev_release_shared)) |
| for (ev_started, ev_got) in ev_shared] |
| |
| # Wait for all shared acquires to start |
| for (ev, _) in ev_shared: |
| ev.wait() |
| |
| # Check lock information |
| self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
| [(self.sl.name, "exclusive", [th_excl1.getName()], None)]) |
| [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING])) |
| self.assertEqual([(pendmode, sorted(waiting)) |
| for (pendmode, waiting) in pending], |
| [("exclusive", [th_excl2.getName()]), |
| ("shared", sorted(th.getName() for th in th_shared))]) |
| |
| # Shared acquires won't start until the exclusive lock is downgraded |
| ev_downgrade_excl1.set() |
| |
| # Wait for all shared acquires to be successful |
| for (_, ev) in ev_shared: |
| ev.wait() |
| |
| # Check lock information again |
| self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, |
| query.LQ_PENDING])), |
| [(self.sl.name, "shared", None, |
| [("exclusive", [th_excl2.getName()])])]) |
| [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER])) |
| self.assertEqual(set(owner), set([th_excl1.getName()] + |
| [th.getName() for th in th_shared])) |
| |
| ev_release_excl1.set() |
| ev_release_excl2.set() |
| ev_release_shared.set() |
| |
| self._waitThreads() |
| |
| self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER, |
| query.LQ_PENDING])), |
| [(self.sl.name, None, None, [])]) |
| |
| @_Repeat |
| def testMixedAcquireTimeout(self): |
| sync = threading.Event() |
| |
| def _AcquireShared(ev): |
| if not self.sl.acquire(shared=1, timeout=None): |
| return |
| |
| self.done.put("shared") |
| |
| # Notify main thread |
| ev.set() |
| |
| # Wait for notification from main thread |
| sync.wait() |
| |
| # Release lock |
| self.sl.release() |
| |
| acquires = [] |
| for _ in range(3): |
| ev = threading.Event() |
| self._addThread(target=_AcquireShared, args=(ev, )) |
| acquires.append(ev) |
| |
| # Wait for all acquires to finish |
| for i in acquires: |
| i.wait() |
| |
| self.assertEqual(self.sl._count_pending(), 0) |
| |
| # Try to get exclusive lock |
| self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
| |
| # Acquire exclusive without timeout |
| exclsync = threading.Event() |
| exclev = threading.Event() |
| |
| def _AcquireExclusive(): |
| if not self.sl.acquire(shared=0): |
| return |
| |
| self.done.put("exclusive") |
| |
| # Notify main thread |
| exclev.set() |
| |
| # Wait for notification from main thread |
| exclsync.wait() |
| |
| self.sl.release() |
| |
| self._addThread(target=_AcquireExclusive) |
| |
| # Try to get exclusive lock |
| self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
| |
| # Make all shared holders release their locks |
| sync.set() |
| |
| # Wait for exclusive acquire to succeed |
| exclev.wait() |
| |
| self.assertEqual(self.sl._count_pending(), 0) |
| |
| # Try to get exclusive lock |
| self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
| |
| def _AcquireSharedSimple(): |
| if self.sl.acquire(shared=1, timeout=None): |
| self.done.put("shared2") |
| self.sl.release() |
| |
| for _ in range(10): |
| self._addThread(target=_AcquireSharedSimple) |
| |
| # Tell exclusive lock to release |
| exclsync.set() |
| |
| # Wait for everything to finish |
| self._waitThreads() |
| |
| self.assertEqual(self.sl._count_pending(), 0) |
| |
| # Check sequence |
| for _ in range(3): |
| self.assertEqual(self.done.get_nowait(), "shared") |
| |
| self.assertEqual(self.done.get_nowait(), "exclusive") |
| |
| for _ in range(10): |
| self.assertEqual(self.done.get_nowait(), "shared2") |
| |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| def testPriority(self): |
| # Acquire in exclusive mode |
| self.assert_(self.sl.acquire(shared=0)) |
| |
| # Queue acquires |
| def _Acquire(prev, next, shared, priority, result): |
| prev.wait() |
| self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) |
| try: |
| self.done.put(result) |
| finally: |
| self.sl.release() |
| |
| counter = itertools.count(0) |
| priorities = range(-20, 30) |
| first = threading.Event() |
| prev = first |
| |
| # Data structure: |
| # { |
| # priority: |
| # [(shared/exclusive, set(acquire names), set(pending threads)), |
| # (shared/exclusive, ...), |
| # ..., |
| # ], |
| # } |
| perprio = {} |
| |
| # References shared acquire per priority in L{perprio}. Data structure: |
| # { |
| # priority: (shared=1, set(acquire names), set(pending threads)), |
| # } |
| prioshared = {} |
| |
| for seed in [4979, 9523, 14902, 32440]: |
| # Use a deterministic random generator |
| rnd = random.Random(seed) |
| for priority in [rnd.choice(priorities) for _ in range(30)]: |
| modes = [0, 1] |
| rnd.shuffle(modes) |
| for shared in modes: |
| # Unique name |
| acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) |
| |
| ev = threading.Event() |
| thread = self._addThread(target=_Acquire, |
| args=(prev, ev, shared, priority, acqname)) |
| prev = ev |
| |
| # Record expected aqcuire, see above for structure |
| data = (shared, set([acqname]), set([thread])) |
| priolist = perprio.setdefault(priority, []) |
| if shared: |
| priosh = prioshared.get(priority, None) |
| if priosh: |
| # Shared acquires are merged |
| for i, j in zip(priosh[1:], data[1:]): |
| i.update(j) |
| assert data[0] == priosh[0] |
| else: |
| prioshared[priority] = data |
| priolist.append(data) |
| else: |
| priolist.append(data) |
| |
| # Start all acquires and wait for them |
| first.set() |
| prev.wait() |
| |
| # Check lock information |
| self.assertEqual(self.sl.GetLockInfo(set()), |
| [(self.sl.name, None, None, None)]) |
| self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), |
| [(self.sl.name, "exclusive", |
| [threading.currentThread().getName()], None)]) |
| |
| self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])), |
| perprio) |
| |
| # Let threads acquire the lock |
| self.sl.release() |
| |
| # Wait for everything to finish |
| self._waitThreads() |
| |
| self.assert_(self.sl._check_empty()) |
| |
| # Check acquires by priority |
| for acquires in [perprio[i] for i in sorted(perprio.keys())]: |
| for (_, names, _) in acquires: |
| # For shared acquires, the set will contain 1..n entries. For exclusive |
| # acquires only one. |
| while names: |
| names.remove(self.done.get_nowait()) |
| self.assertFalse(compat.any(names for (_, names, _) in acquires)) |
| |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio): |
| self.assertEqual(name, self.sl.name) |
| self.assert_(mode is None) |
| self.assert_(owner is None) |
| |
| self.assertEqual([(pendmode, sorted(waiting)) |
| for (pendmode, waiting) in pending], |
| [(["exclusive", "shared"][int(bool(shared))], |
| sorted(t.getName() for t in threads)) |
| for acquires in [perprio[i] |
| for i in sorted(perprio.keys())] |
| for (shared, _, threads) in acquires]) |
| |
| class _FakeTimeForSpuriousNotifications: |
| def __init__(self, now, check_end): |
| self.now = now |
| self.check_end = check_end |
| |
| # Deterministic random number generator |
| self.rnd = random.Random(15086) |
| |
| def time(self): |
| # Advance time if the random number generator thinks so (this is to test |
| # multiple notifications without advancing the time) |
| if self.rnd.random() < 0.3: |
| self.now += self.rnd.random() |
| |
| self.check_end(self.now) |
| |
| return self.now |
| |
| @_Repeat |
| def testAcquireTimeoutWithSpuriousNotifications(self): |
| ready = threading.Event() |
| locked = threading.Event() |
| req = Queue.Queue(0) |
| |
| epoch = 4000.0 |
| timeout = 60.0 |
| |
| def check_end(now): |
| self.assertFalse(locked.isSet()) |
| |
| # If we waited long enough (in virtual time), tell main thread to release |
| # lock, otherwise tell it to notify once more |
| req.put(now < (epoch + (timeout * 0.8))) |
| |
| time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time |
| |
| sl = locking.SharedLock("test", _time_fn=time_fn) |
| |
| # Acquire in exclusive mode |
| sl.acquire(shared=0) |
| |
| def fn(): |
| self.assertTrue(sl.acquire(shared=0, timeout=timeout, |
| test_notify=ready.set)) |
| locked.set() |
| sl.release() |
| self.done.put("success") |
| |
| # Start acquire with timeout and wait for it to be ready |
| self._addThread(target=fn) |
| ready.wait() |
| |
| # The separate thread is now waiting to acquire the lock, so start sending |
| # spurious notifications. |
| |
| # Wait for separate thread to ask for another notification |
| count = 0 |
| while req.get(): |
| # After sending the notification, the lock will take a short amount of |
| # time to notice and to retrieve the current time |
| sl._notify_topmost() |
| count += 1 |
| |
| self.assertTrue(count > 100, "Not enough notifications were sent") |
| |
| self.assertFalse(locked.isSet()) |
| |
| # Some notifications have been sent, now actually release the lock |
| sl.release() |
| |
| # Wait for lock to be acquired |
| locked.wait() |
| |
| self._waitThreads() |
| |
| self.assertEqual(self.done.get_nowait(), "success") |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| |
| |
| class TestSharedLockInCondition(_ThreadedTestCase): |
| """SharedLock as a condition lock tests""" |
| |
| def setUp(self): |
| _ThreadedTestCase.setUp(self) |
| self.sl = locking.SharedLock("TestSharedLockInCondition") |
| self.setCondition() |
| |
| def setCondition(self): |
| self.cond = threading.Condition(self.sl) |
| |
| def testKeepMode(self): |
| self.cond.acquire(shared=1) |
| self.assert_(self.sl.is_owned(shared=1)) |
| self.cond.wait(0) |
| self.assert_(self.sl.is_owned(shared=1)) |
| self.cond.release() |
| self.cond.acquire(shared=0) |
| self.assert_(self.sl.is_owned(shared=0)) |
| self.cond.wait(0) |
| self.assert_(self.sl.is_owned(shared=0)) |
| self.cond.release() |
| |
| |
| class TestSharedLockInPipeCondition(TestSharedLockInCondition): |
| """SharedLock as a pipe condition lock tests""" |
| |
| def setCondition(self): |
| self.cond = locking.PipeCondition(self.sl) |
| |
| |
| class TestSSynchronizedDecorator(_ThreadedTestCase): |
| """Shared Lock Synchronized decorator test""" |
| |
| def setUp(self): |
| _ThreadedTestCase.setUp(self) |
| |
| @locking.ssynchronized(_decoratorlock) |
| def _doItExclusive(self): |
| self.assert_(_decoratorlock.is_owned()) |
| self.done.put("EXC") |
| |
| @locking.ssynchronized(_decoratorlock, shared=1) |
| def _doItSharer(self): |
| self.assert_(_decoratorlock.is_owned(shared=1)) |
| self.done.put("SHR") |
| |
| def testDecoratedFunctions(self): |
| self._doItExclusive() |
| self.assertFalse(_decoratorlock.is_owned()) |
| self._doItSharer() |
| self.assertFalse(_decoratorlock.is_owned()) |
| |
| def testSharersCanCoexist(self): |
| _decoratorlock.acquire(shared=1) |
| threading.Thread(target=self._doItSharer).start() |
| self.assert_(self.done.get(True, 1)) |
| _decoratorlock.release() |
| |
| @_Repeat |
| def testExclusiveBlocksExclusive(self): |
| _decoratorlock.acquire() |
| self._addThread(target=self._doItExclusive) |
| # give it a bit of time to check that it's not actually doing anything |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| _decoratorlock.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "EXC") |
| |
| @_Repeat |
| def testExclusiveBlocksSharer(self): |
| _decoratorlock.acquire() |
| self._addThread(target=self._doItSharer) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| _decoratorlock.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "SHR") |
| |
| @_Repeat |
| def testSharerBlocksExclusive(self): |
| _decoratorlock.acquire(shared=1) |
| self._addThread(target=self._doItExclusive) |
| self.assertRaises(Queue.Empty, self.done.get_nowait) |
| _decoratorlock.release() |
| self._waitThreads() |
| self.failUnlessEqual(self.done.get_nowait(), "EXC") |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |