| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Logical units dealing with nodes.""" |
| |
| import logging |
| import operator |
| |
| from ganeti import constants |
| from ganeti import errors |
| from ganeti import locking |
| from ganeti import netutils |
| from ganeti import objects |
| from ganeti import opcodes |
| import ganeti.rpc.node as rpc |
| from ganeti import utils |
| from ganeti.masterd import iallocator |
| |
| from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs |
| from ganeti.cmdlib.common import CheckParamsNotGlobal, \ |
| MergeAndVerifyHvState, MergeAndVerifyDiskState, \ |
| IsExclusiveStorageEnabledNode, CheckNodePVs, \ |
| RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \ |
| CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \ |
| AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \ |
| GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \ |
| FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \ |
| AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \ |
| EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey |
| |
| |
| def _DecideSelfPromotion(lu, exceptions=None): |
| """Decide whether I should promote myself as a master candidate. |
| |
| """ |
| cp_size = lu.cfg.GetClusterInfo().candidate_pool_size |
| mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) |
| # the new node will increase mc_max with one, so: |
| mc_should = min(mc_should + 1, cp_size) |
| return mc_now < mc_should |
| |
| |
| def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq): |
| """Ensure that a node has the given secondary ip. |
| |
| @type lu: L{LogicalUnit} |
| @param lu: the LU on behalf of which we make the check |
| @type node: L{objects.Node} |
| @param node: the node to check |
| @type secondary_ip: string |
| @param secondary_ip: the ip to check |
| @type prereq: boolean |
| @param prereq: whether to throw a prerequisite or an execute error |
| @raise errors.OpPrereqError: if the node doesn't have the ip, |
| and prereq=True |
| @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False |
| |
| """ |
| # this can be called with a new node, which has no UUID yet, so perform the |
| # RPC call using its name |
| result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip) |
| result.Raise("Failure checking secondary ip on node %s" % node.name, |
| prereq=prereq, ecode=errors.ECODE_ENVIRON) |
| if not result.payload: |
| msg = ("Node claims it doesn't have the secondary ip you gave (%s)," |
| " please fix and re-run this command" % secondary_ip) |
| if prereq: |
| raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) |
| else: |
| raise errors.OpExecError(msg) |
| |
| |
| class LUNodeAdd(LogicalUnit): |
| """Logical unit for adding node to the cluster. |
| |
| """ |
| HPATH = "node-add" |
| HTYPE = constants.HTYPE_NODE |
| _NFLAGS = ["master_capable", "vm_capable"] |
| |
| def CheckArguments(self): |
| self.primary_ip_family = self.cfg.GetPrimaryIPFamily() |
| # validate/normalize the node name |
| self.hostname = netutils.GetHostname(name=self.op.node_name, |
| family=self.primary_ip_family) |
| self.op.node_name = self.hostname.name |
| |
| if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName(): |
| raise errors.OpPrereqError("Cannot readd the master node", |
| errors.ECODE_STATE) |
| |
| if self.op.readd and self.op.group: |
| raise errors.OpPrereqError("Cannot pass a node group when a node is" |
| " being readded", errors.ECODE_INVAL) |
| |
| def BuildHooksEnv(self): |
| """Build hooks env. |
| |
| This will run on all nodes before, and on all nodes + the new node after. |
| |
| """ |
| return { |
| "OP_TARGET": self.op.node_name, |
| "NODE_NAME": self.op.node_name, |
| "NODE_PIP": self.op.primary_ip, |
| "NODE_SIP": self.op.secondary_ip, |
| "MASTER_CAPABLE": str(self.op.master_capable), |
| "VM_CAPABLE": str(self.op.vm_capable), |
| } |
| |
| def BuildHooksNodes(self): |
| """Build hooks nodes. |
| |
| """ |
| hook_nodes = self.cfg.GetNodeList() |
| new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name) |
| if new_node_info is not None: |
| # Exclude added node |
| hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid])) |
| |
| # add the new node as post hook node by name; it does not have an UUID yet |
| return (hook_nodes, hook_nodes) |
| |
| def PreparePostHookNodes(self, post_hook_node_uuids): |
| return post_hook_node_uuids + [self.new_node.uuid] |
| |
| def HooksAbortCallBack(self, phase, feedback_fn, exception): |
| """Cleans up if the hooks fail. |
| |
| This function runs actions that necessary to bring the cluster into a |
| clean state again. This is necessary if for example the hooks of this |
| operation failed and leave the node in an inconsistent state. |
| |
| """ |
| if phase == constants.HOOKS_PHASE_PRE: |
| feedback_fn("Pre operation hook failed. Rolling back preparations.") |
| |
| master_node = self.cfg.GetMasterNodeInfo().name |
| remove_result = self.rpc.call_node_ssh_key_remove_light( |
| [master_node], |
| self.op.node_name) |
| remove_result[master_node].Raise( |
| "Error removing SSH key of node '%s'." % self.op.node_name) |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| This checks: |
| - the new node is not already in the config |
| - it is resolvable |
| - its parameters (single/dual homed) matches the cluster |
| |
| Any errors are signaled by raising errors.OpPrereqError. |
| |
| """ |
| node_name = self.hostname.name |
| self.op.primary_ip = self.hostname.ip |
| if self.op.secondary_ip is None: |
| if self.primary_ip_family == netutils.IP6Address.family: |
| raise errors.OpPrereqError("When using a IPv6 primary address, a valid" |
| " IPv4 address must be given as secondary", |
| errors.ECODE_INVAL) |
| self.op.secondary_ip = self.op.primary_ip |
| |
| secondary_ip = self.op.secondary_ip |
| if not netutils.IP4Address.IsValid(secondary_ip): |
| raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" |
| " address" % secondary_ip, errors.ECODE_INVAL) |
| |
| existing_node_info = self.cfg.GetNodeInfoByName(node_name) |
| if not self.op.readd and existing_node_info is not None: |
| raise errors.OpPrereqError("Node %s is already in the configuration" % |
| node_name, errors.ECODE_EXISTS) |
| elif self.op.readd and existing_node_info is None: |
| raise errors.OpPrereqError("Node %s is not in the configuration" % |
| node_name, errors.ECODE_NOENT) |
| |
| self.changed_primary_ip = False |
| |
| for existing_node in self.cfg.GetAllNodesInfo().values(): |
| if self.op.readd and node_name == existing_node.name: |
| if existing_node.secondary_ip != secondary_ip: |
| raise errors.OpPrereqError("Readded node doesn't have the same IP" |
| " address configuration as before", |
| errors.ECODE_INVAL) |
| if existing_node.primary_ip != self.op.primary_ip: |
| self.changed_primary_ip = True |
| |
| continue |
| |
| if (existing_node.primary_ip == self.op.primary_ip or |
| existing_node.secondary_ip == self.op.primary_ip or |
| existing_node.primary_ip == secondary_ip or |
| existing_node.secondary_ip == secondary_ip): |
| raise errors.OpPrereqError("New node ip address(es) conflict with" |
| " existing node %s" % existing_node.name, |
| errors.ECODE_NOTUNIQUE) |
| |
| # After this 'if' block, None is no longer a valid value for the |
| # _capable op attributes |
| if self.op.readd: |
| assert existing_node_info is not None, \ |
| "Can't retrieve locked node %s" % node_name |
| for attr in self._NFLAGS: |
| if getattr(self.op, attr) is None: |
| setattr(self.op, attr, getattr(existing_node_info, attr)) |
| else: |
| for attr in self._NFLAGS: |
| if getattr(self.op, attr) is None: |
| setattr(self.op, attr, True) |
| |
| if self.op.readd and not self.op.vm_capable: |
| pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid) |
| if pri or sec: |
| raise errors.OpPrereqError("Node %s being re-added with vm_capable" |
| " flag set to false, but it already holds" |
| " instances" % node_name, |
| errors.ECODE_STATE) |
| |
| # check that the type of the node (single versus dual homed) is the |
| # same as for the master |
| myself = self.cfg.GetMasterNodeInfo() |
| master_singlehomed = myself.secondary_ip == myself.primary_ip |
| newbie_singlehomed = secondary_ip == self.op.primary_ip |
| if master_singlehomed != newbie_singlehomed: |
| if master_singlehomed: |
| raise errors.OpPrereqError("The master has no secondary ip but the" |
| " new node has one", |
| errors.ECODE_INVAL) |
| else: |
| raise errors.OpPrereqError("The master has a secondary ip but the" |
| " new node doesn't have one", |
| errors.ECODE_INVAL) |
| |
| # checks reachability |
| if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT): |
| raise errors.OpPrereqError("Node not reachable by ping", |
| errors.ECODE_ENVIRON) |
| |
| if not newbie_singlehomed: |
| # check reachability from my secondary ip to newbie's secondary ip |
| if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, |
| source=myself.secondary_ip): |
| raise errors.OpPrereqError("Node secondary ip not reachable by TCP" |
| " based ping to node daemon port", |
| errors.ECODE_ENVIRON) |
| |
| if self.op.readd: |
| exceptions = [existing_node_info.uuid] |
| else: |
| exceptions = [] |
| |
| if self.op.master_capable: |
| self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) |
| else: |
| self.master_candidate = False |
| |
| self.node_group = None |
| if self.op.readd: |
| self.new_node = existing_node_info |
| self.node_group = existing_node_info.group |
| else: |
| self.node_group = self.cfg.LookupNodeGroup(self.op.group) |
| self.new_node = objects.Node(name=node_name, |
| primary_ip=self.op.primary_ip, |
| secondary_ip=secondary_ip, |
| master_candidate=self.master_candidate, |
| offline=False, drained=False, |
| group=self.node_group, ndparams={}) |
| |
| if self.op.ndparams: |
| utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) |
| CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", |
| "node", "cluster or group") |
| |
| if self.op.hv_state: |
| self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None) |
| |
| if self.op.disk_state: |
| self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None) |
| |
| # TODO: If we need to have multiple DnsOnlyRunner we probably should make |
| # it a property on the base class. |
| rpcrunner = rpc.DnsOnlyRunner() |
| result = rpcrunner.call_version([node_name])[node_name] |
| result.Raise("Can't get version information from node %s" % node_name, |
| prereq=True, ecode=errors.ECODE_ENVIRON) |
| if constants.PROTOCOL_VERSION == result.payload: |
| logging.info("Communication to node %s fine, sw version %s match", |
| node_name, result.payload) |
| else: |
| raise errors.OpPrereqError("Version mismatch master version %s," |
| " node version %s" % |
| (constants.PROTOCOL_VERSION, result.payload), |
| errors.ECODE_ENVIRON) |
| |
| vg_name = self.cfg.GetVGName() |
| if vg_name is not None: |
| vparams = {constants.NV_PVLIST: [vg_name]} |
| excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node) |
| cname = self.cfg.GetClusterName() |
| result = rpcrunner.call_node_verify_light( |
| [node_name], vparams, cname, |
| self.cfg.GetClusterInfo().hvparams, |
| )[node_name] |
| (errmsgs, _) = CheckNodePVs(result.payload, excl_stor) |
| if errmsgs: |
| raise errors.OpPrereqError("Checks on node PVs failed: %s" % |
| "; ".join(errmsgs), errors.ECODE_ENVIRON) |
| |
| def _InitOpenVSwitch(self): |
| filled_ndparams = self.cfg.GetClusterInfo().FillND( |
| self.new_node, self.cfg.GetNodeGroup(self.new_node.group)) |
| |
| ovs = filled_ndparams.get(constants.ND_OVS, None) |
| ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None) |
| ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None) |
| |
| if ovs: |
| if not ovs_link: |
| self.LogInfo("No physical interface for OpenvSwitch was given." |
| " OpenvSwitch will not have an outside connection. This" |
| " might not be what you want.") |
| |
| result = self.rpc.call_node_configure_ovs( |
| self.new_node.name, ovs_name, ovs_link) |
| result.Raise("Failed to initialize OpenVSwitch on new node") |
| |
| def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate, |
| is_potential_master_candidate, rpcrunner, readd, feedback_fn): |
| """Update the SSH setup of all nodes after adding a new node. |
| |
| @type readd: boolean |
| @param readd: whether or not this node is readded |
| |
| """ |
| potential_master_candidates = self.cfg.GetPotentialMasterCandidates() |
| master_node = self.cfg.GetMasterNode() |
| |
| if readd: |
| # clear previous keys |
| master_candidate_uuids = self.cfg.GetMasterCandidateUuids() |
| remove_result = rpcrunner.call_node_ssh_key_remove( |
| [master_node], |
| new_node_uuid, new_node_name, |
| master_candidate_uuids, |
| potential_master_candidates, |
| True, # from authorized keys |
| True, # from public keys |
| False, # clear authorized keys |
| True, # clear public keys |
| True, # it's a readd |
| self.op.debug, |
| self.op.verbose) |
| remove_result[master_node].Raise( |
| "Could not remove SSH keys of node %s before readding," |
| " (UUID: %s)." % (new_node_name, new_node_uuid)) |
| WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn) |
| |
| result = rpcrunner.call_node_ssh_key_add( |
| [master_node], new_node_uuid, new_node_name, |
| potential_master_candidates, |
| is_master_candidate, is_potential_master_candidate, |
| is_potential_master_candidate, self.op.debug, self.op.verbose) |
| |
| result[master_node].Raise("Could not update the node's SSH setup.") |
| WarnAboutFailedSshUpdates(result, master_node, feedback_fn) |
| |
| def Exec(self, feedback_fn): |
| """Adds the new node to the cluster. |
| |
| """ |
| assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ |
| "Not owning BGL" |
| |
| # We adding a new node so we assume it's powered |
| self.new_node.powered = True |
| |
| # for re-adds, reset the offline/drained/master-candidate flags; |
| # we need to reset here, otherwise offline would prevent RPC calls |
| # later in the procedure; this also means that if the re-add |
| # fails, we are left with a non-offlined, broken node |
| if self.op.readd: |
| self.new_node.offline = False |
| self.new_node.drained = False |
| self.LogInfo("Readding a node, the offline/drained flags were reset") |
| # if we demote the node, we do cleanup later in the procedure |
| self.new_node.master_candidate = self.master_candidate |
| if self.changed_primary_ip: |
| self.new_node.primary_ip = self.op.primary_ip |
| |
| # copy the master/vm_capable flags |
| for attr in self._NFLAGS: |
| setattr(self.new_node, attr, getattr(self.op, attr)) |
| |
| # notify the user about any possible mc promotion |
| if self.new_node.master_candidate: |
| self.LogInfo("Node will be a master candidate") |
| |
| if self.op.ndparams: |
| self.new_node.ndparams = self.op.ndparams |
| else: |
| self.new_node.ndparams = {} |
| |
| if self.op.hv_state: |
| self.new_node.hv_state_static = self.new_hv_state |
| |
| if self.op.disk_state: |
| self.new_node.disk_state_static = self.new_disk_state |
| |
| # Add node to our /etc/hosts, and add key to known_hosts |
| if self.cfg.GetClusterInfo().modify_etc_hosts: |
| master_node = self.cfg.GetMasterNode() |
| result = self.rpc.call_etc_hosts_modify( |
| master_node, constants.ETC_HOSTS_ADD, self.hostname.name, |
| self.hostname.ip) |
| result.Raise("Can't update hosts file with new host data") |
| |
| if self.new_node.secondary_ip != self.new_node.primary_ip: |
| _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip, |
| False) |
| |
| node_verifier_uuids = [self.cfg.GetMasterNode()] |
| node_verify_param = { |
| constants.NV_NODELIST: ([self.new_node.name], {}, []), |
| # TODO: do a node-net-test as well? |
| } |
| |
| result = self.rpc.call_node_verify( |
| node_verifier_uuids, node_verify_param, |
| self.cfg.GetClusterName(), |
| self.cfg.GetClusterInfo().hvparams) |
| for verifier in node_verifier_uuids: |
| result[verifier].Raise("Cannot communicate with node %s" % verifier) |
| nl_payload = result[verifier].payload[constants.NV_NODELIST] |
| if nl_payload: |
| for failed in nl_payload: |
| feedback_fn("ssh/hostname verification failed" |
| " (checking from %s): %s" % |
| (verifier, nl_payload[failed])) |
| raise errors.OpExecError("ssh/hostname verification failed") |
| |
| self._InitOpenVSwitch() |
| |
| if self.op.readd: |
| RedistributeAncillaryFiles(self) |
| # make sure we redistribute the config |
| self.cfg.Update(self.new_node, feedback_fn) |
| # and make sure the new node will not have old files around |
| if not self.new_node.master_candidate: |
| result = self.rpc.call_node_demote_from_mc(self.new_node.uuid) |
| result.Warn("Node failed to demote itself from master candidate status", |
| self.LogWarning) |
| else: |
| self.cfg.AddNode(self.new_node, self.proc.GetECId()) |
| RedistributeAncillaryFiles(self) |
| |
| # We create a new certificate even if the node is readded |
| digest = GetClientCertDigest(self, self.new_node.uuid) |
| if self.new_node.master_candidate: |
| self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest) |
| else: |
| self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None) |
| |
| EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid]) |
| |
| # Update SSH setup of all nodes |
| if self.op.node_setup: |
| # FIXME: so far, all nodes are considered potential master candidates |
| self._SshUpdate(self.new_node.uuid, self.new_node.name, |
| self.new_node.master_candidate, True, |
| self.rpc, self.op.readd, feedback_fn) |
| |
| |
| class LUNodeSetParams(LogicalUnit): |
| """Modifies the parameters of a node. |
| |
| @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline) |
| to the node role (as _ROLE_*) |
| @cvar _R2F: a dictionary from node role to tuples of flags |
| @cvar _FLAGS: a list of attribute names corresponding to the flags |
| |
| """ |
| HPATH = "node-modify" |
| HTYPE = constants.HTYPE_NODE |
| REQ_BGL = False |
| (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) |
| _F2R = { |
| (True, False, False): _ROLE_CANDIDATE, |
| (False, True, False): _ROLE_DRAINED, |
| (False, False, True): _ROLE_OFFLINE, |
| (False, False, False): _ROLE_REGULAR, |
| } |
| _R2F = dict((v, k) for k, v in _F2R.items()) |
| _FLAGS = ["master_candidate", "drained", "offline"] |
| |
| def CheckArguments(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, |
| self.op.master_capable, self.op.vm_capable, |
| self.op.secondary_ip, self.op.ndparams, self.op.hv_state, |
| self.op.disk_state] |
| if all_mods.count(None) == len(all_mods): |
| raise errors.OpPrereqError("Please pass at least one modification", |
| errors.ECODE_INVAL) |
| if all_mods.count(True) > 1: |
| raise errors.OpPrereqError("Can't set the node into more than one" |
| " state at the same time", |
| errors.ECODE_INVAL) |
| |
| # Boolean value that tells us whether we might be demoting from MC |
| self.might_demote = (self.op.master_candidate is False or |
| self.op.offline is True or |
| self.op.drained is True or |
| self.op.master_capable is False) |
| |
| if self.op.secondary_ip: |
| if not netutils.IP4Address.IsValid(self.op.secondary_ip): |
| raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" |
| " address" % self.op.secondary_ip, |
| errors.ECODE_INVAL) |
| |
| self.lock_all = self.op.auto_promote and self.might_demote |
| self.lock_instances = self.op.secondary_ip is not None |
| |
| def _InstanceFilter(self, instance): |
| """Filter for getting affected instances. |
| |
| """ |
| disks = self.cfg.GetInstanceDisks(instance.uuid) |
| any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR) |
| return (any_mirrored and |
| self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid)) |
| |
| def ExpandNames(self): |
| if self.lock_all: |
| self.needed_locks = { |
| locking.LEVEL_NODE: locking.ALL_SET, |
| } |
| else: |
| self.needed_locks = { |
| locking.LEVEL_NODE: self.op.node_uuid, |
| } |
| |
| # Since modifying a node can have severe effects on currently running |
| # operations the resource lock is at least acquired in shared mode |
| self.needed_locks[locking.LEVEL_NODE_RES] = \ |
| self.needed_locks[locking.LEVEL_NODE] |
| |
| # Get all locks except nodes in shared mode; they are not used for anything |
| # but read-only access |
| self.share_locks = ShareAll() |
| self.share_locks[locking.LEVEL_NODE] = 0 |
| self.share_locks[locking.LEVEL_NODE_RES] = 0 |
| |
| if self.lock_instances: |
| self.needed_locks[locking.LEVEL_INSTANCE] = \ |
| self.cfg.GetInstanceNames( |
| self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys()) |
| |
| def BuildHooksEnv(self): |
| """Build hooks env. |
| |
| This runs on the master node. |
| |
| """ |
| return { |
| "OP_TARGET": self.op.node_name, |
| "MASTER_CANDIDATE": str(self.op.master_candidate), |
| "OFFLINE": str(self.op.offline), |
| "DRAINED": str(self.op.drained), |
| "MASTER_CAPABLE": str(self.op.master_capable), |
| "VM_CAPABLE": str(self.op.vm_capable), |
| } |
| |
| def BuildHooksNodes(self): |
| """Build hooks nodes. |
| |
| """ |
| nl = [self.cfg.GetMasterNode(), self.op.node_uuid] |
| return (nl, nl) |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| This only checks the instance list against the existing names. |
| |
| """ |
| node = self.cfg.GetNodeInfo(self.op.node_uuid) |
| if self.lock_instances: |
| affected_instances = \ |
| self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) |
| |
| # Verify instance locks |
| owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) |
| wanted_instance_names = frozenset([inst.name for inst in |
| affected_instances.values()]) |
| if wanted_instance_names - owned_instance_names: |
| raise errors.OpPrereqError("Instances affected by changing node %s's" |
| " secondary IP address have changed since" |
| " locks were acquired, wanted '%s', have" |
| " '%s'; retry the operation" % |
| (node.name, |
| utils.CommaJoin(wanted_instance_names), |
| utils.CommaJoin(owned_instance_names)), |
| errors.ECODE_STATE) |
| else: |
| affected_instances = None |
| |
| if (self.op.master_candidate is not None or |
| self.op.drained is not None or |
| self.op.offline is not None): |
| # we can't change the master's node flags |
| if node.uuid == self.cfg.GetMasterNode(): |
| raise errors.OpPrereqError("The master role can be changed" |
| " only via master-failover", |
| errors.ECODE_INVAL) |
| |
| if self.op.master_candidate and not node.master_capable: |
| raise errors.OpPrereqError("Node %s is not master capable, cannot make" |
| " it a master candidate" % node.name, |
| errors.ECODE_STATE) |
| |
| if self.op.vm_capable is False: |
| (ipri, isec) = self.cfg.GetNodeInstances(node.uuid) |
| if ipri or isec: |
| raise errors.OpPrereqError("Node %s hosts instances, cannot unset" |
| " the vm_capable flag" % node.name, |
| errors.ECODE_STATE) |
| |
| if node.master_candidate and self.might_demote and not self.lock_all: |
| assert not self.op.auto_promote, "auto_promote set but lock_all not" |
| # check if after removing the current node, we're missing master |
| # candidates |
| (mc_remaining, mc_should, _) = \ |
| self.cfg.GetMasterCandidateStats(exceptions=[node.uuid]) |
| if mc_remaining < mc_should: |
| raise errors.OpPrereqError("Not enough master candidates, please" |
| " pass auto promote option to allow" |
| " promotion (--auto-promote or RAPI" |
| " auto_promote=True)", errors.ECODE_STATE) |
| |
| self.old_flags = old_flags = (node.master_candidate, |
| node.drained, node.offline) |
| assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) |
| self.old_role = old_role = self._F2R[old_flags] |
| |
| # Check for ineffective changes |
| for attr in self._FLAGS: |
| if getattr(self.op, attr) is False and getattr(node, attr) is False: |
| self.LogInfo("Ignoring request to unset flag %s, already unset", attr) |
| setattr(self.op, attr, None) |
| |
| # Past this point, any flag change to False means a transition |
| # away from the respective state, as only real changes are kept |
| |
| # TODO: We might query the real power state if it supports OOB |
| if SupportsOob(self.cfg, node): |
| if self.op.offline is False and not (node.powered or |
| self.op.powered is True): |
| raise errors.OpPrereqError(("Node %s needs to be turned on before its" |
| " offline status can be reset") % |
| self.op.node_name, errors.ECODE_STATE) |
| elif self.op.powered is not None: |
| raise errors.OpPrereqError(("Unable to change powered state for node %s" |
| " as it does not support out-of-band" |
| " handling") % self.op.node_name, |
| errors.ECODE_STATE) |
| |
| # If we're being deofflined/drained, we'll MC ourself if needed |
| if (self.op.drained is False or self.op.offline is False or |
| (self.op.master_capable and not node.master_capable)): |
| if _DecideSelfPromotion(self): |
| self.op.master_candidate = True |
| self.LogInfo("Auto-promoting node to master candidate") |
| |
| # If we're no longer master capable, we'll demote ourselves from MC |
| if self.op.master_capable is False and node.master_candidate: |
| if self.op.node_uuid == self.cfg.GetMasterNode(): |
| raise errors.OpPrereqError("Master must remain master capable", |
| errors.ECODE_STATE) |
| self.LogInfo("Demoting from master candidate") |
| self.op.master_candidate = False |
| |
| # Compute new role |
| assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1 |
| if self.op.master_candidate: |
| new_role = self._ROLE_CANDIDATE |
| elif self.op.drained: |
| new_role = self._ROLE_DRAINED |
| elif self.op.offline: |
| new_role = self._ROLE_OFFLINE |
| elif False in [self.op.master_candidate, self.op.drained, self.op.offline]: |
| # False is still in new flags, which means we're un-setting (the |
| # only) True flag |
| new_role = self._ROLE_REGULAR |
| else: # no new flags, nothing, keep old role |
| new_role = old_role |
| |
| self.new_role = new_role |
| |
| if old_role == self._ROLE_OFFLINE and new_role != old_role: |
| # Trying to transition out of offline status |
| result = self.rpc.call_version([node.uuid])[node.uuid] |
| if result.fail_msg: |
| raise errors.OpPrereqError("Node %s is being de-offlined but fails" |
| " to report its version: %s" % |
| (node.name, result.fail_msg), |
| errors.ECODE_STATE) |
| else: |
| self.LogWarning("Transitioning node from offline to online state" |
| " without using re-add. Please make sure the node" |
| " is healthy!") |
| |
| # When changing the secondary ip, verify if this is a single-homed to |
| # multi-homed transition or vice versa, and apply the relevant |
| # restrictions. |
| if self.op.secondary_ip: |
| # Ok even without locking, because this can't be changed by any LU |
| master = self.cfg.GetMasterNodeInfo() |
| master_singlehomed = master.secondary_ip == master.primary_ip |
| if master_singlehomed and self.op.secondary_ip != node.primary_ip: |
| if self.op.force and node.uuid == master.uuid: |
| self.LogWarning("Transitioning from single-homed to multi-homed" |
| " cluster; all nodes will require a secondary IP" |
| " address") |
| else: |
| raise errors.OpPrereqError("Changing the secondary ip on a" |
| " single-homed cluster requires the" |
| " --force option to be passed, and the" |
| " target node to be the master", |
| errors.ECODE_INVAL) |
| elif not master_singlehomed and self.op.secondary_ip == node.primary_ip: |
| if self.op.force and node.uuid == master.uuid: |
| self.LogWarning("Transitioning from multi-homed to single-homed" |
| " cluster; secondary IP addresses will have to be" |
| " removed") |
| else: |
| raise errors.OpPrereqError("Cannot set the secondary IP to be the" |
| " same as the primary IP on a multi-homed" |
| " cluster, unless the --force option is" |
| " passed, and the target node is the" |
| " master", errors.ECODE_INVAL) |
| |
| assert not (set([inst.name for inst in affected_instances.values()]) - |
| self.owned_locks(locking.LEVEL_INSTANCE)) |
| |
| if node.offline: |
| if affected_instances: |
| msg = ("Cannot change secondary IP address: offline node has" |
| " instances (%s) configured to use it" % |
| utils.CommaJoin( |
| [inst.name for inst in affected_instances.values()])) |
| raise errors.OpPrereqError(msg, errors.ECODE_STATE) |
| else: |
| # On online nodes, check that no instances are running, and that |
| # the node has the new ip and we can reach it. |
| for instance in affected_instances.values(): |
| CheckInstanceState(self, instance, INSTANCE_DOWN, |
| msg="cannot change secondary ip") |
| |
| _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True) |
| if master.uuid != node.uuid: |
| # check reachability from master secondary ip to new secondary ip |
| if not netutils.TcpPing(self.op.secondary_ip, |
| constants.DEFAULT_NODED_PORT, |
| source=master.secondary_ip): |
| raise errors.OpPrereqError("Node secondary ip not reachable by TCP" |
| " based ping to node daemon port", |
| errors.ECODE_ENVIRON) |
| |
| if self.op.ndparams: |
| new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams) |
| utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) |
| CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node", |
| "node", "cluster or group") |
| self.new_ndparams = new_ndparams |
| |
| if self.op.hv_state: |
| self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, |
| node.hv_state_static) |
| |
| if self.op.disk_state: |
| self.new_disk_state = \ |
| MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static) |
| |
| def Exec(self, feedback_fn): |
| """Modifies a node. |
| |
| """ |
| node = self.cfg.GetNodeInfo(self.op.node_uuid) |
| result = [] |
| |
| if self.op.ndparams: |
| node.ndparams = self.new_ndparams |
| |
| if self.op.powered is not None: |
| node.powered = self.op.powered |
| |
| if self.op.hv_state: |
| node.hv_state_static = self.new_hv_state |
| |
| if self.op.disk_state: |
| node.disk_state_static = self.new_disk_state |
| |
| for attr in ["master_capable", "vm_capable"]: |
| val = getattr(self.op, attr) |
| if val is not None: |
| setattr(node, attr, val) |
| result.append((attr, str(val))) |
| |
| if self.op.secondary_ip: |
| node.secondary_ip = self.op.secondary_ip |
| result.append(("secondary_ip", self.op.secondary_ip)) |
| |
| # this will trigger configuration file update, if needed |
| self.cfg.Update(node, feedback_fn) |
| master_node = self.cfg.GetMasterNode() |
| potential_master_candidates = self.cfg.GetPotentialMasterCandidates() |
| modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup |
| |
| if self.new_role != self.old_role: |
| new_flags = self._R2F[self.new_role] |
| for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS): |
| if of != nf: |
| result.append((desc, str(nf))) |
| (node.master_candidate, node.drained, node.offline) = new_flags |
| self.cfg.Update(node, feedback_fn) |
| |
| # Tell the node to demote itself, if no longer MC and not offline. |
| # This must be done only after the configuration is updated so that |
| # it's ensured the node won't receive any further configuration updates. |
| if self.old_role == self._ROLE_CANDIDATE and \ |
| self.new_role != self._ROLE_OFFLINE: |
| msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg |
| if msg: |
| self.LogWarning("Node failed to demote itself: %s", msg) |
| |
| # we locked all nodes, we adjust the CP before updating this node |
| if self.lock_all: |
| AdjustCandidatePool( |
| self, [node.uuid], master_node, potential_master_candidates, |
| feedback_fn, modify_ssh_setup) |
| |
| # if node gets promoted, grant RPC priviledges |
| if self.new_role == self._ROLE_CANDIDATE: |
| AddNodeCertToCandidateCerts(self, self.cfg, node.uuid) |
| # if node is demoted, revoke RPC priviledges |
| if self.old_role == self._ROLE_CANDIDATE: |
| RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid) |
| |
| EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid]) |
| |
| # this will trigger job queue propagation or cleanup if the mc |
| # flag changed |
| if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1: |
| |
| if modify_ssh_setup: |
| if self.old_role == self._ROLE_CANDIDATE: |
| master_candidate_uuids = self.cfg.GetMasterCandidateUuids() |
| ssh_result = self.rpc.call_node_ssh_key_remove( |
| [master_node], |
| node.uuid, node.name, |
| master_candidate_uuids, potential_master_candidates, |
| True, # remove node's key from all nodes' authorized_keys file |
| False, # currently, all nodes are potential master candidates |
| False, # do not clear node's 'authorized_keys' |
| False, # do not clear node's 'ganeti_pub_keys' |
| False, # no readd |
| self.op.debug, |
| self.op.verbose) |
| ssh_result[master_node].Raise( |
| "Could not adjust the SSH setup after demoting node '%s'" |
| " (UUID: %s)." % (node.name, node.uuid)) |
| WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn) |
| |
| if self.new_role == self._ROLE_CANDIDATE: |
| AddMasterCandidateSshKey( |
| self, master_node, node, potential_master_candidates, feedback_fn) |
| |
| return result |
| |
| |
| class LUNodePowercycle(NoHooksLU): |
| """Powercycles a node. |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| |
| if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force: |
| raise errors.OpPrereqError("The node is the master and the force" |
| " parameter was not set", |
| errors.ECODE_INVAL) |
| |
| def ExpandNames(self): |
| """Locking for PowercycleNode. |
| |
| This is a last-resort option and shouldn't block on other |
| jobs. Therefore, we grab no locks. |
| |
| """ |
| self.needed_locks = {} |
| |
| def Exec(self, feedback_fn): |
| """Reboots a node. |
| |
| """ |
| default_hypervisor = self.cfg.GetHypervisorType() |
| hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor] |
| result = self.rpc.call_node_powercycle(self.op.node_uuid, |
| default_hypervisor, |
| hvparams) |
| result.Raise("Failed to schedule the reboot") |
| return result.payload |
| |
| |
| def _GetNodeInstancesInner(cfg, fn): |
| return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] |
| |
| |
| def _GetNodePrimaryInstances(cfg, node_uuid): |
| """Returns primary instances on a node. |
| |
| """ |
| return _GetNodeInstancesInner(cfg, |
| lambda inst: node_uuid == inst.primary_node) |
| |
| |
| def _GetNodeSecondaryInstances(cfg, node_uuid): |
| """Returns secondary instances on a node. |
| |
| """ |
| return _GetNodeInstancesInner(cfg, |
| lambda inst: node_uuid in |
| cfg.GetInstanceSecondaryNodes(inst.uuid)) |
| |
| |
| def _GetNodeInstances(cfg, node_uuid): |
| """Returns a list of all primary and secondary instances on a node. |
| |
| """ |
| |
| return _GetNodeInstancesInner(cfg, |
| lambda inst: node_uuid in |
| cfg.GetInstanceNodes(inst.uuid)) |
| |
| |
| class LUNodeEvacuate(NoHooksLU): |
| """Evacuates instances off a list of nodes. |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| CheckIAllocatorOrNode(self, "iallocator", "remote_node") |
| |
| def ExpandNames(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| |
| if self.op.remote_node is not None: |
| (self.op.remote_node_uuid, self.op.remote_node) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid, |
| self.op.remote_node) |
| assert self.op.remote_node |
| |
| if self.op.node_uuid == self.op.remote_node_uuid: |
| raise errors.OpPrereqError("Can not use evacuated node as a new" |
| " secondary node", errors.ECODE_INVAL) |
| |
| if self.op.mode != constants.NODE_EVAC_SEC: |
| raise errors.OpPrereqError("Without the use of an iallocator only" |
| " secondary instances can be evacuated", |
| errors.ECODE_INVAL) |
| |
| # Declare locks |
| self.share_locks = ShareAll() |
| self.needed_locks = { |
| locking.LEVEL_INSTANCE: [], |
| locking.LEVEL_NODEGROUP: [], |
| locking.LEVEL_NODE: [], |
| } |
| |
| # Determine nodes (via group) optimistically, needs verification once locks |
| # have been acquired |
| self.lock_nodes = self._DetermineNodes() |
| |
| def _DetermineNodes(self): |
| """Gets the list of node UUIDs to operate on. |
| |
| """ |
| if self.op.remote_node is None: |
| # Iallocator will choose any node(s) in the same group |
| group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid]) |
| else: |
| group_nodes = frozenset([self.op.remote_node_uuid]) |
| |
| # Determine nodes to be locked |
| return set([self.op.node_uuid]) | group_nodes |
| |
| def _DetermineInstances(self): |
| """Builds list of instances to operate on. |
| |
| """ |
| assert self.op.mode in constants.NODE_EVAC_MODES |
| |
| if self.op.mode == constants.NODE_EVAC_PRI: |
| # Primary instances only |
| inst_fn = _GetNodePrimaryInstances |
| assert self.op.remote_node is None, \ |
| "Evacuating primary instances requires iallocator" |
| elif self.op.mode == constants.NODE_EVAC_SEC: |
| # Secondary instances only |
| inst_fn = _GetNodeSecondaryInstances |
| else: |
| # All instances |
| assert self.op.mode == constants.NODE_EVAC_ALL |
| inst_fn = _GetNodeInstances |
| # TODO: In 2.6, change the iallocator interface to take an evacuation mode |
| # per instance |
| raise errors.OpPrereqError("Due to an issue with the iallocator" |
| " interface it is not possible to evacuate" |
| " all instances at once; specify explicitly" |
| " whether to evacuate primary or secondary" |
| " instances", |
| errors.ECODE_INVAL) |
| |
| return inst_fn(self.cfg, self.op.node_uuid) |
| |
| def DeclareLocks(self, level): |
| if level == locking.LEVEL_INSTANCE: |
| # Lock instances optimistically, needs verification once node and group |
| # locks have been acquired |
| self.needed_locks[locking.LEVEL_INSTANCE] = \ |
| set(i.name for i in self._DetermineInstances()) |
| |
| elif level == locking.LEVEL_NODEGROUP: |
| # Lock node groups for all potential target nodes optimistically, needs |
| # verification once nodes have been acquired |
| self.needed_locks[locking.LEVEL_NODEGROUP] = \ |
| self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) |
| |
| elif level == locking.LEVEL_NODE: |
| self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes |
| |
| def CheckPrereq(self): |
| # Verify locks |
| owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE) |
| owned_nodes = self.owned_locks(locking.LEVEL_NODE) |
| owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) |
| |
| need_nodes = self._DetermineNodes() |
| |
| if not owned_nodes.issuperset(need_nodes): |
| raise errors.OpPrereqError("Nodes in same group as '%s' changed since" |
| " locks were acquired, current nodes are" |
| " are '%s', used to be '%s'; retry the" |
| " operation" % |
| (self.op.node_name, |
| utils.CommaJoin(need_nodes), |
| utils.CommaJoin(owned_nodes)), |
| errors.ECODE_STATE) |
| |
| wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) |
| if owned_groups != wanted_groups: |
| raise errors.OpExecError("Node groups changed since locks were acquired," |
| " current groups are '%s', used to be '%s';" |
| " retry the operation" % |
| (utils.CommaJoin(wanted_groups), |
| utils.CommaJoin(owned_groups))) |
| |
| # Determine affected instances |
| self.instances = self._DetermineInstances() |
| self.instance_names = [i.name for i in self.instances] |
| |
| if set(self.instance_names) != owned_instance_names: |
| raise errors.OpExecError("Instances on node '%s' changed since locks" |
| " were acquired, current instances are '%s'," |
| " used to be '%s'; retry the operation" % |
| (self.op.node_name, |
| utils.CommaJoin(self.instance_names), |
| utils.CommaJoin(owned_instance_names))) |
| |
| if self.instance_names: |
| self.LogInfo("Evacuating instances from node '%s': %s", |
| self.op.node_name, |
| utils.CommaJoin(utils.NiceSort(self.instance_names))) |
| else: |
| self.LogInfo("No instances to evacuate from node '%s'", |
| self.op.node_name) |
| |
| if self.op.remote_node is not None: |
| for i in self.instances: |
| if i.primary_node == self.op.remote_node_uuid: |
| raise errors.OpPrereqError("Node %s is the primary node of" |
| " instance %s, cannot use it as" |
| " secondary" % |
| (self.op.remote_node, i.name), |
| errors.ECODE_INVAL) |
| |
| def Exec(self, feedback_fn): |
| assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) |
| |
| if not self.instance_names: |
| # No instances to evacuate |
| jobs = [] |
| |
| elif self.op.iallocator is not None: |
| # TODO: Implement relocation to other group |
| req = iallocator.IAReqNodeEvac( |
| evac_mode=self.op.mode, instances=list(self.instance_names), |
| ignore_soft_errors=self.op.ignore_soft_errors) |
| ial = iallocator.IAllocator(self.cfg, self.rpc, req) |
| |
| ial.Run(self.op.iallocator) |
| |
| if not ial.success: |
| raise errors.OpPrereqError("Can't compute node evacuation using" |
| " iallocator '%s': %s" % |
| (self.op.iallocator, ial.info), |
| errors.ECODE_NORES) |
| |
| jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True) |
| |
| elif self.op.remote_node is not None: |
| assert self.op.mode == constants.NODE_EVAC_SEC |
| jobs = [ |
| [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, |
| remote_node=self.op.remote_node, |
| disks=[], |
| mode=constants.REPLACE_DISK_CHG, |
| early_release=self.op.early_release)] |
| for instance_name in self.instance_names] |
| |
| else: |
| raise errors.ProgrammerError("No iallocator or remote node") |
| |
| return ResultWithJobs(jobs) |
| |
| |
| class LUNodeMigrate(LogicalUnit): |
| """Migrate all instances from a node. |
| |
| """ |
| HPATH = "node-migrate" |
| HTYPE = constants.HTYPE_NODE |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| pass |
| |
| def ExpandNames(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| |
| self.share_locks = ShareAll() |
| self.needed_locks = { |
| locking.LEVEL_NODE: [self.op.node_uuid], |
| } |
| |
| def BuildHooksEnv(self): |
| """Build hooks env. |
| |
| This runs on the master, the primary and all the secondaries. |
| |
| """ |
| return { |
| "NODE_NAME": self.op.node_name, |
| "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, |
| } |
| |
| def BuildHooksNodes(self): |
| """Build hooks nodes. |
| |
| """ |
| nl = [self.cfg.GetMasterNode()] |
| return (nl, nl) |
| |
| def CheckPrereq(self): |
| pass |
| |
| def Exec(self, feedback_fn): |
| # Prepare jobs for migration instances |
| jobs = [ |
| [opcodes.OpInstanceMigrate( |
| instance_name=inst.name, |
| mode=self.op.mode, |
| live=self.op.live, |
| iallocator=self.op.iallocator, |
| target_node=self.op.target_node, |
| allow_runtime_changes=self.op.allow_runtime_changes, |
| ignore_ipolicy=self.op.ignore_ipolicy)] |
| for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)] |
| |
| # TODO: Run iallocator in this opcode and pass correct placement options to |
| # OpInstanceMigrate. Since other jobs can modify the cluster between |
| # running the iallocator and the actual migration, a good consistency model |
| # will have to be found. |
| |
| assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == |
| frozenset([self.op.node_uuid])) |
| |
| return ResultWithJobs(jobs) |
| |
| |
| def _GetStorageTypeArgs(cfg, storage_type): |
| """Returns the arguments for a storage type. |
| |
| """ |
| # Special case for file storage |
| |
| if storage_type == constants.ST_FILE: |
| return [[cfg.GetFileStorageDir()]] |
| elif storage_type == constants.ST_SHARED_FILE: |
| return [[cfg.GetSharedFileStorageDir()]] |
| elif storage_type == constants.ST_GLUSTER: |
| return [[cfg.GetGlusterStorageDir()]] |
| else: |
| return [] |
| |
| |
| class LUNodeModifyStorage(NoHooksLU): |
| """Logical unit for modifying a storage volume on a node. |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| |
| storage_type = self.op.storage_type |
| |
| try: |
| modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] |
| except KeyError: |
| raise errors.OpPrereqError("Storage units of type '%s' can not be" |
| " modified" % storage_type, |
| errors.ECODE_INVAL) |
| |
| diff = set(self.op.changes.keys()) - modifiable |
| if diff: |
| raise errors.OpPrereqError("The following fields can not be modified for" |
| " storage units of type '%s': %r" % |
| (storage_type, list(diff)), |
| errors.ECODE_INVAL) |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| """ |
| CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) |
| |
| def ExpandNames(self): |
| self.needed_locks = { |
| locking.LEVEL_NODE: self.op.node_uuid, |
| } |
| |
| def Exec(self, feedback_fn): |
| """Computes the list of nodes and their attributes. |
| |
| """ |
| st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) |
| result = self.rpc.call_storage_modify(self.op.node_uuid, |
| self.op.storage_type, st_args, |
| self.op.name, self.op.changes) |
| result.Raise("Failed to modify storage unit '%s' on %s" % |
| (self.op.name, self.op.node_name)) |
| |
| |
| def _CheckOutputFields(fields, selected): |
| """Checks whether all selected fields are valid according to fields. |
| |
| @type fields: L{utils.FieldSet} |
| @param fields: fields set |
| @type selected: L{utils.FieldSet} |
| @param selected: fields set |
| |
| """ |
| delta = fields.NonMatching(selected) |
| if delta: |
| raise errors.OpPrereqError("Unknown output fields selected: %s" |
| % ",".join(delta), errors.ECODE_INVAL) |
| |
| |
| class LUNodeQueryvols(NoHooksLU): |
| """Logical unit for getting volumes on node(s). |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS, |
| constants.VF_VG, constants.VF_NAME, |
| constants.VF_SIZE, constants.VF_INSTANCE), |
| self.op.output_fields) |
| |
| def ExpandNames(self): |
| self.share_locks = ShareAll() |
| |
| if self.op.nodes: |
| self.needed_locks = { |
| locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], |
| } |
| else: |
| self.needed_locks = { |
| locking.LEVEL_NODE: locking.ALL_SET, |
| } |
| |
| def Exec(self, feedback_fn): |
| """Computes the list of nodes and their attributes. |
| |
| """ |
| node_uuids = self.owned_locks(locking.LEVEL_NODE) |
| volumes = self.rpc.call_node_volumes(node_uuids) |
| |
| ilist = self.cfg.GetAllInstancesInfo() |
| vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values()) |
| |
| output = [] |
| for node_uuid in node_uuids: |
| nresult = volumes[node_uuid] |
| if nresult.offline: |
| continue |
| msg = nresult.fail_msg |
| if msg: |
| self.LogWarning("Can't compute volume data on node %s: %s", |
| self.cfg.GetNodeName(node_uuid), msg) |
| continue |
| |
| node_vols = sorted(nresult.payload, |
| key=operator.itemgetter(constants.VF_DEV)) |
| |
| for vol in node_vols: |
| node_output = [] |
| for field in self.op.output_fields: |
| if field == constants.VF_NODE: |
| val = self.cfg.GetNodeName(node_uuid) |
| elif field == constants.VF_PHYS: |
| val = vol[constants.VF_DEV] |
| elif field == constants.VF_VG: |
| val = vol[constants.VF_VG] |
| elif field == constants.VF_NAME: |
| val = vol[constants.VF_NAME] |
| elif field == constants.VF_SIZE: |
| val = int(float(vol[constants.VF_SIZE])) |
| elif field == constants.VF_INSTANCE: |
| inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" + |
| vol[constants.VF_NAME]), None) |
| if inst is not None: |
| val = inst.name |
| else: |
| val = "-" |
| else: |
| raise errors.ParameterError(field) |
| node_output.append(str(val)) |
| |
| output.append(node_output) |
| |
| return output |
| |
| |
| class LUNodeQueryStorage(NoHooksLU): |
| """Logical unit for getting information on storage units on node(s). |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS), |
| self.op.output_fields) |
| |
| def ExpandNames(self): |
| self.share_locks = ShareAll() |
| |
| if self.op.nodes: |
| self.needed_locks = { |
| locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0], |
| } |
| else: |
| self.needed_locks = { |
| locking.LEVEL_NODE: locking.ALL_SET, |
| } |
| |
| def _DetermineStorageType(self): |
| """Determines the default storage type of the cluster. |
| |
| """ |
| enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates |
| default_storage_type = \ |
| constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]] |
| return default_storage_type |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| """ |
| if self.op.storage_type: |
| CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) |
| self.storage_type = self.op.storage_type |
| else: |
| self.storage_type = self._DetermineStorageType() |
| supported_storage_types = constants.STS_REPORT_NODE_STORAGE |
| if self.storage_type not in supported_storage_types: |
| raise errors.OpPrereqError( |
| "Storage reporting for storage type '%s' is not supported. Please" |
| " use the --storage-type option to specify one of the supported" |
| " storage types (%s) or set the default disk template to one that" |
| " supports storage reporting." % |
| (self.storage_type, utils.CommaJoin(supported_storage_types))) |
| |
| def Exec(self, feedback_fn): |
| """Computes the list of nodes and their attributes. |
| |
| """ |
| if self.op.storage_type: |
| self.storage_type = self.op.storage_type |
| else: |
| self.storage_type = self._DetermineStorageType() |
| |
| self.node_uuids = self.owned_locks(locking.LEVEL_NODE) |
| |
| # Always get name to sort by |
| if constants.SF_NAME in self.op.output_fields: |
| fields = self.op.output_fields[:] |
| else: |
| fields = [constants.SF_NAME] + self.op.output_fields |
| |
| # Never ask for node or type as it's only known to the LU |
| for extra in [constants.SF_NODE, constants.SF_TYPE]: |
| while extra in fields: |
| fields.remove(extra) |
| |
| field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) |
| name_idx = field_idx[constants.SF_NAME] |
| |
| st_args = _GetStorageTypeArgs(self.cfg, self.storage_type) |
| data = self.rpc.call_storage_list(self.node_uuids, |
| self.storage_type, st_args, |
| self.op.name, fields) |
| |
| result = [] |
| |
| for node_uuid in utils.NiceSort(self.node_uuids): |
| node_name = self.cfg.GetNodeName(node_uuid) |
| nresult = data[node_uuid] |
| if nresult.offline: |
| continue |
| |
| msg = nresult.fail_msg |
| if msg: |
| self.LogWarning("Can't get storage data from node %s: %s", |
| node_name, msg) |
| continue |
| |
| rows = dict([(row[name_idx], row) for row in nresult.payload]) |
| |
| for name in utils.NiceSort(rows.keys()): |
| row = rows[name] |
| |
| out = [] |
| |
| for field in self.op.output_fields: |
| if field == constants.SF_NODE: |
| val = node_name |
| elif field == constants.SF_TYPE: |
| val = self.storage_type |
| elif field in field_idx: |
| val = row[field_idx[field]] |
| else: |
| raise errors.ParameterError(field) |
| |
| out.append(val) |
| |
| result.append(out) |
| |
| return result |
| |
| |
| class LUNodeRemove(LogicalUnit): |
| """Logical unit for removing a node. |
| |
| """ |
| HPATH = "node-remove" |
| HTYPE = constants.HTYPE_NODE |
| |
| def BuildHooksEnv(self): |
| """Build hooks env. |
| |
| """ |
| return { |
| "OP_TARGET": self.op.node_name, |
| "NODE_NAME": self.op.node_name, |
| } |
| |
| def BuildHooksNodes(self): |
| """Build hooks nodes. |
| |
| This doesn't run on the target node in the pre phase as a failed |
| node would then be impossible to remove. |
| |
| """ |
| all_nodes = self.cfg.GetNodeList() |
| try: |
| all_nodes.remove(self.op.node_uuid) |
| except ValueError: |
| pass |
| return (all_nodes, all_nodes) |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| This checks: |
| - the node exists in the configuration |
| - it does not have primary or secondary instances |
| - it's not the master |
| |
| Any errors are signaled by raising errors.OpPrereqError. |
| |
| """ |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| node = self.cfg.GetNodeInfo(self.op.node_uuid) |
| assert node is not None |
| |
| masternode = self.cfg.GetMasterNode() |
| if node.uuid == masternode: |
| raise errors.OpPrereqError("Node is the master node, failover to another" |
| " node is required", errors.ECODE_INVAL) |
| |
| for _, instance in self.cfg.GetAllInstancesInfo().items(): |
| if node.uuid in self.cfg.GetInstanceNodes(instance.uuid): |
| raise errors.OpPrereqError("Instance %s is still running on the node," |
| " please remove first" % instance.name, |
| errors.ECODE_INVAL) |
| self.op.node_name = node.name |
| self.node = node |
| |
| def Exec(self, feedback_fn): |
| """Removes the node from the cluster. |
| |
| """ |
| logging.info("Stopping the node daemon and removing configs from node %s", |
| self.node.name) |
| |
| modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup |
| |
| assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ |
| "Not owning BGL" |
| |
| master_node = self.cfg.GetMasterNode() |
| potential_master_candidates = self.cfg.GetPotentialMasterCandidates() |
| if modify_ssh_setup: |
| # retrieve the list of potential master candidates before the node is |
| # removed |
| potential_master_candidate = \ |
| self.op.node_name in potential_master_candidates |
| master_candidate_uuids = self.cfg.GetMasterCandidateUuids() |
| result = self.rpc.call_node_ssh_key_remove( |
| [master_node], |
| self.node.uuid, self.op.node_name, |
| master_candidate_uuids, potential_master_candidates, |
| self.node.master_candidate, # from_authorized_keys |
| potential_master_candidate, # from_public_keys |
| True, # clear node's 'authorized_keys' |
| True, # clear node's 'ganeti_public_keys' |
| False, # no readd |
| self.op.debug, |
| self.op.verbose) |
| result[master_node].Raise( |
| "Could not remove the SSH key of node '%s' (UUID: %s)." % |
| (self.op.node_name, self.node.uuid)) |
| WarnAboutFailedSshUpdates(result, master_node, feedback_fn) |
| |
| # Promote nodes to master candidate as needed |
| AdjustCandidatePool( |
| self, [self.node.uuid], master_node, potential_master_candidates, |
| feedback_fn, modify_ssh_setup) |
| self.cfg.RemoveNode(self.node.uuid) |
| |
| # Run post hooks on the node before it's removed |
| RunPostHook(self, self.node.name) |
| |
| # we have to call this by name rather than by UUID, as the node is no longer |
| # in the config |
| result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup) |
| msg = result.fail_msg |
| if msg: |
| self.LogWarning("Errors encountered on the remote node while leaving" |
| " the cluster: %s", msg) |
| |
| cluster = self.cfg.GetClusterInfo() |
| |
| # Remove node from candidate certificate list |
| if self.node.master_candidate: |
| self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid) |
| |
| # Remove node from our /etc/hosts |
| if cluster.modify_etc_hosts: |
| master_node_uuid = self.cfg.GetMasterNode() |
| result = self.rpc.call_etc_hosts_modify(master_node_uuid, |
| constants.ETC_HOSTS_REMOVE, |
| self.node.name, None) |
| result.Raise("Can't update hosts file with new host data") |
| RedistributeAncillaryFiles(self) |
| |
| |
| class LURepairNodeStorage(NoHooksLU): |
| """Repairs the volume group on a node. |
| |
| """ |
| REQ_BGL = False |
| |
| def CheckArguments(self): |
| (self.op.node_uuid, self.op.node_name) = \ |
| ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name) |
| |
| storage_type = self.op.storage_type |
| |
| if (constants.SO_FIX_CONSISTENCY not in |
| constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): |
| raise errors.OpPrereqError("Storage units of type '%s' can not be" |
| " repaired" % storage_type, |
| errors.ECODE_INVAL) |
| |
| def ExpandNames(self): |
| self.needed_locks = { |
| locking.LEVEL_NODE: [self.op.node_uuid], |
| } |
| |
| def _CheckFaultyDisks(self, instance, node_uuid): |
| """Ensure faulty disks abort the opcode or at least warn.""" |
| try: |
| if FindFaultyInstanceDisks(self.cfg, self.rpc, instance, |
| node_uuid, True): |
| raise errors.OpPrereqError("Instance '%s' has faulty disks on" |
| " node '%s'" % |
| (instance.name, |
| self.cfg.GetNodeName(node_uuid)), |
| errors.ECODE_STATE) |
| except errors.OpPrereqError, err: |
| if self.op.ignore_consistency: |
| self.LogWarning(str(err.args[0])) |
| else: |
| raise |
| |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| """ |
| CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type) |
| |
| # Check whether any instance on this node has faulty disks |
| for inst in _GetNodeInstances(self.cfg, self.op.node_uuid): |
| if not inst.disks_active: |
| continue |
| check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid)) |
| check_nodes.discard(self.op.node_uuid) |
| for inst_node_uuid in check_nodes: |
| self._CheckFaultyDisks(inst, inst_node_uuid) |
| |
| def Exec(self, feedback_fn): |
| feedback_fn("Repairing storage unit '%s' on %s ..." % |
| (self.op.name, self.op.node_name)) |
| |
| st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) |
| result = self.rpc.call_storage_execute(self.op.node_uuid, |
| self.op.storage_type, st_args, |
| self.op.name, |
| constants.SO_FIX_CONSISTENCY) |
| result.Raise("Failed to repair storage unit '%s' on %s" % |
| (self.op.name, self.op.node_name)) |