blob: 705103f141e91bab2272d8fc05495847132a5f79 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the locking module"""
import os
import unittest
import time
import Queue
import threading
import random
import gc
import itertools
from ganeti import constants
from ganeti import locking
from ganeti import errors
from ganeti import utils
from ganeti import compat
from ganeti import objects
from ganeti import query
import testutils
# This is used to test the ssynchronize decorator.
# Since it's passed as input to a decorator it must be declared as a global.
_decoratorlock = locking.SharedLock("decorator lock")
#: List for looping tests
ITERATIONS = range(8)
def _Repeat(fn):
"""Decorator for executing a function many times"""
def wrapper(*args, **kwargs):
for i in ITERATIONS:
fn(*args, **kwargs)
return wrapper
def SafeSleep(duration):
start = time.time()
while True:
delay = start + duration - time.time()
if delay <= 0.0:
break
time.sleep(delay)
class _ThreadedTestCase(unittest.TestCase):
"""Test class that supports adding/waiting on threads"""
def setUp(self):
unittest.TestCase.setUp(self)
self.done = Queue.Queue(0)
self.threads = []
def _addThread(self, *args, **kwargs):
"""Create and remember a new thread"""
t = threading.Thread(*args, **kwargs)
self.threads.append(t)
t.start()
return t
def _waitThreads(self):
"""Wait for all our threads to finish"""
for t in self.threads:
t.join(60)
self.failIf(t.isAlive())
self.threads = []
class _ConditionTestCase(_ThreadedTestCase):
"""Common test case for conditions"""
def setUp(self, cls):
_ThreadedTestCase.setUp(self)
self.lock = threading.Lock()
self.cond = cls(self.lock)
def _testAcquireRelease(self):
self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.assert_(self.cond._is_owned())
self.cond.notifyAll()
self.assert_(self.cond._is_owned())
self.cond.release()
self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
def _testNotification(self):
def _NotifyAll():
self.done.put("NE")
self.cond.acquire()
self.done.put("NA")
self.cond.notifyAll()
self.done.put("NN")
self.cond.release()
self.cond.acquire()
self._addThread(target=_NotifyAll)
self.assertEqual(self.done.get(True, 1), "NE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.wait(None)
self.assertEqual(self.done.get(True, 1), "NA")
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
self.assertFalse(self.cond._is_owned())
class TestSingleNotifyPipeCondition(_ConditionTestCase):
"""SingleNotifyPipeCondition tests"""
def setUp(self):
_ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
def testNotification(self):
self._testNotification()
def testWaitReuse(self):
self.cond.acquire()
self.cond.wait(0)
self.cond.wait(0.1)
self.cond.release()
def testNoNotifyReuse(self):
self.cond.acquire()
self.cond.notifyAll()
self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
class TestPipeCondition(_ConditionTestCase):
"""PipeCondition tests"""
def setUp(self):
_ConditionTestCase.setUp(self, locking.PipeCondition)
def testAcquireRelease(self):
self._testAcquireRelease()
def testNotification(self):
self._testNotification()
def _TestWait(self, fn):
threads = [
self._addThread(target=fn),
self._addThread(target=fn),
self._addThread(target=fn),
]
# Wait for threads to be waiting
for _ in threads:
self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
self.assertEqual(len(self.cond._waiters), 3)
self.assertEqual(self.cond._waiters, set(threads))
self.assertTrue(repr(self.cond).startswith("<"))
self.assertTrue("waiters=" in repr(self.cond))
# This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.release()
# We should now get 3 W and 1 A (for the new thread) in whatever order
w = 0
a = 0
for i in range(4):
got = self.done.get(True, 1)
if got == "W":
w += 1
elif got == "A":
a += 1
else:
self.fail("Got %s on the done queue" % got)
self.assertEqual(w, 3)
self.assertEqual(a, 1)
self.cond.acquire()
self.cond.notifyAll()
self.cond.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "W")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testBlockingWait(self):
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
self.cond.wait(None)
self.cond.release()
self.done.put("W")
self._TestWait(_BlockingWait)
def testLongTimeoutWait(self):
def _Helper():
self.cond.acquire()
self.done.put("A")
self.cond.wait(15.0)
self.cond.release()
self.done.put("W")
self._TestWait(_Helper)
def _TimeoutWait(self, timeout, check):
self.cond.acquire()
self.cond.wait(timeout)
self.cond.release()
self.done.put(check)
def testShortTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T1")
self.assertEqual(self.done.get_nowait(), "T1")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testZeroTimeoutWait(self):
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._addThread(target=self._TimeoutWait, args=(0, "T0"))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertEqual(self.done.get_nowait(), "T0")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSharedLock(_ThreadedTestCase):
"""SharedLock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock("TestSharedLock")
self.assertTrue(repr(self.sl).startswith("<"))
self.assertTrue("name=TestSharedLock" in repr(self.sl))
def testSequenceAndOwnership(self):
self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl.is_owned())
self.assert_(self.sl.is_owned(shared=1))
self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
self.assertFalse(self.sl.is_owned())
self.sl.acquire()
self.assert_(self.sl.is_owned())
self.assertFalse(self.sl.is_owned(shared=1))
self.assert_(self.sl.is_owned(shared=0))
self.sl.release()
self.assertFalse(self.sl.is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl.is_owned())
self.assert_(self.sl.is_owned(shared=1))
self.assertFalse(self.sl.is_owned(shared=0))
self.sl.release()
self.assertFalse(self.sl.is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
self.assert_(self.sl.acquire(shared=1))
self.sl.release()
self.assert_(self.sl.acquire())
self.sl.release()
def testDoubleLockingStoE(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire)
def testDoubleLockingEtoS(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingStoS(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.acquire, shared=1)
def testDoubleLockingEtoE(self):
self.sl.acquire()
self.assertRaises(AssertionError, self.sl.acquire)
# helper functions: called in a separate thread they acquire the lock, send
# their identifier on the done queue, then release it.
def _doItSharer(self):
try:
self.sl.acquire(shared=1)
self.done.put("SHR")
self.sl.release()
except errors.LockError:
self.done.put("ERR")
def _doItExclusive(self):
try:
self.sl.acquire()
self.done.put("EXC")
self.sl.release()
except errors.LockError:
self.done.put("ERR")
def _doItDelete(self):
try:
self.sl.delete()
self.done.put("DEL")
except errors.LockError:
self.done.put("ERR")
def testSharersCanCoexist(self):
self.sl.acquire(shared=1)
threading.Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
self.sl.release()
@_Repeat
def testExclusiveBlocksExclusive(self):
self.sl.acquire()
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testExclusiveBlocksDelete(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "DEL")
self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveBlocksSharer(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testSharerBlocksExclusive(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testSharerBlocksDelete(self):
self.sl.acquire(shared=1)
self._addThread(target=self._doItDelete)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "DEL")
self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testWaitingExclusiveBlocksSharer(self):
"""SKIPPED testWaitingExclusiveBlockSharer"""
return
self.sl.acquire(shared=1)
# the lock is acquired in shared mode...
self._addThread(target=self._doItExclusive)
# ...but now an exclusive is waiting...
self._addThread(target=self._doItSharer)
# ...so the sharer should be blocked as well
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
# The exclusive passed before
self.failUnlessEqual(self.done.get_nowait(), "EXC")
self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testWaitingSharerBlocksExclusive(self):
"""SKIPPED testWaitingSharerBlocksExclusive"""
return
self.sl.acquire()
# the lock is acquired in exclusive mode...
self._addThread(target=self._doItSharer)
# ...but now a sharer is waiting...
self._addThread(target=self._doItExclusive)
# ...the exclusive is waiting too...
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.sl.release()
self._waitThreads()
# The sharer passed before
self.assertEqual(self.done.get_nowait(), "SHR")
self.assertEqual(self.done.get_nowait(), "EXC")
def testDelete(self):
self.sl.delete()
self.assertRaises(errors.LockError, self.sl.acquire)
self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
self.assertRaises(errors.LockError, self.sl.delete)
def testDeleteTimeout(self):
self.assertTrue(self.sl.delete(timeout=60))
def testDeleteTimeoutFail(self):
ready = threading.Event()
finish = threading.Event()
def fn():
self.sl.acquire(shared=0)
ready.set()
finish.wait()
self.sl.release()
self._addThread(target=fn)
ready.wait()
# Test if deleting a lock owned in exclusive mode by another thread fails
# to delete when a timeout is used
self.assertFalse(self.sl.delete(timeout=0.02))
finish.set()
self._waitThreads()
self.assertTrue(self.sl.delete())
self.assertRaises(errors.LockError, self.sl.acquire)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
self.assertRaises(AssertionError, self.sl.delete)
@_Repeat
def testDeletePendingSharersExclusiveDelete(self):
self.sl.acquire()
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItDelete)
self.sl.delete()
self._waitThreads()
# The threads who were pending return ERR
for _ in range(4):
self.assertEqual(self.done.get_nowait(), "ERR")
self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testDeletePendingDeleteExclusiveSharers(self):
self.sl.acquire()
self._addThread(target=self._doItDelete)
self._addThread(target=self._doItExclusive)
self._addThread(target=self._doItSharer)
self._addThread(target=self._doItSharer)
self.sl.delete()
self._waitThreads()
# The two threads who were pending return both ERR
self.assertEqual(self.done.get_nowait(), "ERR")
self.assertEqual(self.done.get_nowait(), "ERR")
self.assertEqual(self.done.get_nowait(), "ERR")
self.assertEqual(self.done.get_nowait(), "ERR")
self.sl = locking.SharedLock(self.sl.name)
@_Repeat
def testExclusiveAcquireTimeout(self):
for shared in [0, 1]:
on_queue = threading.Event()
release_exclusive = threading.Event()
def _LockExclusive():
self.sl.acquire(shared=0, test_notify=on_queue.set)
self.done.put("A: start wait")
release_exclusive.wait()
self.done.put("A: end wait")
self.sl.release()
# Start thread to hold lock in exclusive mode
self._addThread(target=_LockExclusive)
# Wait for wait to begin
self.assertEqual(self.done.get(timeout=60), "A: start wait")
# Wait up to 60s to get lock, but release exclusive lock as soon as we're
# on the queue
self.failUnless(self.sl.acquire(shared=shared, timeout=60,
test_notify=release_exclusive.set))
self.done.put("got 2nd")
self.sl.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "A: end wait")
self.assertEqual(self.done.get_nowait(), "got 2nd")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testAcquireExpiringTimeout(self):
def _AcquireWithTimeout(shared, timeout):
if not self.sl.acquire(shared=shared, timeout=timeout):
self.done.put("timeout")
for shared in [0, 1]:
# Lock exclusively
self.sl.acquire()
# Start shared acquires with timeout between 0 and 20 ms
for i in range(11):
self._addThread(target=_AcquireWithTimeout,
args=(shared, i * 2.0 / 1000.0))
# Wait for threads to finish (makes sure the acquire timeout expires
# before releasing the lock)
self._waitThreads()
# Release lock
self.sl.release()
for _ in range(11):
self.assertEqual(self.done.get_nowait(), "timeout")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testSharedSkipExclusiveAcquires(self):
# Tests whether shared acquires jump in front of exclusive acquires in the
# queue.
def _Acquire(shared, name, notify_ev, wait_ev):
if notify_ev:
notify_fn = notify_ev.set
else:
notify_fn = None
if wait_ev:
wait_ev.wait()
if not self.sl.acquire(shared=shared, test_notify=notify_fn):
return
self.done.put(name)
self.sl.release()
# Get exclusive lock while we fill the queue
self.sl.acquire()
shrcnt1 = 5
shrcnt2 = 7
shrcnt3 = 9
shrcnt4 = 2
# Add acquires using threading.Event for synchronization. They'll be
# acquired exactly in the order defined in this list.
acquires = (shrcnt1 * [(1, "shared 1")] +
3 * [(0, "exclusive 1")] +
shrcnt2 * [(1, "shared 2")] +
shrcnt3 * [(1, "shared 3")] +
shrcnt4 * [(1, "shared 4")] +
3 * [(0, "exclusive 2")])
ev_cur = None
ev_prev = None
for args in acquires:
ev_cur = threading.Event()
self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
ev_prev = ev_cur
# Wait for last acquire to start
ev_prev.wait()
# Expect 6 pending exclusive acquires and 1 for all shared acquires
# together
self.assertEqual(self.sl._count_pending(), 7)
# Release exclusive lock and wait
self.sl.release()
self._waitThreads()
# Check sequence
for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
# Shared locks aren't guaranteed to be notified in order, but they'll be
# first
tmp = self.done.get_nowait()
if tmp == "shared 1":
shrcnt1 -= 1
elif tmp == "shared 2":
shrcnt2 -= 1
elif tmp == "shared 3":
shrcnt3 -= 1
elif tmp == "shared 4":
shrcnt4 -= 1
self.assertEqual(shrcnt1, 0)
self.assertEqual(shrcnt2, 0)
self.assertEqual(shrcnt3, 0)
self.assertEqual(shrcnt3, 0)
for _ in range(3):
self.assertEqual(self.done.get_nowait(), "exclusive 1")
for _ in range(3):
self.assertEqual(self.done.get_nowait(), "exclusive 2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testIllegalDowngrade(self):
# Not yet acquired
self.assertRaises(AssertionError, self.sl.downgrade)
# Acquire in shared mode, downgrade should be no-op
self.assertTrue(self.sl.acquire(shared=1))
self.assertTrue(self.sl.is_owned(shared=1))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl.is_owned(shared=1))
self.sl.release()
def testDowngrade(self):
self.assertTrue(self.sl.acquire())
self.assertTrue(self.sl.is_owned(shared=0))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl.is_owned(shared=1))
self.sl.release()
@_Repeat
def testDowngradeJumpsAheadOfExclusive(self):
def _KeepExclusive(ev_got, ev_downgrade, ev_release):
self.assertTrue(self.sl.acquire())
self.assertTrue(self.sl.is_owned(shared=0))
ev_got.set()
ev_downgrade.wait()
self.assertTrue(self.sl.is_owned(shared=0))
self.assertTrue(self.sl.downgrade())
self.assertTrue(self.sl.is_owned(shared=1))
ev_release.wait()
self.assertTrue(self.sl.is_owned(shared=1))
self.sl.release()
def _KeepExclusive2(ev_started, ev_release):
self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
self.assertTrue(self.sl.is_owned(shared=0))
ev_release.wait()
self.assertTrue(self.sl.is_owned(shared=0))
self.sl.release()
def _KeepShared(ev_started, ev_got, ev_release):
self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
self.assertTrue(self.sl.is_owned(shared=1))
ev_got.set()
ev_release.wait()
self.assertTrue(self.sl.is_owned(shared=1))
self.sl.release()
# Acquire lock in exclusive mode
ev_got_excl1 = threading.Event()
ev_downgrade_excl1 = threading.Event()
ev_release_excl1 = threading.Event()
th_excl1 = self._addThread(target=_KeepExclusive,
args=(ev_got_excl1, ev_downgrade_excl1,
ev_release_excl1))
ev_got_excl1.wait()
# Start a second exclusive acquire
ev_started_excl2 = threading.Event()
ev_release_excl2 = threading.Event()
th_excl2 = self._addThread(target=_KeepExclusive2,
args=(ev_started_excl2, ev_release_excl2))
ev_started_excl2.wait()
# Start shared acquires, will jump ahead of second exclusive acquire when
# first exclusive acquire downgrades
ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
ev_release_shared = threading.Event()
th_shared = [self._addThread(target=_KeepShared,
args=(ev_started, ev_got, ev_release_shared))
for (ev_started, ev_got) in ev_shared]
# Wait for all shared acquires to start
for (ev, _) in ev_shared:
ev.wait()
# Check lock information
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
[(self.sl.name, "exclusive", [th_excl1.getName()], None)])
[(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[("exclusive", [th_excl2.getName()]),
("shared", sorted(th.getName() for th in th_shared))])
# Shared acquires won't start until the exclusive lock is downgraded
ev_downgrade_excl1.set()
# Wait for all shared acquires to be successful
for (_, ev) in ev_shared:
ev.wait()
# Check lock information again
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
query.LQ_PENDING])),
[(self.sl.name, "shared", None,
[("exclusive", [th_excl2.getName()])])])
[(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
self.assertEqual(set(owner), set([th_excl1.getName()] +
[th.getName() for th in th_shared]))
ev_release_excl1.set()
ev_release_excl2.set()
ev_release_shared.set()
self._waitThreads()
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
query.LQ_PENDING])),
[(self.sl.name, None, None, [])])
@_Repeat
def testMixedAcquireTimeout(self):
sync = threading.Event()
def _AcquireShared(ev):
if not self.sl.acquire(shared=1, timeout=None):
return
self.done.put("shared")
# Notify main thread
ev.set()
# Wait for notification from main thread
sync.wait()
# Release lock
self.sl.release()
acquires = []
for _ in range(3):
ev = threading.Event()
self._addThread(target=_AcquireShared, args=(ev, ))
acquires.append(ev)
# Wait for all acquires to finish
for i in acquires:
i.wait()
self.assertEqual(self.sl._count_pending(), 0)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Acquire exclusive without timeout
exclsync = threading.Event()
exclev = threading.Event()
def _AcquireExclusive():
if not self.sl.acquire(shared=0):
return
self.done.put("exclusive")
# Notify main thread
exclev.set()
# Wait for notification from main thread
exclsync.wait()
self.sl.release()
self._addThread(target=_AcquireExclusive)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
# Make all shared holders release their locks
sync.set()
# Wait for exclusive acquire to succeed
exclev.wait()
self.assertEqual(self.sl._count_pending(), 0)
# Try to get exclusive lock
self.failIf(self.sl.acquire(shared=0, timeout=0.02))
def _AcquireSharedSimple():
if self.sl.acquire(shared=1, timeout=None):
self.done.put("shared2")
self.sl.release()
for _ in range(10):
self._addThread(target=_AcquireSharedSimple)
# Tell exclusive lock to release
exclsync.set()
# Wait for everything to finish
self._waitThreads()
self.assertEqual(self.sl._count_pending(), 0)
# Check sequence
for _ in range(3):
self.assertEqual(self.done.get_nowait(), "shared")
self.assertEqual(self.done.get_nowait(), "exclusive")
for _ in range(10):
self.assertEqual(self.done.get_nowait(), "shared2")
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testPriority(self):
# Acquire in exclusive mode
self.assert_(self.sl.acquire(shared=0))
# Queue acquires
def _Acquire(prev, next, shared, priority, result):
prev.wait()
self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
try:
self.done.put(result)
finally:
self.sl.release()
counter = itertools.count(0)
priorities = range(-20, 30)
first = threading.Event()
prev = first
# Data structure:
# {
# priority:
# [(shared/exclusive, set(acquire names), set(pending threads)),
# (shared/exclusive, ...),
# ...,
# ],
# }
perprio = {}
# References shared acquire per priority in L{perprio}. Data structure:
# {
# priority: (shared=1, set(acquire names), set(pending threads)),
# }
prioshared = {}
for seed in [4979, 9523, 14902, 32440]:
# Use a deterministic random generator
rnd = random.Random(seed)
for priority in [rnd.choice(priorities) for _ in range(30)]:
modes = [0, 1]
rnd.shuffle(modes)
for shared in modes:
# Unique name
acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
ev = threading.Event()
thread = self._addThread(target=_Acquire,
args=(prev, ev, shared, priority, acqname))
prev = ev
# Record expected aqcuire, see above for structure
data = (shared, set([acqname]), set([thread]))
priolist = perprio.setdefault(priority, [])
if shared:
priosh = prioshared.get(priority, None)
if priosh:
# Shared acquires are merged
for i, j in zip(priosh[1:], data[1:]):
i.update(j)
assert data[0] == priosh[0]
else:
prioshared[priority] = data
priolist.append(data)
else:
priolist.append(data)
# Start all acquires and wait for them
first.set()
prev.wait()
# Check lock information
self.assertEqual(self.sl.GetLockInfo(set()),
[(self.sl.name, None, None, None)])
self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
[(self.sl.name, "exclusive",
[threading.currentThread().getName()], None)])
self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
perprio)
# Let threads acquire the lock
self.sl.release()
# Wait for everything to finish
self._waitThreads()
self.assert_(self.sl._check_empty())
# Check acquires by priority
for acquires in [perprio[i] for i in sorted(perprio.keys())]:
for (_, names, _) in acquires:
# For shared acquires, the set will contain 1..n entries. For exclusive
# acquires only one.
while names:
names.remove(self.done.get_nowait())
self.assertFalse(compat.any(names for (_, names, _) in acquires))
self.assertRaises(Queue.Empty, self.done.get_nowait)
def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
self.assertEqual(name, self.sl.name)
self.assert_(mode is None)
self.assert_(owner is None)
self.assertEqual([(pendmode, sorted(waiting))
for (pendmode, waiting) in pending],
[(["exclusive", "shared"][int(bool(shared))],
sorted(t.getName() for t in threads))
for acquires in [perprio[i]
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires])
class _FakeTimeForSpuriousNotifications:
def __init__(self, now, check_end):
self.now = now
self.check_end = check_end
# Deterministic random number generator
self.rnd = random.Random(15086)
def time(self):
# Advance time if the random number generator thinks so (this is to test
# multiple notifications without advancing the time)
if self.rnd.random() < 0.3:
self.now += self.rnd.random()
self.check_end(self.now)
return self.now
@_Repeat
def testAcquireTimeoutWithSpuriousNotifications(self):
ready = threading.Event()
locked = threading.Event()
req = Queue.Queue(0)
epoch = 4000.0
timeout = 60.0
def check_end(now):
self.assertFalse(locked.isSet())
# If we waited long enough (in virtual time), tell main thread to release
# lock, otherwise tell it to notify once more
req.put(now < (epoch + (timeout * 0.8)))
time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
sl = locking.SharedLock("test", _time_fn=time_fn)
# Acquire in exclusive mode
sl.acquire(shared=0)
def fn():
self.assertTrue(sl.acquire(shared=0, timeout=timeout,
test_notify=ready.set))
locked.set()
sl.release()
self.done.put("success")
# Start acquire with timeout and wait for it to be ready
self._addThread(target=fn)
ready.wait()
# The separate thread is now waiting to acquire the lock, so start sending
# spurious notifications.
# Wait for separate thread to ask for another notification
count = 0
while req.get():
# After sending the notification, the lock will take a short amount of
# time to notice and to retrieve the current time
sl._notify_topmost()
count += 1
self.assertTrue(count > 100, "Not enough notifications were sent")
self.assertFalse(locked.isSet())
# Some notifications have been sent, now actually release the lock
sl.release()
# Wait for lock to be acquired
locked.wait()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "success")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock("TestSharedLockInCondition")
self.setCondition()
def setCondition(self):
self.cond = threading.Condition(self.sl)
def testKeepMode(self):
self.cond.acquire(shared=1)
self.assert_(self.sl.is_owned(shared=1))
self.cond.wait(0)
self.assert_(self.sl.is_owned(shared=1))
self.cond.release()
self.cond.acquire(shared=0)
self.assert_(self.sl.is_owned(shared=0))
self.cond.wait(0)
self.assert_(self.sl.is_owned(shared=0))
self.cond.release()
class TestSharedLockInPipeCondition(TestSharedLockInCondition):
"""SharedLock as a pipe condition lock tests"""
def setCondition(self):
self.cond = locking.PipeCondition(self.sl)
class TestSSynchronizedDecorator(_ThreadedTestCase):
"""Shared Lock Synchronized decorator test"""
def setUp(self):
_ThreadedTestCase.setUp(self)
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
self.assert_(_decoratorlock.is_owned())
self.done.put("EXC")
@locking.ssynchronized(_decoratorlock, shared=1)
def _doItSharer(self):
self.assert_(_decoratorlock.is_owned(shared=1))
self.done.put("SHR")
def testDecoratedFunctions(self):
self._doItExclusive()
self.assertFalse(_decoratorlock.is_owned())
self._doItSharer()
self.assertFalse(_decoratorlock.is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
threading.Thread(target=self._doItSharer).start()
self.assert_(self.done.get(True, 1))
_decoratorlock.release()
@_Repeat
def testExclusiveBlocksExclusive(self):
_decoratorlock.acquire()
self._addThread(target=self._doItExclusive)
# give it a bit of time to check that it's not actually doing anything
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "EXC")
@_Repeat
def testExclusiveBlocksSharer(self):
_decoratorlock.acquire()
self._addThread(target=self._doItSharer)
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "SHR")
@_Repeat
def testSharerBlocksExclusive(self):
_decoratorlock.acquire(shared=1)
self._addThread(target=self._doItExclusive)
self.assertRaises(Queue.Empty, self.done.get_nowait)
_decoratorlock.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), "EXC")
class TestLockSet(_ThreadedTestCase):
"""LockSet tests"""
def setUp(self):
_ThreadedTestCase.setUp(self)
self._setUpLS()
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.resources = ["one", "two", "three"]
self.ls = locking.LockSet(self.resources, "TestLockSet")
def testResources(self):
self.assertEquals(self.ls._names(), set(self.resources))
newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
def testCheckOwnedUnknown(self):
self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
for shared in [-1, 0, 1, 6378, 24255]:
self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
shared=shared))
def testCheckOwnedUnknownWhileHolding(self):
self.assertFalse(self.ls.check_owned([]))
self.ls.acquire("one", shared=1)
self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
self.assertTrue(self.ls.check_owned("one", shared=1))
self.assertFalse(self.ls.check_owned("one", shared=0))
self.assertFalse(self.ls.check_owned(["one", "two"]))
self.assertRaises(errors.LockError, self.ls.check_owned,
["one", "nonexist"])
self.assertRaises(errors.LockError, self.ls.check_owned, "")
self.ls.release()
self.assertFalse(self.ls.check_owned([]))
self.assertFalse(self.ls.check_owned("one"))
def testAcquireRelease(self):
self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assert_(self.ls.acquire("one"))
self.assertEquals(self.ls.list_owned(), set(["one"]))
self.assertTrue(self.ls.check_owned("one"))
self.assertTrue(self.ls.check_owned("one", shared=0))
self.assertFalse(self.ls.check_owned("one", shared=1))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
self.assertEquals(self.ls.list_owned(), set(["one"]))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
self.ls.acquire(["one", "two", "three"])
self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
self.assertTrue(self.ls.check_owned(self.ls._names()))
self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
self.ls.release("one")
self.assertFalse(self.ls.check_owned(["one"]))
self.assertTrue(self.ls.check_owned(["two", "three"]))
self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
self.ls.release(["three"])
self.assertEquals(self.ls.list_owned(), set(["two"]))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
for name in self.ls._names():
self.assertFalse(self.ls.check_owned(name))
def testNoDoubleAcquire(self):
self.ls.acquire("one")
self.assertRaises(AssertionError, self.ls.acquire, "one")
self.assertRaises(AssertionError, self.ls.acquire, ["two"])
self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
self.ls.release()
self.ls.acquire(["one", "three"])
self.ls.release("one")
self.assertRaises(AssertionError, self.ls.acquire, ["two"])
self.ls.release("three")
def testNoWrongRelease(self):
self.assertRaises(AssertionError, self.ls.release)
self.ls.acquire("one")
self.assertRaises(AssertionError, self.ls.release, "two")
def testAddRemove(self):
self.ls.add("four")
self.assertEquals(self.ls.list_owned(), set())
self.assert_("four" in self.ls._names())
self.ls.add(["five", "six", "seven"], acquired=1)
self.assert_("five" in self.ls._names())
self.assert_("six" in self.ls._names())
self.assert_("seven" in self.ls._names())
self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
self.assert_("five" not in self.ls._names())
self.assert_("six" not in self.ls._names())
self.assertEquals(self.ls.list_owned(), set(["seven"]))
self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
self.ls.remove("seven")
self.assert_("seven" not in self.ls._names())
self.assertEquals(self.ls.list_owned(), set([]))
self.ls.acquire(None, shared=1)
self.assertRaises(AssertionError, self.ls.add, "eight")
self.ls.release()
self.ls.acquire(None)
self.ls.add("eight", acquired=1)
self.assert_("eight" in self.ls._names())
self.assert_("eight" in self.ls.list_owned())
self.ls.add("nine")
self.assert_("nine" in self.ls._names())
self.assert_("nine" not in self.ls.list_owned())
self.ls.release()
self.ls.remove(["two"])
self.assert_("two" not in self.ls._names())
self.ls.acquire("three")
self.assertEquals(self.ls.remove(["three"]), ["three"])
self.assert_("three" not in self.ls._names())
self.assertEquals(self.ls.remove("three"), [])
self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
self.assert_("one" not in self.ls._names())
def testRemoveNonBlocking(self):
self.ls.acquire("one")
self.assertEquals(self.ls.remove("one"), ["one"])
self.ls.acquire(["two", "three"])
self.assertEquals(self.ls.remove(["two", "three"]),
["two", "three"])
def testNoDoubleAdd(self):
self.assertRaises(errors.LockError, self.ls.add, "two")
self.ls.add("four")
self.assertRaises(errors.LockError, self.ls.add, "four")
def testNoWrongRemoves(self):
self.ls.acquire(["one", "three"], shared=1)
# Cannot remove "two" while holding something which is not a superset
self.assertRaises(AssertionError, self.ls.remove, "two")
# Cannot remove "three" as we are sharing it
self.assertRaises(AssertionError, self.ls.remove, "three")
def testAcquireSetLock(self):
# acquire the set-lock exclusively
self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
self.assertEquals(self.ls.is_owned(), True)
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# I can still add/remove elements...
self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
self.assert_(self.ls.add("six"))
self.ls.release()
# share the set-lock
self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
# adding new elements is not possible
self.assertRaises(AssertionError, self.ls.add, "five")
self.ls.release()
def testAcquireWithRepetitions(self):
self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
set(["two", "two", "three"]))
self.ls.release(["two", "two"])
self.assertEquals(self.ls.list_owned(), set(["three"]))
def testEmptyAcquire(self):
# Acquire an empty list of locks...
self.assertEquals(self.ls.acquire([]), set())
self.assertEquals(self.ls.list_owned(), set())
# New locks can still be addded
self.assert_(self.ls.add("six"))
# "re-acquiring" is not an issue, since we had really acquired nothing
self.assertEquals(self.ls.acquire([], shared=1), set())
self.assertEquals(self.ls.list_owned(), set())
# We haven't really acquired anything, so we cannot release
self.assertRaises(AssertionError, self.ls.release)
def _doLockSet(self, names, shared):
try:
self.ls.acquire(names, shared=shared)
self.done.put("DONE")
self.ls.release()
except errors.LockError:
self.done.put("ERR")
def _doAddSet(self, names):
try:
self.ls.add(names, acquired=1)
self.done.put("DONE")
self.ls.release()
except errors.LockError:
self.done.put("ERR")
def _doRemoveSet(self, names):
self.done.put(self.ls.remove(names))
@_Repeat
def testConcurrentSharedAcquire(self):
self.ls.acquire(["one", "two"], shared=1)
self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLockSet, args=("three", 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.assertEqual(self.done.get_nowait(), "DONE")
@_Repeat
def testConcurrentExclusiveAcquire(self):
self.ls.acquire(["one", "two"])
self._addThread(target=self._doLockSet, args=("three", 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLockSet, args=("three", 0))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
self._addThread(target=self._doLockSet, args=("one", 0))
self._addThread(target=self._doLockSet, args=("one", 1))
self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(6):
self.failUnlessEqual(self.done.get_nowait(), "DONE")
@_Repeat
def testSimpleAcquireTimeoutExpiring(self):
names = sorted(self.ls._names())
self.assert_(len(names) >= 3)
# Get name of first lock
first = names[0]
# Get name of last lock
last = names.pop()
checks = [
# Block first and try to lock it again
(first, first),
# Block last and try to lock all locks
(None, first),
# Block last and try to lock it again
(last, last),
]
for (wanted, block) in checks:
# Lock in exclusive mode
self.assert_(self.ls.acquire(block, shared=0))
def _AcquireOne():
# Try to get the same lock again with a timeout (should never succeed)
acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
if acquired:
self.done.put("acquired")
self.ls.release()
else:
self.assert_(acquired is None)
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls.is_owned())
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
# Wait for timeout in thread to expire
self._waitThreads()
# Release exclusive lock again
self.ls.release()
self.assertEqual(self.done.get_nowait(), "not acquired")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testDelayedAndExpiringLockAcquire(self):
self._setUpLS()
self.ls.add(["five", "six", "seven", "eight", "nine"])
for expire in (False, True):
names = sorted(self.ls._names())
self.assertEqual(len(names), 8)
lock_ev = dict([(i, threading.Event()) for i in names])
# Lock all in exclusive mode
self.assert_(self.ls.acquire(names, shared=0))
if expire:
# We'll wait at least 300ms per lock
lockwait = len(names) * [0.3]
# Fail if we can't acquire all locks in 400ms. There are 8 locks, so
# this gives us up to 2.4s to fail.
lockall_timeout = 0.4
else:
# This should finish rather quickly
lockwait = None
lockall_timeout = len(names) * 5.0
def _LockAll():
def acquire_notification(name):
if not expire:
self.done.put("getting %s" % name)
# Kick next lock
lock_ev[name].set()
if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
test_notify=acquire_notification):
self.done.put("got all")
self.ls.release()
else:
self.done.put("timeout on all")
# Notify all locks
for ev in lock_ev.values():
ev.set()
t = self._addThread(target=_LockAll)
for idx, name in enumerate(names):
# Wait for actual acquire on this lock to start
lock_ev[name].wait(10.0)
if expire and t.isAlive():
# Wait some time after getting the notification to make sure the lock
# acquire will expire
SafeSleep(lockwait[idx])
self.ls.release(names=name)
self.assertFalse(self.ls.list_owned())
self._waitThreads()
if expire:
# Not checking which locks were actually acquired. Doing so would be
# too timing-dependant.
self.assertEqual(self.done.get_nowait(), "timeout on all")
else:
for i in names:
self.assertEqual(self.done.get_nowait(), "getting %s" % i)
self.assertEqual(self.done.get_nowait(), "got all")
self.assertRaises(Queue.Empty, self.done.get_nowait)
@_Repeat
def testConcurrentRemove(self):
self.ls.add("four")
self.ls.acquire(["one", "two", "four"])
self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.remove("one")
self.ls.release()
self._waitThreads()
for i in range(4):
self.failUnlessEqual(self.done.get_nowait(), "ERR")
self.ls.add(["five", "six"], acquired=1)
self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
self.ls.remove("five")
self.ls.release()
self._waitThreads()
for i in range(4):
self.failUnlessEqual(self.done.get_nowait(), "DONE")
self.ls.acquire(["three", "four"])
self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.remove("four")
self._waitThreads()
self.assertEqual(self.done.get_nowait(), ["six"])
self._addThread(target=self._doRemoveSet, args=(["two"]))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), ["two"])
self.ls.release()
# reset lockset
self._setUpLS()
@_Repeat
def testConcurrentSharedSetLock(self):
# share the set-lock...
self.ls.acquire(None, shared=1)
# ...another thread can share it too
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
# ...or just share some elements
self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
# ...but not add new ones or remove any
t = self._addThread(target=self._doAddSet, args=(["nine"]))
self._addThread(target=self._doRemoveSet, args=(["two"], ))
self.assertRaises(Queue.Empty, self.done.get_nowait)
# this just releases the set-lock
self.ls.release([])
t.join(60)
self.assertEqual(self.done.get_nowait(), "DONE")
# release the lock on the actual elements so remove() can proceed too
self.ls.release()
self._waitThreads()
self.failUnlessEqual(self.done.get_nowait(), ["two"])
# reset lockset
self._setUpLS()
@_Repeat
def testConcurrentExclusiveSetLock(self):
# acquire the set-lock...
self.ls.acquire(None, shared=0)
# ...no one can do anything else
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(["three"], 0))
self._addThread(target=self._doLockSet, args=(["two"], 1))
self._addThread(target=self._doAddSet, args=(["nine"]))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(5):
self.assertEqual(self.done.get(True, 1), "DONE")
# cleanup
self._setUpLS()
@_Repeat
def testConcurrentSetLockAdd(self):
self.ls.acquire("one")
# Another thread wants the whole SetLock
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(AssertionError, self.ls.add, "four")
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.assertEqual(self.done.get_nowait(), "DONE")
self.ls.acquire(None)
self._addThread(target=self._doLockSet, args=(None, 0))
self._addThread(target=self._doLockSet, args=(None, 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.add("four")
self.ls.add("five", acquired=1)
self.ls.add("six", acquired=1, shared=1)
self.assertEquals(self.ls.list_owned(),
set(["one", "two", "three", "five", "six"]))
self.assertEquals(self.ls.is_owned(), True)
self.assertEquals(self.ls._names(),
set(["one", "two", "three", "four", "five", "six"]))
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.assertEqual(self.done.get_nowait(), "DONE")
self._setUpLS()
@_Repeat
def testEmptyLockSet(self):
# get the set-lock
self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
# now empty it...
self.ls.remove(["one", "two", "three"])
self.assertFalse(self.ls._names())
# and adds/locks by another thread still wait
self._addThread(target=self._doAddSet, args=(["nine"]))
self._addThread(target=self._doLockSet, args=(None, 1))
self._addThread(target=self._doLockSet, args=(None, 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
for _ in range(3):
self.assertEqual(self.done.get_nowait(), "DONE")
# empty it again...
self.assertEqual(self.ls.remove(["nine"]), ["nine"])
# now share it...
self.assertEqual(self.ls.acquire(None, shared=1), set())
# other sharers can go, adds still wait
self._addThread(target=self._doLockSet, args=(None, 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doAddSet, args=(["nine"]))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.ls.release()
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._setUpLS()
def testAcquireWithNamesDowngrade(self):
self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
self.assertTrue(self.ls.is_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.ls.release()
self.assertFalse(self.ls.is_owned())
self.assertFalse(self.ls._get_lock().is_owned())
# Can't downgrade after releasing
self.assertRaises(AssertionError, self.ls.downgrade, "two")
def testDowngrade(self):
# Not owning anything, must raise an exception
self.assertFalse(self.ls.is_owned())
self.assertRaises(AssertionError, self.ls.downgrade)
self.assertFalse(compat.any(i.is_owned()
for i in self.ls._get_lockdict().values()))
self.assertFalse(self.ls.check_owned(self.ls._names()))
for name in self.ls._names():
self.assertFalse(self.ls.check_owned(name))
self.assertEquals(self.ls.acquire(None, shared=0),
set(["one", "two", "three"]))
self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
for name in self.ls._names():
self.assertTrue(self.ls.check_owned(name))
self.assertTrue(self.ls.check_owned(name, shared=0))
self.assertFalse(self.ls.check_owned(name, shared=1))
self.assertTrue(self.ls._get_lock().is_owned(shared=0))
self.assertTrue(compat.all(i.is_owned(shared=0)
for i in self.ls._get_lockdict().values()))
# Start downgrading locks
self.assertTrue(self.ls.downgrade(names=["one"]))
self.assertTrue(self.ls._get_lock().is_owned(shared=0))
self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
for name, lock in
self.ls._get_lockdict().items()))
self.assertFalse(self.ls.check_owned("one", shared=0))
self.assertTrue(self.ls.check_owned("one", shared=1))
self.assertTrue(self.ls.check_owned("two", shared=0))
self.assertTrue(self.ls.check_owned("three", shared=0))
# Downgrade second lock
self.assertTrue(self.ls.downgrade(names="two"))
self.assertTrue(self.ls._get_lock().is_owned(shared=0))
should_share = lambda name: [0, 1][int(name in ("one", "two"))]
self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
for name, lock in
self.ls._get_lockdict().items()))
self.assertFalse(self.ls.check_owned("one", shared=0))
self.assertTrue(self.ls.check_owned("one", shared=1))
self.assertFalse(self.ls.check_owned("two", shared=0))
self.assertTrue(self.ls.check_owned("two", shared=1))
self.assertTrue(self.ls.check_owned("three", shared=0))
# Downgrading the last exclusive lock to shared must downgrade the
# lockset-internal lock too
self.assertTrue(self.ls.downgrade(names="three"))
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
self.assertTrue(compat.all(i.is_owned(shared=1)
for i in self.ls._get_lockdict().values()))
# Verify owned locks
for name in self.ls._names():
self.assertTrue(self.ls.check_owned(name, shared=1))
# Downgrading a shared lock must be a no-op
self.assertTrue(self.ls.downgrade(names=["one", "three"]))
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
self.assertTrue(compat.all(i.is_owned(shared=1)
for i in self.ls._get_lockdict().values()))
self.ls.release()
def testDowngradeEverything(self):
self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
set(["one", "two", "three"]))
self.assertTrue(self.ls.owning_all())
# Ensure all locks are now owned in exclusive mode
for name in self.ls._names():
self.assertTrue(self.ls.check_owned(name, shared=0))
# Downgrade everything
self.assertTrue(self.ls.downgrade())
# Ensure all locks are now owned in shared mode
for name in self.ls._names():
self.assertTrue(self.ls.check_owned(name, shared=1))
self.assertTrue(self.ls.owning_all())
def testPriority(self):
def _Acquire(prev, next, name, priority, success_fn):
prev.wait()
self.assert_(self.ls.acquire(name, shared=0,
priority=priority,
test_notify=lambda _: next.set()))
try:
success_fn()
finally:
self.ls.release()
# Get all in exclusive mode
self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
done_two = Queue.Queue(0)
first = threading.Event()
prev = first
acquires = [("one", prio, self.done) for prio in range(1, 33)]
acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
# Use a deterministic random generator
random.Random(741).shuffle(acquires)
for (name, prio, done) in acquires:
ev = threading.Event()
self._addThread(target=_Acquire,
args=(prev, ev, name, prio,
compat.partial(done.put, "Prio%s" % prio)))
prev = ev
# Start acquires
first.set()
# Wait for last acquire to start
prev.wait()
# Let threads acquire locks
self.ls.release()
# Wait for threads to finish
self._waitThreads()
for i in range(1, 33):
self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.assertRaises(Queue.Empty, done_two.get_nowait)
def testNamesWithOpportunisticAndTimeout(self):
self.assertRaises(AssertionError, self.ls.acquire,
["one", "two"], timeout=1.0, opportunistic=True)
def testOpportunisticWithUnknownName(self):
name = "unknown"
self.assertFalse(name in self.ls._names())
result = self.ls.acquire(name, opportunistic=True)
self.assertFalse(result)
self.assertFalse(self.ls.list_owned())
result = self.ls.acquire(["two", name], opportunistic=True)
self.assertEqual(result, set(["two"]))
self.assertEqual(self.ls.list_owned(), set(["two"]))
self.ls.release()
def testSimpleOpportunisticAcquisition(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold a lock in main thread
self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
def fn():
# The lock "two" is held by the main thread
result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
self.assertEqual(result, set(["one"]))
self.assertEqual(self.ls.list_owned(), set(["one"]))
self.assertFalse(self.ls._get_lock().is_owned())
self.ls.release()
self.assertFalse(self.ls.list_owned())
# Try to acquire the lock held by the main thread
result = self.ls.acquire(["two"], shared=0, opportunistic=True)
self.assertFalse(self.ls._get_lock().is_owned())
self.assertFalse(result)
self.assertFalse(self.ls.list_owned())
# Try to acquire all locks
result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
self.assertTrue(self.ls._get_lock().is_owned(),
msg="Internal lock is not owned")
self.assertEqual(result, set(["one", "three"]))
self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["two"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testOpportunisticAcquisitionWithoutNamesExpires(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=0)
self.assertTrue(self.ls._get_lock().is_owned())
def fn():
# Try to acquire all locks in separate thread
result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
timeout=0.1)
self.assertFalse(result)
self.assertFalse(self.ls._get_lock().is_owned())
self.assertFalse(self.ls.list_owned())
# Try once more without a timeout
self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned(shared=0))
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testSharedOpportunisticAcquisitionWithoutNames(self):
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=1)
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
def fn():
# Try to acquire all locks in separate thread in shared mode
result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
timeout=0.1)
self.assertEqual(result, set(["one", "two", "three"]))
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
self.ls.release()
self.assertFalse(self.ls._get_lock().is_owned())
# Try one in exclusive mode
self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
self.done.put(True)
self._addThread(target=fn)
# Wait for threads to finish
self._waitThreads()
self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertTrue(self.done.get_nowait())
self.assertRaises(Queue.Empty, self.done.get_nowait)
def testLockDeleteWithOpportunisticAcquisition(self):
# This test exercises some code handling LockError on acquisition, that is
# after all lock names have been gathered. This shouldn't happen in reality
# as removing locks from the set requires the lockset-internal lock, but
# the code should handle the situation anyway.
ready = threading.Event()
finished = threading.Event()
self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
# Thread function to delete lock
def fn():
# Wait for notification
ready.wait()
# Delete lock named "two" by accessing lockset-internal data
ld = self.ls._get_lockdict()
self.assertTrue(ld["two"].delete())
self.done.put("deleted.two")
# Notify helper
finished.set()
self._addThread(target=fn)
# Notification helper, called when lock already holds internal lock.
# Therefore only one of the locks not yet locked can be deleted.
def notify(name):
self.done.put("notify.%s" % name)
if name == "one":
# Tell helper thread to delete lock "two"
ready.set()
finished.wait()
# Hold all locks in main thread
self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
# Wait for threads to finish
self._waitThreads()
# Release all locks
self.ls.release()
self.assertFalse(self.ls.list_owned())
self.assertFalse(self.ls._get_lock().is_owned())
self.assertEqual(self.done.get_nowait(), "notify.one")
self.assertEqual(self.done.get_nowait(), "deleted.two")
self.assertEqual(self.done.get_nowait(), "notify.three")
self.assertEqual(self.done.get_nowait(), "notify.two")
self.assertRaises(Queue.Empty, self.done.get_nowait)
class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
def setUp(self):
self.fn = locking._GetLsAcquireModeAndTimeouts
def testOpportunisticWithoutNames(self):
(mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
self.assertTrue(ls_timeout_fn is None)
self.assertEqual(timeout_fn(), 0)
def testAllInputCombinations(self):
for want_all in [False, True]:
for timeout in [None, 0, 100]:
for opportunistic in [False, True]:
if (opportunistic and
not want_all and
timeout is not None):
# Can't accept a timeout when acquiring opportunistically
self.assertRaises(AssertionError, self.fn,
want_all, timeout, opportunistic)
else:
(mode, ls_timeout_fn, timeout_fn) = \
self.fn(want_all, timeout, opportunistic)
if opportunistic:
self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
self.assertEqual(timeout_fn(), 0)
else:
self.assertTrue(callable(timeout_fn))
if want_all:
self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
else:
self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
if want_all:
self.assertTrue(callable(ls_timeout_fn))
else:
self.assertTrue(ls_timeout_fn is None)
class TestGanetiLockManager(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.nodes = ["n1", "n2"]
self.nodegroups = ["g1", "g2"]
self.instances = ["i1", "i2", "i3"]
self.networks = ["net1", "net2", "net3"]
self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
self.instances, self.networks)
def tearDown(self):
# Don't try this at home...
locking.GanetiLockManager._instance = None
def testLockingConstants(self):
# The locking library internally cheats by assuming its constants have some
# relationships with each other. Check those hold true.
# This relationship is also used in the Processor to recursively acquire
# the right locks. Again, please don't break it.
for i in range(len(locking.LEVELS)):
self.assertEqual(i, locking.LEVELS[i])
def testDoubleGLFails(self):
self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
def testLockNames(self):
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
set(self.networks))
def testInitAndResources(self):
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], self.instances, [])
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
set(self.instances))
locking.GanetiLockManager._instance = None
self.GL = locking.GanetiLockManager([], [], [], self.networks)
self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
set(self.networks))
def testAcquireRelease(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
shared=1))
self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
self.GL.release(locking.LEVEL_NODE, ["n2"])
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
self.GL.release(locking.LEVEL_NODE)
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.assertRaises(errors.LockError, self.GL.acquire,
locking.LEVEL_INSTANCE, ["i5"])
self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
def testAcquireWholeSets(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
set(self.nodegroups))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
set(self.nodegroups))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
set(self.nodes))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(self.nodes))
self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testAcquireWholeAndPartial(self):
self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
set(self.instances))
self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
set(self.instances))
self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
set(["n2"]))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
set(["n2"]))
self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
self.GL.release(locking.LEVEL_NODE)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.release(locking.LEVEL_CLUSTER)
def testBGLDependency(self):
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODE, ["n1", "n2"])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ["i3"])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODEGROUP, ["g1"])
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.GL.acquire(locking.LEVEL_NODE, ["n1"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER, ["BGL"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_NODE)
self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER, ["BGL"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_INSTANCE)
self.GL.acquire(locking.LEVEL_NODEGROUP, None)
self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER, ["BGL"])
self.assertRaises(AssertionError, self.GL.release,
locking.LEVEL_CLUSTER)
self.GL.release(locking.LEVEL_NODEGROUP)
self.GL.release(locking.LEVEL_CLUSTER)
def testWrongOrder(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.GL.acquire(locking.LEVEL_NODE, ["n2"])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODE, ["n1"])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_NODEGROUP, ["g1"])
self.assertRaises(AssertionError, self.GL.acquire,
locking.LEVEL_INSTANCE, ["i2"])
def testModifiableLevels(self):
self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
["BGL2"])
self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
["NAL2"])
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
self.GL.add(locking.LEVEL_NODE, ["n3"])
self.GL.remove(locking.LEVEL_NODE, ["n1"])
self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
["BGL2"])
# Helper function to run as a thread that shared the BGL and then acquires
# some locks at another level.
def _doLock(self, level, names, shared):
try:
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self.GL.acquire(level, names, shared=shared)
self.done.put("DONE")
self.GL.release(level)
self.GL.release(locking.LEVEL_CLUSTER)
except errors.LockError:
self.done.put("ERR")
@_Repeat
def testConcurrency(self):
self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, "i1", 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, "i1", 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, "i3", 1))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, "i2", 1))
self._waitThreads()
self.assertEqual(self.done.get_nowait(), "DONE")
self._addThread(target=self._doLock,
args=(locking.LEVEL_INSTANCE, "i2", 0))
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.GL.release(locking.LEVEL_INSTANCE)
self._waitThreads()
self.assertEqual(self.done.get(True, 1), "DONE")
self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
class TestLockMonitor(_ThreadedTestCase):
def setUp(self):
_ThreadedTestCase.setUp(self)
self.lm = locking.LockMonitor()
def testSingleThread(self):
locks = []
for i in range(100):
name = "TestLock%s" % i
locks.append(locking.SharedLock(name, monitor=self.lm))
self.assertEqual(len(self.lm._locks), len(locks))
result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
self.assertEqual(len(result.fields), 1)
self.assertEqual(len(result.data), 100)
# Delete all locks
del locks[:]
# The garbage collector might needs some time
def _CheckLocks():
if self.lm._locks:
raise utils.RetryAgain()
utils.Retry(_CheckLocks, 0.1, 30.0)
self.assertFalse(self.lm._locks)
def testMultiThread(self):
locks = []
def _CreateLock(prev, next, name):
prev.wait()
locks.append(locking.SharedLock(name, monitor=self.lm))
if next:
next.set()
expnames = []
first = threading.Event()
prev = first
# Use a deterministic random generator
for i in random.Random(4263).sample(range(100), 33):
name = "MtTestLock%s" % i
expnames.append(name)
ev = threading.Event()
self._addThread(target=_CreateLock, args=(prev, ev, name))
prev = ev
# Add locks
first.set()
self._waitThreads()
# Check order in which locks were added
self.assertEqual([i.name for i in locks], expnames)
# Check query result
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assert_(isinstance(result, dict))
response = objects.QueryResponse.FromDict(result)
self.assertEqual(response.data,
[[(constants.RS_NORMAL, name),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, [])]
for name in utils.NiceSort(expnames)])
self.assertEqual(len(response.fields), 4)
self.assertEqual(["name", "mode", "owner", "pending"],
[fdef.name for fdef in response.fields])
# Test exclusive acquire
for tlock in locks[::4]:
tlock.acquire(shared=0)
try:
def _GetExpResult(name):
if tlock.name == name:
return [(constants.RS_NORMAL, name),
(constants.RS_NORMAL, "exclusive"),
(constants.RS_NORMAL,
[threading.currentThread().getName()]),
(constants.RS_NORMAL, [])]
return [(constants.RS_NORMAL, name),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, [])]
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
tlock.release()
# Test shared acquire
def _Acquire(lock, shared, ev, notify):
lock.acquire(shared=shared)
try:
notify.set()
ev.wait()
finally:
lock.release()
for tlock1 in locks[::11]:
for tlock2 in locks[::-15]:
if tlock2 == tlock1:
# Avoid deadlocks
continue
for tlock3 in locks[::10]:
if tlock3 in (tlock2, tlock1):
# Avoid deadlocks
continue
releaseev = threading.Event()
# Acquire locks
acquireev = []
tthreads1 = []
for i in range(3):
ev = threading.Event()
tthreads1.append(self._addThread(target=_Acquire,
args=(tlock1, 1, releaseev, ev)))
acquireev.append(ev)
ev = threading.Event()
tthread2 = self._addThread(target=_Acquire,
args=(tlock2, 1, releaseev, ev))
acquireev.append(ev)
ev = threading.Event()
tthread3 = self._addThread(target=_Acquire,
args=(tlock3, 0, releaseev, ev))
acquireev.append(ev)
# Wait for all locks to be acquired
for i in acquireev:
i.wait()
# Check query result
result = self.lm.QueryLocks(["name", "mode", "owner"])
response = objects.QueryResponse.FromDict(result)
for (name, mode, owner) in response.data:
(name_status, name_value) = name
(owner_status, owner_value) = owner
self.assertEqual(name_status, constants.RS_NORMAL)
self.assertEqual(owner_status, constants.RS_NORMAL)
if name_value == tlock1.name:
self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
self.assertEqual(set(owner_value),
set(i.getName() for i in tthreads1))
continue
if name_value == tlock2.name:
self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
self.assertEqual(owner_value, [tthread2.getName()])
continue
if name_value == tlock3.name:
self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
self.assertEqual(owner_value, [tthread3.getName()])
continue
self.assert_(name_value in expnames)
self.assertEqual(mode, (constants.RS_NORMAL, None))
self.assert_(owner_value is None)
# Release locks again
releaseev.set()
self._waitThreads()
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, name),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)]
for name in utils.NiceSort(expnames)])
def testDelete(self):
lock = locking.SharedLock("TestLock", monitor=self.lm)
self.assertEqual(len(self.lm._locks), 1)
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lock.name),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)]])
lock.delete()
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lock.name),
(constants.RS_NORMAL, "deleted"),
(constants.RS_NORMAL, None)]])
self.assertEqual(len(self.lm._locks), 1)
def testPending(self):
def _Acquire(lock, shared, prev, next):
prev.wait()
lock.acquire(shared=shared, test_notify=next.set)
try:
pass
finally:
lock.release()
lock = locking.SharedLock("ExcLock", monitor=self.lm)
for shared in [0, 1]:
lock.acquire()
try:
self.assertEqual(len(self.lm._locks), 1)
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lock.name),
(constants.RS_NORMAL, "exclusive"),
(constants.RS_NORMAL,
[threading.currentThread().getName()])]])
threads = []
first = threading.Event()
prev = first
for i in range(5):
ev = threading.Event()
threads.append(self._addThread(target=_Acquire,
args=(lock, shared, prev, ev)))
prev = ev
# Start acquires
first.set()
# Wait for last acquire to start waiting
prev.wait()
# NOTE: This works only because QueryLocks will acquire the
# lock-internal lock again and won't be able to get the information
# until it has the lock. By then the acquire should be registered in
# SharedLock.__pending (otherwise it's a bug).
# All acquires are waiting now
if shared:
pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
else:
pending = [("exclusive", [t.getName()]) for t in threads]
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lock.name),
(constants.RS_NORMAL, "exclusive"),
(constants.RS_NORMAL,
[threading.currentThread().getName()]),
(constants.RS_NORMAL, pending)]])
self.assertEqual(len(self.lm._locks), 1)
finally:
lock.release()
self._waitThreads()
# No pending acquires
result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lock.name),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, [])]])
self.assertEqual(len(self.lm._locks), 1)
def testDeleteAndRecreate(self):
lname = "TestLock101923193"
# Create some locks with the same name and keep all references
locks = [locking.SharedLock(lname, monitor=self.lm)
for _ in range(5)]
self.assertEqual(len(self.lm._locks), len(locks))
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)]] * 5)
locks[2].delete()
# Check information order
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
[[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)]] * 2 +
[[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, "deleted"),
(constants.RS_NORMAL, None)]] +
[[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)]] * 2)
locks[1].acquire(shared=0)
last_status = [
[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)],
[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, "exclusive"),
(constants.RS_NORMAL, [threading.currentThread().getName()])],
[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, "deleted"),
(constants.RS_NORMAL, None)],
[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)],
[(constants.RS_NORMAL, lname),
(constants.RS_NORMAL, None),
(constants.RS_NORMAL, None)],
]
# Check information order
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
self.assertEqual(len(set(self.lm._locks.values())), len(locks))
self.assertEqual(len(self.lm._locks), len(locks))
# Check lock deletion
for idx in range(len(locks)):
del locks[0]
assert gc.isenabled()
gc.collect()
self.assertEqual(len(self.lm._locks), len(locks))
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data,
last_status[idx + 1:])
# All locks should have been deleted
assert not locks
self.assertFalse(self.lm._locks)
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
class _FakeLock:
def __init__(self):
self._info = []
def AddResult(self, *args):
self._info.append(args)
def CountPending(self):
return len(self._info)
def GetLockInfo(self, requested):
(exp_requested, result) = self._info.pop(0)
if exp_requested != requested:
raise Exception("Requested information (%s) does not match"
" expectations (%s)" % (requested, exp_requested))
return result
def testMultipleResults(self):
fl1 = self._FakeLock()
fl2 = self._FakeLock()
self.lm.RegisterLock(fl1)
self.lm.RegisterLock(fl2)
# Empty information
for i in [fl1, fl2]:
i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
result = self.lm.QueryLocks(["name", "mode", "owner"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
# Check ordering
for fn in [lambda x: x, reversed, sorted]:
fl1.AddResult(set(), list(fn([
("aaa", None, None, None),
("bbb", None, None, None),
])))
fl2.AddResult(set(), [])
result = self.lm.QueryLocks(["name"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [
[(constants.RS_NORMAL, "aaa")],
[(constants.RS_NORMAL, "bbb")],
])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
for fn2 in [lambda x: x, reversed, sorted]:
fl1.AddResult(set([query.LQ_MODE]), list(fn([
# Same name, but different information
("aaa", "mode0", None, None),
("aaa", "mode1", None, None),
("aaa", "mode2", None, None),
("aaa", "mode3", None, None),
])))
fl2.AddResult(set([query.LQ_MODE]), [
("zzz", "end", None, None),
("000", "start", None, None),
] + list(fn2([
("aaa", "b200", None, None),
("aaa", "b300", None, None),
])))
result = self.lm.QueryLocks(["name", "mode"])
self.assertEqual(objects.QueryResponse.FromDict(result).data, [
[(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
] + list(fn([
# Name is the same, so order must be equal to incoming order
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
])) + list(fn2([
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
[(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
])) + [
[(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
])
for i in [fl1, fl2]:
self.assertEqual(i.CountPending(), 0)
if __name__ == "__main__":
testutils.GanetiTestProgram()