| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Script for testing ganeti.utils.filelock""" |
| |
| import os |
| import tempfile |
| import unittest |
| |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import errors |
| |
| import testutils |
| |
| |
| class _BaseFileLockTest: |
| """Test case for the FileLock class""" |
| |
| def testSharedNonblocking(self): |
| self.lock.Shared(blocking=False) |
| self.lock.Close() |
| |
| def testExclusiveNonblocking(self): |
| self.lock.Exclusive(blocking=False) |
| self.lock.Close() |
| |
| def testUnlockNonblocking(self): |
| self.lock.Unlock(blocking=False) |
| self.lock.Close() |
| |
| def testSharedBlocking(self): |
| self.lock.Shared(blocking=True) |
| self.lock.Close() |
| |
| def testExclusiveBlocking(self): |
| self.lock.Exclusive(blocking=True) |
| self.lock.Close() |
| |
| def testUnlockBlocking(self): |
| self.lock.Unlock(blocking=True) |
| self.lock.Close() |
| |
| def testSharedExclusiveUnlock(self): |
| self.lock.Shared(blocking=False) |
| self.lock.Exclusive(blocking=False) |
| self.lock.Unlock(blocking=False) |
| self.lock.Close() |
| |
| def testExclusiveSharedUnlock(self): |
| self.lock.Exclusive(blocking=False) |
| self.lock.Shared(blocking=False) |
| self.lock.Unlock(blocking=False) |
| self.lock.Close() |
| |
| def testSimpleTimeout(self): |
| # These will succeed on the first attempt, hence a short timeout |
| self.lock.Shared(blocking=True, timeout=10.0) |
| self.lock.Exclusive(blocking=False, timeout=10.0) |
| self.lock.Unlock(blocking=True, timeout=10.0) |
| self.lock.Close() |
| |
| @staticmethod |
| def _TryLockInner(filename, shared, blocking): |
| lock = utils.FileLock.Open(filename) |
| |
| if shared: |
| fn = lock.Shared |
| else: |
| fn = lock.Exclusive |
| |
| try: |
| # The timeout doesn't really matter as the parent process waits for us to |
| # finish anyway. |
| fn(blocking=blocking, timeout=0.01) |
| except errors.LockError, err: |
| return False |
| |
| return True |
| |
| def _TryLock(self, *args): |
| return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name, |
| *args) |
| |
| def testTimeout(self): |
| for blocking in [True, False]: |
| self.lock.Exclusive(blocking=True) |
| self.failIf(self._TryLock(False, blocking)) |
| self.failIf(self._TryLock(True, blocking)) |
| |
| self.lock.Shared(blocking=True) |
| self.assert_(self._TryLock(True, blocking)) |
| self.failIf(self._TryLock(False, blocking)) |
| |
| def testCloseShared(self): |
| self.lock.Close() |
| self.assertRaises(AssertionError, self.lock.Shared, blocking=False) |
| |
| def testCloseExclusive(self): |
| self.lock.Close() |
| self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False) |
| |
| def testCloseUnlock(self): |
| self.lock.Close() |
| self.assertRaises(AssertionError, self.lock.Unlock, blocking=False) |
| |
| |
| class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest): |
| TESTDATA = "Hello World\n" * 10 |
| |
| def setUp(self): |
| testutils.GanetiTestCase.setUp(self) |
| |
| self.tmpfile = tempfile.NamedTemporaryFile() |
| utils.WriteFile(self.tmpfile.name, data=self.TESTDATA) |
| self.lock = utils.FileLock.Open(self.tmpfile.name) |
| |
| # Ensure "Open" didn't truncate file |
| self.assertFileContent(self.tmpfile.name, self.TESTDATA) |
| |
| def tearDown(self): |
| self.assertFileContent(self.tmpfile.name, self.TESTDATA) |
| |
| testutils.GanetiTestCase.tearDown(self) |
| |
| |
| class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest): |
| def setUp(self): |
| self.tmpfile = tempfile.NamedTemporaryFile() |
| self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |