blob: 7d45b4f93c4c2874518ee4b365fe013fadffd469 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Logical units dealing with instance migration an failover."""
import logging
import time
from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti.masterd import iallocator
from ganeti import utils
from ganeti.cmdlib.base import LogicalUnit, Tasklet
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, \
CheckIAllocatorOrNode, ExpandNodeUuidAndName
from ganeti.cmdlib.instance_storage import CheckDiskConsistency, \
ExpandCheckDisks, ShutdownInstanceDisks, AssembleInstanceDisks
from ganeti.cmdlib.instance_utils import BuildInstanceHookEnvByObject, \
CheckTargetNodeIPolicy, ReleaseLocks, CheckNodeNotDrained, \
CopyLockList, CheckNodeFreeMemory, CheckInstanceBridgesExist
import ganeti.masterd.instance
def _ExpandNamesForMigration(lu):
"""Expands names for use with L{TLMigrateInstance}.
@type lu: L{LogicalUnit}
"""
if lu.op.target_node is not None:
(lu.op.target_node_uuid, lu.op.target_node) = \
ExpandNodeUuidAndName(lu.cfg, lu.op.target_node_uuid, lu.op.target_node)
lu.needed_locks[locking.LEVEL_NODE] = []
lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
lu.needed_locks[locking.LEVEL_NODE_RES] = []
lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
# The node allocation lock is actually only needed for externally replicated
# instances (e.g. sharedfile or RBD) and if an iallocator is used.
lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
def _DeclareLocksForMigration(lu, level):
"""Declares locks for L{TLMigrateInstance}.
@type lu: L{LogicalUnit}
@param level: Lock level
"""
if level == locking.LEVEL_NODE_ALLOC:
assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
instance = lu.cfg.GetInstanceInfo(lu.op.instance_uuid)
# Node locks are already declared here rather than at LEVEL_NODE as we need
# the instance object anyway to declare the node allocation lock.
if instance.disk_template in constants.DTS_EXT_MIRROR:
if lu.op.target_node is None:
lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
else:
lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
lu.op.target_node_uuid]
del lu.recalculate_locks[locking.LEVEL_NODE]
else:
lu._LockInstancesNodes() # pylint: disable=W0212
elif level == locking.LEVEL_NODE:
# Node locks are declared together with the node allocation lock
assert (lu.needed_locks[locking.LEVEL_NODE] or
lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
lu.needed_locks[locking.LEVEL_NODE_RES] = \
CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
class LUInstanceFailover(LogicalUnit):
"""Failover an instance.
"""
HPATH = "instance-failover"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
def CheckArguments(self):
"""Check the arguments.
"""
self.iallocator = getattr(self.op, "iallocator", None)
self.target_node = getattr(self.op, "target_node", None)
def ExpandNames(self):
self._ExpandAndLockInstance()
_ExpandNamesForMigration(self)
self._migrater = \
TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
self.op.cleanup, True, False,
self.op.ignore_consistency, True,
self.op.shutdown_timeout, self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
_DeclareLocksForMigration(self, level)
def BuildHooksEnv(self):
"""Build hooks env.
This runs on master, primary and secondary nodes of the instance.
"""
instance = self._migrater.instance
source_node_uuid = instance.primary_node
env = {
"IGNORE_CONSISTENCY": self.op.ignore_consistency,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
"OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
"NEW_PRIMARY": self.op.target_node,
"FAILOVER_CLEANUP": self.op.cleanup,
}
if instance.disk_template in constants.DTS_INT_MIRROR:
env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
else:
env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
env.update(BuildInstanceHookEnvByObject(self, instance))
return env
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
instance = self._migrater.instance
nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
return (nl, nl + [instance.primary_node])
class LUInstanceMigrate(LogicalUnit):
"""Migrate an instance.
This is migration without shutting down, compared to the failover,
which is done with shutdown.
"""
HPATH = "instance-migrate"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
def ExpandNames(self):
self._ExpandAndLockInstance()
_ExpandNamesForMigration(self)
self._migrater = \
TLMigrateInstance(self, self.op.instance_uuid, self.op.instance_name,
self.op.cleanup, False, self.op.allow_failover, False,
self.op.allow_runtime_changes,
constants.DEFAULT_SHUTDOWN_TIMEOUT,
self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
_DeclareLocksForMigration(self, level)
def BuildHooksEnv(self):
"""Build hooks env.
This runs on master, primary and secondary nodes of the instance.
"""
instance = self._migrater.instance
source_node_uuid = instance.primary_node
env = BuildInstanceHookEnvByObject(self, instance)
env.update({
"MIGRATE_LIVE": self._migrater.live,
"MIGRATE_CLEANUP": self.op.cleanup,
"OLD_PRIMARY": self.cfg.GetNodeName(source_node_uuid),
"NEW_PRIMARY": self.op.target_node,
"ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
})
if instance.disk_template in constants.DTS_INT_MIRROR:
env["OLD_SECONDARY"] = self.cfg.GetNodeName(instance.secondary_nodes[0])
env["NEW_SECONDARY"] = self.cfg.GetNodeName(source_node_uuid)
else:
env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
return env
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
instance = self._migrater.instance
snode_uuids = list(instance.secondary_nodes)
nl = [self.cfg.GetMasterNode(), instance.primary_node] + snode_uuids
return (nl, nl)
class TLMigrateInstance(Tasklet):
"""Tasklet class for instance migration.
@type live: boolean
@ivar live: whether the migration will be done live or non-live;
this variable is initalized only after CheckPrereq has run
@type cleanup: boolean
@ivar cleanup: Wheater we cleanup from a failed migration
@type iallocator: string
@ivar iallocator: The iallocator used to determine target_node
@type target_node_uuid: string
@ivar target_node_uuid: If given, the target node UUID to reallocate the
instance to
@type failover: boolean
@ivar failover: Whether operation results in failover or migration
@type fallback: boolean
@ivar fallback: Whether fallback to failover is allowed if migration not
possible
@type ignore_consistency: boolean
@ivar ignore_consistency: Wheter we should ignore consistency between source
and target node
@type shutdown_timeout: int
@ivar shutdown_timeout: In case of failover timeout of the shutdown
@type ignore_ipolicy: bool
@ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
"""
# Constants
_MIGRATION_POLL_INTERVAL = 1 # seconds
_MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
fallback, ignore_consistency, allow_runtime_changes,
shutdown_timeout, ignore_ipolicy):
"""Initializes this class.
"""
Tasklet.__init__(self, lu)
# Parameters
self.instance_uuid = instance_uuid
self.instance_name = instance_name
self.cleanup = cleanup
self.live = False # will be overridden later
self.failover = failover
self.fallback = fallback
self.ignore_consistency = ignore_consistency
self.shutdown_timeout = shutdown_timeout
self.ignore_ipolicy = ignore_ipolicy
self.allow_runtime_changes = allow_runtime_changes
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance is in the cluster.
"""
(self.instance_uuid, self.instance_name) = \
ExpandInstanceUuidAndName(self.lu.cfg, self.instance_uuid,
self.instance_name)
self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
assert self.instance is not None
cluster = self.cfg.GetClusterInfo()
if (not self.cleanup and
not self.instance.admin_state == constants.ADMINST_UP and
not self.failover and self.fallback):
self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
" switching to failover")
self.failover = True
if self.instance.disk_template not in constants.DTS_MIRRORED:
if self.failover:
text = "failovers"
else:
text = "migrations"
raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
" %s" % (self.instance.disk_template, text),
errors.ECODE_STATE)
if self.instance.disk_template in constants.DTS_EXT_MIRROR:
CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
if self.lu.op.iallocator:
assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
self._RunAllocator()
else:
# We set set self.target_node_uuid as it is required by
# BuildHooksEnv
self.target_node_uuid = self.lu.op.target_node_uuid
# Check that the target node is correct in terms of instance policy
nodeinfo = self.cfg.GetNodeInfo(self.target_node_uuid)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
group_info)
CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
self.cfg, ignore=self.ignore_ipolicy)
# self.target_node is already populated, either directly or by the
# iallocator run
target_node_uuid = self.target_node_uuid
if self.target_node_uuid == self.instance.primary_node:
raise errors.OpPrereqError(
"Cannot migrate instance %s to its primary (%s)" %
(self.instance.name,
self.cfg.GetNodeName(self.instance.primary_node)),
errors.ECODE_STATE)
if len(self.lu.tasklets) == 1:
# It is safe to release locks only when we're the only tasklet
# in the LU
ReleaseLocks(self.lu, locking.LEVEL_NODE,
keep=[self.instance.primary_node, self.target_node_uuid])
ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
else:
assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
secondary_node_uuids = self.instance.secondary_nodes
if not secondary_node_uuids:
raise errors.ConfigurationError("No secondary node but using"
" %s disk template" %
self.instance.disk_template)
target_node_uuid = secondary_node_uuids[0]
if self.lu.op.iallocator or \
(self.lu.op.target_node_uuid and
self.lu.op.target_node_uuid != target_node_uuid):
if self.failover:
text = "failed over"
else:
text = "migrated"
raise errors.OpPrereqError("Instances with disk template %s cannot"
" be %s to arbitrary nodes"
" (neither an iallocator nor a target"
" node can be passed)" %
(self.instance.disk_template, text),
errors.ECODE_INVAL)
nodeinfo = self.cfg.GetNodeInfo(target_node_uuid)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
group_info)
CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance, nodeinfo,
self.cfg, ignore=self.ignore_ipolicy)
i_be = cluster.FillBE(self.instance)
# check memory requirements on the secondary node
if (not self.cleanup and
(not self.failover or
self.instance.admin_state == constants.ADMINST_UP)):
self.tgt_free_mem = CheckNodeFreeMemory(
self.lu, target_node_uuid,
"migrating instance %s" % self.instance.name,
i_be[constants.BE_MINMEM], self.instance.hypervisor,
self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])
else:
self.lu.LogInfo("Not checking memory on the secondary node as"
" instance will not be started")
# check if failover must be forced instead of migration
if (not self.cleanup and not self.failover and
i_be[constants.BE_ALWAYS_FAILOVER]):
self.lu.LogInfo("Instance configured to always failover; fallback"
" to failover")
self.failover = True
# check bridge existance
CheckInstanceBridgesExist(self.lu, self.instance,
node_uuid=target_node_uuid)
if not self.cleanup:
CheckNodeNotDrained(self.lu, target_node_uuid)
if not self.failover:
result = self.rpc.call_instance_migratable(self.instance.primary_node,
self.instance)
if result.fail_msg and self.fallback:
self.lu.LogInfo("Can't migrate, instance offline, fallback to"
" failover")
self.failover = True
else:
result.Raise("Can't migrate, please use failover",
prereq=True, ecode=errors.ECODE_STATE)
assert not (self.failover and self.cleanup)
if not self.failover:
if self.lu.op.live is not None and self.lu.op.mode is not None:
raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
" parameters are accepted",
errors.ECODE_INVAL)
if self.lu.op.live is not None:
if self.lu.op.live:
self.lu.op.mode = constants.HT_MIGRATION_LIVE
else:
self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
# reset the 'live' parameter to None so that repeated
# invocations of CheckPrereq do not raise an exception
self.lu.op.live = None
elif self.lu.op.mode is None:
# read the default value from the hypervisor
i_hv = cluster.FillHV(self.instance, skip_globals=False)
self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
else:
# Failover is never live
self.live = False
if not (self.failover or self.cleanup):
remote_info = self.rpc.call_instance_info(
self.instance.primary_node, self.instance.name,
self.instance.hypervisor, cluster.hvparams[self.instance.hypervisor])
remote_info.Raise("Error checking instance on node %s" %
self.cfg.GetNodeName(self.instance.primary_node))
instance_running = bool(remote_info.payload)
if instance_running:
self.current_mem = int(remote_info.payload["memory"])
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
# FIXME: add a self.ignore_ipolicy option
req = iallocator.IAReqRelocate(
inst_uuid=self.instance_uuid,
relocate_from_node_uuids=[self.instance.primary_node])
ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.lu.op.iallocator)
if not ial.success:
raise errors.OpPrereqError("Can't compute nodes using"
" iallocator '%s': %s" %
(self.lu.op.iallocator, ial.info),
errors.ECODE_NORES)
self.target_node_uuid = self.cfg.GetNodeInfoByName(ial.result[0]).uuid
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
self.instance_name, self.lu.op.iallocator,
utils.CommaJoin(ial.result))
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
This uses our own step-based rpc call.
"""
self.feedback_fn("* wait until resync is done")
all_done = False
while not all_done:
all_done = True
result = self.rpc.call_drbd_wait_sync(self.all_node_uuids,
self.nodes_ip,
(self.instance.disks,
self.instance))
min_percent = 100
for node_uuid, nres in result.items():
nres.Raise("Cannot resync disks on node %s" %
self.cfg.GetNodeName(node_uuid))
node_done, node_percent = nres.payload
all_done = all_done and node_done
if node_percent is not None:
min_percent = min(min_percent, node_percent)
if not all_done:
if min_percent < 100:
self.feedback_fn(" - progress: %.1f%%" % min_percent)
time.sleep(2)
def _EnsureSecondary(self, node_uuid):
"""Demote a node to secondary.
"""
self.feedback_fn("* switching node %s to secondary mode" %
self.cfg.GetNodeName(node_uuid))
for dev in self.instance.disks:
self.cfg.SetDiskID(dev, node_uuid)
result = self.rpc.call_blockdev_close(node_uuid, self.instance.name,
self.instance.disks)
result.Raise("Cannot change disk to secondary on node %s" %
self.cfg.GetNodeName(node_uuid))
def _GoStandalone(self):
"""Disconnect from the network.
"""
self.feedback_fn("* changing into standalone mode")
result = self.rpc.call_drbd_disconnect_net(self.all_node_uuids,
self.nodes_ip,
self.instance.disks)
for node_uuid, nres in result.items():
nres.Raise("Cannot disconnect disks node %s" %
self.cfg.GetNodeName(node_uuid))
def _GoReconnect(self, multimaster):
"""Reconnect to the network.
"""
if multimaster:
msg = "dual-master"
else:
msg = "single-master"
self.feedback_fn("* changing disks into %s mode" % msg)
result = self.rpc.call_drbd_attach_net(self.all_node_uuids, self.nodes_ip,
(self.instance.disks, self.instance),
self.instance.name, multimaster)
for node_uuid, nres in result.items():
nres.Raise("Cannot change disks config on node %s" %
self.cfg.GetNodeName(node_uuid))
def _ExecCleanup(self):
"""Try to cleanup after a failed migration.
The cleanup is done by:
- check that the instance is running only on one node
(and update the config if needed)
- change disks on its secondary node to secondary
- wait until disks are fully synchronized
- disconnect from the network
- change disks into single-master mode
- wait again until disks are fully synchronized
"""
# check running on only one node
self.feedback_fn("* checking where the instance actually runs"
" (if this hangs, the hypervisor might be in"
" a bad state)")
cluster_hvparams = self.cfg.GetClusterInfo().hvparams
ins_l = self.rpc.call_instance_list(self.all_node_uuids,
[self.instance.hypervisor],
cluster_hvparams)
for node_uuid, result in ins_l.items():
result.Raise("Can't contact node %s" % node_uuid)
runningon_source = self.instance.name in \
ins_l[self.source_node_uuid].payload
runningon_target = self.instance.name in \
ins_l[self.target_node_uuid].payload
if runningon_source and runningon_target:
raise errors.OpExecError("Instance seems to be running on two nodes,"
" or the hypervisor is confused; you will have"
" to ensure manually that it runs only on one"
" and restart this operation")
if not (runningon_source or runningon_target):
raise errors.OpExecError("Instance does not seem to be running at all;"
" in this case it's safer to repair by"
" running 'gnt-instance stop' to ensure disk"
" shutdown, and then restarting it")
if runningon_target:
# the migration has actually succeeded, we need to update the config
self.feedback_fn("* instance running on secondary node (%s),"
" updating config" %
self.cfg.GetNodeName(self.target_node_uuid))
self.instance.primary_node = self.target_node_uuid
self.cfg.Update(self.instance, self.feedback_fn)
demoted_node_uuid = self.source_node_uuid
else:
self.feedback_fn("* instance confirmed to be running on its"
" primary node (%s)" %
self.cfg.GetNodeName(self.source_node_uuid))
demoted_node_uuid = self.target_node_uuid
if self.instance.disk_template in constants.DTS_INT_MIRROR:
self._EnsureSecondary(demoted_node_uuid)
try:
self._WaitUntilSync()
except errors.OpExecError:
# we ignore here errors, since if the device is standalone, it
# won't be able to sync
pass
self._GoStandalone()
self._GoReconnect(False)
self._WaitUntilSync()
self.feedback_fn("* done")
def _RevertDiskStatus(self):
"""Try to revert the disk status after a failed migration.
"""
if self.instance.disk_template in constants.DTS_EXT_MIRROR:
return
try:
self._EnsureSecondary(self.target_node_uuid)
self._GoStandalone()
self._GoReconnect(False)
self._WaitUntilSync()
except errors.OpExecError, err:
self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
" please try to recover the instance manually;"
" error '%s'" % str(err))
def _AbortMigration(self):
"""Call the hypervisor code to abort a started migration.
"""
abort_result = self.rpc.call_instance_finalize_migration_dst(
self.target_node_uuid, self.instance, self.migration_info,
False)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on target node %s: %s",
self.cfg.GetNodeName(self.target_node_uuid), abort_msg)
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
abort_result = self.rpc.call_instance_finalize_migration_src(
self.source_node_uuid, self.instance, False, self.live)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on source node %s: %s",
self.cfg.GetNodeName(self.source_node_uuid), abort_msg)
def _ExecMigration(self):
"""Migrate an instance.
The migrate is done by:
- change the disks into dual-master mode
- wait until disks are fully synchronized again
- migrate the instance
- change disks on the new secondary node (the old primary) to secondary
- wait until disks are fully synchronized
- change disks into single-master mode
"""
# Check for hypervisor version mismatch and warn the user.
hvspecs = [(self.instance.hypervisor,
self.cfg.GetClusterInfo().hvparams[self.instance.hypervisor])]
nodeinfo = self.rpc.call_node_info(
[self.source_node_uuid, self.target_node_uuid], None, hvspecs)
for ninfo in nodeinfo.values():
ninfo.Raise("Unable to retrieve node information from node '%s'" %
ninfo.node)
(_, _, (src_info, )) = nodeinfo[self.source_node_uuid].payload
(_, _, (dst_info, )) = nodeinfo[self.target_node_uuid].payload
if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
(constants.HV_NODEINFO_KEY_VERSION in dst_info)):
src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
if src_version != dst_version:
self.feedback_fn("* warning: hypervisor version mismatch between"
" source (%s) and target (%s) node" %
(src_version, dst_version))
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(self.instance.disks):
if not CheckDiskConsistency(self.lu, self.instance, dev,
self.target_node_uuid,
False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
" aborting migration" % idx)
if self.current_mem > self.tgt_free_mem:
if not self.allow_runtime_changes:
raise errors.OpExecError("Memory ballooning not allowed and not enough"
" free memory to fit instance %s on target"
" node %s (have %dMB, need %dMB)" %
(self.instance.name,
self.cfg.GetNodeName(self.target_node_uuid),
self.tgt_free_mem, self.current_mem))
self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
rpcres = self.rpc.call_instance_balloon_memory(self.instance.primary_node,
self.instance,
self.tgt_free_mem)
rpcres.Raise("Cannot modify instance runtime memory")
# First get the migration information from the remote node
result = self.rpc.call_migration_info(self.source_node_uuid, self.instance)
msg = result.fail_msg
if msg:
log_err = ("Failed fetching source migration information from %s: %s" %
(self.cfg.GetNodeName(self.source_node_uuid), msg))
logging.error(log_err)
raise errors.OpExecError(log_err)
self.migration_info = migration_info = result.payload
if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
# Then switch the disks to master/master mode
self._EnsureSecondary(self.target_node_uuid)
self._GoStandalone()
self._GoReconnect(True)
self._WaitUntilSync()
self.feedback_fn("* preparing %s to accept the instance" %
self.cfg.GetNodeName(self.target_node_uuid))
# This fills physical_id slot that may be missing on newly created disks
for disk in self.instance.disks:
self.cfg.SetDiskID(disk, self.target_node_uuid)
result = self.rpc.call_accept_instance(self.target_node_uuid,
self.instance,
migration_info,
self.nodes_ip[self.target_node_uuid])
msg = result.fail_msg
if msg:
logging.error("Instance pre-migration failed, trying to revert"
" disk status: %s", msg)
self.feedback_fn("Pre-migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
(self.instance.name, msg))
self.feedback_fn("* migrating instance to %s" %
self.cfg.GetNodeName(self.target_node_uuid))
cluster = self.cfg.GetClusterInfo()
result = self.rpc.call_instance_migrate(
self.source_node_uuid, cluster.cluster_name, self.instance,
self.nodes_ip[self.target_node_uuid], self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration failed, trying to revert"
" disk status: %s", msg)
self.feedback_fn("Migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
raise errors.OpExecError("Could not migrate instance %s: %s" %
(self.instance.name, msg))
self.feedback_fn("* starting memory transfer")
last_feedback = time.time()
while True:
result = self.rpc.call_instance_get_migration_status(
self.source_node_uuid, self.instance)
msg = result.fail_msg
ms = result.payload # MigrationStatus instance
if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
logging.error("Instance migration failed, trying to revert"
" disk status: %s", msg)
self.feedback_fn("Migration failed, aborting")
self._AbortMigration()
self._RevertDiskStatus()
if not msg:
msg = "hypervisor returned failure"
raise errors.OpExecError("Could not migrate instance %s: %s" %
(self.instance.name, msg))
if result.payload.status != constants.HV_MIGRATION_ACTIVE:
self.feedback_fn("* memory transfer complete")
break
if (utils.TimeoutExpired(last_feedback,
self._MIGRATION_FEEDBACK_INTERVAL) and
ms.transferred_ram is not None):
mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
last_feedback = time.time()
time.sleep(self._MIGRATION_POLL_INTERVAL)
result = self.rpc.call_instance_finalize_migration_src(
self.source_node_uuid, self.instance, True, self.live)
msg = result.fail_msg
if msg:
logging.error("Instance migration succeeded, but finalization failed"
" on the source node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
self.instance.primary_node = self.target_node_uuid
# distribute new instance config to the other nodes
self.cfg.Update(self.instance, self.feedback_fn)
result = self.rpc.call_instance_finalize_migration_dst(
self.target_node_uuid, self.instance, migration_info, True)
msg = result.fail_msg
if msg:
logging.error("Instance migration succeeded, but finalization failed"
" on the target node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
self._EnsureSecondary(self.source_node_uuid)
self._WaitUntilSync()
self._GoStandalone()
self._GoReconnect(False)
self._WaitUntilSync()
# If the instance's disk template is `rbd' or `ext' and there was a
# successful migration, unmap the device from the source node.
if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
disks = ExpandCheckDisks(self.instance, self.instance.disks)
self.feedback_fn("* unmapping instance's disks from %s" %
self.cfg.GetNodeName(self.source_node_uuid))
for disk in disks:
result = self.rpc.call_blockdev_shutdown(self.source_node_uuid,
(disk, self.instance))
msg = result.fail_msg
if msg:
logging.error("Migration was successful, but couldn't unmap the"
" block device %s on source node %s: %s",
disk.iv_name,
self.cfg.GetNodeName(self.source_node_uuid), msg)
logging.error("You need to unmap the device %s manually on %s",
disk.iv_name,
self.cfg.GetNodeName(self.source_node_uuid))
self.feedback_fn("* done")
def _ExecFailover(self):
"""Failover an instance.
The failover is done by shutting it down on its present node and
starting it on the secondary.
"""
primary_node = self.cfg.GetNodeInfo(self.instance.primary_node)
source_node_uuid = self.instance.primary_node
if self.instance.disks_active:
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(self.instance.disks):
# for drbd, these are drbd over lvm
if not CheckDiskConsistency(self.lu, self.instance, dev,
self.target_node_uuid, False):
if primary_node.offline:
self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
" target node %s" %
(primary_node.name, idx,
self.cfg.GetNodeName(self.target_node_uuid)))
elif not self.ignore_consistency:
raise errors.OpExecError("Disk %s is degraded on target node,"
" aborting failover" % idx)
else:
self.feedback_fn("* not checking disk consistency as instance is not"
" running")
self.feedback_fn("* shutting down instance on source node")
logging.info("Shutting down instance %s on node %s",
self.instance.name, self.cfg.GetNodeName(source_node_uuid))
result = self.rpc.call_instance_shutdown(source_node_uuid, self.instance,
self.shutdown_timeout,
self.lu.op.reason)
msg = result.fail_msg
if msg:
if self.ignore_consistency or primary_node.offline:
self.lu.LogWarning("Could not shutdown instance %s on node %s,"
" proceeding anyway; please make sure node"
" %s is down; error details: %s",
self.instance.name,
self.cfg.GetNodeName(source_node_uuid),
self.cfg.GetNodeName(source_node_uuid), msg)
else:
raise errors.OpExecError("Could not shutdown instance %s on"
" node %s: %s" %
(self.instance.name,
self.cfg.GetNodeName(source_node_uuid), msg))
self.feedback_fn("* deactivating the instance's disks on source node")
if not ShutdownInstanceDisks(self.lu, self.instance, ignore_primary=True):
raise errors.OpExecError("Can't shut down the instance's disks")
self.instance.primary_node = self.target_node_uuid
# distribute new instance config to the other nodes
self.cfg.Update(self.instance, self.feedback_fn)
# Only start the instance if it's marked as up
if self.instance.admin_state == constants.ADMINST_UP:
self.feedback_fn("* activating the instance's disks on target node %s" %
self.cfg.GetNodeName(self.target_node_uuid))
logging.info("Starting instance %s on node %s", self.instance.name,
self.cfg.GetNodeName(self.target_node_uuid))
disks_ok, _ = AssembleInstanceDisks(self.lu, self.instance,
ignore_secondaries=True)
if not disks_ok:
ShutdownInstanceDisks(self.lu, self.instance)
raise errors.OpExecError("Can't activate the instance's disks")
self.feedback_fn("* starting the instance on the target node %s" %
self.cfg.GetNodeName(self.target_node_uuid))
result = self.rpc.call_instance_start(self.target_node_uuid,
(self.instance, None, None), False,
self.lu.op.reason)
msg = result.fail_msg
if msg:
ShutdownInstanceDisks(self.lu, self.instance)
raise errors.OpExecError("Could not start instance %s on node %s: %s" %
(self.instance.name,
self.cfg.GetNodeName(self.target_node_uuid),
msg))
def Exec(self, feedback_fn):
"""Perform the migration.
"""
self.feedback_fn = feedback_fn
self.source_node_uuid = self.instance.primary_node
# FIXME: if we implement migrate-to-any in DRBD, this needs fixing
if self.instance.disk_template in constants.DTS_INT_MIRROR:
self.target_node_uuid = self.instance.secondary_nodes[0]
# Otherwise self.target_node has been populated either
# directly, or through an iallocator.
self.all_node_uuids = [self.source_node_uuid, self.target_node_uuid]
self.nodes_ip = dict((uuid, node.secondary_ip) for (uuid, node)
in self.cfg.GetMultiNodeInfo(self.all_node_uuids))
if self.failover:
feedback_fn("Failover instance %s" % self.instance.name)
self._ExecFailover()
else:
feedback_fn("Migrating instance %s" % self.instance.name)
if self.cleanup:
return self._ExecCleanup()
else:
return self._ExecMigration()