blob: 84ceecf8275aecdd78d55d1b4227d229fb293fb0 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Logical units dealing with backup operations."""
import OpenSSL
import logging
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti import masterd
from ganeti import utils
from ganeti.cmdlib.base import NoHooksLU, LogicalUnit
from ganeti.cmdlib.common import CheckNodeOnline, ExpandNodeUuidAndName
from ganeti.cmdlib.instance_helpervm import RunWithHelperVM
from ganeti.cmdlib.instance_storage import StartInstanceDisks, \
ShutdownInstanceDisks
from ganeti.cmdlib.instance_utils import GetClusterDomainSecret, \
BuildInstanceHookEnvByObject, CheckNodeNotDrained, RemoveInstance, \
CheckCompressionTool
class LUBackupPrepare(NoHooksLU):
"""Prepares an instance for an export and returns useful information.
"""
REQ_BGL = False
def ExpandNames(self):
self._ExpandAndLockInstance()
def CheckPrereq(self):
"""Check prerequisites.
"""
self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
CheckNodeOnline(self, self.instance.primary_node)
self._cds = GetClusterDomainSecret()
def Exec(self, feedback_fn):
"""Prepares an instance for an export.
"""
if self.op.mode == constants.EXPORT_MODE_REMOTE:
salt = utils.GenerateSecret(8)
feedback_fn("Generating X509 certificate on %s" %
self.cfg.GetNodeName(self.instance.primary_node))
result = self.rpc.call_x509_cert_create(self.instance.primary_node,
constants.RIE_CERT_VALIDITY)
result.Raise("Can't create X509 key and certificate on %s" %
self.cfg.GetNodeName(result.node))
(name, cert_pem) = result.payload
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert_pem)
return {
"handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
"x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
salt),
"x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
}
return None
class LUBackupExport(LogicalUnit):
"""Export an instance to an image in the cluster.
"""
HPATH = "instance-export"
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
def CheckArguments(self):
"""Check the arguments.
"""
self.x509_key_name = self.op.x509_key_name
self.dest_x509_ca_pem = self.op.destination_x509_ca
if self.op.mode == constants.EXPORT_MODE_REMOTE:
if not self.x509_key_name:
raise errors.OpPrereqError("Missing X509 key name for encryption",
errors.ECODE_INVAL)
if not self.dest_x509_ca_pem:
raise errors.OpPrereqError("Missing destination X509 CA",
errors.ECODE_INVAL)
if self.op.zero_free_space and not self.op.compress:
raise errors.OpPrereqError("Zeroing free space does not make sense "
"unless compression is used")
if self.op.zero_free_space and not self.op.shutdown:
raise errors.OpPrereqError("Unless the instance is shut down, zeroing "
"cannot be used.")
def ExpandNames(self):
self._ExpandAndLockInstance()
# In case we are zeroing, a node lock is required as we will be creating and
# destroying a disk - allocations should be stopped, but not on the entire
# cluster
if self.op.zero_free_space:
self.recalculate_locks = {locking.LEVEL_NODE: constants.LOCKS_REPLACE}
self._LockInstancesNodes(primary_only=True)
# Lock all nodes for local exports
if self.op.mode == constants.EXPORT_MODE_LOCAL:
(self.op.target_node_uuid, self.op.target_node) = \
ExpandNodeUuidAndName(self.cfg, self.op.target_node_uuid,
self.op.target_node)
# FIXME: lock only instance primary and destination node
#
# Sad but true, for now we have do lock all nodes, as we don't know where
# the previous export might be, and in this LU we search for it and
# remove it from its current node. In the future we could fix this by:
# - making a tasklet to search (share-lock all), then create the
# new one, then one to remove, after
# - removing the removal operation altogether
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
"""Last minute lock declaration."""
# All nodes are locked anyway, so nothing to do here.
def BuildHooksEnv(self):
"""Build hooks env.
This will run on the master, primary node and target node.
"""
env = {
"EXPORT_MODE": self.op.mode,
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
"SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
# TODO: Generic function for boolean env variables
"REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
}
env.update(BuildInstanceHookEnvByObject(
self, self.instance,
secondary_nodes=self.secondary_nodes, disks=self.inst_disks))
return env
def BuildHooksNodes(self):
"""Build hooks nodes.
"""
nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
if self.op.mode == constants.EXPORT_MODE_LOCAL:
nl.append(self.op.target_node_uuid)
return (nl, nl)
def CheckPrereq(self):
"""Check prerequisites.
This checks that the instance and node names are valid.
"""
self.instance = self.cfg.GetInstanceInfoByName(self.op.instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
CheckNodeOnline(self, self.instance.primary_node)
if (self.op.remove_instance and
self.instance.admin_state == constants.ADMINST_UP and
not self.op.shutdown):
raise errors.OpPrereqError("Can not remove instance without shutting it"
" down before", errors.ECODE_STATE)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self.dst_node = self.cfg.GetNodeInfo(self.op.target_node_uuid)
assert self.dst_node is not None
CheckNodeOnline(self, self.dst_node.uuid)
CheckNodeNotDrained(self, self.dst_node.uuid)
self._cds = None
self.dest_disk_info = None
self.dest_x509_ca = None
elif self.op.mode == constants.EXPORT_MODE_REMOTE:
self.dst_node = None
if len(self.op.target_node) != len(self.instance.disks):
raise errors.OpPrereqError(("Received destination information for %s"
" disks, but instance %s has %s disks") %
(len(self.op.target_node),
self.op.instance_name,
len(self.instance.disks)),
errors.ECODE_INVAL)
cds = GetClusterDomainSecret()
# Check X509 key name
try:
(key_name, hmac_digest, hmac_salt) = self.x509_key_name
except (TypeError, ValueError), err:
raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
errors.ECODE_INVAL)
if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
raise errors.OpPrereqError("HMAC for X509 key name is wrong",
errors.ECODE_INVAL)
# Load and verify CA
try:
(cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
except OpenSSL.crypto.Error, err:
raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
(err, ), errors.ECODE_INVAL)
(errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
if errcode is not None:
raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
(msg, ), errors.ECODE_INVAL)
self.dest_x509_ca = cert
# Verify target information
disk_info = []
for idx, disk_data in enumerate(self.op.target_node):
try:
(host, port, magic) = \
masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
except errors.GenericError, err:
raise errors.OpPrereqError("Target info for disk %s: %s" %
(idx, err), errors.ECODE_INVAL)
disk_info.append((host, port, magic))
assert len(disk_info) == len(self.op.target_node)
self.dest_disk_info = disk_info
else:
raise errors.ProgrammerError("Unhandled export mode %r" %
self.op.mode)
# Check prerequisites for zeroing
if self.op.zero_free_space:
# Check that user shutdown detection has been enabled
hvparams = self.cfg.GetClusterInfo().FillHV(self.instance)
if self.instance.hypervisor == constants.HT_KVM and \
not hvparams.get(constants.HV_KVM_USER_SHUTDOWN, False):
raise errors.OpPrereqError("Instance shutdown detection must be "
"enabled for zeroing to work",
errors.ECODE_INVAL)
# Check that the instance is set to boot from the disk
if constants.HV_BOOT_ORDER in hvparams and \
hvparams[constants.HV_BOOT_ORDER] != constants.HT_BO_DISK:
raise errors.OpPrereqError("Booting from disk must be set for zeroing "
"to work", errors.ECODE_INVAL)
# Check that the zeroing image is set
if not self.cfg.GetZeroingImage():
raise errors.OpPrereqError("A zeroing image must be set for zeroing to"
" work", errors.ECODE_INVAL)
if self.op.zeroing_timeout_fixed is None:
self.op.zeroing_timeout_fixed = constants.HELPER_VM_STARTUP
if self.op.zeroing_timeout_per_mib is None:
self.op.zeroing_timeout_per_mib = constants.ZEROING_TIMEOUT_PER_MIB
else:
if (self.op.zeroing_timeout_fixed is not None or
self.op.zeroing_timeout_per_mib is not None):
raise errors.OpPrereqError("Zeroing timeout options can only be used"
" only with the --zero-free-space option",
errors.ECODE_INVAL)
if self.op.long_sleep and not self.op.shutdown:
raise errors.OpPrereqError("The long sleep option only makes sense when"
" the instance can be shut down.",
errors.ECODE_INVAL)
self.secondary_nodes = \
self.cfg.GetInstanceSecondaryNodes(self.instance.uuid)
self.inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
# Check if the compression tool is whitelisted
CheckCompressionTool(self, self.op.compress)
def _CleanupExports(self, feedback_fn):
"""Removes exports of current instance from all other nodes.
If an instance in a cluster with nodes A..D was exported to node C, its
exports will be removed from the nodes A, B and D.
"""
assert self.op.mode != constants.EXPORT_MODE_REMOTE
node_uuids = self.cfg.GetNodeList()
node_uuids.remove(self.dst_node.uuid)
# on one-node clusters nodelist will be empty after the removal
# if we proceed the backup would be removed because OpBackupQuery
# substitutes an empty list with the full cluster node list.
iname = self.instance.name
if node_uuids:
feedback_fn("Removing old exports for instance %s" % iname)
exportlist = self.rpc.call_export_list(node_uuids)
for node_uuid in exportlist:
if exportlist[node_uuid].fail_msg:
continue
if iname in exportlist[node_uuid].payload:
msg = self.rpc.call_export_remove(node_uuid, iname).fail_msg
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname,
self.cfg.GetNodeName(node_uuid), msg)
def _InstanceDiskSizeSum(self):
"""Calculates the size of all the disks of the instance used in this LU.
@rtype: int
@return: Size of the disks in MiB
"""
inst_disks = self.cfg.GetInstanceDisks(self.instance.uuid)
return sum([d.size for d in inst_disks])
def ZeroFreeSpace(self, feedback_fn):
"""Zeroes the free space on a shutdown instance.
@type feedback_fn: function
@param feedback_fn: Function used to log progress
"""
assert self.op.zeroing_timeout_fixed is not None
assert self.op.zeroing_timeout_per_mib is not None
zeroing_image = self.cfg.GetZeroingImage()
# Calculate the sum prior to adding the temporary disk
instance_disks_size_sum = self._InstanceDiskSizeSum()
timeout = self.op.zeroing_timeout_fixed + \
self.op.zeroing_timeout_per_mib * instance_disks_size_sum
RunWithHelperVM(self, self.instance, zeroing_image,
self.op.shutdown_timeout, timeout,
log_prefix="Zeroing free disk space",
feedback_fn=feedback_fn)
def StartInstance(self, feedback_fn, src_node_uuid):
"""Send the node instructions to start the instance.
@raise errors.OpExecError: If the instance didn't start up.
"""
assert self.instance.disks_active
feedback_fn("Starting instance %s" % self.instance.name)
result = self.rpc.call_instance_start(src_node_uuid,
(self.instance, None, None),
False, self.op.reason)
msg = result.fail_msg
if msg:
feedback_fn("Failed to start instance: %s" % msg)
ShutdownInstanceDisks(self, self.instance)
raise errors.OpExecError("Could not start instance: %s" % msg)
def TrySnapshot(self):
"""Returns true if there is a reason to prefer a snapshot."""
return (not self.op.remove_instance and
self.instance.admin_state == constants.ADMINST_UP)
def DoReboot(self):
"""Returns true iff the instance needs to be started after transfer."""
return (self.op.shutdown and
self.instance.admin_state == constants.ADMINST_UP and
not self.op.remove_instance)
def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster.
"""
assert self.op.mode in constants.EXPORT_MODES
src_node_uuid = self.instance.primary_node
if self.op.shutdown:
# shutdown the instance, but not the disks
feedback_fn("Shutting down instance %s" % self.instance.name)
result = self.rpc.call_instance_shutdown(src_node_uuid, self.instance,
self.op.shutdown_timeout,
self.op.reason)
# TODO: Maybe ignore failures if ignore_remove_failures is set
result.Raise("Could not shutdown instance %s on"
" node %s" % (self.instance.name,
self.cfg.GetNodeName(src_node_uuid)))
if self.op.zero_free_space:
self.ZeroFreeSpace(feedback_fn)
activate_disks = not self.instance.disks_active
if activate_disks:
# Activate the instance disks if we're exporting a stopped instance
feedback_fn("Activating disks for %s" % self.instance.name)
StartInstanceDisks(self, self.instance, None)
self.instance = self.cfg.GetInstanceInfo(self.instance.uuid)
try:
helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
self.instance)
snapshots_available = False
if self.TrySnapshot():
snapshots_available = helper.CreateSnapshots()
if not snapshots_available:
if not self.op.shutdown:
raise errors.OpExecError(
"Not all disks could be snapshotted, and you requested a live "
"export; aborting"
)
if not self.op.long_sleep:
raise errors.OpExecError(
"Not all disks could be snapshotted, and you did not allow the "
"instance to remain offline for a longer time through the "
"--long-sleep option; aborting"
)
try:
if self.DoReboot() and snapshots_available:
self.StartInstance(feedback_fn, src_node_uuid)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
(fin_resu, dresults) = helper.LocalExport(self.dst_node,
self.op.compress)
elif self.op.mode == constants.EXPORT_MODE_REMOTE:
connect_timeout = constants.RIE_CONNECT_TIMEOUT
timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
(key_name, _, _) = self.x509_key_name
dest_ca_pem = \
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
self.dest_x509_ca)
(fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
key_name, dest_ca_pem,
self.op.compress,
timeouts)
if self.DoReboot() and not snapshots_available:
self.StartInstance(feedback_fn, src_node_uuid)
finally:
helper.Cleanup()
# Check for backwards compatibility
assert len(dresults) == len(self.instance.disks)
assert compat.all(isinstance(i, bool) for i in dresults), \
"Not all results are boolean: %r" % dresults
finally:
if activate_disks:
feedback_fn("Deactivating disks for %s" % self.instance.name)
ShutdownInstanceDisks(self, self.instance)
if not (compat.all(dresults) and fin_resu):
failures = []
if not fin_resu:
failures.append("export finalization")
if not compat.all(dresults):
fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
if not dsk)
failures.append("disk export: disk(s) %s" % fdsk)
raise errors.OpExecError("Export failed, errors in %s" %
utils.CommaJoin(failures))
# At this point, the export was successful, we can cleanup/finish
# Remove instance if requested
if self.op.remove_instance:
feedback_fn("Removing instance %s" % self.instance.name)
RemoveInstance(self, feedback_fn, self.instance,
self.op.ignore_remove_failures)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self._CleanupExports(feedback_fn)
return fin_resu, dresults
class LUBackupRemove(NoHooksLU):
"""Remove exports related to the named instance.
"""
REQ_BGL = False
def ExpandNames(self):
self.needed_locks = {
# We need all nodes to be locked in order for RemoveExport to work, but
# we don't need to lock the instance itself, as nothing will happen to it
# (and we can remove exports also for a removed instance)
locking.LEVEL_NODE: locking.ALL_SET,
}
def Exec(self, feedback_fn):
"""Remove any export.
"""
(_, inst_name) = self.cfg.ExpandInstanceName(self.op.instance_name)
# If the instance was not found we'll try with the name that was passed in.
# This will only work if it was an FQDN, though.
fqdn_warn = False
if not inst_name:
fqdn_warn = True
inst_name = self.op.instance_name
locked_nodes = self.owned_locks(locking.LEVEL_NODE)
exportlist = self.rpc.call_export_list(locked_nodes)
found = False
for node_uuid in exportlist:
msg = exportlist[node_uuid].fail_msg
if msg:
self.LogWarning("Failed to query node %s (continuing): %s",
self.cfg.GetNodeName(node_uuid), msg)
continue
if inst_name in exportlist[node_uuid].payload:
found = True
result = self.rpc.call_export_remove(node_uuid, inst_name)
msg = result.fail_msg
if msg:
logging.error("Could not remove export for instance %s"
" on node %s: %s", inst_name,
self.cfg.GetNodeName(node_uuid), msg)
if fqdn_warn and not found:
feedback_fn("Export not found. If trying to remove an export belonging"
" to a deleted instance please use its Fully Qualified"
" Domain Name.")