blob: 9ce9ba8115d7cb322df09d02ef9fa660956a1df0 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Logical units dealing with storage of instances."""
import itertools
import logging
import os
import time
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import ht
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import objects
from ganeti import utils
from ganeti import rpc
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
CheckDiskTemplateEnabled
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
import ganeti.masterd.instance
_DISK_TEMPLATE_NAME_PREFIX = {
constants.DT_PLAIN: "",
constants.DT_RBD: ".rbd",
constants.DT_EXT: ".ext",
constants.DT_FILE: ".file",
constants.DT_SHARED_FILE: ".sharedfile",
}
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
excl_stor):
"""Create a single block device on a given node.
This will not recurse over children of the device, so they must be
created in advance.
@param lu: the lu on whose behalf we execute
@param node_uuid: the node on which to create the device
@type instance: L{objects.Instance}
@param instance: the instance which owns the device
@type device: L{objects.Disk}
@param device: the device to create
@param info: the extra 'metadata' we should attach to the device
(this will be represented as a LVM tag)
@type force_open: boolean
@param force_open: this parameter will be passes to the
L{backend.BlockdevCreate} function where it specifies
whether we run on primary or not, and it affects both
the child assembly and the device own Open() execution
@type excl_stor: boolean
@param excl_stor: Whether exclusive_storage is active for the node
"""
lu.cfg.SetDiskID(device, node_uuid)
result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
instance.name, force_open, info,
excl_stor)
result.Raise("Can't create block device %s on"
" node %s for instance %s" % (device,
lu.cfg.GetNodeName(node_uuid),
instance.name))
if device.physical_id is None:
device.physical_id = result.payload
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
info, force_open, excl_stor):
"""Create a tree of block devices on a given node.
If this device type has to be created on secondaries, create it and
all its children.
If not, just recurse to children keeping the same 'force' value.
@attention: The device has to be annotated already.
@param lu: the lu on whose behalf we execute
@param node_uuid: the node on which to create the device
@type instance: L{objects.Instance}
@param instance: the instance which owns the device
@type device: L{objects.Disk}
@param device: the device to create
@type force_create: boolean
@param force_create: whether to force creation of this device; this
will be change to True whenever we find a device which has
CreateOnSecondary() attribute
@param info: the extra 'metadata' we should attach to the device
(this will be represented as a LVM tag)
@type force_open: boolean
@param force_open: this parameter will be passes to the
L{backend.BlockdevCreate} function where it specifies
whether we run on primary or not, and it affects both
the child assembly and the device own Open() execution
@type excl_stor: boolean
@param excl_stor: Whether exclusive_storage is active for the node
@return: list of created devices
"""
created_devices = []
try:
if device.CreateOnSecondary():
force_create = True
if device.children:
for child in device.children:
devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
force_create, info, force_open, excl_stor)
created_devices.extend(devs)
if not force_create:
return created_devices
CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
excl_stor)
# The device has been completely created, so there is no point in keeping
# its subdevices in the list. We just add the device itself instead.
created_devices = [(node_uuid, device)]
return created_devices
except errors.DeviceCreationError, e:
e.created_devices.extend(created_devices)
raise e
except errors.OpExecError, e:
raise errors.DeviceCreationError(str(e), created_devices)
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
"""Whether exclusive_storage is in effect for the given node.
@type cfg: L{config.ConfigWriter}
@param cfg: The cluster configuration
@type node_uuid: string
@param node_uuid: The node UUID
@rtype: bool
@return: The effective value of exclusive_storage
@raise errors.OpPrereqError: if no node exists with the given name
"""
ni = cfg.GetNodeInfo(node_uuid)
if ni is None:
raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
errors.ECODE_NOENT)
return IsExclusiveStorageEnabledNode(cfg, ni)
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
force_open):
"""Wrapper around L{_CreateBlockDevInner}.
This method annotates the root device first.
"""
(disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
force_open, excl_stor)
def _UndoCreateDisks(lu, disks_created):
"""Undo the work performed by L{CreateDisks}.
This function is called in case of an error to undo the work of
L{CreateDisks}.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@param disks_created: the result returned by L{CreateDisks}
"""
for (node_uuid, disk) in disks_created:
lu.cfg.SetDiskID(disk, node_uuid)
result = lu.rpc.call_blockdev_remove(node_uuid, disk)
result.Warn("Failed to remove newly-created disk %s on node %s" %
(disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
"""Create all disks for an instance.
This abstracts away some work from AddInstance.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should create
@type to_skip: list
@param to_skip: list of indices to skip
@type target_node_uuid: string
@param target_node_uuid: if passed, overrides the target node for creation
@type disks: list of {objects.Disk}
@param disks: the disks to create; if not specified, all the disks of the
instance are created
@return: information about the created disks, to be used to call
L{_UndoCreateDisks}
@raise errors.OpPrereqError: in case of error
"""
info = GetInstanceInfoText(instance)
if target_node_uuid is None:
pnode_uuid = instance.primary_node
all_node_uuids = instance.all_nodes
else:
pnode_uuid = target_node_uuid
all_node_uuids = [pnode_uuid]
if disks is None:
disks = instance.disks
CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
if instance.disk_template in constants.DTS_FILEBASED:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
result.Raise("Failed to create directory '%s' on"
" node %s" % (file_storage_dir,
lu.cfg.GetNodeName(pnode_uuid)))
disks_created = []
for idx, device in enumerate(disks):
if to_skip and idx in to_skip:
continue
logging.info("Creating disk %s for instance '%s'", idx, instance.name)
for node_uuid in all_node_uuids:
f_create = node_uuid == pnode_uuid
try:
_CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
f_create)
disks_created.append((node_uuid, device))
except errors.DeviceCreationError, e:
logging.warning("Creating disk %s for instance '%s' failed",
idx, instance.name)
disks_created.extend(e.created_devices)
_UndoCreateDisks(lu, disks_created)
raise errors.OpExecError(e.message)
return disks_created
def ComputeDiskSizePerVG(disk_template, disks):
"""Compute disk size requirements in the volume group
"""
def _compute(disks, payload):
"""Universal algorithm.
"""
vgs = {}
for disk in disks:
vgs[disk[constants.IDISK_VG]] = \
vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
return vgs
# Required free disk space as a function of disk and swap space
req_size_dict = {
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
if disk_template not in req_size_dict:
raise errors.ProgrammerError("Disk template '%s' size requirement"
" is unknown" % disk_template)
return req_size_dict[disk_template]
def ComputeDisks(op, default_vg):
"""Computes the instance disks.
@param op: The instance opcode
@param default_vg: The default_vg to assume
@return: The computed disks
"""
disks = []
for disk in op.disks:
mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
if mode not in constants.DISK_ACCESS_SET:
raise errors.OpPrereqError("Invalid disk access mode '%s'" %
mode, errors.ECODE_INVAL)
size = disk.get(constants.IDISK_SIZE, None)
if size is None:
raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
try:
size = int(size)
except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
ext_provider = disk.get(constants.IDISK_PROVIDER, None)
if ext_provider and op.disk_template != constants.DT_EXT:
raise errors.OpPrereqError("The '%s' option is only valid for the %s"
" disk template, not %s" %
(constants.IDISK_PROVIDER, constants.DT_EXT,
op.disk_template), errors.ECODE_INVAL)
data_vg = disk.get(constants.IDISK_VG, default_vg)
name = disk.get(constants.IDISK_NAME, None)
if name is not None and name.lower() == constants.VALUE_NONE:
name = None
new_disk = {
constants.IDISK_SIZE: size,
constants.IDISK_MODE: mode,
constants.IDISK_VG: data_vg,
constants.IDISK_NAME: name,
}
for key in [
constants.IDISK_METAVG,
constants.IDISK_ADOPT,
constants.IDISK_SPINDLES,
]:
if key in disk:
new_disk[key] = disk[key]
# For extstorage, demand the `provider' option and add any
# additional parameters (ext-params) to the dict
if op.disk_template == constants.DT_EXT:
if ext_provider:
new_disk[constants.IDISK_PROVIDER] = ext_provider
for key in disk:
if key not in constants.IDISK_PARAMS:
new_disk[key] = disk[key]
else:
raise errors.OpPrereqError("Missing provider for template '%s'" %
constants.DT_EXT, errors.ECODE_INVAL)
disks.append(new_disk)
return disks
def CheckRADOSFreeSpace():
"""Compute disk size requirements inside the RADOS cluster.
"""
# For the RADOS cluster we assume there is always enough space.
pass
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
assert len(vgnames) == len(names) == 2
port = lu.cfg.AllocatePort()
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
logical_id=(vgnames[0], names[0]),
params={})
dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
size=constants.DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]),
params={})
dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
logical_id=(primary_uuid, secondary_uuid, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
iv_name=iv_name, params={})
drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
return drbd_dev
def GenerateDiskTemplate(
lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
disk_info, file_storage_dir, file_driver, base_index,
feedback_fn, full_disk_params):
"""Generate the entire disk layout for a given template type.
"""
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
if template_name == constants.DT_DISKLESS:
pass
elif template_name == constants.DT_DRBD8:
if len(secondary_node_uuids) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node_uuid = secondary_node_uuids[0]
minors = lu.cfg.AllocateDRBDMinor(
[primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
(drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
full_disk_params)
drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
names = []
for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_data")
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
data_vg = disk.get(constants.IDISK_VG, vgname)
meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
disk[constants.IDISK_SIZE],
[data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
minors[idx * 2], minors[idx * 2 + 1])
disk_dev.mode = disk[constants.IDISK_MODE]
disk_dev.name = disk.get(constants.IDISK_NAME, None)
disks.append(disk_dev)
else:
if secondary_node_uuids:
raise errors.ProgrammerError("Wrong template configuration")
name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
if name_prefix is None:
names = None
else:
names = _GenerateUniqueNames(lu, ["%s.disk%s" %
(name_prefix, base_index + i)
for i in range(disk_count)])
if template_name == constants.DT_PLAIN:
def logical_id_fn(idx, _, disk):
vg = disk.get(constants.IDISK_VG, vgname)
return (vg, names[idx])
elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
logical_id_fn = \
lambda _, disk_index, disk: (file_driver,
"%s/%s" % (file_storage_dir,
names[idx]))
elif template_name == constants.DT_BLOCK:
logical_id_fn = \
lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
disk[constants.IDISK_ADOPT])
elif template_name == constants.DT_RBD:
logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
elif template_name == constants.DT_EXT:
def logical_id_fn(idx, _, disk):
provider = disk.get(constants.IDISK_PROVIDER, None)
if provider is None:
raise errors.ProgrammerError("Disk template is %s, but '%s' is"
" not found", constants.DT_EXT,
constants.IDISK_PROVIDER)
return (provider, names[idx])
else:
raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
dev_type = template_name
for idx, disk in enumerate(disk_info):
params = {}
# Only for the Ext template add disk_info to params
if template_name == constants.DT_EXT:
params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
for key in disk:
if key not in constants.IDISK_PARAMS:
params[key] = disk[key]
disk_index = idx + base_index
size = disk[constants.IDISK_SIZE]
feedback_fn("* disk %s, size %s" %
(disk_index, utils.FormatUnit(size, "h")))
disk_dev = objects.Disk(dev_type=dev_type, size=size,
logical_id=logical_id_fn(idx, disk_index, disk),
iv_name="disk/%d" % disk_index,
mode=disk[constants.IDISK_MODE],
params=params,
spindles=disk.get(constants.IDISK_SPINDLES))
disk_dev.name = disk.get(constants.IDISK_NAME, None)
disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
disks.append(disk_dev)
return disks
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
"""Check the presence of the spindle options with exclusive_storage.
@type diskdict: dict
@param diskdict: disk parameters
@type es_flag: bool
@param es_flag: the effective value of the exlusive_storage flag
@type required: bool
@param required: whether spindles are required or just optional
@raise errors.OpPrereqError when spindles are given and they should not
"""
if (not es_flag and constants.IDISK_SPINDLES in diskdict and
diskdict[constants.IDISK_SPINDLES] is not None):
raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
" when exclusive storage is not active",
errors.ECODE_INVAL)
if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
diskdict[constants.IDISK_SPINDLES] is None)):
raise errors.OpPrereqError("You must specify spindles in instance disks"
" when exclusive storage is active",
errors.ECODE_INVAL)
class LUInstanceRecreateDisks(LogicalUnit):
"""Recreate an instance's missing disks.
"""
HPATH = "instance-recreate-disks"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
_MODIFYABLE = compat.UniqueFrozenset([
constants.IDISK_SIZE,
constants.IDISK_MODE,
constants.IDISK_SPINDLES,
])
# New or changed disk parameters may have different semantics
assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
constants.IDISK_ADOPT,
# TODO: Implement support changing VG while recreating
constants.IDISK_VG,
constants.IDISK_METAVG,
constants.IDISK_PROVIDER,
constants.IDISK_NAME,
]))
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
# FIXME
# The allocator should actually run in "relocate" mode, but current
# allocators don't support relocating all the nodes of an instance at
# the same time. As a workaround we use "allocate" mode, but this is
# suboptimal for two reasons:
# - The instance name passed to the allocator is present in the list of
# existing instances, so there could be a conflict within the
# internal structures of the allocator. This doesn't happen with the
# current allocators, but it's a liability.
# - The allocator counts the resources used by the instance twice: once
# because the instance exists already, and once because it tries to
# allocate a new instance.
# The allocator could choose some of the nodes on which the instance is
# running, but that's not a problem. If the instance nodes are broken,
# they should be already be marked as drained or offline, and hence
# skipped by the allocator. If instance disks have been lost for other
# reasons, then recreating the disks on the same nodes should be fine.
disk_template = self.instance.disk_template
spindle_use = be_full[constants.BE_SPINDLE_USE]
disks = [{
constants.IDISK_SIZE: d.size,
constants.IDISK_MODE: d.mode,
constants.IDISK_SPINDLES: d.spindles,
} for d in self.instance.disks]
req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
disk_template=disk_template,
tags=list(self.instance.GetTags()),
os=self.instance.os,
nics=[{}],
vcpus=be_full[constants.BE_VCPUS],
memory=be_full[constants.BE_MAXMEM],
spindle_use=spindle_use,
disks=disks,
hypervisor=self.instance.hypervisor,
node_whitelist=None)
ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
assert req.RequiredNodes() == len(self.instance.all_nodes)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
" %s" % (self.op.iallocator, ial.info),
errors.ECODE_NORES)
(self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
self.op.instance_name, self.op.iallocator,
utils.CommaJoin(self.op.nodes))
def CheckArguments(self):
if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
# Normalize and convert deprecated list of disk indices
self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
if duplicates:
raise errors.OpPrereqError("Some disks have been specified more than"
" once: %s" % utils.CommaJoin(duplicates),
errors.ECODE_INVAL)
# We don't want _CheckIAllocatorOrNode selecting the default iallocator
# when neither iallocator nor nodes are specified
if self.op.iallocator or self.op.nodes:
CheckIAllocatorOrNode(self, "iallocator", "nodes")
for (idx, params) in self.op.disks:
utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
unsupported = frozenset(params.keys()) - self._MODIFYABLE
if unsupported:
raise errors.OpPrereqError("Parameters for disk %s try to change"
" unmodifyable parameter(s): %s" %
(idx, utils.CommaJoin(unsupported)),
errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
if self.op.nodes:
(self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
else:
self.needed_locks[locking.LEVEL_NODE] = []
if self.op.iallocator:
# iallocator will select a new node in the same group
self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
if level == locking.LEVEL_NODEGROUP:
assert self.op.iallocator is not None
assert not self.op.nodes
assert not self.needed_locks[locking.LEVEL_NODEGROUP]
self.share_locks[locking.LEVEL_NODEGROUP] = 1
# Lock the primary group used by the instance optimistically; this
# requires going via the node before it's locked, requiring
# verification later on
self.needed_locks[locking.LEVEL_NODEGROUP] = \
self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
elif level == locking.LEVEL_NODE:
# If an allocator is used, then we lock all the nodes in the current
# instance group, as we don't know yet which ones will be selected;
# if we replace the nodes without using an allocator, locks are
# already declared in ExpandNames; otherwise, we need to lock all the
# instance nodes for disk re-creation
if self.op.iallocator:
assert not self.op.nodes
assert not self.needed_locks[locking.LEVEL_NODE]
assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
# Lock member nodes of the group of the primary node
for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
self.needed_locks[locking.LEVEL_NODE].extend(
self.cfg.GetNodeGroup(group_uuid).members)
assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
elif not self.op.nodes:
self._LockInstancesNodes(primary_only=False)
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
CopyLockList(self.needed_locks[locking.LEVEL_NODE])
def BuildHooksEnv(self):
"""Build hooks env.
This runs on master, primary and secondary nodes of the instance.
"""
return BuildInstanceHookEnvByObject(self, self.instance)
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster and is not running.
"""
instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
assert instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
if self.op.node_uuids:
if len(self.op.node_uuids) != len(instance.all_nodes):
raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
" %d replacement nodes were specified" %
(instance.name, len(instance.all_nodes),
len(self.op.node_uuids)),
errors.ECODE_INVAL)
assert instance.disk_template != constants.DT_DRBD8 or \
len(self.op.node_uuids) == 2
assert instance.disk_template != constants.DT_PLAIN or \
len(self.op.node_uuids) == 1
primary_node = self.op.node_uuids[0]
else:
primary_node = instance.primary_node
if not self.op.iallocator:
CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
# Verify if node group locks are still correct
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
if owned_groups:
# Node group locks are acquired only for the primary node (and only
# when the allocator is used)
CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
primary_only=True)
# if we replace nodes *and* the old primary is offline, we don't
# check the instance state
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
msg="cannot recreate disks")
if self.op.disks:
self.disks = dict(self.op.disks)
else:
self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
maxidx = max(self.disks.keys())
if maxidx >= len(instance.disks):
raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
errors.ECODE_INVAL)
if ((self.op.node_uuids or self.op.iallocator) and
sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
errors.ECODE_INVAL)
self.instance = instance
if self.op.iallocator:
self._RunAllocator()
# Release unneeded node and node resource locks
ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
if self.op.node_uuids:
node_uuids = self.op.node_uuids
else:
node_uuids = instance.all_nodes
excl_stor = compat.any(
rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
)
for new_params in self.disks.values():
CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
def Exec(self, feedback_fn):
"""Recreate the disks.
"""
assert (self.owned_locks(locking.LEVEL_NODE) ==
self.owned_locks(locking.LEVEL_NODE_RES))
to_skip = []
mods = [] # keeps track of needed changes
for idx, disk in enumerate(self.instance.disks):
try:
changes = self.disks[idx]
except KeyError:
# Disk should not be recreated
to_skip.append(idx)
continue
# update secondaries for disks, if needed
if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
# need to update the nodes and minors
assert len(self.op.node_uuids) == 2
assert len(disk.logical_id) == 6 # otherwise disk internals
# have changed
(_, _, old_port, _, _, old_secret) = disk.logical_id
new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
self.instance.uuid)
new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
new_minors[0], new_minors[1], old_secret)
assert len(disk.logical_id) == len(new_id)
else:
new_id = None
mods.append((idx, new_id, changes))
# now that we have passed all asserts above, we can apply the mods
# in a single run (to avoid partial changes)
for idx, new_id, changes in mods:
disk = self.instance.disks[idx]
if new_id is not None:
assert disk.dev_type == constants.DT_DRBD8
disk.logical_id = new_id
if changes:
disk.Update(size=changes.get(constants.IDISK_SIZE, None),
mode=changes.get(constants.IDISK_MODE, None),
spindles=changes.get(constants.IDISK_SPINDLES, None))
# change primary node, if needed
if self.op.node_uuids:
self.instance.primary_node = self.op.node_uuids[0]
self.LogWarning("Changing the instance's nodes, you will have to"
" remove any disks left on the older nodes manually")
if self.op.node_uuids:
self.cfg.Update(self.instance, feedback_fn)
# All touched nodes must be locked
mylocks = self.owned_locks(locking.LEVEL_NODE)
assert mylocks.issuperset(frozenset(self.instance.all_nodes))
new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
# TODO: Release node locks before wiping, or explain why it's not possible
if self.cfg.GetClusterInfo().prealloc_wipe_disks:
wipedisks = [(idx, disk, 0)
for (idx, disk) in enumerate(self.instance.disks)
if idx not in to_skip]
WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
cleanup=new_disks)
def _PerformNodeInfoCall(lu, node_uuids, vg):
"""Prepares the input and performs a node info call.
@type lu: C{LogicalUnit}
@param lu: a logical unit from which we get configuration data
@type node_uuids: list of string
@param node_uuids: list of node UUIDs to perform the call for
@type vg: string
@param vg: the volume group's name
"""
lvm_storage_units = [(constants.ST_LVM_VG, vg)]
storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
node_uuids)
hvname = lu.cfg.GetHypervisorType()
hvparams = lu.cfg.GetClusterInfo().hvparams
nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
[(hvname, hvparams[hvname])])
return nodeinfo
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
"""Checks the vg capacity for a given node.
@type node_info: tuple (_, list of dicts, _)
@param node_info: the result of the node info call for one node
@type node_name: string
@param node_name: the name of the node
@type vg: string
@param vg: volume group name
@type requested: int
@param requested: the amount of disk in MiB to check for
@raise errors.OpPrereqError: if the node doesn't have enough disk,
or we cannot check the node
"""
(_, space_info, _) = node_info
lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
space_info, constants.ST_LVM_VG)
if not lvm_vg_info:
raise errors.OpPrereqError("Can't retrieve storage information for LVM")
vg_free = lvm_vg_info.get("storage_free", None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on node"
" %s for vg %s, result was '%s'" %
(node_name, vg, vg_free), errors.ECODE_ENVIRON)
if requested > vg_free:
raise errors.OpPrereqError("Not enough disk space on target node %s"
" vg %s: required %d MiB, available %d MiB" %
(node_name, vg, requested, vg_free),
errors.ECODE_NORES)
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
"""Checks if nodes have enough free disk space in the specified VG.
This function checks if all given nodes have the needed amount of
free disk. In case any node has less disk or we cannot get the
information from the node, this function raises an OpPrereqError
exception.
@type lu: C{LogicalUnit}
@param lu: a logical unit from which we get configuration data
@type node_uuids: C{list}
@param node_uuids: the list of node UUIDs to check
@type vg: C{str}
@param vg: the volume group to check
@type requested: C{int}
@param requested: the amount of disk in MiB to check for
@raise errors.OpPrereqError: if the node doesn't have enough disk,
or we cannot check the node
"""
nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
for node in node_uuids:
node_name = lu.cfg.GetNodeName(node)
info = nodeinfo[node]
info.Raise("Cannot get current information from node %s" % node_name,
prereq=True, ecode=errors.ECODE_ENVIRON)
_CheckVgCapacityForNode(node_name, info.payload, vg, requested)
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
"""Checks if nodes have enough free disk space in all the VGs.
This function checks if all given nodes have the needed amount of
free disk. In case any node has less disk or we cannot get the
information from the node, this function raises an OpPrereqError
exception.
@type lu: C{LogicalUnit}
@param lu: a logical unit from which we get configuration data
@type node_uuids: C{list}
@param node_uuids: the list of node UUIDs to check
@type req_sizes: C{dict}
@param req_sizes: the hash of vg and corresponding amount of disk in
MiB to check for
@raise errors.OpPrereqError: if the node doesn't have enough disk,
or we cannot check the node
"""
for vg, req_size in req_sizes.items():
_CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
def _DiskSizeInBytesToMebibytes(lu, size):
"""Converts a disk size in bytes to mebibytes.
Warns and rounds up if the size isn't an even multiple of 1 MiB.
"""
(mib, remainder) = divmod(size, 1024 * 1024)
if remainder != 0:
lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
" to not overwrite existing data (%s bytes will not be"
" wiped)", (1024 * 1024) - remainder)
mib += 1
return mib
def _CalcEta(time_taken, written, total_size):
"""Calculates the ETA based on size written and total size.
@param time_taken: The time taken so far
@param written: amount written so far
@param total_size: The total size of data to be written
@return: The remaining time in seconds
"""
avg_time = time_taken / float(written)
return (total_size - written) * avg_time
def WipeDisks(lu, instance, disks=None):
"""Wipes instance disks.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should create
@type disks: None or list of tuple of (number, L{objects.Disk}, number)
@param disks: Disk details; tuple contains disk index, disk object and the
start offset
"""
node_uuid = instance.primary_node
node_name = lu.cfg.GetNodeName(node_uuid)
if disks is None:
disks = [(idx, disk, 0)
for (idx, disk) in enumerate(instance.disks)]
for (_, device, _) in disks:
lu.cfg.SetDiskID(device, node_uuid)
logging.info("Pausing synchronization of disks of instance '%s'",
instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
(map(compat.snd, disks),
instance),
True)
result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("Pausing synchronization of disk %s of instance '%s'"
" failed", idx, instance.name)
try:
for (idx, device, offset) in disks:
# The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
# MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
wipe_chunk_size = \
int(min(constants.MAX_WIPE_CHUNK,
device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
size = device.size
last_output = 0
start_time = time.time()
if offset == 0:
info_text = ""
else:
info_text = (" (from %s to %s)" %
(utils.FormatUnit(offset, "h"),
utils.FormatUnit(size, "h")))
lu.LogInfo("* Wiping disk %s%s", idx, info_text)
logging.info("Wiping disk %d for instance %s on node %s using"
" chunk size %s", idx, instance.name, node_name,
wipe_chunk_size)
while offset < size:
wipe_size = min(wipe_chunk_size, size - offset)
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
offset, wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
now = time.time()
offset += wipe_size
if now - last_output >= 60:
eta = _CalcEta(now - start_time, offset, size)
lu.LogInfo(" - done: %.1f%% ETA: %s",
offset / float(size) * 100, utils.FormatSeconds(eta))
last_output = now
finally:
logging.info("Resuming synchronization of disks for instance '%s'",
instance.name)
result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
(map(compat.snd, disks),
instance),
False)
if result.fail_msg:
lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
node_name, result.fail_msg)
else:
for idx, success in enumerate(result.payload):
if not success:
lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
" failed", idx, instance.name)
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
"""Wrapper for L{WipeDisks} that handles errors.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance whose disks we should wipe
@param disks: see L{WipeDisks}
@param cleanup: the result returned by L{CreateDisks}, used for cleanup in
case of error
@raise errors.OpPrereqError: in case of failure
"""
try:
WipeDisks(lu, instance, disks=disks)
except errors.OpExecError:
logging.warning("Wiping disks for instance '%s' failed",
instance.name)
_UndoCreateDisks(lu, cleanup)
raise
def ExpandCheckDisks(instance, disks):
"""Return the instance disks selected by the disks list
@type disks: list of L{objects.Disk} or None
@param disks: selected disks
@rtype: list of L{objects.Disk}
@return: selected instance disks to act on
"""
if disks is None:
return instance.disks
else:
if not set(disks).issubset(instance.disks):
raise errors.ProgrammerError("Can only act on disks belonging to the"
" target instance: expected a subset of %r,"
" got %r" % (instance.disks, disks))
return disks
def WaitForSync(lu, instance, disks=None, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
"""
if not instance.disks or disks is not None and not disks:
return True
disks = ExpandCheckDisks(instance, disks)
if not oneshot:
lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
node_uuid = instance.primary_node
node_name = lu.cfg.GetNodeName(node_uuid)
for dev in disks:
lu.cfg.SetDiskID(dev, node_uuid)
# TODO: Convert to utils.Retry
retries = 0
degr_retries = 10 # in seconds, as we sleep 1 second each time
while True:
max_time = 0
done = True
cumul_degraded = False
rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
retries += 1
if retries >= 10:
raise errors.RemoteError("Can't contact node %s for mirror data,"
" aborting." % node_name)
time.sleep(6)
continue
rstats = rstats.payload
retries = 0
for i, mstat in enumerate(rstats):
if mstat is None:
lu.LogWarning("Can't compute data for node %s/%s",
node_name, disks[i].iv_name)
continue
cumul_degraded = (cumul_degraded or
(mstat.is_degraded and mstat.sync_percent is None))
if mstat.sync_percent is not None:
done = False
if mstat.estimated_time is not None:
rem_time = ("%s remaining (estimated)" %
utils.FormatSeconds(mstat.estimated_time))
max_time = mstat.estimated_time
else:
rem_time = "no time estimate"
lu.LogInfo("- device %s: %5.2f%% done, %s",
disks[i].iv_name, mstat.sync_percent, rem_time)
# if we're done but degraded, let's do a few small retries, to
# make sure we see a stable and not transient situation; therefore
# we force restart of the loop
if (done or oneshot) and cumul_degraded and degr_retries > 0:
logging.info("Degraded disks found, %d retries left", degr_retries)
degr_retries -= 1
time.sleep(1)
continue
if done or oneshot:
break
time.sleep(min(60, max_time))
if done:
lu.LogInfo("Instance %s's disks are in sync", instance.name)
return not cumul_degraded
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
"""Shutdown block devices of an instance.
This does the shutdown on all nodes of the instance.
If the ignore_primary is false, errors on the primary node are
ignored.
"""
lu.cfg.MarkInstanceDisksInactive(instance.uuid)
all_result = True
disks = ExpandCheckDisks(instance, disks)
for disk in disks:
for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node_uuid)
result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
if ((node_uuid == instance.primary_node and not ignore_primary) or
(node_uuid != instance.primary_node and not result.offline)):
all_result = False
return all_result
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
"""Shutdown block devices of an instance.
This function checks if an instance is running, before calling
_ShutdownInstanceDisks.
"""
CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
ShutdownInstanceDisks(lu, instance, disks=disks)
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
ignore_size=False):
"""Prepare the block devices for an instance.
This sets up the block devices on all nodes.
@type lu: L{LogicalUnit}
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance for whose disks we assemble
@type disks: list of L{objects.Disk} or None
@param disks: which disks to assemble (or all, if None)
@type ignore_secondaries: boolean
@param ignore_secondaries: if true, errors on secondary nodes
won't result in an error return from the function
@type ignore_size: boolean
@param ignore_size: if true, the current known size of the disk
will not be used during the disk activation, useful for cases
when the size is wrong
@return: False if the operation failed, otherwise a list of
(host, instance_visible_name, node_visible_name)
with the mapping from node devices to instance devices
"""
device_info = []
disks_ok = True
disks = ExpandCheckDisks(instance, disks)
# With the two passes mechanism we try to reduce the window of
# opportunity for the race condition of switching DRBD to primary
# before handshaking occured, but we do not eliminate it
# The proper fix would be to wait (with some limits) until the
# connection has been made and drbd transitions from WFConnection
# into any other network-connected state (Connected, SyncTarget,
# SyncSource, etc.)
# mark instance disks as active before doing actual work, so watcher does
# not try to shut them down erroneously
lu.cfg.MarkInstanceDisksActive(instance.uuid)
# 1st pass, assemble on all nodes in secondary mode
for idx, inst_disk in enumerate(disks):
for node_uuid, node_disk in inst_disk.ComputeNodeTree(
instance.primary_node):
if ignore_size:
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node_uuid)
result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
instance.name, False, idx)
msg = result.fail_msg
if msg:
is_offline_secondary = (node_uuid in instance.secondary_nodes and
result.offline)
lu.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
# FIXME: race condition on drbd migration to primary
# 2nd pass, do only the primary node
for idx, inst_disk in enumerate(disks):
dev_path = None
for node_uuid, node_disk in inst_disk.ComputeNodeTree(
instance.primary_node):
if node_uuid != instance.primary_node:
continue
if ignore_size:
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node_uuid)
result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
instance.name, True, idx)
msg = result.fail_msg
if msg:
lu.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=True, pass=2): %s",
inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
disks_ok = False
else:
dev_path = result.payload
device_info.append((lu.cfg.GetNodeName(instance.primary_node),
inst_disk.iv_name, dev_path))
# leave the disks configured for the primary node
# this is a workaround that would be fixed better by
# improving the logical/physical id handling
for disk in disks:
lu.cfg.SetDiskID(disk, instance.primary_node)
if not disks_ok:
lu.cfg.MarkInstanceDisksInactive(instance.uuid)
return disks_ok, device_info
def StartInstanceDisks(lu, instance, force):
"""Start the disks of an instance.
"""
disks_ok, _ = AssembleInstanceDisks(lu, instance,
ignore_secondaries=force)
if not disks_ok:
ShutdownInstanceDisks(lu, instance)
if force is not None and not force:
lu.LogWarning("",
hint=("If the message above refers to a secondary node,"
" you can retry the operation using '--force'"))
raise errors.OpExecError("Disk consistency error")
class LUInstanceGrowDisk(LogicalUnit):
"""Grow a disk of an instance.
"""
HPATH = "disk-grow"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
CopyLockList(self.needed_locks[locking.LEVEL_NODE])
def BuildHooksEnv(self):
"""Build hooks env.
This runs on the master, the primary and all the secondaries.
"""
env = {
"DISK": self.op.disk,
"AMOUNT": self.op.amount,
"ABSOLUTE": self.op.absolute,
}
env.update(BuildInstanceHookEnvByObject(self, self.instance))
return env
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
node_uuids = list(self.instance.all_nodes)
for node_uuid in node_uuids:
CheckNodeOnline(self, node_uuid)
self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
if self.instance.disk_template not in constants.DTS_GROWABLE:
raise errors.OpPrereqError("Instance's disk layout does not support"
" growing", errors.ECODE_INVAL)
self.disk = self.instance.FindDisk(self.op.disk)
if self.op.absolute:
self.target = self.op.amount
self.delta = self.target - self.disk.size
if self.delta < 0:
raise errors.OpPrereqError("Requested size (%s) is smaller than "
"current disk size (%s)" %
(utils.FormatUnit(self.target, "h"),
utils.FormatUnit(self.disk.size, "h")),
errors.ECODE_STATE)
else:
self.delta = self.op.amount
self.target = self.disk.size + self.delta
if self.delta < 0:
raise errors.OpPrereqError("Requested increment (%s) is negative" %
utils.FormatUnit(self.delta, "h"),
errors.ECODE_INVAL)
self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
def _CheckDiskSpace(self, node_uuids, req_vgspace):
template = self.instance.disk_template
if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
not any(self.node_es_flags.values())):
# TODO: check the free disk space for file, when that feature will be
# supported
# With exclusive storage we need to do something smarter than just looking
# at free space, which, in the end, is basically a dry run. So we rely on
# the dry run performed in Exec() instead.
CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
def Exec(self, feedback_fn):
"""Execute disk grow.
"""
assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
assert (self.owned_locks(locking.LEVEL_NODE) ==
self.owned_locks(locking.LEVEL_NODE_RES))
wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
(self.op.disk, self.instance.name,
utils.FormatUnit(self.delta, "h"),
utils.FormatUnit(self.target, "h")))
# First run all grow ops in dry-run mode
for node_uuid in self.instance.all_nodes:
self.cfg.SetDiskID(self.disk, node_uuid)
result = self.rpc.call_blockdev_grow(node_uuid,
(self.disk, self.instance),
self.delta, True, True,
self.node_es_flags[node_uuid])
result.Raise("Dry-run grow request failed to node %s" %
self.cfg.GetNodeName(node_uuid))
if wipe_disks:
# Get disk size from primary node for wiping
self.cfg.SetDiskID(self.disk, self.instance.primary_node)
result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
[self.disk])
result.Raise("Failed to retrieve disk size from node '%s'" %
self.instance.primary_node)
(disk_dimensions, ) = result.payload
if disk_dimensions is None:
raise errors.OpExecError("Failed to retrieve disk size from primary"
" node '%s'" % self.instance.primary_node)
(disk_size_in_bytes, _) = disk_dimensions
old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
assert old_disk_size >= self.disk.size, \
("Retrieved disk size too small (got %s, should be at least %s)" %
(old_disk_size, self.disk.size))
else:
old_disk_size = None
# We know that (as far as we can test) operations across different
# nodes will succeed, time to run it for real on the backing storage
for node_uuid in self.instance.all_nodes:
self.cfg.SetDiskID(self.disk, node_uuid)
result = self.rpc.call_blockdev_grow(node_uuid,
(self.disk, self.instance),
self.delta, False, True,
self.node_es_flags[node_uuid])
result.Raise("Grow request failed to node %s" %
self.cfg.GetNodeName(node_uuid))
# And now execute it for logical storage, on the primary node
node_uuid = self.instance.primary_node
self.cfg.SetDiskID(self.disk, node_uuid)
result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
self.delta, False, False,
self.node_es_flags[node_uuid])
result.Raise("Grow request failed to node %s" %
self.cfg.GetNodeName(node_uuid))
self.disk.RecordGrow(self.delta)
self.cfg.Update(self.instance, feedback_fn)
# Changes have been recorded, release node lock
ReleaseLocks(self, locking.LEVEL_NODE)
# Downgrade lock while waiting for sync
self.glm.downgrade(locking.LEVEL_INSTANCE)
assert wipe_disks ^ (old_disk_size is None)
if wipe_disks:
assert self.instance.disks[self.op.disk] == self.disk
# Wipe newly added disk space
WipeDisks(self, self.instance,
disks=[(self.op.disk, self.disk, old_disk_size)])
if self.op.wait_for_sync:
disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
if disk_abort:
self.LogWarning("Disk syncing has not returned a good status; check"
" the instance")
if not self.instance.disks_active:
_SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
elif not self.instance.disks_active:
self.LogWarning("Not shutting down the disk even if the instance is"
" not supposed to be running because no wait for"
" sync mode was requested")
assert self.owned_locks(locking.LEVEL_NODE_RES)
assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
class LUInstanceReplaceDisks(LogicalUnit):
"""Replace the disks of an instance.
"""
HPATH = "mirrors-replace"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
def CheckArguments(self):
"""Check arguments.
"""
if self.op.mode == constants.REPLACE_DISK_CHG:
if self.op.remote_node is None and self.op.iallocator is None:
raise errors.OpPrereqError("When changing the secondary either an"
" iallocator script must be used or the"
" new node given", errors.ECODE_INVAL)
else:
CheckIAllocatorOrNode(self, "iallocator", "remote_node")
elif self.op.remote_node is not None or self.op.iallocator is not None:
# Not replacing the secondary
raise errors.OpPrereqError("The iallocator and new node options can"
" only be used when changing the"
" secondary node", errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
assert locking.LEVEL_NODE not in self.needed_locks
assert locking.LEVEL_NODE_RES not in self.needed_locks
assert locking.LEVEL_NODEGROUP not in self.needed_locks
assert self.op.iallocator is None or self.op.remote_node is None, \
"Conflicting options"
if self.op.remote_node is not None:
(self.op.remote_node_uuid, self.op.remote_node) = \
ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
self.op.remote_node)
# Warning: do not remove the locking of the new secondary here
# unless DRBD8Dev.AddChildren is changed to work in parallel;
# currently it doesn't since parallel invocations of
# FindUnusedMinor will conflict
self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
else:
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
if self.op.iallocator is not None:
# iallocator will select a new node in the same group
self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
self.needed_locks[locking.LEVEL_NODE_RES] = []
self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node_uuid,
self.op.disks, self.op.early_release,
self.op.ignore_ipolicy)
self.tasklets = [self.replacer]
def DeclareLocks(self, level):
if level == locking.LEVEL_NODEGROUP:
assert self.op.remote_node_uuid is None
assert self.op.iallocator is not None
assert not self.needed_locks[locking.LEVEL_NODEGROUP]
self.share_locks[locking.LEVEL_NODEGROUP] = 1
# Lock all groups used by instance optimistically; this requires going
# via the node before it's locked, requiring verification later on
self.needed_locks[locking.LEVEL_NODEGROUP] = \
self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
elif level == locking.LEVEL_NODE:
if self.op.iallocator is not None:
assert self.op.remote_node_uuid is None
assert not self.needed_locks[locking.LEVEL_NODE]
assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
# Lock member nodes of all locked groups
self.needed_locks[locking.LEVEL_NODE] = \
[node_uuid
for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
else:
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
self._LockInstancesNodes()
elif level == locking.LEVEL_NODE_RES:
# Reuse node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
self.needed_locks[locking.LEVEL_NODE]
def BuildHooksEnv(self):
"""Build hooks env.
This runs on the master, the primary and all the secondaries.
"""
instance = self.replacer.instance
env = {
"MODE": self.op.mode,
"NEW_SECONDARY": self.op.remote_node,
"OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
}
env.update(BuildInstanceHookEnvByObject(self, instance))
return env
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
instance = self.replacer.instance
nl = [
self.cfg.GetMasterNode(),
instance.primary_node,
]
if self.op.remote_node_uuid is not None:
nl.append(self.op.remote_node_uuid)
return nl, nl
def CheckPrereq(self):
"""Check prerequisites.
"""
assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
self.op.iallocator is None)
# Verify if node group locks are still correct
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
if owned_groups:
CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
return LogicalUnit.CheckPrereq(self)
class LUInstanceActivateDisks(NoHooksLU):
"""Bring up an instance's disks.
"""
REQ_BGL = False
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
CheckNodeOnline(self, self.instance.primary_node)
def Exec(self, feedback_fn):
"""Activate the disks.
"""
disks_ok, disks_info = \
AssembleInstanceDisks(self, self.instance,
ignore_size=self.op.ignore_size)
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
if self.op.wait_for_sync:
if not WaitForSync(self, self.instance):
self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
raise errors.OpExecError("Some disks of the instance are degraded!")
return disks_info
class LUInstanceDeactivateDisks(NoHooksLU):
"""Shutdown an instance's disks.
"""
REQ_BGL = False
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
def Exec(self, feedback_fn):
"""Deactivate the disks
"""
if self.op.force:
ShutdownInstanceDisks(self, self.instance)
else:
_SafeShutdownInstanceDisks(self, self.instance)
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
ldisk=False):
"""Check that mirrors are not degraded.
@attention: The device has to be annotated already.
The ldisk parameter, if True, will change the test from the
is_degraded attribute (which represents overall non-ok status for
the device(s)) to the ldisk (representing the local storage status).
"""
lu.cfg.SetDiskID(dev, node_uuid)
result = True
if on_primary or dev.AssembleOnSecondary():
rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't find disk on node %s: %s",
lu.cfg.GetNodeName(node_uuid), msg)
result = False
elif not rstats.payload:
lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
result = False
else:
if ldisk:
result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
else:
result = result and not rstats.payload.is_degraded
if dev.children:
for child in dev.children:
result = result and _CheckDiskConsistencyInner(lu, instance, child,
node_uuid, on_primary)
return result
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
"""Wrapper around L{_CheckDiskConsistencyInner}.
"""
(disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
ldisk=ldisk)
def _BlockdevFind(lu, node_uuid, dev, instance):
"""Wrapper around call_blockdev_find to annotate diskparams.
@param lu: A reference to the lu object
@param node_uuid: The node to call out
@param dev: The device to find
@param instance: The instance object the device belongs to
@returns The result of the rpc call
"""
(disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
return lu.rpc.call_blockdev_find(node_uuid, disk)
def _GenerateUniqueNames(lu, exts):
"""Generate a suitable LV name.
This will generate a logical volume name for the given instance.
"""
results = []
for val in exts:
new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
results.append("%s%s" % (new_id, val))
return results
class TLReplaceDisks(Tasklet):
"""Replaces disks for an instance.
Note: Locking is not within the scope of this class.
"""
def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
remote_node_uuid, disks, early_release, ignore_ipolicy):
"""Initializes this class.
"""
Tasklet.__init__(self, lu)
# Parameters
self.instance_uuid = instance_uuid
self.instance_name = instance_name
self.mode = mode
self.iallocator_name = iallocator_name
self.remote_node_uuid = remote_node_uuid
self.disks = disks
self.early_release = early_release
self.ignore_ipolicy = ignore_ipolicy
# Runtime data
self.instance = None
self.new_node_uuid = None
self.target_node_uuid = None
self.other_node_uuid = None
self.remote_node_info = None
self.node_secondary_ip = None
@staticmethod
def _RunAllocator(lu, iallocator_name, instance_uuid,
relocate_from_node_uuids):
"""Compute a new secondary node using an IAllocator.
"""
req = iallocator.IAReqRelocate(
inst_uuid=instance_uuid,
relocate_from_node_uuids=list(relocate_from_node_uuids))
ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
ial.Run(iallocator_name)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
" %s" % (iallocator_name, ial.info),
errors.ECODE_NORES)
remote_node_name = ial.result[0]
remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
if remote_node is None:
raise errors.OpPrereqError("Node %s not found in configuration" %
remote_node_name, errors.ECODE_NOENT)
lu.LogInfo("Selected new secondary for instance '%s': %s",
instance_uuid, remote_node_name)
return remote_node.uuid
def _FindFaultyDisks(self, node_uuid):
"""Wrapper for L{FindFaultyInstanceDisks}.
"""
return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
node_uuid, True)
def _CheckDisksActivated(self, instance):
"""Checks if the instance disks are activated.
@param instance: The instance to check disks
@return: True if they are activated, False otherwise
"""
node_uuids = instance.all_nodes
for idx, dev in enumerate(instance.disks):
for node_uuid in node_uuids:
self.lu.LogInfo("Checking disk/%d on %s", idx,
self.cfg.GetNodeName(node_uuid))
self.cfg.SetDiskID(dev, node_uuid)
result = _BlockdevFind(self, node_uuid, dev, instance)
if result.offline:
continue
elif result.fail_msg or not result.payload:
return False
return True
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.instance_name
if self.instance.disk_template != constants.DT_DRBD8:
raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
" instances", errors.ECODE_INVAL)
if len(self.instance.secondary_nodes) != 1:
raise errors.OpPrereqError("The instance has a strange layout,"
" expected one secondary but found %d" %
len(self.instance.secondary_nodes),
errors.ECODE_FAULT)
secondary_node_uuid = self.instance.secondary_nodes[0]
if self.iallocator_name is None:
remote_node_uuid = self.remote_node_uuid
else:
remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
self.instance.uuid,
self.instance.secondary_nodes)
if remote_node_uuid is None:
self.remote_node_info = None
else:
assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
"Remote node '%s' is not locked" % remote_node_uuid
self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
assert self.remote_node_info is not None, \
"Cannot retrieve locked node %s" % remote_node_uuid
if remote_node_uuid == self.instance.primary_node:
raise errors.OpPrereqError("The specified node is the primary node of"
" the instance", errors.ECODE_INVAL)
if remote_node_uuid == secondary_node_uuid:
raise errors.OpPrereqError("The specified node is already the"
" secondary node of the instance",
errors.ECODE_INVAL)
if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
constants.REPLACE_DISK_CHG):
raise errors.OpPrereqError("Cannot specify disks to be replaced",
errors.ECODE_INVAL)
if self.mode == constants.REPLACE_DISK_AUTO:
if not self._CheckDisksActivated(self.instance):
raise errors.OpPrereqError("Please run activate-disks on instance %s"
" first" % self.instance_name,
errors.ECODE_STATE)
faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
if faulty_primary and faulty_secondary:
raise errors.OpPrereqError("Instance %s has faulty disks on more than"
" one node and can not be repaired"
" automatically" % self.instance_name,
errors.ECODE_STATE)
if faulty_primary:
self.disks = faulty_primary
self.target_node_uuid = self.instance.primary_node
self.other_node_uuid = secondary_node_uuid
check_nodes = [self.target_node_uuid, self.other_node_uuid]
elif faulty_secondary:
self.disks = faulty_secondary
self.target_node_uuid = secondary_node_uuid
self.other_node_uuid = self.instance.primary_node
check_nodes = [self.target_node_uuid, self.other_node_uuid]
else:
self.disks = []
check_nodes = []
else:
# Non-automatic modes
if self.mode == constants.REPLACE_DISK_PRI:
self.target_node_uuid = self.instance.primary_node
self.other_node_uuid = secondary_node_uuid
check_nodes = [self.target_node_uuid, self.other_node_uuid]
elif self.mode == constants.REPLACE_DISK_SEC:
self.target_node_uuid = secondary_node_uuid
self.other_node_uuid = self.instance.primary_node
check_nodes = [self.target_node_uuid, self.other_node_uuid]
elif self.mode == constants.REPLACE_DISK_CHG:
self.new_node_uuid = remote_node_uuid
self.other_node_uuid = self.instance.primary_node
self.target_node_uuid = secondary_node_uuid
check_nodes = [self.new_node_uuid, self.other_node_uuid]
CheckNodeNotDrained(self.lu, remote_node_uuid)
CheckNodeVmCapable(self.lu, remote_node_uuid)
old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
assert old_node_info is not None
if old_node_info.offline and not self.early_release:
# doesn't make sense to delay the release
self.early_release = True
self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
" early-release mode", secondary_node_uuid)
else:
raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
self.mode)
# If not specified all disks should be replaced
if not self.disks:
self.disks = range(len(self.instance.disks))
# TODO: This is ugly, but right now we can't distinguish between internal
# submitted opcode and external one. We should fix that.
if self.remote_node_info:
# We change the node, lets verify it still meets instance policy
new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
cluster = self.cfg.GetClusterInfo()
ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
new_group_info)
CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
self.remote_node_info, self.cfg,
ignore=self.ignore_ipolicy)
for node_uuid in check_nodes:
CheckNodeOnline(self.lu, node_uuid)
touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
self.other_node_uuid,
self.target_node_uuid]
if node_uuid is not None)
# Release unneeded node and node resource locks
ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
# Release any owned node group
ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
# Check whether disks are valid
for disk_idx in self.disks:
self.instance.FindDisk(disk_idx)
# Get secondary node IP addresses
self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
in self.cfg.GetMultiNodeInfo(touched_nodes))
def Exec(self, feedback_fn):
"""Execute disk replacement.
This dispatches the disk replacement to the appropriate handler.
"""
if __debug__:
# Verify owned locks before starting operation
owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
assert set(owned_nodes) == set(self.node_secondary_ip), \
("Incorrect node locks, owning %s, expected %s" %
(owned_nodes, self.node_secondary_ip.keys()))
assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
self.lu.owned_locks(locking.LEVEL_NODE_RES))
assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
assert list(owned_instances) == [self.instance_name], \
"Instance '%s' not locked" % self.instance_name
assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
"Should not own any node group lock at this point"
if not self.disks:
feedback_fn("No disks need replacement for instance '%s'" %
self.instance.name)
return
feedback_fn("Replacing disk(s) %s for instance '%s'" %
(utils.CommaJoin(self.disks), self.instance.name))
feedback_fn("Current primary node: %s" %
self.cfg.GetNodeName(self.instance.primary_node))
feedback_fn("Current seconary node: %s" %
utils.CommaJoin(self.cfg.GetNodeNames(
self.instance.secondary_nodes)))
activate_disks = not self.instance.disks_active
# Activate the instance disks if we're replacing them on a down instance
if activate_disks:
StartInstanceDisks(self.lu, self.instance, True)
try:
# Should we replace the secondary node?
if self.new_node_uuid is not None:
fn = self._ExecDrbd8Secondary
else:
fn = self._ExecDrbd8DiskOnly
result = fn(feedback_fn)
finally:
# Deactivate the instance disks if we're replacing them on a
# down instance
if activate_disks:
_SafeShutdownInstanceDisks(self.lu, self.instance)
assert not self.lu.owned_locks(locking.LEVEL_NODE)
if __debug__:
# Verify owned locks
owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
nodes = frozenset(self.node_secondary_ip)
assert ((self.early_release and not owned_nodes) or
(not self.early_release and not (set(owned_nodes) - nodes))), \
("Not owning the correct locks, early_release=%s, owned=%r,"
" nodes=%r" % (self.early_release, owned_nodes, nodes))
return result
def _CheckVolumeGroup(self, node_uuids):
self.lu.LogInfo("Checking volume groups")
vgname = self.cfg.GetVGName()
# Make sure volume group exists on all involved nodes
results = self.rpc.call_vg_list(node_uuids)
if not results:
raise errors.OpExecError("Can't list volume groups on the nodes")
for node_uuid in node_uuids:
res = results[node_uuid]
res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
if vgname not in res.payload:
raise errors.OpExecError("Volume group '%s' not found on node %s" %
(vgname, self.cfg.GetNodeName(node_uuid)))
def _CheckDisksExistence(self, node_uuids):
# Check disk existence
for idx, dev in enumerate(self.instance.disks):
if idx not in self.disks:
continue
for node_uuid in node_uuids:
self.lu.LogInfo("Checking disk/%d on %s", idx,
self.cfg.GetNodeName(node_uuid))
self.cfg.SetDiskID(dev, node_uuid)
result = _BlockdevFind(self, node_uuid, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
if not msg:
msg = "disk not found"
if not self._CheckDisksActivated(self.instance):
extra_hint = ("\nDisks seem to be not properly activated. Try"
" running activate-disks on the instance before"
" using replace-disks.")
else:
extra_hint = ""
raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
(idx, self.cfg.GetNodeName(node_uuid), msg,
extra_hint))
def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
for idx, dev in enumerate(self.instance.disks):
if idx not in self.disks:
continue
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, self.cfg.GetNodeName(node_uuid)))
if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(self.cfg.GetNodeName(node_uuid),
self.instance.name))
def _CreateNewStorage(self, node_uuid):
"""Create new storage on the primary or secondary node.
This is only used for same-node replaces, not for changing the
secondary node, hence we don't want to modify the existing disk.
"""
iv_names = {}
disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
self.lu.LogInfo("Adding storage on %s for disk/%d",
self.cfg.GetNodeName(node_uuid), idx)
self.cfg.SetDiskID(dev, node_uuid)
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
(data_disk, meta_disk) = dev.children
vg_data = data_disk.logical_id[0]
lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
logical_id=(vg_data, names[0]),
params=data_disk.params)
vg_meta = meta_disk.logical_id[0]
lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
size=constants.DRBD_META_SIZE,
logical_id=(vg_meta, names[1]),
params=meta_disk.params)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
try:
_CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
GetInstanceInfoText(self.instance), False,
excl_stor)
except errors.DeviceCreationError, e:
raise errors.OpExecError("Can't create block device: %s" % e.message)
return iv_names
def _CheckDevices(self, node_uuid, iv_names):
for name, (dev, _, _) in iv_names.iteritems():
self.cfg.SetDiskID(dev, node_uuid)
result = _BlockdevFind(self, node_uuid, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
if not msg:
msg = "disk not found"
raise errors.OpExecError("Can't find DRBD device %s: %s" %
(name, msg))
if result.payload.is_degraded:
raise errors.OpExecError("DRBD device %s is degraded!" % name)
def _RemoveOldStorage(self, node_uuid, iv_names):
for name, (_, old_lvs, _) in iv_names.iteritems():
self.lu.LogInfo("Remove logical volumes for %s", name)
for lv in old_lvs:
self.cfg.SetDiskID(lv, node_uuid)
msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
if msg:
self.lu.LogWarning("Can't remove old LV: %s", msg,
hint="remove unused LVs manually")
def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
"""Replace a disk on the primary or secondary for DRBD 8.
The algorithm for replace is quite complicated:
1. for each disk to be replaced:
1. create new LVs on the target node with unique names
1. detach old LVs from the drbd device
1. rename old LVs to name_replaced.<time_t>
1. rename new LVs to old LVs
1. attach the new LVs (with the old names now) to the drbd device
1. wait for sync across all devices
1. for each modified disk:
1. remove old LVs (which have the name name_replaces.<time_t>)
Failures are not very well handled.
"""
steps_total = 6
# Step: check device activation
self.lu.LogStep(1, steps_total, "Check device existence")
self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
# Step: check other node consistency
self.lu.LogStep(2, steps_total, "Check peer consistency")
self._CheckDisksConsistency(
self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
False)
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
iv_names = self._CreateNewStorage(self.target_node_uuid)
# Step: for each lv, detach+rename*2+attach
self.lu.LogStep(4, steps_total, "Changing drbd configuration")
for dev, old_lvs, new_lvs in iv_names.itervalues():
self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
old_lvs)
result.Raise("Can't detach drbd from local storage on node"
" %s for device %s" %
(self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
#dev.children = []
#cfg.Update(instance)
# ok, we created the new LVs, so now we know we have the needed
# storage; as such, we proceed on the target node to rename
# old_lv to _old, and new_lv to old_lv; note that we rename LVs
# using the assumption that logical_id == physical_id (which in
# turn is the unique_id on that node)
# FIXME(iustin): use a better name for the replaced LVs
temp_suffix = int(time.time())
ren_fn = lambda d, suff: (d.physical_id[0],
d.physical_id[1] + "_replaced-%s" % suff)
# Build the rename list based on what LVs exist on the node
rename_old_to_new = []
for to_ren in old_lvs:
result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
if not result.fail_msg and result.payload:
# device exists
rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
self.lu.LogInfo("Renaming the old LVs on the target node")
result = self.rpc.call_blockdev_rename(self.target_node_uuid,
rename_old_to_new)
result.Raise("Can't rename old LVs on node %s" %
self.cfg.GetNodeName(self.target_node_uuid))
# Now we rename the new LVs to the old LVs
self.lu.LogInfo("Renaming the new LVs on the target node")
rename_new_to_old = [(new, old.physical_id)
for old, new in zip(old_lvs, new_lvs)]
result = self.rpc.call_blockdev_rename(self.target_node_uuid,
rename_new_to_old)
result.Raise("Can't rename new LVs on node %s" %
self.cfg.GetNodeName(self.target_node_uuid))
# Intermediate steps of in memory modifications
for old, new in zip(old_lvs, new_lvs):
new.logical_id = old.logical_id
self.cfg.SetDiskID(new, self.target_node_uuid)
# We need to modify old_lvs so that removal later removes the
# right LVs, not the newly added ones; note that old_lvs is a
# copy here
for disk in old_lvs:
disk.logical_id = ren_fn(disk, temp_suffix)
self.cfg.SetDiskID(disk, self.target_node_uuid)
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s",
self.cfg.GetNodeName(self.target_node_uuid))
result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
(dev, self.instance), new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
new_lv).fail_msg
if msg2:
self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
hint=("cleanup manually the unused logical"
"volumes"))
raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
cstep = itertools.count(5)
if self.early_release:
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node_uuid, iv_names)
# TODO: Check if releasing locks early still makes sense
ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
else:
# Release all resource locks except those used by the instance
ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
keep=self.node_secondary_ip.keys())
# Release all node locks while waiting for sync
ReleaseLocks(self.lu, locking.LEVEL_NODE)
# TODO: Can the instance lock be downgraded here? Take the optional disk
# shutdown in the caller into consideration.
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
WaitForSync(self.lu, self.instance)
# Check all devices manually
self._CheckDevices(self.instance.primary_node, iv_names)
# Step: remove old storage
if not self.early_release:
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node_uuid, iv_names)
def _ExecDrbd8Secondary(self, feedback_fn):
"""Replace the secondary node for DRBD 8.
The algorithm for replace is quite complicated:
- for all disks of the instance:
- create new LVs on the new node with same names
- shutdown the drbd device on the old secondary
- disconnect the drbd network on the primary
- create the drbd device on the new secondary
- network attach the drbd on the primary, using an artifice:
the drbd code for Attach() will connect to the network if it
finds a device which is connected to the good local disks but
not network enabled
- wait for sync across all devices
- remove all disks from the old secondary
Failures are not very well handled.
"""
steps_total = 6
pnode = self.instance.primary_node
# Step: check device activation
self.lu.LogStep(1, steps_total, "Check device existence")
self._CheckDisksExistence([self.instance.primary_node])
self._CheckVolumeGroup([self.instance.primary_node])
# Step: check other node consistency
self.lu.LogStep(2, steps_total, "Check peer consistency")
self._CheckDisksConsistency(self.instance.primary_node, True, True)
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
self.new_node_uuid)
for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.cfg.GetNodeName(self.new_node_uuid), idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
try:
_CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
new_lv, True, GetInstanceInfoText(self.instance),
False, excl_stor)
except errors.DeviceCreationError, e:
raise errors.OpExecError("Can't create block device: %s" % e.message)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
# error and the success paths
self.lu.LogStep(4, steps_total, "Changing drbd configuration")
minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
for _ in self.instance.disks],
self.instance.uuid)
logging.debug("Allocated minors %r", minors)
iv_names = {}
for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
(self.cfg.GetNodeName(self.new_node_uuid), idx))
# create new devices on new_node; note that we create two IDs:
# one without port, so the drbd will be activated without
# networking information on the new node at this stage, and one
# with network, for the latter activation in step 4
(o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
if self.instance.primary_node == o_node1:
p_minor = o_minor1
else:
assert self.instance.primary_node == o_node2, "Three-node instance?"
p_minor = o_minor2
new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
p_minor, new_minor, o_secret)
new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
p_minor, new_minor, o_secret)
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
logical_id=new_alone_id,
children=dev.children,
size=dev.size,
params={})
(anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
self.cfg)
try:
CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
anno_new_drbd,
GetInstanceInfoText(self.instance), False,
excl_stor)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.uuid)
raise
# We have new devices, shutdown the drbd on the old secondary
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
self.cfg.SetDiskID(dev, self.target_node_uuid)
msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
(dev, self.instance)).fail_msg
if msg:
self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
"node: %s" % (idx, msg),
hint=("Please cleanup this device manually as"
" soon as possible"))
self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
self.instance.disks)[pnode]
msg = result.fail_msg
if msg:
# detaches didn't succeed (unlikely)
self.cfg.ReleaseDRBDMinors(self.instance.uuid)
raise errors.OpExecError("Can't detach the disks from the network on"
" old node: %s" % (msg,))
# if we managed to detach at least one, we update all the disks of
# the instance to point to the new secondary
self.lu.LogInfo("Updating instance configuration")
for dev, _, new_logical_id in iv_names.itervalues():
dev.logical_id = new_logical_id
self.cfg.SetDiskID(dev, self.instance.primary_node)
self.cfg.Update(self.instance, feedback_fn)
# Release all node locks (the configuration has been updated)
ReleaseLocks(self.lu, locking.LEVEL_NODE)
# and now perform the drbd attach
self.lu.LogInfo("Attaching primary drbds to new secondary"
" (standalone => connected)")
result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
self.new_node_uuid],
self.node_secondary_ip,
(self.instance.disks, self.instance),
self.instance.name,
False)
for to_node, to_result in result.items():
msg = to_result.fail_msg
if msg:
self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
self.cfg.GetNodeName(to_node), msg,
hint=("please do a gnt-instance info to see the"
" status of disks"))
cstep = itertools.count(5)
if self.early_release:
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node_uuid, iv_names)
# TODO: Check if releasing locks early still makes sense
ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
else:
# Release all resource locks except those used by the instance
ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
keep=self.node_secondary_ip.keys())
# TODO: Can the instance lock be downgraded here? Take the optional disk
# shutdown in the caller into consideration.
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
WaitForSync(self.lu, self.instance)
# Check all devices manually
self._CheckDevices(self.instance.primary_node, iv_names)
# Step: remove old storage
if not self.early_release:
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node_uuid, iv_names)