blob: a3d73b6207dad3800469585d7d343e4d5dbe12a9 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Module implementing the Ganeti locking code."""
# pylint: disable=W0212
# W0212 since e.g. LockSet methods use (a lot) the internals of
# SharedLock
import os
import select
import threading
import errno
import logging
import heapq
import time
from ganeti import errors
from ganeti import utils
from ganeti import compat
from ganeti import query
_EXCLUSIVE_TEXT = "exclusive"
_SHARED_TEXT = "shared"
_DELETED_TEXT = "deleted"
_DEFAULT_PRIORITY = 0
#: Minimum timeout required to consider scheduling a pending acquisition
#: (seconds)
_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
Calls the function holding the given lock, either in exclusive or shared
mode. It requires the passed lock to be a SharedLock (or support its
semantics).
@type mylock: lockable object or string
@param mylock: lock to acquire or class member name of the lock to acquire
"""
def wrap(fn):
def sync_function(*args, **kwargs):
if isinstance(mylock, basestring):
assert args, "cannot ssynchronize on non-class method: self not found"
# args[0] is "self"
lock = getattr(args[0], mylock)
else:
lock = mylock
lock.acquire(shared=shared)
try:
return fn(*args, **kwargs)
finally:
lock.release()
return sync_function
return wrap
class _SingleNotifyPipeConditionWaiter(object):
"""Helper class for SingleNotifyPipeCondition
"""
__slots__ = [
"_fd",
]
def __init__(self, fd):
"""Constructor for _SingleNotifyPipeConditionWaiter
@type fd: int
@param fd: File descriptor to wait for
"""
object.__init__(self)
self._fd = fd
def __call__(self, timeout):
"""Wait for something to happen on the pipe.
@type timeout: float or None
@param timeout: Timeout for waiting (can be None)
"""
running_timeout = utils.RunningTimeout(timeout, True)
poller = select.poll()
poller.register(self._fd, select.POLLHUP)
while True:
remaining_time = running_timeout.Remaining()
if remaining_time is not None:
if remaining_time < 0.0:
break
# Our calculation uses seconds, poll() wants milliseconds
remaining_time *= 1000
try:
result = poller.poll(remaining_time)
except EnvironmentError, err:
if err.errno != errno.EINTR:
raise
result = None
# Check whether we were notified
if result and result[0][0] == self._fd:
break
class _BaseCondition(object):
"""Base class containing common code for conditions.
Some of this code is taken from python's threading module.
"""
__slots__ = [
"_lock",
"acquire",
"release",
"_is_owned",
"_acquire_restore",
"_release_save",
]
def __init__(self, lock):
"""Constructor for _BaseCondition.
@type lock: threading.Lock
@param lock: condition base lock
"""
object.__init__(self)
try:
self._release_save = lock._release_save
except AttributeError:
self._release_save = self._base_release_save
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
self._acquire_restore = self._base_acquire_restore
try:
self._is_owned = lock.is_owned
except AttributeError:
self._is_owned = self._base_is_owned
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
def _base_is_owned(self):
"""Check whether lock is owned by current thread.
"""
if self._lock.acquire(0):
self._lock.release()
return False
return True
def _base_release_save(self):
self._lock.release()
def _base_acquire_restore(self, _):
self._lock.acquire()
def _check_owned(self):
"""Raise an exception if the current thread doesn't own the lock.
"""
if not self._is_owned():
raise RuntimeError("cannot work with un-aquired lock")
class SingleNotifyPipeCondition(_BaseCondition):
"""Condition which can only be notified once.
This condition class uses pipes and poll, internally, to be able to wait for
notification with a timeout, without resorting to polling. It is almost
compatible with Python's threading.Condition, with the following differences:
- notifyAll can only be called once, and no wait can happen after that
- notify is not supported, only notifyAll
"""
__slots__ = [
"_read_fd",
"_write_fd",
"_nwaiters",
"_notified",
]
_waiter_class = _SingleNotifyPipeConditionWaiter
def __init__(self, lock):
"""Constructor for SingleNotifyPipeCondition
"""
_BaseCondition.__init__(self, lock)
self._nwaiters = 0
self._notified = False
self._read_fd = None
self._write_fd = None
def _check_unnotified(self):
"""Throws an exception if already notified.
"""
if self._notified:
raise RuntimeError("cannot use already notified condition")
def _Cleanup(self):
"""Cleanup open file descriptors, if any.
"""
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
@param timeout: Waiting timeout (can be None)
"""
self._check_owned()
self._check_unnotified()
self._nwaiters += 1
try:
if self._read_fd is None:
(self._read_fd, self._write_fd) = os.pipe()
wait_fn = self._waiter_class(self._read_fd)
state = self._release_save()
try:
# Wait for notification
wait_fn(timeout)
finally:
# Re-acquire lock
self._acquire_restore(state)
finally:
self._nwaiters -= 1
if self._nwaiters == 0:
self._Cleanup()
def notifyAll(self): # pylint: disable=C0103
"""Close the writing side of the pipe to notify all waiters.
"""
self._check_owned()
self._check_unnotified()
self._notified = True
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
class PipeCondition(_BaseCondition):
"""Group-only non-polling condition with counters.
This condition class uses pipes and poll, internally, to be able to wait for
notification with a timeout, without resorting to polling. It is almost
compatible with Python's threading.Condition, but only supports notifyAll and
non-recursive locks. As an additional features it's able to report whether
there are any waiting threads.
"""
__slots__ = [
"_waiters",
"_single_condition",
]
_single_condition_class = SingleNotifyPipeCondition
def __init__(self, lock):
"""Initializes this class.
"""
_BaseCondition.__init__(self, lock)
self._waiters = set()
self._single_condition = self._single_condition_class(self._lock)
def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
@param timeout: Waiting timeout (can be None)
"""
self._check_owned()
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
cond = self._single_condition
self._waiters.add(threading.currentThread())
try:
cond.wait(timeout)
finally:
self._check_owned()
self._waiters.remove(threading.currentThread())
def notifyAll(self): # pylint: disable=C0103
"""Notify all currently waiting threads.
"""
self._check_owned()
self._single_condition.notifyAll()
self._single_condition = self._single_condition_class(self._lock)
def get_waiting(self):
"""Returns a list of all waiting threads.
"""
self._check_owned()
return self._waiters
def has_waiting(self):
"""Returns whether there are active waiters.
"""
self._check_owned()
return bool(self._waiters)
def __repr__(self):
return ("<%s.%s waiters=%s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self._waiters, id(self)))
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
"shared",
]
def __init__(self, lock, shared):
"""Initializes this class.
"""
self.shared = shared
PipeCondition.__init__(self, lock)
class SharedLock(object):
"""Implements a shared lock.
Multiple threads can acquire the lock in a shared way by calling
C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
threads can call C{acquire(shared=0)}.
Notes on data structures: C{__pending} contains a priority queue (heapq) of
all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
...]}. Each per-priority queue contains a normal in-order list of conditions
to be notified when the lock can be acquired. Shared locks are grouped
together by priority and the condition for them is stored in
C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
references for the per-priority queues indexed by priority for faster access.
@type name: string
@ivar name: the name of the lock
"""
__slots__ = [
"__weakref__",
"__deleted",
"__exc",
"__lock",
"__pending",
"__pending_by_prio",
"__pending_shared",
"__shr",
"__time_fn",
"name",
]
__condition_class = _PipeConditionWithMode
def __init__(self, name, monitor=None, _time_fn=time.time):
"""Construct a new SharedLock.
@param name: the name of the lock
@param monitor: Lock monitor with which to register
"""
object.__init__(self)
self.name = name
# Used for unittesting
self.__time_fn = _time_fn
# Internal lock
self.__lock = threading.Lock()
# Queue containing waiting acquires
self.__pending = []
self.__pending_by_prio = {}
self.__pending_shared = {}
# Current lock holders
self.__shr = set()
self.__exc = None
# is this lock in the deleted state?
self.__deleted = False
# Register with lock monitor
if monitor:
logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
def __repr__(self):
return ("<%s.%s name=%s at %#x>" %
(self.__class__.__module__, self.__class__.__name__,
self.name, id(self)))
def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
@type requested: set
@param requested: Requested information, see C{query.LQ_*}
"""
self.__lock.acquire()
try:
# Note: to avoid unintentional race conditions, no references to
# modifiable objects should be returned unless they were created in this
# function.
mode = None
owner_names = None
if query.LQ_MODE in requested:
if self.__deleted:
mode = _DELETED_TEXT
assert not (self.__exc or self.__shr)
elif self.__exc:
mode = _EXCLUSIVE_TEXT
elif self.__shr:
mode = _SHARED_TEXT
# Current owner(s) are wanted
if query.LQ_OWNER in requested:
if self.__exc:
owner = [self.__exc]
else:
owner = self.__shr
if owner:
assert not self.__deleted
owner_names = [i.getName() for i in owner]
# Pending acquires are wanted
if query.LQ_PENDING in requested:
pending = []
# Sorting instead of copying and using heaq functions for simplicity
for (_, prioqueue) in sorted(self.__pending):
for cond in prioqueue:
if cond.shared:
pendmode = _SHARED_TEXT
else:
pendmode = _EXCLUSIVE_TEXT
# List of names will be sorted in L{query._GetLockPending}
pending.append((pendmode, [i.getName()
for i in cond.get_waiting()]))
else:
pending = None
return [(self.name, mode, owner_names, pending)]
finally:
self.__lock.release()
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
"""
if self.__deleted:
raise errors.LockError("Deleted lock %s" % self.name)
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?
"""
return threading.currentThread() in self.__shr
def __is_exclusive(self):
"""Is the current thread holding the lock exclusively at this time?
"""
return threading.currentThread() == self.__exc
def __is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
This is a private version of the function, which presumes you're holding
the internal lock.
"""
if shared < 0:
return self.__is_sharer() or self.__is_exclusive()
elif shared:
return self.__is_sharer()
else:
return self.__is_exclusive()
def is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
@param shared:
- < 0: check for any type of ownership (default)
- 0: check for exclusive ownership
- > 0: check for shared ownership
"""
self.__lock.acquire()
try:
return self.__is_owned(shared=shared)
finally:
self.__lock.release()
#: Necessary to remain compatible with threading.Condition, which tries to
#: retrieve a locks' "_is_owned" attribute
_is_owned = is_owned
def _count_pending(self):
"""Returns the number of pending acquires.
@rtype: int
"""
self.__lock.acquire()
try:
return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
finally:
self.__lock.release()
def _check_empty(self):
"""Checks whether there are any pending acquires.
@rtype: bool
"""
self.__lock.acquire()
try:
# Order is important: __find_first_pending_queue modifies __pending
(_, prioqueue) = self.__find_first_pending_queue()
return not (prioqueue or
self.__pending or
self.__pending_by_prio or
self.__pending_shared)
finally:
self.__lock.release()
def __do_acquire(self, shared):
"""Actually acquire the lock.
"""
if shared:
self.__shr.add(threading.currentThread())
else:
self.__exc = threading.currentThread()
def __can_acquire(self, shared):
"""Determine whether lock can be acquired.
"""
if shared:
return self.__exc is None
else:
return len(self.__shr) == 0 and self.__exc is None
def __find_first_pending_queue(self):
"""Tries to find the topmost queued entry with pending acquires.
Removes empty entries while going through the list.
"""
while self.__pending:
(priority, prioqueue) = self.__pending[0]
if prioqueue:
return (priority, prioqueue)
# Remove empty queue
heapq.heappop(self.__pending)
del self.__pending_by_prio[priority]
assert priority not in self.__pending_shared
return (None, None)
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
The caller must make sure the queue isn't empty.
"""
(_, prioqueue) = self.__find_first_pending_queue()
return cond == prioqueue[0]
def __acquire_unlocked(self, shared, timeout, priority):
"""Acquire a shared lock.
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
"""
self.__check_deleted()
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
" %s" % self.name)
# Remove empty entries from queue
self.__find_first_pending_queue()
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
# Apparently not, can acquire lock directly.
self.__do_acquire(shared)
return True
# The lock couldn't be acquired right away, so if a timeout is given and is
# considered too short, return right away as scheduling a pending
# acquisition is quite expensive
if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
return False
prioqueue = self.__pending_by_prio.get(priority, None)
if shared:
# Try to re-use condition for shared acquire
wait_condition = self.__pending_shared.get(priority, None)
assert (wait_condition is None or
(wait_condition.shared and wait_condition in prioqueue))
else:
wait_condition = None
if wait_condition is None:
if prioqueue is None:
assert priority not in self.__pending_by_prio
prioqueue = []
heapq.heappush(self.__pending, (priority, prioqueue))
self.__pending_by_prio[priority] = prioqueue
wait_condition = self.__condition_class(self.__lock, shared)
prioqueue.append(wait_condition)
if shared:
# Keep reference for further shared acquires on same priority. This is
# better than trying to find it in the list of pending acquires.
assert priority not in self.__pending_shared
self.__pending_shared[priority] = wait_condition
wait_start = self.__time_fn()
acquired = False
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
while True:
if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
self.__do_acquire(shared)
acquired = True
break
# A lot of code assumes blocking acquires always succeed, therefore we
# can never return False for a blocking acquire
if (timeout is not None and
utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
break
# Wait for notification
wait_condition.wait(timeout)
self.__check_deleted()
finally:
# Remove condition from queue if there are no more waiters
if not wait_condition.has_waiting():
prioqueue.remove(wait_condition)
if wait_condition.shared:
# Remove from list of shared acquires if it wasn't while releasing
# (e.g. on lock deletion)
self.__pending_shared.pop(priority, None)
return acquired
def acquire(self, shared=0, timeout=None, priority=None,
test_notify=None):
"""Acquire a shared lock.
@type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
"""
if priority is None:
priority = _DEFAULT_PRIORITY
self.__lock.acquire()
try:
# We already got the lock, notify now
if __debug__ and callable(test_notify):
test_notify()
return self.__acquire_unlocked(shared, timeout, priority)
finally:
self.__lock.release()
def downgrade(self):
"""Changes the lock mode from exclusive to shared.
Pending acquires in shared mode on the same priority will go ahead.
"""
self.__lock.acquire()
try:
assert self.__is_owned(), "Lock must be owned"
if self.__is_exclusive():
# Do nothing if the lock is already acquired in shared mode
self.__exc = None
self.__do_acquire(1)
# Important: pending shared acquires should only jump ahead if there
# was a transition from exclusive to shared, otherwise an owner of a
# shared lock can keep calling this function to push incoming shared
# acquires
(priority, prioqueue) = self.__find_first_pending_queue()
if prioqueue:
# Is there a pending shared acquire on this priority?
cond = self.__pending_shared.pop(priority, None)
if cond:
assert cond.shared
assert cond in prioqueue
# Ensure shared acquire is on top of queue
if len(prioqueue) > 1:
prioqueue.remove(cond)
prioqueue.insert(0, cond)
# Notify
cond.notifyAll()
assert not self.__is_exclusive()
assert self.__is_sharer()
return True
finally:
self.__lock.release()
def release(self):
"""Release a Shared Lock.
You must have acquired the lock, either in shared or in exclusive mode,
before calling this function.
"""
self.__lock.acquire()
try:
assert self.__is_exclusive() or self.__is_sharer(), \
"Cannot release non-owned lock"
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
notify = True
else:
self.__shr.remove(threading.currentThread())
notify = not self.__shr
# Notify topmost condition in queue if there are no owners left (for
# shared locks)
if notify:
self.__notify_topmost()
finally:
self.__lock.release()
def __notify_topmost(self):
"""Notifies topmost condition in queue of pending acquires.
"""
(priority, prioqueue) = self.__find_first_pending_queue()
if prioqueue:
cond = prioqueue[0]
cond.notifyAll()
if cond.shared:
# Prevent further shared acquires from sneaking in while waiters are
# notified
self.__pending_shared.pop(priority, None)
def _notify_topmost(self):
"""Exported version of L{__notify_topmost}.
"""
self.__lock.acquire()
try:
return self.__notify_topmost()
finally:
self.__lock.release()
def delete(self, timeout=None, priority=None):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
acquired in exclusive mode if you don't already own it, then the lock
will be put in a state where any future and pending acquire() fail.
@type timeout: float
@param timeout: maximum waiting time before giving up
@type priority: integer
@param priority: Priority for acquiring lock
"""
if priority is None:
priority = _DEFAULT_PRIORITY
self.__lock.acquire()
try:
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
self.__check_deleted()
# The caller is allowed to hold the lock exclusively already.
acquired = self.__is_exclusive()
if not acquired:
acquired = self.__acquire_unlocked(0, timeout, priority)
if acquired:
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
self.__deleted = True
self.__exc = None
assert not (self.__exc or self.__shr), "Found owner during deletion"
# Notify all acquires. They'll throw an error.
for (_, prioqueue) in self.__pending:
for cond in prioqueue:
cond.notifyAll()
assert self.__deleted
return acquired
finally:
self.__lock.release()
def _release_save(self):
shared = self.__is_sharer()
self.release()
return shared
def _acquire_restore(self, shared):
self.acquire(shared=shared)
# Whenever we want to acquire a full LockSet we pass None as the value
# to acquire. Hide this behind this nicely named constant.
ALL_SET = None
LOCKSET_NAME = "[lockset]"
def _TimeoutZero():
"""Returns the number zero.
"""
return 0
class _AcquireTimeout(Exception):
"""Internal exception to abort an acquire on a timeout.
"""
# Locking levels, must be acquired in increasing order. Current rules are:
# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
# acquired before performing any operation, either in shared or exclusive
# mode. Acquiring the BGL in exclusive mode is discouraged and should be
# avoided..
# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
# you need more than one node, or more than one instance, acquire them at the
# same time.
# - LEVEL_NODE_RES is for node resources and should be used by operations with
# possibly high impact on the node's disks.
(LEVEL_CLUSTER,
LEVEL_INSTANCE,
LEVEL_NODEGROUP,
LEVEL_NODE,
LEVEL_NODE_RES,
LEVEL_NETWORK) = range(0, 6)
LEVELS = [
LEVEL_CLUSTER,
LEVEL_INSTANCE,
LEVEL_NODEGROUP,
LEVEL_NODE,
LEVEL_NODE_RES,
LEVEL_NETWORK,
]
# Lock levels which are modifiable
LEVELS_MOD = compat.UniqueFrozenset([
LEVEL_NODE_RES,
LEVEL_NODE,
LEVEL_NODEGROUP,
LEVEL_INSTANCE,
LEVEL_NETWORK,
])
#: Lock level names (make sure to use singular form)
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
LEVEL_NODE_RES: "node-res",
LEVEL_NETWORK: "network",
}
# Constant for the big ganeti lock
BGL = "BGL"