blob: a452239b4ac3f98cac2a20cf81a1b0e3b596e447 [file] [log] [blame]
#
#
# Copyright (C) 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Module implementing the parameter types code."""
import re
import operator
from ganeti import compat
from ganeti import utils
from ganeti import constants
_PAREN_RE = re.compile("^[a-zA-Z0-9_-]+$")
def Parens(text):
"""Enclose text in parens if necessary.
@param text: Text
"""
text = str(text)
if _PAREN_RE.match(text):
return text
else:
return "(%s)" % text
class _WrapperBase(object):
__slots__ = [
"_fn",
"_text",
]
def __init__(self, text, fn):
"""Initializes this class.
@param text: Description
@param fn: Wrapped function
"""
assert text.strip()
self._text = text
self._fn = fn
def __call__(self, *args):
return self._fn(*args)
class _DescWrapper(_WrapperBase):
"""Wrapper class for description text.
"""
def __str__(self):
return self._text
class _CommentWrapper(_WrapperBase):
"""Wrapper class for comment.
"""
def __str__(self):
return "%s [%s]" % (self._fn, self._text)
def WithDesc(text):
"""Builds wrapper class with description text.
@type text: string
@param text: Description text
@return: Callable class
"""
assert text[0] == text[0].upper()
return compat.partial(_DescWrapper, text)
def Comment(text):
"""Builds wrapper for adding comment to description text.
@type text: string
@param text: Comment text
@return: Callable class
"""
assert not frozenset(text).intersection("[]")
return compat.partial(_CommentWrapper, text)
def CombinationDesc(op, args, fn):
"""Build description for combinating operator.
@type op: string
@param op: Operator as text (e.g. "and")
@type args: list
@param args: Operator arguments
@type fn: callable
@param fn: Wrapped function
"""
# Some type descriptions are rather long. If "None" is listed at the
# end or somewhere in between it is easily missed. Therefore it should
# be at the beginning, e.g. "None or (long description)".
if __debug__ and TNone in args and args.index(TNone) > 0:
raise Exception("TNone must be listed first")
if len(args) == 1:
descr = str(args[0])
else:
descr = (" %s " % op).join(Parens(i) for i in args)
return WithDesc(descr)(fn)
# Modifiable default values; need to define these here before the
# actual LUs
@WithDesc(str([]))
def EmptyList():
"""Returns an empty list.
"""
return []
@WithDesc(str({}))
def EmptyDict():
"""Returns an empty dict.
"""
return {}
#: The without-default default value
NoDefault = object()
#: The no-type (value too complex to check it in the type system)
NoType = object()
# Some basic types
@WithDesc("Anything")
def TAny(_):
"""Accepts any value.
"""
return True
@WithDesc("NotNone")
def TNotNone(val):
"""Checks if the given value is not None.
"""
return val is not None
@WithDesc("None")
def TNone(val):
"""Checks if the given value is None.
"""
return val is None
@WithDesc("ValueNone")
def TValueNone(val):
"""Checks if the given value is L{constants.VALUE_NONE}.
"""
return val == constants.VALUE_NONE
@WithDesc("Boolean")
def TBool(val):
"""Checks if the given value is a boolean.
"""
return isinstance(val, bool)
@WithDesc("Integer")
def TInt(val):
"""Checks if the given value is an integer.
"""
# For backwards compatibility with older Python versions, boolean values are
# also integers and should be excluded in this test.
#
# >>> (isinstance(False, int), isinstance(True, int))
# (True, True)
return isinstance(val, (int, long)) and not isinstance(val, bool)
@WithDesc("Float")
def TFloat(val):
"""Checks if the given value is a float.
"""
return isinstance(val, float)
@WithDesc("String")
def TString(val):
"""Checks if the given value is a string.
"""
return isinstance(val, basestring)
@WithDesc("EvalToTrue")
def TTrue(val):
"""Checks if a given value evaluates to a boolean True value.
"""
return bool(val)
def TElemOf(target_list):
"""Builds a function that checks if a given value is a member of a list.
"""
def fn(val):
return val in target_list
return WithDesc("OneOf %s" % (utils.CommaJoin(target_list), ))(fn)
# Container types
@WithDesc("List")
def TList(val):
"""Checks if the given value is a list.
"""
return isinstance(val, list)
@WithDesc("Tuple")
def TTuple(val):
"""Checks if the given value is a tuple.
"""
return isinstance(val, tuple)
@WithDesc("Dictionary")
def TDict(val):
"""Checks if the given value is a dictionary.
"""
return isinstance(val, dict)
def TIsLength(size):
"""Check is the given container is of the given size.
"""
def fn(container):
return len(container) == size
return WithDesc("Length %s" % (size, ))(fn)
# Combinator types
def TAnd(*args):
"""Combine multiple functions using an AND operation.
"""
def fn(val):
return compat.all(t(val) for t in args)
return CombinationDesc("and", args, fn)
def TOr(*args):
"""Combine multiple functions using an OR operation.
"""
def fn(val):
return compat.any(t(val) for t in args)
return CombinationDesc("or", args, fn)
def TMap(fn, test):
"""Checks that a modified version of the argument passes the given test.
"""
return WithDesc("Result of %s must be %s" %
(Parens(fn), Parens(test)))(lambda val: test(fn(val)))
def TRegex(pobj):
"""Checks whether a string matches a specific regular expression.
@param pobj: Compiled regular expression as returned by C{re.compile}
"""
desc = WithDesc("String matching regex \"%s\"" %
pobj.pattern.encode("string_escape"))
return desc(TAnd(TString, pobj.match))
def TMaybe(test):
"""Wrap a test in a TOr(TNone, test).
This makes it easier to define TMaybe* types.
"""
return TOr(TNone, test)
def TMaybeValueNone(test):
"""Used for unsetting values.
"""
return TMaybe(TOr(TValueNone, test))
# Type aliases
#: a non-empty string
TNonEmptyString = WithDesc("NonEmptyString")(TAnd(TString, TTrue))
#: a maybe non-empty string
TMaybeString = TMaybe(TNonEmptyString)
#: a maybe boolean (bool or none)
TMaybeBool = TMaybe(TBool)
#: Maybe a dictionary (dict or None)
TMaybeDict = TMaybe(TDict)
#: Maybe a list (list or None)
TMaybeList = TMaybe(TList)
#: a non-negative integer (value >= 0)
TNonNegativeInt = \
TAnd(TInt, WithDesc("EqualOrGreaterThanZero")(lambda v: v >= 0))
#: a positive integer (value > 0)
TPositiveInt = \
TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0))
#: a maybe positive integer (positive integer or None)
TMaybePositiveInt = TMaybe(TPositiveInt)
#: a negative integer (value < 0)
TNegativeInt = \
TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0)))
#: a positive float
TNonNegativeFloat = \
TAnd(TFloat, WithDesc("EqualOrGreaterThanZero")(lambda v: v >= 0.0))
#: Job ID
TJobId = WithDesc("JobId")(TOr(TNonNegativeInt,
TRegex(re.compile("^%s$" %
constants.JOB_ID_TEMPLATE))))
#: Number
TNumber = TOr(TInt, TFloat)
#: Relative job ID
TRelativeJobId = WithDesc("RelativeJobId")(TNegativeInt)
def TInstanceOf(cls):
"""Checks if a given value is an instance of C{cls}.
@type cls: class
@param cls: Class object
"""
name = "%s.%s" % (cls.__module__, cls.__name__)
desc = WithDesc("Instance of %s" % (Parens(name), ))
return desc(lambda val: isinstance(val, cls))
def TListOf(my_type):
"""Checks if a given value is a list with all elements of the same type.
"""
desc = WithDesc("List of %s" % (Parens(my_type), ))
return desc(TAnd(TList, lambda lst: compat.all(my_type(v) for v in lst)))
TMaybeListOf = lambda item_type: TMaybe(TListOf(item_type))
def TDictOf(key_type, val_type):
"""Checks a dict type for the type of its key/values.
"""
desc = WithDesc("Dictionary with keys of %s and values of %s" %
(Parens(key_type), Parens(val_type)))
def fn(container):
return (compat.all(key_type(v) for v in container.keys()) and
compat.all(val_type(v) for v in container.values()))
return desc(TAnd(TDict, fn))
def _TStrictDictCheck(require_all, exclusive, items, val):
"""Helper function for L{TStrictDict}.
"""
notfound_fn = lambda _: not exclusive
if require_all and not frozenset(val.keys()).issuperset(items.keys()):
# Requires items not found in value
return False
return compat.all(items.get(key, notfound_fn)(value)
for (key, value) in val.items())
def TStrictDict(require_all, exclusive, items):
"""Strict dictionary check with specific keys.
@type require_all: boolean
@param require_all: Whether all keys in L{items} are required
@type exclusive: boolean
@param exclusive: Whether only keys listed in L{items} should be accepted
@type items: dictionary
@param items: Mapping from key (string) to verification function
"""
descparts = ["Dictionary containing"]
if exclusive:
descparts.append(" none but the")
if require_all:
descparts.append(" required")
if len(items) == 1:
descparts.append(" key ")
else:
descparts.append(" keys ")
descparts.append(utils.CommaJoin("\"%s\" (value %s)" % (key, value)
for (key, value) in items.items()))
desc = WithDesc("".join(descparts))
return desc(TAnd(TDict,
compat.partial(_TStrictDictCheck, require_all, exclusive,
items)))
def TItems(items):
"""Checks individual items of a container.
If the verified value and the list of expected items differ in length, this
check considers only as many items as are contained in the shorter list. Use
L{TIsLength} to enforce a certain length.
@type items: list
@param items: List of checks
"""
assert items, "Need items"
text = ["Item", "item"]
desc = WithDesc(utils.CommaJoin("%s %s is %s" %
(text[int(idx > 0)], idx, Parens(check))
for (idx, check) in enumerate(items)))
return desc(lambda value: compat.all(check(i)
for (check, i) in zip(items, value)))