| # |
| # |
| |
| # Copyright (C) 2010, 2011, 2012 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Module implementing the parameter types code.""" |
| |
| import re |
| import operator |
| |
| from ganeti import compat |
| from ganeti import utils |
| from ganeti import constants |
| |
| |
| _PAREN_RE = re.compile("^[a-zA-Z0-9_-]+$") |
| |
| |
| def Parens(text): |
| """Enclose text in parens if necessary. |
| |
| @param text: Text |
| |
| """ |
| text = str(text) |
| |
| if _PAREN_RE.match(text): |
| return text |
| else: |
| return "(%s)" % text |
| |
| |
| class _WrapperBase(object): |
| __slots__ = [ |
| "_fn", |
| "_text", |
| ] |
| |
| def __init__(self, text, fn): |
| """Initializes this class. |
| |
| @param text: Description |
| @param fn: Wrapped function |
| |
| """ |
| assert text.strip() |
| |
| self._text = text |
| self._fn = fn |
| |
| def __call__(self, *args): |
| return self._fn(*args) |
| |
| |
| class _DescWrapper(_WrapperBase): |
| """Wrapper class for description text. |
| |
| """ |
| def __str__(self): |
| return self._text |
| |
| |
| class _CommentWrapper(_WrapperBase): |
| """Wrapper class for comment. |
| |
| """ |
| def __str__(self): |
| return "%s [%s]" % (self._fn, self._text) |
| |
| |
| def WithDesc(text): |
| """Builds wrapper class with description text. |
| |
| @type text: string |
| @param text: Description text |
| @return: Callable class |
| |
| """ |
| assert text[0] == text[0].upper() |
| |
| return compat.partial(_DescWrapper, text) |
| |
| |
| def Comment(text): |
| """Builds wrapper for adding comment to description text. |
| |
| @type text: string |
| @param text: Comment text |
| @return: Callable class |
| |
| """ |
| assert not frozenset(text).intersection("[]") |
| |
| return compat.partial(_CommentWrapper, text) |
| |
| |
| def CombinationDesc(op, args, fn): |
| """Build description for combinating operator. |
| |
| @type op: string |
| @param op: Operator as text (e.g. "and") |
| @type args: list |
| @param args: Operator arguments |
| @type fn: callable |
| @param fn: Wrapped function |
| |
| """ |
| # Some type descriptions are rather long. If "None" is listed at the |
| # end or somewhere in between it is easily missed. Therefore it should |
| # be at the beginning, e.g. "None or (long description)". |
| if __debug__ and TNone in args and args.index(TNone) > 0: |
| raise Exception("TNone must be listed first") |
| |
| if len(args) == 1: |
| descr = str(args[0]) |
| else: |
| descr = (" %s " % op).join(Parens(i) for i in args) |
| |
| return WithDesc(descr)(fn) |
| |
| |
| # Modifiable default values; need to define these here before the |
| # actual LUs |
| |
| @WithDesc(str([])) |
| def EmptyList(): |
| """Returns an empty list. |
| |
| """ |
| return [] |
| |
| |
| @WithDesc(str({})) |
| def EmptyDict(): |
| """Returns an empty dict. |
| |
| """ |
| return {} |
| |
| |
| #: The without-default default value |
| NoDefault = object() |
| |
| |
| #: The no-type (value too complex to check it in the type system) |
| NoType = object() |
| |
| |
| # Some basic types |
| @WithDesc("Anything") |
| def TAny(_): |
| """Accepts any value. |
| |
| """ |
| return True |
| |
| |
| @WithDesc("NotNone") |
| def TNotNone(val): |
| """Checks if the given value is not None. |
| |
| """ |
| return val is not None |
| |
| |
| @WithDesc("None") |
| def TNone(val): |
| """Checks if the given value is None. |
| |
| """ |
| return val is None |
| |
| |
| @WithDesc("ValueNone") |
| def TValueNone(val): |
| """Checks if the given value is L{constants.VALUE_NONE}. |
| |
| """ |
| return val == constants.VALUE_NONE |
| |
| |
| @WithDesc("Boolean") |
| def TBool(val): |
| """Checks if the given value is a boolean. |
| |
| """ |
| return isinstance(val, bool) |
| |
| |
| @WithDesc("Integer") |
| def TInt(val): |
| """Checks if the given value is an integer. |
| |
| """ |
| # For backwards compatibility with older Python versions, boolean values are |
| # also integers and should be excluded in this test. |
| # |
| # >>> (isinstance(False, int), isinstance(True, int)) |
| # (True, True) |
| return isinstance(val, (int, long)) and not isinstance(val, bool) |
| |
| |
| @WithDesc("Float") |
| def TFloat(val): |
| """Checks if the given value is a float. |
| |
| """ |
| return isinstance(val, float) |
| |
| |
| @WithDesc("String") |
| def TString(val): |
| """Checks if the given value is a string. |
| |
| """ |
| return isinstance(val, basestring) |
| |
| |
| @WithDesc("EvalToTrue") |
| def TTrue(val): |
| """Checks if a given value evaluates to a boolean True value. |
| |
| """ |
| return bool(val) |
| |
| |
| def TElemOf(target_list): |
| """Builds a function that checks if a given value is a member of a list. |
| |
| """ |
| def fn(val): |
| return val in target_list |
| |
| return WithDesc("OneOf %s" % (utils.CommaJoin(target_list), ))(fn) |
| |
| |
| # Container types |
| @WithDesc("List") |
| def TList(val): |
| """Checks if the given value is a list. |
| |
| """ |
| return isinstance(val, list) |
| |
| |
| @WithDesc("Tuple") |
| def TTuple(val): |
| """Checks if the given value is a tuple. |
| |
| """ |
| return isinstance(val, tuple) |
| |
| |
| @WithDesc("Dictionary") |
| def TDict(val): |
| """Checks if the given value is a dictionary. |
| |
| """ |
| return isinstance(val, dict) |
| |
| |
| def TIsLength(size): |
| """Check is the given container is of the given size. |
| |
| """ |
| def fn(container): |
| return len(container) == size |
| |
| return WithDesc("Length %s" % (size, ))(fn) |
| |
| |
| # Combinator types |
| def TAnd(*args): |
| """Combine multiple functions using an AND operation. |
| |
| """ |
| def fn(val): |
| return compat.all(t(val) for t in args) |
| |
| return CombinationDesc("and", args, fn) |
| |
| |
| def TOr(*args): |
| """Combine multiple functions using an OR operation. |
| |
| """ |
| def fn(val): |
| return compat.any(t(val) for t in args) |
| |
| return CombinationDesc("or", args, fn) |
| |
| |
| def TMap(fn, test): |
| """Checks that a modified version of the argument passes the given test. |
| |
| """ |
| return WithDesc("Result of %s must be %s" % |
| (Parens(fn), Parens(test)))(lambda val: test(fn(val))) |
| |
| |
| def TRegex(pobj): |
| """Checks whether a string matches a specific regular expression. |
| |
| @param pobj: Compiled regular expression as returned by C{re.compile} |
| |
| """ |
| desc = WithDesc("String matching regex \"%s\"" % |
| pobj.pattern.encode("string_escape")) |
| |
| return desc(TAnd(TString, pobj.match)) |
| |
| |
| def TMaybe(test): |
| """Wrap a test in a TOr(TNone, test). |
| |
| This makes it easier to define TMaybe* types. |
| |
| """ |
| return TOr(TNone, test) |
| |
| |
| def TMaybeValueNone(test): |
| """Used for unsetting values. |
| |
| """ |
| return TMaybe(TOr(TValueNone, test)) |
| |
| |
| # Type aliases |
| |
| #: a non-empty string |
| TNonEmptyString = WithDesc("NonEmptyString")(TAnd(TString, TTrue)) |
| |
| #: a maybe non-empty string |
| TMaybeString = TMaybe(TNonEmptyString) |
| |
| #: a maybe boolean (bool or none) |
| TMaybeBool = TMaybe(TBool) |
| |
| #: Maybe a dictionary (dict or None) |
| TMaybeDict = TMaybe(TDict) |
| |
| #: Maybe a list (list or None) |
| TMaybeList = TMaybe(TList) |
| |
| #: a non-negative integer (value >= 0) |
| TNonNegativeInt = \ |
| TAnd(TInt, WithDesc("EqualOrGreaterThanZero")(lambda v: v >= 0)) |
| |
| #: a positive integer (value > 0) |
| TPositiveInt = \ |
| TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0)) |
| |
| #: a maybe positive integer (positive integer or None) |
| TMaybePositiveInt = TMaybe(TPositiveInt) |
| |
| #: a negative integer (value < 0) |
| TNegativeInt = \ |
| TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0))) |
| |
| #: a positive float |
| TNonNegativeFloat = \ |
| TAnd(TFloat, WithDesc("EqualOrGreaterThanZero")(lambda v: v >= 0.0)) |
| |
| #: Job ID |
| TJobId = WithDesc("JobId")(TOr(TNonNegativeInt, |
| TRegex(re.compile("^%s$" % |
| constants.JOB_ID_TEMPLATE)))) |
| |
| #: Number |
| TNumber = TOr(TInt, TFloat) |
| |
| #: Relative job ID |
| TRelativeJobId = WithDesc("RelativeJobId")(TNegativeInt) |
| |
| |
| def TInstanceOf(cls): |
| """Checks if a given value is an instance of C{cls}. |
| |
| @type cls: class |
| @param cls: Class object |
| |
| """ |
| name = "%s.%s" % (cls.__module__, cls.__name__) |
| |
| desc = WithDesc("Instance of %s" % (Parens(name), )) |
| |
| return desc(lambda val: isinstance(val, cls)) |
| |
| |
| def TListOf(my_type): |
| """Checks if a given value is a list with all elements of the same type. |
| |
| """ |
| desc = WithDesc("List of %s" % (Parens(my_type), )) |
| return desc(TAnd(TList, lambda lst: compat.all(my_type(v) for v in lst))) |
| |
| |
| TMaybeListOf = lambda item_type: TMaybe(TListOf(item_type)) |
| |
| |
| def TDictOf(key_type, val_type): |
| """Checks a dict type for the type of its key/values. |
| |
| """ |
| desc = WithDesc("Dictionary with keys of %s and values of %s" % |
| (Parens(key_type), Parens(val_type))) |
| |
| def fn(container): |
| return (compat.all(key_type(v) for v in container.keys()) and |
| compat.all(val_type(v) for v in container.values())) |
| |
| return desc(TAnd(TDict, fn)) |
| |
| |
| def _TStrictDictCheck(require_all, exclusive, items, val): |
| """Helper function for L{TStrictDict}. |
| |
| """ |
| notfound_fn = lambda _: not exclusive |
| |
| if require_all and not frozenset(val.keys()).issuperset(items.keys()): |
| # Requires items not found in value |
| return False |
| |
| return compat.all(items.get(key, notfound_fn)(value) |
| for (key, value) in val.items()) |
| |
| |
| def TStrictDict(require_all, exclusive, items): |
| """Strict dictionary check with specific keys. |
| |
| @type require_all: boolean |
| @param require_all: Whether all keys in L{items} are required |
| @type exclusive: boolean |
| @param exclusive: Whether only keys listed in L{items} should be accepted |
| @type items: dictionary |
| @param items: Mapping from key (string) to verification function |
| |
| """ |
| descparts = ["Dictionary containing"] |
| |
| if exclusive: |
| descparts.append(" none but the") |
| |
| if require_all: |
| descparts.append(" required") |
| |
| if len(items) == 1: |
| descparts.append(" key ") |
| else: |
| descparts.append(" keys ") |
| |
| descparts.append(utils.CommaJoin("\"%s\" (value %s)" % (key, value) |
| for (key, value) in items.items())) |
| |
| desc = WithDesc("".join(descparts)) |
| |
| return desc(TAnd(TDict, |
| compat.partial(_TStrictDictCheck, require_all, exclusive, |
| items))) |
| |
| |
| def TItems(items): |
| """Checks individual items of a container. |
| |
| If the verified value and the list of expected items differ in length, this |
| check considers only as many items as are contained in the shorter list. Use |
| L{TIsLength} to enforce a certain length. |
| |
| @type items: list |
| @param items: List of checks |
| |
| """ |
| assert items, "Need items" |
| |
| text = ["Item", "item"] |
| desc = WithDesc(utils.CommaJoin("%s %s is %s" % |
| (text[int(idx > 0)], idx, Parens(check)) |
| for (idx, check) in enumerate(items))) |
| |
| return desc(lambda value: compat.all(check(i) |
| for (check, i) in zip(items, value))) |