| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Inter-node RPC library. |
| |
| """ |
| |
| # pylint: disable=C0103,R0201,R0904 |
| # C0103: Invalid name, since call_ are not valid |
| # R0201: Method could be a function, we keep all rpcs instance methods |
| # as not to change them back and forth between static/instance methods |
| # if they need to start using instance attributes |
| # R0904: Too many public methods |
| |
| import logging |
| import zlib |
| import base64 |
| import pycurl |
| import threading |
| import copy |
| import os |
| |
| from ganeti import utils |
| from ganeti import objects |
| from ganeti import http |
| from ganeti import serializer |
| from ganeti import constants |
| from ganeti import errors |
| from ganeti import netutils |
| from ganeti import ssconf |
| from ganeti import runtime |
| from ganeti import compat |
| from ganeti import rpc_defs |
| from ganeti import pathutils |
| from ganeti import vcluster |
| |
| # Special module generated at build time |
| from ganeti import _generated_rpc |
| |
| # pylint has a bug here, doesn't see this import |
| import ganeti.http.client # pylint: disable=W0611 |
| |
| |
| _RPC_CLIENT_HEADERS = [ |
| "Content-type: %s" % http.HTTP_APP_JSON, |
| "Expect:", |
| ] |
| |
| #: Special value to describe an offline host |
| _OFFLINE = object() |
| |
| |
| def Init(): |
| """Initializes the module-global HTTP client manager. |
| |
| Must be called before using any RPC function and while exactly one thread is |
| running. |
| |
| """ |
| # curl_global_init(3) and curl_global_cleanup(3) must be called with only |
| # one thread running. This check is just a safety measure -- it doesn't |
| # cover all cases. |
| assert threading.activeCount() == 1, \ |
| "Found more than one active thread when initializing pycURL" |
| |
| logging.info("Using PycURL %s", pycurl.version) |
| |
| pycurl.global_init(pycurl.GLOBAL_ALL) |
| |
| |
| def Shutdown(): |
| """Stops the module-global HTTP client manager. |
| |
| Must be called before quitting the program and while exactly one thread is |
| running. |
| |
| """ |
| pycurl.global_cleanup() |
| |
| |
| def _ConfigRpcCurl(curl): |
| noded_cert = pathutils.NODED_CERT_FILE |
| noded_client_cert = pathutils.NODED_CLIENT_CERT_FILE |
| |
| # This fallback is required for backwards compatibility with 2.10. Ganeti |
| # 2.11 introduced per-node client certificates, but when we restart after |
| # an upgrade from 2.10, the client certs are not in place yet, and we need |
| # to fall back to using the cluster-wide server cert. |
| if not os.path.exists(noded_client_cert): |
| logging.warn("Using server certificate as client certificate for RPC" |
| "call.") |
| noded_client_cert = noded_cert |
| |
| curl.setopt(pycurl.FOLLOWLOCATION, False) |
| curl.setopt(pycurl.CAINFO, noded_cert) |
| curl.setopt(pycurl.SSL_VERIFYHOST, 0) |
| curl.setopt(pycurl.SSL_VERIFYPEER, True) |
| curl.setopt(pycurl.SSLCERTTYPE, "PEM") |
| curl.setopt(pycurl.SSLCERT, noded_client_cert) |
| curl.setopt(pycurl.SSLKEYTYPE, "PEM") |
| curl.setopt(pycurl.SSLKEY, noded_client_cert) |
| curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT) |
| |
| |
| def RunWithRPC(fn): |
| """RPC-wrapper decorator. |
| |
| When applied to a function, it runs it with the RPC system |
| initialized, and it shutsdown the system afterwards. This means the |
| function must be called without RPC being initialized. |
| |
| """ |
| def wrapper(*args, **kwargs): |
| Init() |
| try: |
| return fn(*args, **kwargs) |
| finally: |
| Shutdown() |
| return wrapper |
| |
| |
| def _Compress(_, data): |
| """Compresses a string for transport over RPC. |
| |
| Small amounts of data are not compressed. |
| |
| @type data: str |
| @param data: Data |
| @rtype: tuple |
| @return: Encoded data to send |
| |
| """ |
| # Small amounts of data are not compressed |
| if len(data) < 512: |
| return (constants.RPC_ENCODING_NONE, data) |
| |
| # Compress with zlib and encode in base64 |
| return (constants.RPC_ENCODING_ZLIB_BASE64, |
| base64.b64encode(zlib.compress(data, 3))) |
| |
| |
| class RpcResult(object): |
| """RPC Result class. |
| |
| This class holds an RPC result. It is needed since in multi-node |
| calls we can't raise an exception just because one out of many |
| failed, and therefore we use this class to encapsulate the result. |
| |
| @ivar data: the data payload, for successful results, or None |
| @ivar call: the name of the RPC call |
| @ivar node: the name of the node to which we made the call |
| @ivar offline: whether the operation failed because the node was |
| offline, as opposed to actual failure; offline=True will always |
| imply failed=True, in order to allow simpler checking if |
| the user doesn't care about the exact failure mode |
| @ivar fail_msg: the error message if the call failed |
| |
| """ |
| def __init__(self, data=None, failed=False, offline=False, |
| call=None, node=None): |
| self.offline = offline |
| self.call = call |
| self.node = node |
| |
| if offline: |
| self.fail_msg = "Node is marked offline" |
| self.data = self.payload = None |
| elif failed: |
| self.fail_msg = self._EnsureErr(data) |
| self.data = self.payload = None |
| else: |
| self.data = data |
| if not isinstance(self.data, (tuple, list)): |
| self.fail_msg = ("RPC layer error: invalid result type (%s)" % |
| type(self.data)) |
| self.payload = None |
| elif len(data) != 2: |
| self.fail_msg = ("RPC layer error: invalid result length (%d), " |
| "expected 2" % len(self.data)) |
| self.payload = None |
| elif not self.data[0]: |
| self.fail_msg = self._EnsureErr(self.data[1]) |
| self.payload = None |
| else: |
| # finally success |
| self.fail_msg = None |
| self.payload = data[1] |
| |
| for attr_name in ["call", "data", "fail_msg", |
| "node", "offline", "payload"]: |
| assert hasattr(self, attr_name), "Missing attribute %s" % attr_name |
| |
| def __repr__(self): |
| return ("RpcResult(data=%s, call=%s, node=%s, offline=%s, fail_msg=%s)" % |
| (self.offline, self.call, self.node, self.offline, self.fail_msg)) |
| |
| @staticmethod |
| def _EnsureErr(val): |
| """Helper to ensure we return a 'True' value for error.""" |
| if val: |
| return val |
| else: |
| return "No error information" |
| |
| def Raise(self, msg, prereq=False, ecode=None): |
| """If the result has failed, raise an OpExecError. |
| |
| This is used so that LU code doesn't have to check for each |
| result, but instead can call this function. |
| |
| """ |
| if not self.fail_msg: |
| return |
| |
| if not msg: # one could pass None for default message |
| msg = ("Call '%s' to node '%s' has failed: %s" % |
| (self.call, self.node, self.fail_msg)) |
| else: |
| msg = "%s: %s" % (msg, self.fail_msg) |
| if prereq: |
| ec = errors.OpPrereqError |
| else: |
| ec = errors.OpExecError |
| if ecode is not None: |
| args = (msg, ecode) |
| else: |
| args = (msg, ) |
| raise ec(*args) # pylint: disable=W0142 |
| |
| def Warn(self, msg, feedback_fn): |
| """If the result has failed, call the feedback_fn. |
| |
| This is used to in cases were LU wants to warn the |
| user about a failure, but continue anyway. |
| |
| """ |
| if not self.fail_msg: |
| return |
| |
| msg = "%s: %s" % (msg, self.fail_msg) |
| feedback_fn(msg) |
| |
| |
| def _SsconfResolver(ssconf_ips, node_list, _, |
| ssc=ssconf.SimpleStore, |
| nslookup_fn=netutils.Hostname.GetIP): |
| """Return addresses for given node names. |
| |
| @type ssconf_ips: bool |
| @param ssconf_ips: Use the ssconf IPs |
| @type node_list: list |
| @param node_list: List of node names |
| @type ssc: class |
| @param ssc: SimpleStore class that is used to obtain node->ip mappings |
| @type nslookup_fn: callable |
| @param nslookup_fn: function use to do NS lookup |
| @rtype: list of tuple; (string, string) |
| @return: List of tuples containing node name and IP address |
| |
| """ |
| ss = ssc() |
| family = ss.GetPrimaryIPFamily() |
| |
| if ssconf_ips: |
| iplist = ss.GetNodePrimaryIPList() |
| ipmap = dict(entry.split() for entry in iplist) |
| else: |
| ipmap = {} |
| |
| result = [] |
| for node in node_list: |
| ip = ipmap.get(node) |
| if ip is None: |
| ip = nslookup_fn(node, family=family) |
| result.append((node, ip, node)) |
| |
| return result |
| |
| |
| class _StaticResolver: |
| def __init__(self, addresses): |
| """Initializes this class. |
| |
| """ |
| self._addresses = addresses |
| |
| def __call__(self, hosts, _): |
| """Returns static addresses for hosts. |
| |
| """ |
| assert len(hosts) == len(self._addresses) |
| return zip(hosts, self._addresses, hosts) |
| |
| |
| def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node): |
| """Checks if a node is online. |
| |
| @type node_uuid_or_name: string |
| @param node_uuid_or_name: Node UUID |
| @type node: L{objects.Node} or None |
| @param node: Node object |
| |
| """ |
| if node is None: |
| # Assume that the passed parameter was actually a node name, so depend on |
| # DNS for name resolution |
| return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name) |
| else: |
| if node.offline and not accept_offline_node: |
| ip = _OFFLINE |
| else: |
| ip = node.primary_ip |
| return (node.name, ip, node_uuid_or_name) |
| |
| |
| def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts): |
| """Calculate node addresses using configuration. |
| |
| Note that strings in node_uuids are treated as node names if the UUID is not |
| found in the configuration. |
| |
| """ |
| accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE) |
| |
| assert accept_offline_node or opts is None, "Unknown option" |
| |
| # Special case for single-host lookups |
| if len(node_uuids) == 1: |
| (uuid, ) = node_uuids |
| return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)] |
| else: |
| all_nodes = all_nodes_fn() |
| return [_CheckConfigNode(uuid, all_nodes.get(uuid, None), |
| accept_offline_node) |
| for uuid in node_uuids] |
| |
| |
| class _RpcProcessor: |
| def __init__(self, resolver, port, lock_monitor_cb=None): |
| """Initializes this class. |
| |
| @param resolver: callable accepting a list of node UUIDs or hostnames, |
| returning a list of tuples containing name, IP address and original name |
| of the resolved node. IP address can be the name or the special value |
| L{_OFFLINE} to mark offline machines. |
| @type port: int |
| @param port: TCP port |
| @param lock_monitor_cb: Callable for registering with lock monitor |
| |
| """ |
| self._resolver = resolver |
| self._port = port |
| self._lock_monitor_cb = lock_monitor_cb |
| |
| @staticmethod |
| def _PrepareRequests(hosts, port, procedure, body, read_timeout): |
| """Prepares requests by sorting offline hosts into separate list. |
| |
| @type body: dict |
| @param body: a dictionary with per-host body data |
| |
| """ |
| results = {} |
| requests = {} |
| |
| assert isinstance(body, dict) |
| assert len(body) == len(hosts) |
| assert compat.all(isinstance(v, str) for v in body.values()) |
| assert frozenset(h[2] for h in hosts) == frozenset(body.keys()), \ |
| "%s != %s" % (hosts, body.keys()) |
| |
| for (name, ip, original_name) in hosts: |
| if ip is _OFFLINE: |
| # Node is marked as offline |
| results[original_name] = RpcResult(node=name, |
| offline=True, |
| call=procedure) |
| else: |
| requests[original_name] = \ |
| http.client.HttpClientRequest(str(ip), port, |
| http.HTTP_POST, str("/%s" % procedure), |
| headers=_RPC_CLIENT_HEADERS, |
| post_data=body[original_name], |
| read_timeout=read_timeout, |
| nicename="%s/%s" % (name, procedure), |
| curl_config_fn=_ConfigRpcCurl) |
| |
| return (results, requests) |
| |
| @staticmethod |
| def _CombineResults(results, requests, procedure): |
| """Combines pre-computed results for offline hosts with actual call results. |
| |
| """ |
| for name, req in requests.items(): |
| if req.success and req.resp_status_code == http.HTTP_OK: |
| host_result = RpcResult(data=serializer.LoadJson(req.resp_body), |
| node=name, call=procedure) |
| else: |
| # TODO: Better error reporting |
| if req.error: |
| msg = req.error |
| else: |
| msg = req.resp_body |
| |
| logging.error("RPC error in %s on node %s: %s", procedure, name, msg) |
| host_result = RpcResult(data=msg, failed=True, node=name, |
| call=procedure) |
| |
| results[name] = host_result |
| |
| return results |
| |
| def __call__(self, nodes, procedure, body, read_timeout, resolver_opts, |
| _req_process_fn=None): |
| """Makes an RPC request to a number of nodes. |
| |
| @type nodes: sequence |
| @param nodes: node UUIDs or Hostnames |
| @type procedure: string |
| @param procedure: Request path |
| @type body: dictionary |
| @param body: dictionary with request bodies per host |
| @type read_timeout: int or None |
| @param read_timeout: Read timeout for request |
| @rtype: dictionary |
| @return: a dictionary mapping host names to rpc.RpcResult objects |
| |
| """ |
| assert read_timeout is not None, \ |
| "Missing RPC read timeout for procedure '%s'" % procedure |
| |
| if _req_process_fn is None: |
| _req_process_fn = http.client.ProcessRequests |
| |
| (results, requests) = \ |
| self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port, |
| procedure, body, read_timeout) |
| |
| _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) |
| |
| assert not frozenset(results).intersection(requests) |
| |
| return self._CombineResults(results, requests, procedure) |
| |
| |
| class _RpcClientBase: |
| def __init__(self, resolver, encoder_fn, lock_monitor_cb=None, |
| _req_process_fn=None): |
| """Initializes this class. |
| |
| """ |
| proc = _RpcProcessor(resolver, |
| netutils.GetDaemonPort(constants.NODED), |
| lock_monitor_cb=lock_monitor_cb) |
| self._proc = compat.partial(proc, _req_process_fn=_req_process_fn) |
| self._encoder = compat.partial(self._EncodeArg, encoder_fn) |
| |
| @staticmethod |
| def _EncodeArg(encoder_fn, node, (argkind, value)): |
| """Encode argument. |
| |
| """ |
| if argkind is None: |
| return value |
| else: |
| return encoder_fn(argkind)(node, value) |
| |
| def _Call(self, cdef, node_list, args): |
| """Entry point for automatically generated RPC wrappers. |
| |
| """ |
| (procedure, _, resolver_opts, timeout, argdefs, |
| prep_fn, postproc_fn, _) = cdef |
| |
| if callable(timeout): |
| read_timeout = timeout(args) |
| else: |
| read_timeout = timeout |
| |
| if callable(resolver_opts): |
| req_resolver_opts = resolver_opts(args) |
| else: |
| req_resolver_opts = resolver_opts |
| |
| if len(args) != len(argdefs): |
| raise errors.ProgrammerError("Number of passed arguments doesn't match") |
| |
| if prep_fn is None: |
| prep_fn = lambda _, args: args |
| assert callable(prep_fn) |
| |
| # encode the arguments for each node individually, pass them and the node |
| # name to the prep_fn, and serialise its return value |
| encode_args_fn = lambda node: [self._encoder(node, (argdef[1], val)) for |
| (argdef, val) in zip(argdefs, args)] |
| pnbody = dict( |
| (n, |
| serializer.DumpJson(prep_fn(n, encode_args_fn(n)), |
| private_encoder=serializer.EncodeWithPrivateFields)) |
| for n in node_list |
| ) |
| |
| result = self._proc(node_list, procedure, pnbody, read_timeout, |
| req_resolver_opts) |
| |
| if postproc_fn: |
| return dict((k, postproc_fn(v)) for (k, v) in result.items()) |
| else: |
| return result |
| |
| |
| def _ObjectToDict(_, value): |
| """Converts an object to a dictionary. |
| |
| @note: See L{objects}. |
| |
| """ |
| return value.ToDict() |
| |
| |
| def _ObjectListToDict(node, value): |
| """Converts a list of L{objects} to dictionaries. |
| |
| """ |
| return [_ObjectToDict(node, v) for v in value] |
| |
| |
| def _PrepareFileUpload(getents_fn, node, filename): |
| """Loads a file and prepares it for an upload to nodes. |
| |
| """ |
| statcb = utils.FileStatHelper() |
| data = _Compress(node, utils.ReadFile(filename, preread=statcb)) |
| st = statcb.st |
| |
| if getents_fn is None: |
| getents_fn = runtime.GetEnts |
| |
| getents = getents_fn() |
| |
| virt_filename = vcluster.MakeVirtualPath(filename) |
| |
| return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid), |
| getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime] |
| |
| |
| def _PrepareFinalizeExportDisks(_, snap_disks): |
| """Encodes disks for finalizing export. |
| |
| """ |
| flat_disks = [] |
| |
| for disk in snap_disks: |
| if isinstance(disk, bool): |
| flat_disks.append(disk) |
| else: |
| flat_disks.append(disk.ToDict()) |
| |
| return flat_disks |
| |
| |
| def _EncodeBlockdevRename(_, value): |
| """Encodes information for renaming block devices. |
| |
| """ |
| return [(d.ToDict(), uid) for d, uid in value] |
| |
| |
| def _AddSpindlesToLegacyNodeInfo(result, space_info): |
| """Extracts the spindle information from the space info and adds |
| it to the result dictionary. |
| |
| @type result: dict of strings |
| @param result: dictionary holding the result of the legacy node info |
| @type space_info: list of dicts of strings |
| @param space_info: list, each row holding space information of one storage |
| unit |
| @rtype: None |
| @return: does not return anything, manipulates the C{result} variable |
| |
| """ |
| lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType( |
| space_info, constants.ST_LVM_PV) |
| if lvm_pv_info: |
| result["spindles_free"] = lvm_pv_info["storage_free"] |
| result["spindles_total"] = lvm_pv_info["storage_size"] |
| else: |
| result["spindles_free"] = 0 |
| result["spindles_total"] = 0 |
| |
| |
| def _AddStorageInfoToLegacyNodeInfoByTemplate( |
| result, space_info, disk_template): |
| """Extracts the storage space information of the disk template from |
| the space info and adds it to the result dictionary. |
| |
| @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information. |
| |
| """ |
| if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template): |
| disk_info = utils.storage.LookupSpaceInfoByDiskTemplate( |
| space_info, disk_template) |
| result["name"] = disk_info["name"] |
| result["storage_free"] = disk_info["storage_free"] |
| result["storage_size"] = disk_info["storage_size"] |
| else: |
| # FIXME: consider displaying '-' in this case |
| result["storage_free"] = 0 |
| result["storage_size"] = 0 |
| |
| |
| def MakeLegacyNodeInfo(data, disk_template): |
| """Formats the data returned by call_node_info. |
| |
| Converts the data into a single dictionary. This is fine for most use cases, |
| but some require information from more than one volume group or hypervisor. |
| |
| """ |
| (bootid, space_info, (hv_info, )) = data |
| |
| ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid}) |
| |
| _AddSpindlesToLegacyNodeInfo(ret, space_info) |
| _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template) |
| |
| return ret |
| |
| |
| def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)): |
| """Annotates just DRBD disks layouts. |
| |
| """ |
| assert disk.dev_type == constants.DT_DRBD8 |
| |
| disk.params = objects.FillDict(drbd_params, disk.params) |
| (dev_data, dev_meta) = disk.children |
| dev_data.params = objects.FillDict(data_params, dev_data.params) |
| dev_meta.params = objects.FillDict(meta_params, dev_meta.params) |
| |
| return disk |
| |
| |
| def _AnnotateDParamsGeneric(disk, (params, )): |
| """Generic disk parameter annotation routine. |
| |
| """ |
| assert disk.dev_type != constants.DT_DRBD8 |
| |
| disk.params = objects.FillDict(params, disk.params) |
| |
| return disk |
| |
| |
| def AnnotateDiskParams(disks, disk_params): |
| """Annotates the disk objects with the disk parameters. |
| |
| @param disks: The list of disks objects to annotate |
| @param disk_params: The disk parameters for annotation |
| @returns: A list of disk objects annotated |
| |
| """ |
| def AnnotateDisk(disk): |
| if disk.dev_type == constants.DT_DISKLESS: |
| return disk |
| |
| ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params) |
| |
| if disk.dev_type == constants.DT_DRBD8: |
| return _AnnotateDParamsDRBD(disk, ld_params) |
| else: |
| return _AnnotateDParamsGeneric(disk, ld_params) |
| |
| return [AnnotateDisk(disk.Copy()) for disk in disks] |
| |
| |
| def _GetExclusiveStorageFlag(cfg, node_uuid): |
| ni = cfg.GetNodeInfo(node_uuid) |
| if ni is None: |
| raise errors.OpPrereqError("Invalid node name %s" % node_uuid, |
| errors.ECODE_NOENT) |
| return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE] |
| |
| |
| def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag): |
| """Adds the exclusive storage flag to lvm units. |
| |
| This function creates a copy of the storage_units lists, with the |
| es_flag being added to all lvm storage units. |
| |
| @type storage_units: list of pairs (string, string) |
| @param storage_units: list of 'raw' storage units, consisting only of |
| (storage_type, storage_key) |
| @type es_flag: boolean |
| @param es_flag: exclusive storage flag |
| @rtype: list of tuples (string, string, list) |
| @return: list of storage units (storage_type, storage_key, params) with |
| the params containing the es_flag for lvm-vg storage units |
| |
| """ |
| result = [] |
| for (storage_type, storage_key) in storage_units: |
| if storage_type in [constants.ST_LVM_VG]: |
| result.append((storage_type, storage_key, [es_flag])) |
| if es_flag: |
| result.append((constants.ST_LVM_PV, storage_key, [es_flag])) |
| else: |
| result.append((storage_type, storage_key, [])) |
| return result |
| |
| |
| def GetExclusiveStorageForNodes(cfg, node_uuids): |
| """Return the exclusive storage flag for all the given nodes. |
| |
| @type cfg: L{config.ConfigWriter} |
| @param cfg: cluster configuration |
| @type node_uuids: list or tuple |
| @param node_uuids: node UUIDs for which to read the flag |
| @rtype: dict |
| @return: mapping from node uuids to exclusive storage flags |
| @raise errors.OpPrereqError: if any given node name has no corresponding |
| node |
| |
| """ |
| getflag = lambda n: _GetExclusiveStorageFlag(cfg, n) |
| flags = map(getflag, node_uuids) |
| return dict(zip(node_uuids, flags)) |
| |
| |
| def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids): |
| """Return the lvm storage unit for all the given nodes. |
| |
| Main purpose of this function is to map the exclusive storage flag, which |
| can be different for each node, to the default LVM storage unit. |
| |
| @type cfg: L{config.ConfigWriter} |
| @param cfg: cluster configuration |
| @type storage_units: list of pairs (string, string) |
| @param storage_units: list of 'raw' storage units, e.g. pairs of |
| (storage_type, storage_key) |
| @type node_uuids: list or tuple |
| @param node_uuids: node UUIDs for which to read the flag |
| @rtype: dict |
| @return: mapping from node uuids to a list of storage units which include |
| the exclusive storage flag for lvm storage |
| @raise errors.OpPrereqError: if any given node name has no corresponding |
| node |
| |
| """ |
| getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits( |
| storage_units, _GetExclusiveStorageFlag(cfg, n)) |
| flags = map(getunit, node_uuids) |
| return dict(zip(node_uuids, flags)) |
| |
| |
| #: Generic encoders |
| _ENCODERS = { |
| rpc_defs.ED_OBJECT_DICT: _ObjectToDict, |
| rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict, |
| rpc_defs.ED_COMPRESS: _Compress, |
| rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks, |
| rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename, |
| } |
| |
| |
| class RpcRunner(_RpcClientBase, |
| _generated_rpc.RpcClientDefault, |
| _generated_rpc.RpcClientBootstrap, |
| _generated_rpc.RpcClientDnsOnly, |
| _generated_rpc.RpcClientConfig): |
| """RPC runner class. |
| |
| """ |
| def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None): |
| """Initialized the RPC runner. |
| |
| @type cfg: L{config.ConfigWriter} |
| @param cfg: Configuration |
| @type lock_monitor_cb: callable |
| @param lock_monitor_cb: Lock monitor callback |
| |
| """ |
| self._cfg = cfg |
| |
| encoders = _ENCODERS.copy() |
| |
| encoders.update({ |
| # Encoders requiring configuration object |
| rpc_defs.ED_INST_DICT: self._InstDict, |
| rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp, |
| rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp, |
| rpc_defs.ED_NIC_DICT: self._NicDict, |
| rpc_defs.ED_DEVICE_DICT: self._DeviceDict, |
| |
| # Encoders annotating disk parameters |
| rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP, |
| rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP, |
| rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP, |
| rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP, |
| |
| # Encoders with special requirements |
| rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), |
| |
| rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO, |
| }) |
| |
| # Resolver using configuration |
| resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo, |
| cfg.GetAllNodesInfo) |
| |
| # Pylint doesn't recognize multiple inheritance properly, see |
| # <http://www.logilab.org/ticket/36586> and |
| # <http://www.logilab.org/ticket/35642> |
| # pylint: disable=W0233 |
| _RpcClientBase.__init__(self, resolver, encoders.get, |
| lock_monitor_cb=lock_monitor_cb, |
| _req_process_fn=_req_process_fn) |
| _generated_rpc.RpcClientConfig.__init__(self) |
| _generated_rpc.RpcClientBootstrap.__init__(self) |
| _generated_rpc.RpcClientDnsOnly.__init__(self) |
| _generated_rpc.RpcClientDefault.__init__(self) |
| |
| def _NicDict(self, _, nic): |
| """Convert the given nic to a dict and encapsulate netinfo |
| |
| """ |
| n = copy.deepcopy(nic) |
| if n.network: |
| net_uuid = self._cfg.LookupNetwork(n.network) |
| if net_uuid: |
| nobj = self._cfg.GetNetwork(net_uuid) |
| n.netinfo = objects.Network.ToDict(nobj) |
| return n.ToDict() |
| |
| def _DeviceDict(self, _, (device, instance)): |
| if isinstance(device, objects.NIC): |
| return self._NicDict(None, device) |
| elif isinstance(device, objects.Disk): |
| return self._SingleDiskDictDP(None, (device, instance)) |
| |
| def _InstDict(self, node, instance, hvp=None, bep=None, osp=None): |
| """Convert the given instance to a dict. |
| |
| This is done via the instance's ToDict() method and additionally |
| we fill the hvparams with the cluster defaults. |
| |
| @type instance: L{objects.Instance} |
| @param instance: an Instance object |
| @type hvp: dict or None |
| @param hvp: a dictionary with overridden hypervisor parameters |
| @type bep: dict or None |
| @param bep: a dictionary with overridden backend parameters |
| @type osp: dict or None |
| @param osp: a dictionary with overridden os parameters |
| @rtype: dict |
| @return: the instance dict, with the hvparams filled with the |
| cluster defaults |
| |
| """ |
| idict = instance.ToDict() |
| cluster = self._cfg.GetClusterInfo() |
| idict["hvparams"] = cluster.FillHV(instance) |
| idict["secondary_nodes"] = \ |
| self._cfg.GetInstanceSecondaryNodes(instance.uuid) |
| if hvp is not None: |
| idict["hvparams"].update(hvp) |
| idict["beparams"] = cluster.FillBE(instance) |
| if bep is not None: |
| idict["beparams"].update(bep) |
| idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams) |
| if osp is not None: |
| idict["osparams"].update(osp) |
| disks = self._cfg.GetInstanceDisks(instance.uuid) |
| idict["disks_info"] = self._DisksDictDP(node, (disks, instance)) |
| for nic in idict["nics"]: |
| nic["nicparams"] = objects.FillDict( |
| cluster.nicparams[constants.PP_DEFAULT], |
| nic["nicparams"]) |
| network = nic.get("network", None) |
| if network: |
| net_uuid = self._cfg.LookupNetwork(network) |
| if net_uuid: |
| nobj = self._cfg.GetNetwork(net_uuid) |
| nic["netinfo"] = objects.Network.ToDict(nobj) |
| return idict |
| |
| def _InstDictHvpBepDp(self, node, (instance, hvp, bep)): |
| """Wrapper for L{_InstDict}. |
| |
| """ |
| return self._InstDict(node, instance, hvp=hvp, bep=bep) |
| |
| def _InstDictOspDp(self, node, (instance, osparams)): |
| """Wrapper for L{_InstDict}. |
| |
| """ |
| return self._InstDict(node, instance, osp=osparams) |
| |
| def _DisksDictDP(self, node, (disks, instance)): |
| """Wrapper for L{AnnotateDiskParams}. |
| |
| """ |
| diskparams = self._cfg.GetInstanceDiskParams(instance) |
| ret = [] |
| for disk in AnnotateDiskParams(disks, diskparams): |
| disk_node_uuids = disk.GetNodes(instance.primary_node) |
| node_ips = dict((uuid, node.secondary_ip) for (uuid, node) |
| in self._cfg.GetMultiNodeInfo(disk_node_uuids)) |
| |
| disk.UpdateDynamicDiskParams(node, node_ips) |
| |
| ret.append(disk.ToDict(include_dynamic_params=True)) |
| |
| return ret |
| |
| def _MultiDiskDictDP(self, node, disks_insts): |
| """Wrapper for L{AnnotateDiskParams}. |
| |
| Supports a list of (disk, instance) tuples. |
| """ |
| return [disk for disk_inst in disks_insts |
| for disk in self._DisksDictDP(node, disk_inst)] |
| |
| def _SingleDiskDictDP(self, node, (disk, instance)): |
| """Wrapper for L{AnnotateDiskParams}. |
| |
| """ |
| (anno_disk,) = self._DisksDictDP(node, ([disk], instance)) |
| return anno_disk |
| |
| def _EncodeNodeToDiskDictDP(self, node, value): |
| """Encode dict of node name -> list of (disk, instance) tuples as values. |
| |
| """ |
| return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks]) |
| for name, disks in value.items()) |
| |
| def _EncodeImportExportIO(self, node, (ieio, ieioargs)): |
| """Encodes import/export I/O information. |
| |
| """ |
| if ieio == constants.IEIO_RAW_DISK: |
| assert len(ieioargs) == 2 |
| return (ieio, (self._SingleDiskDictDP(node, ieioargs), )) |
| |
| if ieio == constants.IEIO_SCRIPT: |
| assert len(ieioargs) == 2 |
| return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1])) |
| |
| return (ieio, ieioargs) |
| |
| |
| class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue): |
| """RPC wrappers for job queue. |
| |
| """ |
| def __init__(self, _context, address_list): |
| """Initializes this class. |
| |
| """ |
| if address_list is None: |
| resolver = compat.partial(_SsconfResolver, True) |
| else: |
| # Caller provided an address list |
| resolver = _StaticResolver(address_list) |
| |
| _RpcClientBase.__init__(self, resolver, _ENCODERS.get, |
| lock_monitor_cb=lambda _: None) |
| _generated_rpc.RpcClientJobQueue.__init__(self) |
| |
| |
| class BootstrapRunner(_RpcClientBase, |
| _generated_rpc.RpcClientBootstrap, |
| _generated_rpc.RpcClientDnsOnly): |
| """RPC wrappers for bootstrapping. |
| |
| """ |
| def __init__(self): |
| """Initializes this class. |
| |
| """ |
| # Pylint doesn't recognize multiple inheritance properly, see |
| # <http://www.logilab.org/ticket/36586> and |
| # <http://www.logilab.org/ticket/35642> |
| # pylint: disable=W0233 |
| _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True), |
| _ENCODERS.get) |
| _generated_rpc.RpcClientBootstrap.__init__(self) |
| _generated_rpc.RpcClientDnsOnly.__init__(self) |
| |
| |
| class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly): |
| """RPC wrappers for calls using only DNS. |
| |
| """ |
| def __init__(self): |
| """Initialize this class. |
| |
| """ |
| _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False), |
| _ENCODERS.get) |
| _generated_rpc.RpcClientDnsOnly.__init__(self) |
| |
| |
| class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig): |
| """RPC wrappers for L{config}. |
| |
| """ |
| def __init__(self, _context, address_list, _req_process_fn=None, |
| _getents=None): |
| """Initializes this class. |
| |
| """ |
| lock_monitor_cb = None |
| |
| if address_list is None: |
| resolver = compat.partial(_SsconfResolver, True) |
| else: |
| # Caller provided an address list |
| resolver = _StaticResolver(address_list) |
| |
| encoders = _ENCODERS.copy() |
| |
| encoders.update({ |
| rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents), |
| }) |
| |
| _RpcClientBase.__init__(self, resolver, encoders.get, |
| lock_monitor_cb=lock_monitor_cb, |
| _req_process_fn=_req_process_fn) |
| _generated_rpc.RpcClientConfig.__init__(self) |