blob: d1eae5ef9a2cb3100bb119acef740816c74e37cf [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Logical units dealing with nodes."""
import logging
import operator
from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti import netutils
from ganeti import objects
from ganeti import opcodes
import ganeti.rpc.node as rpc
from ganeti import utils
from ganeti.masterd import iallocator
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
from ganeti.cmdlib.common import CheckParamsNotGlobal, \
MergeAndVerifyHvState, MergeAndVerifyDiskState, \
IsExclusiveStorageEnabledNode, CheckNodePVs, \
RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
FindFaultyInstanceDisks, CheckStorageTypeEnabled, GetClientCertDigest, \
AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
EnsureKvmdOnNodes, WarnAboutFailedSshUpdates, AddMasterCandidateSshKey
def _DecideSelfPromotion(lu, exceptions=None):
"""Decide whether I should promote myself as a master candidate.
"""
cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
# the new node will increase mc_max with one, so:
mc_should = min(mc_should + 1, cp_size)
return mc_now < mc_should
def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
"""Ensure that a node has the given secondary ip.
@type lu: L{LogicalUnit}
@param lu: the LU on behalf of which we make the check
@type node: L{objects.Node}
@param node: the node to check
@type secondary_ip: string
@param secondary_ip: the ip to check
@type prereq: boolean
@param prereq: whether to throw a prerequisite or an execute error
@raise errors.OpPrereqError: if the node doesn't have the ip,
and prereq=True
@raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
"""
# this can be called with a new node, which has no UUID yet, so perform the
# RPC call using its name
result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
result.Raise("Failure checking secondary ip on node %s" % node.name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
if not result.payload:
msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
" please fix and re-run this command" % secondary_ip)
if prereq:
raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
else:
raise errors.OpExecError(msg)
class LUNodeAdd(LogicalUnit):
"""Logical unit for adding node to the cluster.
"""
HPATH = "node-add"
HTYPE = constants.HTYPE_NODE
_NFLAGS = ["master_capable", "vm_capable"]
def CheckArguments(self):
self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
# validate/normalize the node name
self.hostname = netutils.GetHostname(name=self.op.node_name,
family=self.primary_ip_family)
self.op.node_name = self.hostname.name
if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
raise errors.OpPrereqError("Cannot readd the master node",
errors.ECODE_STATE)
if self.op.readd and self.op.group:
raise errors.OpPrereqError("Cannot pass a node group when a node is"
" being readded", errors.ECODE_INVAL)
def BuildHooksEnv(self):
"""Build hooks env.
This will run on all nodes before, and on all nodes + the new node after.
"""
return {
"OP_TARGET": self.op.node_name,
"NODE_NAME": self.op.node_name,
"NODE_PIP": self.op.primary_ip,
"NODE_SIP": self.op.secondary_ip,
"MASTER_CAPABLE": str(self.op.master_capable),
"VM_CAPABLE": str(self.op.vm_capable),
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
hook_nodes = self.cfg.GetNodeList()
new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
if new_node_info is not None:
# Exclude added node
hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
# add the new node as post hook node by name; it does not have an UUID yet
return (hook_nodes, hook_nodes)
def PreparePostHookNodes(self, post_hook_node_uuids):
return post_hook_node_uuids + [self.new_node.uuid]
def HooksAbortCallBack(self, phase, feedback_fn, exception):
"""Cleans up if the hooks fail.
This function runs actions that necessary to bring the cluster into a
clean state again. This is necessary if for example the hooks of this
operation failed and leave the node in an inconsistent state.
"""
if phase == constants.HOOKS_PHASE_PRE:
feedback_fn("Pre operation hook failed. Rolling back preparations.")
master_node = self.cfg.GetMasterNodeInfo().name
remove_result = self.rpc.call_node_ssh_key_remove_light(
[master_node],
self.op.node_name)
remove_result[master_node].Raise(
"Error removing SSH key of node '%s'." % self.op.node_name)
def CheckPrereq(self):
"""Check prerequisites.
This checks:
- the new node is not already in the config
- it is resolvable
- its parameters (single/dual homed) matches the cluster
Any errors are signaled by raising errors.OpPrereqError.
"""
node_name = self.hostname.name
self.op.primary_ip = self.hostname.ip
if self.op.secondary_ip is None:
if self.primary_ip_family == netutils.IP6Address.family:
raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
" IPv4 address must be given as secondary",
errors.ECODE_INVAL)
self.op.secondary_ip = self.op.primary_ip
secondary_ip = self.op.secondary_ip
if not netutils.IP4Address.IsValid(secondary_ip):
raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
" address" % secondary_ip, errors.ECODE_INVAL)
existing_node_info = self.cfg.GetNodeInfoByName(node_name)
if not self.op.readd and existing_node_info is not None:
raise errors.OpPrereqError("Node %s is already in the configuration" %
node_name, errors.ECODE_EXISTS)
elif self.op.readd and existing_node_info is None:
raise errors.OpPrereqError("Node %s is not in the configuration" %
node_name, errors.ECODE_NOENT)
self.changed_primary_ip = False
for existing_node in self.cfg.GetAllNodesInfo().values():
if self.op.readd and node_name == existing_node.name:
if existing_node.secondary_ip != secondary_ip:
raise errors.OpPrereqError("Readded node doesn't have the same IP"
" address configuration as before",
errors.ECODE_INVAL)
if existing_node.primary_ip != self.op.primary_ip:
self.changed_primary_ip = True
continue
if (existing_node.primary_ip == self.op.primary_ip or
existing_node.secondary_ip == self.op.primary_ip or
existing_node.primary_ip == secondary_ip or
existing_node.secondary_ip == secondary_ip):
raise errors.OpPrereqError("New node ip address(es) conflict with"
" existing node %s" % existing_node.name,
errors.ECODE_NOTUNIQUE)
# After this 'if' block, None is no longer a valid value for the
# _capable op attributes
if self.op.readd:
assert existing_node_info is not None, \
"Can't retrieve locked node %s" % node_name
for attr in self._NFLAGS:
if getattr(self.op, attr) is None:
setattr(self.op, attr, getattr(existing_node_info, attr))
else:
for attr in self._NFLAGS:
if getattr(self.op, attr) is None:
setattr(self.op, attr, True)
if self.op.readd and not self.op.vm_capable:
pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
if pri or sec:
raise errors.OpPrereqError("Node %s being re-added with vm_capable"
" flag set to false, but it already holds"
" instances" % node_name,
errors.ECODE_STATE)
# check that the type of the node (single versus dual homed) is the
# same as for the master
myself = self.cfg.GetMasterNodeInfo()
master_singlehomed = myself.secondary_ip == myself.primary_ip
newbie_singlehomed = secondary_ip == self.op.primary_ip
if master_singlehomed != newbie_singlehomed:
if master_singlehomed:
raise errors.OpPrereqError("The master has no secondary ip but the"
" new node has one",
errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("The master has a secondary ip but the"
" new node doesn't have one",
errors.ECODE_INVAL)
# checks reachability
if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("Node not reachable by ping",
errors.ECODE_ENVIRON)
if not newbie_singlehomed:
# check reachability from my secondary ip to newbie's secondary ip
if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
source=myself.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to node daemon port",
errors.ECODE_ENVIRON)
if self.op.readd:
exceptions = [existing_node_info.uuid]
else:
exceptions = []
if self.op.master_capable:
self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
else:
self.master_candidate = False
self.node_group = None
if self.op.readd:
self.new_node = existing_node_info
self.node_group = existing_node_info.group
else:
self.node_group = self.cfg.LookupNodeGroup(self.op.group)
self.new_node = objects.Node(name=node_name,
primary_ip=self.op.primary_ip,
secondary_ip=secondary_ip,
master_candidate=self.master_candidate,
offline=False, drained=False,
group=self.node_group, ndparams={})
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
"node", "cluster or group")
if self.op.hv_state:
self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
if self.op.disk_state:
self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
# TODO: If we need to have multiple DnsOnlyRunner we probably should make
# it a property on the base class.
rpcrunner = rpc.DnsOnlyRunner()
result = rpcrunner.call_version([node_name])[node_name]
result.Raise("Can't get version information from node %s" % node_name,
prereq=True, ecode=errors.ECODE_ENVIRON)
if constants.PROTOCOL_VERSION == result.payload:
logging.info("Communication to node %s fine, sw version %s match",
node_name, result.payload)
else:
raise errors.OpPrereqError("Version mismatch master version %s,"
" node version %s" %
(constants.PROTOCOL_VERSION, result.payload),
errors.ECODE_ENVIRON)
vg_name = self.cfg.GetVGName()
if vg_name is not None:
vparams = {constants.NV_PVLIST: [vg_name]}
excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
cname = self.cfg.GetClusterName()
result = rpcrunner.call_node_verify_light(
[node_name], vparams, cname,
self.cfg.GetClusterInfo().hvparams,
)[node_name]
(errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
if errmsgs:
raise errors.OpPrereqError("Checks on node PVs failed: %s" %
"; ".join(errmsgs), errors.ECODE_ENVIRON)
def _InitOpenVSwitch(self):
filled_ndparams = self.cfg.GetClusterInfo().FillND(
self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
ovs = filled_ndparams.get(constants.ND_OVS, None)
ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
if ovs:
if not ovs_link:
self.LogInfo("No physical interface for OpenvSwitch was given."
" OpenvSwitch will not have an outside connection. This"
" might not be what you want.")
result = self.rpc.call_node_configure_ovs(
self.new_node.name, ovs_name, ovs_link)
result.Raise("Failed to initialize OpenVSwitch on new node")
def _SshUpdate(self, new_node_uuid, new_node_name, is_master_candidate,
is_potential_master_candidate, rpcrunner, readd, feedback_fn):
"""Update the SSH setup of all nodes after adding a new node.
@type readd: boolean
@param readd: whether or not this node is readded
"""
potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
master_node = self.cfg.GetMasterNode()
if readd:
# clear previous keys
master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
remove_result = rpcrunner.call_node_ssh_key_remove(
[master_node],
new_node_uuid, new_node_name,
master_candidate_uuids,
potential_master_candidates,
True, # from authorized keys
True, # from public keys
False, # clear authorized keys
True, # clear public keys
True, # it's a readd
self.op.debug,
self.op.verbose)
remove_result[master_node].Raise(
"Could not remove SSH keys of node %s before readding,"
" (UUID: %s)." % (new_node_name, new_node_uuid))
WarnAboutFailedSshUpdates(remove_result, master_node, feedback_fn)
result = rpcrunner.call_node_ssh_key_add(
[master_node], new_node_uuid, new_node_name,
potential_master_candidates,
is_master_candidate, is_potential_master_candidate,
is_potential_master_candidate, self.op.debug, self.op.verbose)
result[master_node].Raise("Could not update the node's SSH setup.")
WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
"""
assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
"Not owning BGL"
# We adding a new node so we assume it's powered
self.new_node.powered = True
# for re-adds, reset the offline/drained/master-candidate flags;
# we need to reset here, otherwise offline would prevent RPC calls
# later in the procedure; this also means that if the re-add
# fails, we are left with a non-offlined, broken node
if self.op.readd:
self.new_node.offline = False
self.new_node.drained = False
self.LogInfo("Readding a node, the offline/drained flags were reset")
# if we demote the node, we do cleanup later in the procedure
self.new_node.master_candidate = self.master_candidate
if self.changed_primary_ip:
self.new_node.primary_ip = self.op.primary_ip
# copy the master/vm_capable flags
for attr in self._NFLAGS:
setattr(self.new_node, attr, getattr(self.op, attr))
# notify the user about any possible mc promotion
if self.new_node.master_candidate:
self.LogInfo("Node will be a master candidate")
if self.op.ndparams:
self.new_node.ndparams = self.op.ndparams
else:
self.new_node.ndparams = {}
if self.op.hv_state:
self.new_node.hv_state_static = self.new_hv_state
if self.op.disk_state:
self.new_node.disk_state_static = self.new_disk_state
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
master_node = self.cfg.GetMasterNode()
result = self.rpc.call_etc_hosts_modify(
master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
self.hostname.ip)
result.Raise("Can't update hosts file with new host data")
if self.new_node.secondary_ip != self.new_node.primary_ip:
_CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
False)
node_verifier_uuids = [self.cfg.GetMasterNode()]
node_verify_param = {
constants.NV_NODELIST: ([self.new_node.name], {}, []),
# TODO: do a node-net-test as well?
}
result = self.rpc.call_node_verify(
node_verifier_uuids, node_verify_param,
self.cfg.GetClusterName(),
self.cfg.GetClusterInfo().hvparams)
for verifier in node_verifier_uuids:
result[verifier].Raise("Cannot communicate with node %s" % verifier)
nl_payload = result[verifier].payload[constants.NV_NODELIST]
if nl_payload:
for failed in nl_payload:
feedback_fn("ssh/hostname verification failed"
" (checking from %s): %s" %
(verifier, nl_payload[failed]))
raise errors.OpExecError("ssh/hostname verification failed")
self._InitOpenVSwitch()
if self.op.readd:
RedistributeAncillaryFiles(self)
# make sure we redistribute the config
self.cfg.Update(self.new_node, feedback_fn)
# and make sure the new node will not have old files around
if not self.new_node.master_candidate:
result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
result.Warn("Node failed to demote itself from master candidate status",
self.LogWarning)
else:
self.cfg.AddNode(self.new_node, self.proc.GetECId())
RedistributeAncillaryFiles(self)
# We create a new certificate even if the node is readded
digest = GetClientCertDigest(self, self.new_node.uuid)
if self.new_node.master_candidate:
self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
else:
self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
# Update SSH setup of all nodes
if self.op.node_setup:
# FIXME: so far, all nodes are considered potential master candidates
self._SshUpdate(self.new_node.uuid, self.new_node.name,
self.new_node.master_candidate, True,
self.rpc, self.op.readd, feedback_fn)
class LUNodeSetParams(LogicalUnit):
"""Modifies the parameters of a node.
@cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
to the node role (as _ROLE_*)
@cvar _R2F: a dictionary from node role to tuples of flags
@cvar _FLAGS: a list of attribute names corresponding to the flags
"""
HPATH = "node-modify"
HTYPE = constants.HTYPE_NODE
REQ_BGL = False
(_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
_F2R = {
(True, False, False): _ROLE_CANDIDATE,
(False, True, False): _ROLE_DRAINED,
(False, False, True): _ROLE_OFFLINE,
(False, False, False): _ROLE_REGULAR,
}
_R2F = dict((v, k) for k, v in _F2R.items())
_FLAGS = ["master_candidate", "drained", "offline"]
def CheckArguments(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
self.op.master_capable, self.op.vm_capable,
self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
self.op.disk_state]
if all_mods.count(None) == len(all_mods):
raise errors.OpPrereqError("Please pass at least one modification",
errors.ECODE_INVAL)
if all_mods.count(True) > 1:
raise errors.OpPrereqError("Can't set the node into more than one"
" state at the same time",
errors.ECODE_INVAL)
# Boolean value that tells us whether we might be demoting from MC
self.might_demote = (self.op.master_candidate is False or
self.op.offline is True or
self.op.drained is True or
self.op.master_capable is False)
if self.op.secondary_ip:
if not netutils.IP4Address.IsValid(self.op.secondary_ip):
raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
" address" % self.op.secondary_ip,
errors.ECODE_INVAL)
self.lock_all = self.op.auto_promote and self.might_demote
self.lock_instances = self.op.secondary_ip is not None
def _InstanceFilter(self, instance):
"""Filter for getting affected instances.
"""
disks = self.cfg.GetInstanceDisks(instance.uuid)
any_mirrored = utils.AnyDiskOfType(disks, constants.DTS_INT_MIRROR)
return (any_mirrored and
self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
def ExpandNames(self):
if self.lock_all:
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
}
else:
self.needed_locks = {
locking.LEVEL_NODE: self.op.node_uuid,
}
# Since modifying a node can have severe effects on currently running
# operations the resource lock is at least acquired in shared mode
self.needed_locks[locking.LEVEL_NODE_RES] = \
self.needed_locks[locking.LEVEL_NODE]
# Get all locks except nodes in shared mode; they are not used for anything
# but read-only access
self.share_locks = ShareAll()
self.share_locks[locking.LEVEL_NODE] = 0
self.share_locks[locking.LEVEL_NODE_RES] = 0
if self.lock_instances:
self.needed_locks[locking.LEVEL_INSTANCE] = \
self.cfg.GetInstanceNames(
self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
def BuildHooksEnv(self):
"""Build hooks env.
This runs on the master node.
"""
return {
"OP_TARGET": self.op.node_name,
"MASTER_CANDIDATE": str(self.op.master_candidate),
"OFFLINE": str(self.op.offline),
"DRAINED": str(self.op.drained),
"MASTER_CAPABLE": str(self.op.master_capable),
"VM_CAPABLE": str(self.op.vm_capable),
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This only checks the instance list against the existing names.
"""
node = self.cfg.GetNodeInfo(self.op.node_uuid)
if self.lock_instances:
affected_instances = \
self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
# Verify instance locks
owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
wanted_instance_names = frozenset([inst.name for inst in
affected_instances.values()])
if wanted_instance_names - owned_instance_names:
raise errors.OpPrereqError("Instances affected by changing node %s's"
" secondary IP address have changed since"
" locks were acquired, wanted '%s', have"
" '%s'; retry the operation" %
(node.name,
utils.CommaJoin(wanted_instance_names),
utils.CommaJoin(owned_instance_names)),
errors.ECODE_STATE)
else:
affected_instances = None
if (self.op.master_candidate is not None or
self.op.drained is not None or
self.op.offline is not None):
# we can't change the master's node flags
if node.uuid == self.cfg.GetMasterNode():
raise errors.OpPrereqError("The master role can be changed"
" only via master-failover",
errors.ECODE_INVAL)
if self.op.master_candidate and not node.master_capable:
raise errors.OpPrereqError("Node %s is not master capable, cannot make"
" it a master candidate" % node.name,
errors.ECODE_STATE)
if self.op.vm_capable is False:
(ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
if ipri or isec:
raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
" the vm_capable flag" % node.name,
errors.ECODE_STATE)
if node.master_candidate and self.might_demote and not self.lock_all:
assert not self.op.auto_promote, "auto_promote set but lock_all not"
# check if after removing the current node, we're missing master
# candidates
(mc_remaining, mc_should, _) = \
self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
" pass auto promote option to allow"
" promotion (--auto-promote or RAPI"
" auto_promote=True)", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
self.old_role = old_role = self._F2R[old_flags]
# Check for ineffective changes
for attr in self._FLAGS:
if getattr(self.op, attr) is False and getattr(node, attr) is False:
self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
setattr(self.op, attr, None)
# Past this point, any flag change to False means a transition
# away from the respective state, as only real changes are kept
# TODO: We might query the real power state if it supports OOB
if SupportsOob(self.cfg, node):
if self.op.offline is False and not (node.powered or
self.op.powered is True):
raise errors.OpPrereqError(("Node %s needs to be turned on before its"
" offline status can be reset") %
self.op.node_name, errors.ECODE_STATE)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
" as it does not support out-of-band"
" handling") % self.op.node_name,
errors.ECODE_STATE)
# If we're being deofflined/drained, we'll MC ourself if needed
if (self.op.drained is False or self.op.offline is False or
(self.op.master_capable and not node.master_capable)):
if _DecideSelfPromotion(self):
self.op.master_candidate = True
self.LogInfo("Auto-promoting node to master candidate")
# If we're no longer master capable, we'll demote ourselves from MC
if self.op.master_capable is False and node.master_candidate:
if self.op.node_uuid == self.cfg.GetMasterNode():
raise errors.OpPrereqError("Master must remain master capable",
errors.ECODE_STATE)
self.LogInfo("Demoting from master candidate")
self.op.master_candidate = False
# Compute new role
assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
if self.op.master_candidate:
new_role = self._ROLE_CANDIDATE
elif self.op.drained:
new_role = self._ROLE_DRAINED
elif self.op.offline:
new_role = self._ROLE_OFFLINE
elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
# False is still in new flags, which means we're un-setting (the
# only) True flag
new_role = self._ROLE_REGULAR
else: # no new flags, nothing, keep old role
new_role = old_role
self.new_role = new_role
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
result = self.rpc.call_version([node.uuid])[node.uuid]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
(node.name, result.fail_msg),
errors.ECODE_STATE)
else:
self.LogWarning("Transitioning node from offline to online state"
" without using re-add. Please make sure the node"
" is healthy!")
# When changing the secondary ip, verify if this is a single-homed to
# multi-homed transition or vice versa, and apply the relevant
# restrictions.
if self.op.secondary_ip:
# Ok even without locking, because this can't be changed by any LU
master = self.cfg.GetMasterNodeInfo()
master_singlehomed = master.secondary_ip == master.primary_ip
if master_singlehomed and self.op.secondary_ip != node.primary_ip:
if self.op.force and node.uuid == master.uuid:
self.LogWarning("Transitioning from single-homed to multi-homed"
" cluster; all nodes will require a secondary IP"
" address")
else:
raise errors.OpPrereqError("Changing the secondary ip on a"
" single-homed cluster requires the"
" --force option to be passed, and the"
" target node to be the master",
errors.ECODE_INVAL)
elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
if self.op.force and node.uuid == master.uuid:
self.LogWarning("Transitioning from multi-homed to single-homed"
" cluster; secondary IP addresses will have to be"
" removed")
else:
raise errors.OpPrereqError("Cannot set the secondary IP to be the"
" same as the primary IP on a multi-homed"
" cluster, unless the --force option is"
" passed, and the target node is the"
" master", errors.ECODE_INVAL)
assert not (set([inst.name for inst in affected_instances.values()]) -
self.owned_locks(locking.LEVEL_INSTANCE))
if node.offline:
if affected_instances:
msg = ("Cannot change secondary IP address: offline node has"
" instances (%s) configured to use it" %
utils.CommaJoin(
[inst.name for inst in affected_instances.values()]))
raise errors.OpPrereqError(msg, errors.ECODE_STATE)
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
for instance in affected_instances.values():
CheckInstanceState(self, instance, INSTANCE_DOWN,
msg="cannot change secondary ip")
_CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
if master.uuid != node.uuid:
# check reachability from master secondary ip to new secondary ip
if not netutils.TcpPing(self.op.secondary_ip,
constants.DEFAULT_NODED_PORT,
source=master.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to node daemon port",
errors.ECODE_ENVIRON)
if self.op.ndparams:
new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
"node", "cluster or group")
self.new_ndparams = new_ndparams
if self.op.hv_state:
self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
node.hv_state_static)
if self.op.disk_state:
self.new_disk_state = \
MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
def Exec(self, feedback_fn):
"""Modifies a node.
"""
node = self.cfg.GetNodeInfo(self.op.node_uuid)
result = []
if self.op.ndparams:
node.ndparams = self.new_ndparams
if self.op.powered is not None:
node.powered = self.op.powered
if self.op.hv_state:
node.hv_state_static = self.new_hv_state
if self.op.disk_state:
node.disk_state_static = self.new_disk_state
for attr in ["master_capable", "vm_capable"]:
val = getattr(self.op, attr)
if val is not None:
setattr(node, attr, val)
result.append((attr, str(val)))
if self.op.secondary_ip:
node.secondary_ip = self.op.secondary_ip
result.append(("secondary_ip", self.op.secondary_ip))
# this will trigger configuration file update, if needed
self.cfg.Update(node, feedback_fn)
master_node = self.cfg.GetMasterNode()
potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
if self.new_role != self.old_role:
new_flags = self._R2F[self.new_role]
for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
if of != nf:
result.append((desc, str(nf)))
(node.master_candidate, node.drained, node.offline) = new_flags
self.cfg.Update(node, feedback_fn)
# Tell the node to demote itself, if no longer MC and not offline.
# This must be done only after the configuration is updated so that
# it's ensured the node won't receive any further configuration updates.
if self.old_role == self._ROLE_CANDIDATE and \
self.new_role != self._ROLE_OFFLINE:
msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
if msg:
self.LogWarning("Node failed to demote itself: %s", msg)
# we locked all nodes, we adjust the CP before updating this node
if self.lock_all:
AdjustCandidatePool(
self, [node.uuid], master_node, potential_master_candidates,
feedback_fn, modify_ssh_setup)
# if node gets promoted, grant RPC priviledges
if self.new_role == self._ROLE_CANDIDATE:
AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
# if node is demoted, revoke RPC priviledges
if self.old_role == self._ROLE_CANDIDATE:
RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
# this will trigger job queue propagation or cleanup if the mc
# flag changed
if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
if modify_ssh_setup:
if self.old_role == self._ROLE_CANDIDATE:
master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
ssh_result = self.rpc.call_node_ssh_key_remove(
[master_node],
node.uuid, node.name,
master_candidate_uuids, potential_master_candidates,
True, # remove node's key from all nodes' authorized_keys file
False, # currently, all nodes are potential master candidates
False, # do not clear node's 'authorized_keys'
False, # do not clear node's 'ganeti_pub_keys'
False, # no readd
self.op.debug,
self.op.verbose)
ssh_result[master_node].Raise(
"Could not adjust the SSH setup after demoting node '%s'"
" (UUID: %s)." % (node.name, node.uuid))
WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
if self.new_role == self._ROLE_CANDIDATE:
AddMasterCandidateSshKey(
self, master_node, node, potential_master_candidates, feedback_fn)
return result
class LUNodePowercycle(NoHooksLU):
"""Powercycles a node.
"""
REQ_BGL = False
def CheckArguments(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
raise errors.OpPrereqError("The node is the master and the force"
" parameter was not set",
errors.ECODE_INVAL)
def ExpandNames(self):
"""Locking for PowercycleNode.
This is a last-resort option and shouldn't block on other
jobs. Therefore, we grab no locks.
"""
self.needed_locks = {}
def Exec(self, feedback_fn):
"""Reboots a node.
"""
default_hypervisor = self.cfg.GetHypervisorType()
hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
result = self.rpc.call_node_powercycle(self.op.node_uuid,
default_hypervisor,
hvparams)
result.Raise("Failed to schedule the reboot")
return result.payload
def _GetNodeInstancesInner(cfg, fn):
return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
def _GetNodePrimaryInstances(cfg, node_uuid):
"""Returns primary instances on a node.
"""
return _GetNodeInstancesInner(cfg,
lambda inst: node_uuid == inst.primary_node)
def _GetNodeSecondaryInstances(cfg, node_uuid):
"""Returns secondary instances on a node.
"""
return _GetNodeInstancesInner(cfg,
lambda inst: node_uuid in
cfg.GetInstanceSecondaryNodes(inst.uuid))
def _GetNodeInstances(cfg, node_uuid):
"""Returns a list of all primary and secondary instances on a node.
"""
return _GetNodeInstancesInner(cfg,
lambda inst: node_uuid in
cfg.GetInstanceNodes(inst.uuid))
class LUNodeEvacuate(NoHooksLU):
"""Evacuates instances off a list of nodes.
"""
REQ_BGL = False
def CheckArguments(self):
CheckIAllocatorOrNode(self, "iallocator", "remote_node")
def ExpandNames(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
if self.op.remote_node is not None:
(self.op.remote_node_uuid, self.op.remote_node) = \
ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
self.op.remote_node)
assert self.op.remote_node
if self.op.node_uuid == self.op.remote_node_uuid:
raise errors.OpPrereqError("Can not use evacuated node as a new"
" secondary node", errors.ECODE_INVAL)
if self.op.mode != constants.NODE_EVAC_SEC:
raise errors.OpPrereqError("Without the use of an iallocator only"
" secondary instances can be evacuated",
errors.ECODE_INVAL)
# Declare locks
self.share_locks = ShareAll()
self.needed_locks = {
locking.LEVEL_INSTANCE: [],
locking.LEVEL_NODEGROUP: [],
locking.LEVEL_NODE: [],
}
# Determine nodes (via group) optimistically, needs verification once locks
# have been acquired
self.lock_nodes = self._DetermineNodes()
def _DetermineNodes(self):
"""Gets the list of node UUIDs to operate on.
"""
if self.op.remote_node is None:
# Iallocator will choose any node(s) in the same group
group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
else:
group_nodes = frozenset([self.op.remote_node_uuid])
# Determine nodes to be locked
return set([self.op.node_uuid]) | group_nodes
def _DetermineInstances(self):
"""Builds list of instances to operate on.
"""
assert self.op.mode in constants.NODE_EVAC_MODES
if self.op.mode == constants.NODE_EVAC_PRI:
# Primary instances only
inst_fn = _GetNodePrimaryInstances
assert self.op.remote_node is None, \
"Evacuating primary instances requires iallocator"
elif self.op.mode == constants.NODE_EVAC_SEC:
# Secondary instances only
inst_fn = _GetNodeSecondaryInstances
else:
# All instances
assert self.op.mode == constants.NODE_EVAC_ALL
inst_fn = _GetNodeInstances
# TODO: In 2.6, change the iallocator interface to take an evacuation mode
# per instance
raise errors.OpPrereqError("Due to an issue with the iallocator"
" interface it is not possible to evacuate"
" all instances at once; specify explicitly"
" whether to evacuate primary or secondary"
" instances",
errors.ECODE_INVAL)
return inst_fn(self.cfg, self.op.node_uuid)
def DeclareLocks(self, level):
if level == locking.LEVEL_INSTANCE:
# Lock instances optimistically, needs verification once node and group
# locks have been acquired
self.needed_locks[locking.LEVEL_INSTANCE] = \
set(i.name for i in self._DetermineInstances())
elif level == locking.LEVEL_NODEGROUP:
# Lock node groups for all potential target nodes optimistically, needs
# verification once nodes have been acquired
self.needed_locks[locking.LEVEL_NODEGROUP] = \
self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
elif level == locking.LEVEL_NODE:
self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
def CheckPrereq(self):
# Verify locks
owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
owned_nodes = self.owned_locks(locking.LEVEL_NODE)
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
need_nodes = self._DetermineNodes()
if not owned_nodes.issuperset(need_nodes):
raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
" locks were acquired, current nodes are"
" are '%s', used to be '%s'; retry the"
" operation" %
(self.op.node_name,
utils.CommaJoin(need_nodes),
utils.CommaJoin(owned_nodes)),
errors.ECODE_STATE)
wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
if owned_groups != wanted_groups:
raise errors.OpExecError("Node groups changed since locks were acquired,"
" current groups are '%s', used to be '%s';"
" retry the operation" %
(utils.CommaJoin(wanted_groups),
utils.CommaJoin(owned_groups)))
# Determine affected instances
self.instances = self._DetermineInstances()
self.instance_names = [i.name for i in self.instances]
if set(self.instance_names) != owned_instance_names:
raise errors.OpExecError("Instances on node '%s' changed since locks"
" were acquired, current instances are '%s',"
" used to be '%s'; retry the operation" %
(self.op.node_name,
utils.CommaJoin(self.instance_names),
utils.CommaJoin(owned_instance_names)))
if self.instance_names:
self.LogInfo("Evacuating instances from node '%s': %s",
self.op.node_name,
utils.CommaJoin(utils.NiceSort(self.instance_names)))
else:
self.LogInfo("No instances to evacuate from node '%s'",
self.op.node_name)
if self.op.remote_node is not None:
for i in self.instances:
if i.primary_node == self.op.remote_node_uuid:
raise errors.OpPrereqError("Node %s is the primary node of"
" instance %s, cannot use it as"
" secondary" %
(self.op.remote_node, i.name),
errors.ECODE_INVAL)
def Exec(self, feedback_fn):
assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
if not self.instance_names:
# No instances to evacuate
jobs = []
elif self.op.iallocator is not None:
# TODO: Implement relocation to other group
req = iallocator.IAReqNodeEvac(
evac_mode=self.op.mode, instances=list(self.instance_names),
ignore_soft_errors=self.op.ignore_soft_errors)
ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
if not ial.success:
raise errors.OpPrereqError("Can't compute node evacuation using"
" iallocator '%s': %s" %
(self.op.iallocator, ial.info),
errors.ECODE_NORES)
jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
elif self.op.remote_node is not None:
assert self.op.mode == constants.NODE_EVAC_SEC
jobs = [
[opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
remote_node=self.op.remote_node,
disks=[],
mode=constants.REPLACE_DISK_CHG,
early_release=self.op.early_release)]
for instance_name in self.instance_names]
else:
raise errors.ProgrammerError("No iallocator or remote node")
return ResultWithJobs(jobs)
class LUNodeMigrate(LogicalUnit):
"""Migrate all instances from a node.
"""
HPATH = "node-migrate"
HTYPE = constants.HTYPE_NODE
REQ_BGL = False
def CheckArguments(self):
pass
def ExpandNames(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
self.share_locks = ShareAll()
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_uuid],
}
def BuildHooksEnv(self):
"""Build hooks env.
This runs on the master, the primary and all the secondaries.
"""
return {
"NODE_NAME": self.op.node_name,
"ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
}
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
nl = [self.cfg.GetMasterNode()]
return (nl, nl)
def CheckPrereq(self):
pass
def Exec(self, feedback_fn):
# Prepare jobs for migration instances
jobs = [
[opcodes.OpInstanceMigrate(
instance_name=inst.name,
mode=self.op.mode,
live=self.op.live,
iallocator=self.op.iallocator,
target_node=self.op.target_node,
allow_runtime_changes=self.op.allow_runtime_changes,
ignore_ipolicy=self.op.ignore_ipolicy)]
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
# TODO: Run iallocator in this opcode and pass correct placement options to
# OpInstanceMigrate. Since other jobs can modify the cluster between
# running the iallocator and the actual migration, a good consistency model
# will have to be found.
assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
frozenset([self.op.node_uuid]))
return ResultWithJobs(jobs)
def _GetStorageTypeArgs(cfg, storage_type):
"""Returns the arguments for a storage type.
"""
# Special case for file storage
if storage_type == constants.ST_FILE:
return [[cfg.GetFileStorageDir()]]
elif storage_type == constants.ST_SHARED_FILE:
return [[cfg.GetSharedFileStorageDir()]]
elif storage_type == constants.ST_GLUSTER:
return [[cfg.GetGlusterStorageDir()]]
else:
return []
class LUNodeModifyStorage(NoHooksLU):
"""Logical unit for modifying a storage volume on a node.
"""
REQ_BGL = False
def CheckArguments(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
storage_type = self.op.storage_type
try:
modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
except KeyError:
raise errors.OpPrereqError("Storage units of type '%s' can not be"
" modified" % storage_type,
errors.ECODE_INVAL)
diff = set(self.op.changes.keys()) - modifiable
if diff:
raise errors.OpPrereqError("The following fields can not be modified for"
" storage units of type '%s': %r" %
(storage_type, list(diff)),
errors.ECODE_INVAL)
def CheckPrereq(self):
"""Check prerequisites.
"""
CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: self.op.node_uuid,
}
def Exec(self, feedback_fn):
"""Computes the list of nodes and their attributes.
"""
st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
result = self.rpc.call_storage_modify(self.op.node_uuid,
self.op.storage_type, st_args,
self.op.name, self.op.changes)
result.Raise("Failed to modify storage unit '%s' on %s" %
(self.op.name, self.op.node_name))
def _CheckOutputFields(fields, selected):
"""Checks whether all selected fields are valid according to fields.
@type fields: L{utils.FieldSet}
@param fields: fields set
@type selected: L{utils.FieldSet}
@param selected: fields set
"""
delta = fields.NonMatching(selected)
if delta:
raise errors.OpPrereqError("Unknown output fields selected: %s"
% ",".join(delta), errors.ECODE_INVAL)
class LUNodeQueryvols(NoHooksLU):
"""Logical unit for getting volumes on node(s).
"""
REQ_BGL = False
def CheckArguments(self):
_CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
constants.VF_VG, constants.VF_NAME,
constants.VF_SIZE, constants.VF_INSTANCE),
self.op.output_fields)
def ExpandNames(self):
self.share_locks = ShareAll()
if self.op.nodes:
self.needed_locks = {
locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
}
else:
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
}
def Exec(self, feedback_fn):
"""Computes the list of nodes and their attributes.
"""
node_uuids = self.owned_locks(locking.LEVEL_NODE)
volumes = self.rpc.call_node_volumes(node_uuids)
ilist = self.cfg.GetAllInstancesInfo()
vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values())
output = []
for node_uuid in node_uuids:
nresult = volumes[node_uuid]
if nresult.offline:
continue
msg = nresult.fail_msg
if msg:
self.LogWarning("Can't compute volume data on node %s: %s",
self.cfg.GetNodeName(node_uuid), msg)
continue
node_vols = sorted(nresult.payload,
key=operator.itemgetter(constants.VF_DEV))
for vol in node_vols:
node_output = []
for field in self.op.output_fields:
if field == constants.VF_NODE:
val = self.cfg.GetNodeName(node_uuid)
elif field == constants.VF_PHYS:
val = vol[constants.VF_DEV]
elif field == constants.VF_VG:
val = vol[constants.VF_VG]
elif field == constants.VF_NAME:
val = vol[constants.VF_NAME]
elif field == constants.VF_SIZE:
val = int(float(vol[constants.VF_SIZE]))
elif field == constants.VF_INSTANCE:
inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
vol[constants.VF_NAME]), None)
if inst is not None:
val = inst.name
else:
val = "-"
else:
raise errors.ParameterError(field)
node_output.append(str(val))
output.append(node_output)
return output
class LUNodeQueryStorage(NoHooksLU):
"""Logical unit for getting information on storage units on node(s).
"""
REQ_BGL = False
def CheckArguments(self):
_CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
self.op.output_fields)
def ExpandNames(self):
self.share_locks = ShareAll()
if self.op.nodes:
self.needed_locks = {
locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
}
else:
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
}
def _DetermineStorageType(self):
"""Determines the default storage type of the cluster.
"""
enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
default_storage_type = \
constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
return default_storage_type
def CheckPrereq(self):
"""Check prerequisites.
"""
if self.op.storage_type:
CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
self.storage_type = self.op.storage_type
else:
self.storage_type = self._DetermineStorageType()
supported_storage_types = constants.STS_REPORT_NODE_STORAGE
if self.storage_type not in supported_storage_types:
raise errors.OpPrereqError(
"Storage reporting for storage type '%s' is not supported. Please"
" use the --storage-type option to specify one of the supported"
" storage types (%s) or set the default disk template to one that"
" supports storage reporting." %
(self.storage_type, utils.CommaJoin(supported_storage_types)))
def Exec(self, feedback_fn):
"""Computes the list of nodes and their attributes.
"""
if self.op.storage_type:
self.storage_type = self.op.storage_type
else:
self.storage_type = self._DetermineStorageType()
self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
# Always get name to sort by
if constants.SF_NAME in self.op.output_fields:
fields = self.op.output_fields[:]
else:
fields = [constants.SF_NAME] + self.op.output_fields
# Never ask for node or type as it's only known to the LU
for extra in [constants.SF_NODE, constants.SF_TYPE]:
while extra in fields:
fields.remove(extra)
field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
name_idx = field_idx[constants.SF_NAME]
st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
data = self.rpc.call_storage_list(self.node_uuids,
self.storage_type, st_args,
self.op.name, fields)
result = []
for node_uuid in utils.NiceSort(self.node_uuids):
node_name = self.cfg.GetNodeName(node_uuid)
nresult = data[node_uuid]
if nresult.offline:
continue
msg = nresult.fail_msg
if msg:
self.LogWarning("Can't get storage data from node %s: %s",
node_name, msg)
continue
rows = dict([(row[name_idx], row) for row in nresult.payload])
for name in utils.NiceSort(rows.keys()):
row = rows[name]
out = []
for field in self.op.output_fields:
if field == constants.SF_NODE:
val = node_name
elif field == constants.SF_TYPE:
val = self.storage_type
elif field in field_idx:
val = row[field_idx[field]]
else:
raise errors.ParameterError(field)
out.append(val)
result.append(out)
return result
class LUNodeRemove(LogicalUnit):
"""Logical unit for removing a node.
"""
HPATH = "node-remove"
HTYPE = constants.HTYPE_NODE
def BuildHooksEnv(self):
"""Build hooks env.
"""
return {
"OP_TARGET": self.op.node_name,
"NODE_NAME": self.op.node_name,
}
def BuildHooksNodes(self):
"""Build hooks nodes.
This doesn't run on the target node in the pre phase as a failed
node would then be impossible to remove.
"""
all_nodes = self.cfg.GetNodeList()
try:
all_nodes.remove(self.op.node_uuid)
except ValueError:
pass
return (all_nodes, all_nodes)
def CheckPrereq(self):
"""Check prerequisites.
This checks:
- the node exists in the configuration
- it does not have primary or secondary instances
- it's not the master
Any errors are signaled by raising errors.OpPrereqError.
"""
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
node = self.cfg.GetNodeInfo(self.op.node_uuid)
assert node is not None
masternode = self.cfg.GetMasterNode()
if node.uuid == masternode:
raise errors.OpPrereqError("Node is the master node, failover to another"
" node is required", errors.ECODE_INVAL)
for _, instance in self.cfg.GetAllInstancesInfo().items():
if node.uuid in self.cfg.GetInstanceNodes(instance.uuid):
raise errors.OpPrereqError("Instance %s is still running on the node,"
" please remove first" % instance.name,
errors.ECODE_INVAL)
self.op.node_name = node.name
self.node = node
def Exec(self, feedback_fn):
"""Removes the node from the cluster.
"""
logging.info("Stopping the node daemon and removing configs from node %s",
self.node.name)
modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
"Not owning BGL"
master_node = self.cfg.GetMasterNode()
potential_master_candidates = self.cfg.GetPotentialMasterCandidates()
if modify_ssh_setup:
# retrieve the list of potential master candidates before the node is
# removed
potential_master_candidate = \
self.op.node_name in potential_master_candidates
master_candidate_uuids = self.cfg.GetMasterCandidateUuids()
result = self.rpc.call_node_ssh_key_remove(
[master_node],
self.node.uuid, self.op.node_name,
master_candidate_uuids, potential_master_candidates,
self.node.master_candidate, # from_authorized_keys
potential_master_candidate, # from_public_keys
True, # clear node's 'authorized_keys'
True, # clear node's 'ganeti_public_keys'
False, # no readd
self.op.debug,
self.op.verbose)
result[master_node].Raise(
"Could not remove the SSH key of node '%s' (UUID: %s)." %
(self.op.node_name, self.node.uuid))
WarnAboutFailedSshUpdates(result, master_node, feedback_fn)
# Promote nodes to master candidate as needed
AdjustCandidatePool(
self, [self.node.uuid], master_node, potential_master_candidates,
feedback_fn, modify_ssh_setup)
self.cfg.RemoveNode(self.node.uuid)
# Run post hooks on the node before it's removed
RunPostHook(self, self.node.name)
# we have to call this by name rather than by UUID, as the node is no longer
# in the config
result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
msg = result.fail_msg
if msg:
self.LogWarning("Errors encountered on the remote node while leaving"
" the cluster: %s", msg)
cluster = self.cfg.GetClusterInfo()
# Remove node from candidate certificate list
if self.node.master_candidate:
self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
# Remove node from our /etc/hosts
if cluster.modify_etc_hosts:
master_node_uuid = self.cfg.GetMasterNode()
result = self.rpc.call_etc_hosts_modify(master_node_uuid,
constants.ETC_HOSTS_REMOVE,
self.node.name, None)
result.Raise("Can't update hosts file with new host data")
RedistributeAncillaryFiles(self)
class LURepairNodeStorage(NoHooksLU):
"""Repairs the volume group on a node.
"""
REQ_BGL = False
def CheckArguments(self):
(self.op.node_uuid, self.op.node_name) = \
ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
storage_type = self.op.storage_type
if (constants.SO_FIX_CONSISTENCY not in
constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
raise errors.OpPrereqError("Storage units of type '%s' can not be"
" repaired" % storage_type,
errors.ECODE_INVAL)
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_uuid],
}
def _CheckFaultyDisks(self, instance, node_uuid):
"""Ensure faulty disks abort the opcode or at least warn."""
try:
if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
node_uuid, True):
raise errors.OpPrereqError("Instance '%s' has faulty disks on"
" node '%s'" %
(instance.name,
self.cfg.GetNodeName(node_uuid)),
errors.ECODE_STATE)
except errors.OpPrereqError, err:
if self.op.ignore_consistency:
self.LogWarning(str(err.args[0]))
else:
raise
def CheckPrereq(self):
"""Check prerequisites.
"""
CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
# Check whether any instance on this node has faulty disks
for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
if not inst.disks_active:
continue
check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
check_nodes.discard(self.op.node_uuid)
for inst_node_uuid in check_nodes:
self._CheckFaultyDisks(inst, inst_node_uuid)
def Exec(self, feedback_fn):
feedback_fn("Repairing storage unit '%s' on %s ..." %
(self.op.name, self.op.node_name))
st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
result = self.rpc.call_storage_execute(self.op.node_uuid,
self.op.storage_type, st_args,
self.op.name,
constants.SO_FIX_CONSISTENCY)
result.Raise("Failed to repair storage unit '%s' on %s" %
(self.op.name, self.op.node_name))