blob: 8913f629fffa53d7f95f0094d1765a2d8267c5c6 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2011, 2012 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tool to move instances from one cluster to another.
"""
# pylint: disable=C0103
# C0103: Invalid name move-instance
import os
import sys
import time
import logging
import optparse
import random
import threading
from ganeti import cli
from ganeti import constants
from ganeti import utils
from ganeti import workerpool
from ganeti import objects
from ganeti import compat
from ganeti import rapi
from ganeti import errors
import ganeti.rapi.client # pylint: disable=W0611
import ganeti.rapi.client_utils
from ganeti.rapi.client import UsesRapiClient
SRC_RAPI_PORT_OPT = \
cli.cli_option("--src-rapi-port", action="store", type="int",
dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
help=("Source cluster RAPI port (defaults to %s)" %
constants.DEFAULT_RAPI_PORT))
SRC_CA_FILE_OPT = \
cli.cli_option("--src-ca-file", action="store", type="string",
dest="src_ca_file",
help=("File containing source cluster Certificate"
" Authority (CA) in PEM format"))
SRC_USERNAME_OPT = \
cli.cli_option("--src-username", action="store", type="string",
dest="src_username", default=None,
help="Source cluster username")
SRC_PASSWORD_FILE_OPT = \
cli.cli_option("--src-password-file", action="store", type="string",
dest="src_password_file",
help="File containing source cluster password")
DEST_RAPI_PORT_OPT = \
cli.cli_option("--dest-rapi-port", action="store", type="int",
dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
help=("Destination cluster RAPI port (defaults to source"
" cluster RAPI port)"))
DEST_CA_FILE_OPT = \
cli.cli_option("--dest-ca-file", action="store", type="string",
dest="dest_ca_file",
help=("File containing destination cluster Certificate"
" Authority (CA) in PEM format (defaults to source"
" cluster CA)"))
DEST_USERNAME_OPT = \
cli.cli_option("--dest-username", action="store", type="string",
dest="dest_username", default=None,
help=("Destination cluster username (defaults to"
" source cluster username)"))
DEST_PASSWORD_FILE_OPT = \
cli.cli_option("--dest-password-file", action="store", type="string",
dest="dest_password_file",
help=("File containing destination cluster password"
" (defaults to source cluster password)"))
DEST_INSTANCE_NAME_OPT = \
cli.cli_option("--dest-instance-name", action="store", type="string",
dest="dest_instance_name",
help=("Instance name on destination cluster (only"
" when moving exactly one instance)"))
DEST_PRIMARY_NODE_OPT = \
cli.cli_option("--dest-primary-node", action="store", type="string",
dest="dest_primary_node",
help=("Primary node on destination cluster (only"
" when moving exactly one instance)"))
DEST_SECONDARY_NODE_OPT = \
cli.cli_option("--dest-secondary-node", action="store", type="string",
dest="dest_secondary_node",
help=("Secondary node on destination cluster (only"
" when moving exactly one instance)"))
DEST_DISK_TEMPLATE_OPT = \
cli.cli_option("--dest-disk-template", action="store", type="string",
dest="dest_disk_template", default=None,
help="Disk template to use on destination cluster")
COMPRESS_OPT = \
cli.cli_option("--compress", action="store", type="string",
dest="compress", default="none",
help="Compression mode to use during the move (this mode has"
" to be supported by both clusters)")
PARALLEL_OPT = \
cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
dest="parallel", metavar="<number>",
help="Number of instances to be moved simultaneously")
OPPORTUNISTIC_TRIES_OPT = \
cli.cli_option("--opportunistic-tries", action="store", type="int",
dest="opportunistic_tries", metavar="<number>",
help="Number of opportunistic instance creation attempts"
" before a normal creation is performed. An opportunistic"
" attempt will use the iallocator with all the nodes"
" currently unlocked, failing if not enough nodes are"
" available. Even though it will succeed (or fail) more"
" quickly, it can result in suboptimal instance"
" placement")
OPPORTUNISTIC_DELAY_OPT = \
cli.cli_option("--opportunistic-delay", action="store", type="int",
dest="opportunistic_delay", metavar="<number>",
help="The delay between successive opportunistic instance"
" creation attempts, in seconds")
class Error(Exception):
"""Generic error.
"""
class Abort(Error):
"""Special exception for aborting import/export.
"""
class RapiClientFactory(object):
"""Factory class for creating RAPI clients.
@ivar src_cluster_name: Source cluster name
@ivar dest_cluster_name: Destination cluster name
@ivar GetSourceClient: Callable returning new client for source cluster
@ivar GetDestClient: Callable returning new client for destination cluster
"""
def __init__(self, options, src_cluster_name, dest_cluster_name):
"""Initializes this class.
@param options: Program options
@type src_cluster_name: string
@param src_cluster_name: Source cluster name
@type dest_cluster_name: string
@param dest_cluster_name: Destination cluster name
"""
self.src_cluster_name = src_cluster_name
self.dest_cluster_name = dest_cluster_name
# TODO: Implement timeouts for RAPI connections
# TODO: Support for using system default paths for verifying SSL certificate
logging.debug("Using '%s' as source CA", options.src_ca_file)
src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
if options.dest_ca_file:
logging.debug("Using '%s' as destination CA", options.dest_ca_file)
dest_curl_config = \
rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
else:
logging.debug("Using source CA for destination")
dest_curl_config = src_curl_config
logging.debug("Source RAPI server is %s:%s",
src_cluster_name, options.src_rapi_port)
logging.debug("Source username is '%s'", options.src_username)
if options.src_username is None:
src_username = ""
else:
src_username = options.src_username
if options.src_password_file:
logging.debug("Reading '%s' for source password",
options.src_password_file)
src_password = utils.ReadOneLineFile(options.src_password_file,
strict=True)
else:
logging.debug("Source has no password")
src_password = None
self.GetSourceClient = lambda: \
rapi.client.GanetiRapiClient(src_cluster_name,
port=options.src_rapi_port,
curl_config_fn=src_curl_config,
username=src_username,
password=src_password)
if options.dest_rapi_port:
dest_rapi_port = options.dest_rapi_port
else:
dest_rapi_port = options.src_rapi_port
if options.dest_username is None:
dest_username = src_username
else:
dest_username = options.dest_username
logging.debug("Destination RAPI server is %s:%s",
dest_cluster_name, dest_rapi_port)
logging.debug("Destination username is '%s'", dest_username)
if options.dest_password_file:
logging.debug("Reading '%s' for destination password",
options.dest_password_file)
dest_password = utils.ReadOneLineFile(options.dest_password_file,
strict=True)
else:
logging.debug("Using source password for destination")
dest_password = src_password
self.GetDestClient = lambda: \
rapi.client.GanetiRapiClient(dest_cluster_name,
port=dest_rapi_port,
curl_config_fn=dest_curl_config,
username=dest_username,
password=dest_password)
class MoveJobPollReportCb(cli.JobPollReportCbBase):
def __init__(self, abort_check_fn, remote_import_fn):
"""Initializes this class.
@type abort_check_fn: callable
@param abort_check_fn: Function to check whether move is aborted
@type remote_import_fn: callable or None
@param remote_import_fn: Callback for reporting received remote import
information
"""
cli.JobPollReportCbBase.__init__(self)
self._abort_check_fn = abort_check_fn
self._remote_import_fn = remote_import_fn
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
if log_type == constants.ELOG_REMOTE_IMPORT:
logging.debug("Received remote import information")
if not self._remote_import_fn:
raise RuntimeError("Received unexpected remote import information")
assert "x509_ca" in log_msg
assert "disks" in log_msg
self._remote_import_fn(log_msg)
return
logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
cli.FormatLogMessage(log_type, log_msg))
def ReportNotChanged(self, job_id, status):
"""Called if a job hasn't changed in a while.
"""
try:
# Check whether we were told to abort by the other thread
self._abort_check_fn()
except Abort:
logging.warning("Aborting despite job %s still running", job_id)
raise
class InstanceMove(object):
"""Status class for instance moves.
"""
def __init__(self, src_instance_name, dest_instance_name,
dest_pnode, dest_snode, compress, dest_iallocator,
dest_disk_template, hvparams,
beparams, osparams, nics, opportunistic_tries,
opportunistic_delay):
"""Initializes this class.
@type src_instance_name: string
@param src_instance_name: Instance name on source cluster
@type dest_instance_name: string
@param dest_instance_name: Instance name on destination cluster
@type dest_pnode: string or None
@param dest_pnode: Name of primary node on destination cluster
@type dest_snode: string or None
@param dest_snode: Name of secondary node on destination cluster
@type compress; string
@param compress: Compression mode to use (has to be supported on both
clusters)
@type dest_iallocator: string or None
@param dest_iallocator: Name of iallocator to use
@type dest_disk_template: string or None
@param dest_disk_template: Disk template to use instead of the original one
@type hvparams: dict or None
@param hvparams: Hypervisor parameters to override
@type beparams: dict or None
@param beparams: Backend parameters to override
@type osparams: dict or None
@param osparams: OS parameters to override
@type nics: dict or None
@param nics: NICs to override
@type opportunistic_tries: int or None
@param opportunistic_tries: Number of opportunistic creation attempts to
perform
@type opportunistic_delay: int or None
@param opportunistic_delay: Delay between successive creation attempts, in
seconds
"""
self.src_instance_name = src_instance_name
self.dest_instance_name = dest_instance_name
self.dest_pnode = dest_pnode
self.dest_snode = dest_snode
self.compress = compress
self.dest_iallocator = dest_iallocator
self.dest_disk_template = dest_disk_template
self.hvparams = hvparams
self.beparams = beparams
self.osparams = osparams
self.nics = nics
if opportunistic_tries is not None:
self.opportunistic_tries = opportunistic_tries
else:
self.opportunistic_tries = 0
if opportunistic_delay is not None:
self.opportunistic_delay = opportunistic_delay
else:
self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL
self.error_message = None
class MoveRuntime(object):
"""Class to keep track of instance move.
"""
def __init__(self, move):
"""Initializes this class.
@type move: L{InstanceMove}
"""
self.move = move
# Thread synchronization
self.lock = threading.Lock()
self.source_to_dest = threading.Condition(self.lock)
self.dest_to_source = threading.Condition(self.lock)
# Source information
self.src_error_message = None
self.src_expinfo = None
self.src_instinfo = None
# Destination information
self.dest_error_message = None
self.dest_impinfo = None
def HandleErrors(self, prefix, fn, *args):
"""Wrapper to catch errors and abort threads.
@type prefix: string
@param prefix: Variable name prefix ("src" or "dest")
@type fn: callable
@param fn: Function
"""
assert prefix in ("dest", "src")
try:
# Call inner function
fn(*args)
errmsg = None
except Abort:
errmsg = "Aborted"
except Exception, err:
logging.exception("Caught unhandled exception")
errmsg = str(err)
setattr(self, "%s_error_message" % prefix, errmsg)
self.lock.acquire()
try:
self.source_to_dest.notifyAll()
self.dest_to_source.notifyAll()
finally:
self.lock.release()
def CheckAbort(self):
"""Check whether thread should be aborted.
@raise Abort: When thread should be aborted
"""
if not (self.src_error_message is None and
self.dest_error_message is None):
logging.info("Aborting")
raise Abort()
def Wait(self, cond, check_fn):
"""Waits for a condition to become true.
@type cond: threading.Condition
@param cond: Threading condition
@type check_fn: callable
@param check_fn: Function to check whether condition is true
"""
cond.acquire()
try:
while check_fn(self):
self.CheckAbort()
cond.wait()
finally:
cond.release()
def PollJob(self, cl, job_id, remote_import_fn=None):
"""Wrapper for polling a job.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type job_id: string
@param job_id: Job ID
@type remote_import_fn: callable or None
@param remote_import_fn: Callback for reporting received remote import
information
@return: opreturn of the move job
@raise errors.JobLost: If job can't be found
@raise errors.OpExecError: If job didn't succeed
@see: L{ganeti.rapi.client_utils.PollJob}
"""
return rapi.client_utils.PollJob(cl, job_id,
MoveJobPollReportCb(self.CheckAbort,
remote_import_fn))
class MoveDestExecutor(object):
def __init__(self, dest_client, mrt):
"""Destination side of an instance move.
@type dest_client: L{rapi.client.GanetiRapiClient}
@param dest_client: RAPI client
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
"""
logging.debug("Waiting for instance information to become available")
mrt.Wait(mrt.source_to_dest,
lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
logging.info("Creating instance %s in remote-import mode",
mrt.move.dest_instance_name)
# Depending on whether opportunistic tries are enabled, we may have to
# make multiple creation attempts
creation_attempts = [True] * mrt.move.opportunistic_tries
# But the last one is never opportunistic, and will block until completion
# or failure
creation_attempts.append(False)
# Initiate the RNG for the variations
random.seed()
for is_attempt_opportunistic in creation_attempts:
job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
mrt.move.dest_pnode, mrt.move.dest_snode,
mrt.move.compress,
mrt.move.dest_iallocator,
mrt.move.dest_disk_template,
mrt.src_instinfo, mrt.src_expinfo,
mrt.move.hvparams, mrt.move.beparams,
mrt.move.osparams, mrt.move.nics,
is_attempt_opportunistic
)
try:
# The completion of this block signifies that the import has been
# completed successfullly
mrt.PollJob(dest_client, job_id,
remote_import_fn=compat.partial(self._SetImportInfo, mrt))
logging.info("Import successful")
return
except errors.OpPrereqError, err:
# Any exception in the non-opportunistic creation is to be passed on,
# as well as exceptions apart from resources temporarily unavailable
if not is_attempt_opportunistic or \
err.args[1] != rapi.client.ECODE_TEMP_NORES:
raise
delay_to_use = MoveDestExecutor._VaryDelay(mrt)
logging.info("Opportunistic attempt unsuccessful, waiting %.2f seconds"
" before another creation attempt is made",
delay_to_use)
time.sleep(delay_to_use)
@staticmethod
def _VaryDelay(mrt):
""" Varies the opportunistic delay by a small amount.
"""
MAX_VARIATION = 0.15
variation_factor = (1.0 + random.uniform(-MAX_VARIATION, MAX_VARIATION))
return mrt.move.opportunistic_delay * variation_factor
@staticmethod
def _SetImportInfo(mrt, impinfo):
"""Sets the remote import information and notifies source thread.
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
@param impinfo: Remote import information
"""
mrt.dest_to_source.acquire()
try:
mrt.dest_impinfo = impinfo
mrt.dest_to_source.notifyAll()
finally:
mrt.dest_to_source.release()
@staticmethod
def _GetDisks(instance):
disks = []
for idisk in instance["disks"]:
odisk = {
constants.IDISK_SIZE: idisk["size"],
constants.IDISK_MODE: idisk["mode"],
constants.IDISK_NAME: str(idisk.get("name")),
}
spindles = idisk.get("spindles")
if spindles is not None:
odisk[constants.IDISK_SPINDLES] = spindles
disks.append(odisk)
return disks
@staticmethod
def _GetNics(instance, override_nics):
try:
nics = [{
constants.INIC_IP: ip,
constants.INIC_MAC: mac,
constants.INIC_MODE: mode,
constants.INIC_LINK: link,
constants.INIC_VLAN: vlan,
constants.INIC_NETWORK: network,
constants.INIC_NAME: nic_name
} for nic_name, _, ip, mac, mode, link, vlan, network, _
in instance["nics"]]
except ValueError:
raise Error("Received NIC information does not match expected format; "
"Do the versions of this tool and the source cluster match?")
if len(override_nics) > len(nics):
raise Error("Can not create new NICs")
if override_nics:
assert len(override_nics) <= len(nics)
for idx, (nic, override) in enumerate(zip(nics, override_nics)):
nics[idx] = objects.FillDict(nic, override)
return nics
@staticmethod
def _CreateInstance(cl, name, pnode, snode, compress, iallocator,
dest_disk_template, instance, expinfo, override_hvparams,
override_beparams, override_osparams, override_nics,
is_attempt_opportunistic):
"""Starts the instance creation in remote import mode.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type name: string
@param name: Instance name
@type pnode: string or None
@param pnode: Name of primary node on destination cluster
@type snode: string or None
@param snode: Name of secondary node on destination cluster
@type compress: string
@param compress: Compression mode to use
@type iallocator: string or None
@param iallocator: Name of iallocator to use
@type dest_disk_template: string or None
@param dest_disk_template: Disk template to use instead of the original one
@type instance: dict
@param instance: Instance details from source cluster
@type expinfo: dict
@param expinfo: Prepared export information from source cluster
@type override_hvparams: dict or None
@param override_hvparams: Hypervisor parameters to override
@type override_beparams: dict or None
@param override_beparams: Backend parameters to override
@type override_osparams: dict or None
@param override_osparams: OS parameters to override
@type override_nics: dict or None
@param override_nics: NICs to override
@type is_attempt_opportunistic: bool
@param is_attempt_opportunistic: Whether to use opportunistic locking or not
@return: Job ID
"""
if dest_disk_template:
disk_template = dest_disk_template
else:
disk_template = instance["disk_template"]
disks = MoveDestExecutor._GetDisks(instance)
nics = MoveDestExecutor._GetNics(instance, override_nics)
os_type = instance.get("os", None)
# TODO: Should this be the actual up/down status? (run_state)
start = (instance["config_state"] == "up")
assert len(disks) == len(instance["disks"])
assert len(nics) == len(instance["nics"])
inst_beparams = instance.get("be_instance", {})
inst_hvparams = instance.get("hv_instance", {})
inst_osparams = instance.get("os_instance", {})
return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
name, disk_template, disks, nics,
os=os_type,
pnode=pnode,
snode=snode,
start=start,
ip_check=False,
iallocator=iallocator,
hypervisor=instance["hypervisor"],
source_handshake=expinfo["handshake"],
source_x509_ca=expinfo["x509_ca"],
compress=compress,
source_instance_name=instance["name"],
beparams=objects.FillDict(inst_beparams,
override_beparams),
hvparams=objects.FillDict(inst_hvparams,
override_hvparams),
osparams=objects.FillDict(inst_osparams,
override_osparams),
opportunistic_locking=is_attempt_opportunistic
)
class MoveSourceExecutor(object):
def __init__(self, src_client, mrt):
"""Source side of an instance move.
@type src_client: L{rapi.client.GanetiRapiClient}
@param src_client: RAPI client
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
"""
logging.info("Checking whether instance exists")
self._CheckInstance(src_client, mrt.move.src_instance_name)
logging.info("Retrieving instance information from source cluster")
instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
mrt.move.src_instance_name)
logging.info("Preparing export on source cluster")
expinfo = self._PrepareExport(src_client, mrt.PollJob,
mrt.move.src_instance_name)
assert "handshake" in expinfo
assert "x509_key_name" in expinfo
assert "x509_ca" in expinfo
# Hand information to destination thread
mrt.source_to_dest.acquire()
try:
mrt.src_instinfo = instinfo
mrt.src_expinfo = expinfo
mrt.source_to_dest.notifyAll()
finally:
mrt.source_to_dest.release()
logging.info("Waiting for destination information to become available")
mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
logging.info("Starting remote export on source cluster")
self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
expinfo["x509_key_name"], mrt.move.compress,
mrt.dest_impinfo)
logging.info("Export successful")
@staticmethod
def _CheckInstance(cl, name):
"""Checks whether the instance exists on the source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type name: string
@param name: Instance name
"""
try:
cl.GetInstance(name)
except rapi.client.GanetiApiError, err:
if err.code == rapi.client.HTTP_NOT_FOUND:
raise Error("Instance %s not found (%s)" % (name, str(err)))
raise
@staticmethod
def _GetInstanceInfo(cl, poll_job_fn, name):
"""Retrieves detailed instance information from source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
"""
job_id = cl.GetInstanceInfo(name, static=True)
result = poll_job_fn(cl, job_id)
assert len(result[0].keys()) == 1
return result[0][result[0].keys()[0]]
@staticmethod
def _PrepareExport(cl, poll_job_fn, name):
"""Prepares export on source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
"""
job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
return poll_job_fn(cl, job_id)[0]
@staticmethod
def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo):
"""Exports instance from source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
@param x509_key_name: Source X509 key
@type compress: string
@param compress: Compression mode to use
@param impinfo: Import information from destination cluster
"""
job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
impinfo["disks"], shutdown=True,
remove_instance=True,
x509_key_name=x509_key_name,
destination_x509_ca=impinfo["x509_ca"],
compress=compress)
(fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
if not (fin_resu and compat.all(dresults)):
raise Error("Export failed for disks %s" %
utils.CommaJoin(str(idx) for idx, result
in enumerate(dresults) if not result))
class MoveSourceWorker(workerpool.BaseWorker):
def RunTask(self, rapi_factory, move): # pylint: disable=W0221
"""Executes an instance move.
@type rapi_factory: L{RapiClientFactory}
@param rapi_factory: RAPI client factory
@type move: L{InstanceMove}
@param move: Instance move information
"""
try:
logging.info("Preparing to move %s from cluster %s to %s as %s",
move.src_instance_name, rapi_factory.src_cluster_name,
rapi_factory.dest_cluster_name, move.dest_instance_name)
mrt = MoveRuntime(move)
logging.debug("Starting destination thread")
dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
target=mrt.HandleErrors,
args=("dest", MoveDestExecutor,
rapi_factory.GetDestClient(),
mrt, ))
dest_thread.start()
try:
mrt.HandleErrors("src", MoveSourceExecutor,
rapi_factory.GetSourceClient(), mrt)
finally:
dest_thread.join()
if mrt.src_error_message or mrt.dest_error_message:
move.error_message = ("Source error: %s, destination error: %s" %
(mrt.src_error_message, mrt.dest_error_message))
else:
move.error_message = None
except Exception, err: # pylint: disable=W0703
logging.exception("Caught unhandled exception")
move.error_message = str(err)
def CheckRapiSetup(rapi_factory):
"""Checks the RAPI setup by retrieving the version.
@type rapi_factory: L{RapiClientFactory}
@param rapi_factory: RAPI client factory
"""
src_client = rapi_factory.GetSourceClient()
logging.info("Connecting to source RAPI server")
logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
dest_client = rapi_factory.GetDestClient()
logging.info("Connecting to destination RAPI server")
logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
def ParseOptions():
"""Parses options passed to program.
"""
program = os.path.basename(sys.argv[0])
parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
" <source-cluster> <dest-cluster>"
" <instance...>"),
prog=program)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(cli.IALLOCATOR_OPT)
parser.add_option(cli.BACKEND_OPT)
parser.add_option(cli.HVOPTS_OPT)
parser.add_option(cli.OSPARAMS_OPT)
parser.add_option(cli.NET_OPT)
parser.add_option(SRC_RAPI_PORT_OPT)
parser.add_option(SRC_CA_FILE_OPT)
parser.add_option(SRC_USERNAME_OPT)
parser.add_option(SRC_PASSWORD_FILE_OPT)
parser.add_option(DEST_RAPI_PORT_OPT)
parser.add_option(DEST_CA_FILE_OPT)
parser.add_option(DEST_USERNAME_OPT)
parser.add_option(DEST_PASSWORD_FILE_OPT)
parser.add_option(DEST_INSTANCE_NAME_OPT)
parser.add_option(DEST_PRIMARY_NODE_OPT)
parser.add_option(DEST_SECONDARY_NODE_OPT)
parser.add_option(DEST_DISK_TEMPLATE_OPT)
parser.add_option(COMPRESS_OPT)
parser.add_option(PARALLEL_OPT)
parser.add_option(OPPORTUNISTIC_TRIES_OPT)
parser.add_option(OPPORTUNISTIC_DELAY_OPT)
(options, args) = parser.parse_args()
return (parser, options, args)
def _CheckAllocatorOptions(parser, options):
if (bool(options.iallocator) and
bool(options.dest_primary_node or options.dest_secondary_node)):
parser.error("Destination node and iallocator options exclude each other")
if not options.iallocator and (options.opportunistic_tries > 0):
parser.error("Opportunistic instance creation can only be used with an"
" iallocator")
def _CheckOpportunisticLockingOptions(parser, options):
tries_specified = options.opportunistic_tries is not None
delay_specified = options.opportunistic_delay is not None
if tries_specified:
if options.opportunistic_tries < 0:
parser.error("Number of opportunistic creation attempts must be >= 0")
if delay_specified:
if options.opportunistic_delay <= 0:
parser.error("The delay between two successive creation attempts must"
" be greater than zero")
elif delay_specified:
parser.error("Opportunistic delay can only be specified when opportunistic"
" tries are used")
else:
# The default values will be provided later
pass
def _CheckInstanceOptions(parser, options, instance_names):
if len(instance_names) == 1:
# Moving one instance only
if options.hvparams:
utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
if options.beparams:
utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
if options.nics:
options.nics = cli.ParseNicOption(options.nics)
else:
# Moving more than one instance
if (options.dest_instance_name or options.dest_primary_node or
options.dest_secondary_node or options.hvparams or
options.beparams or options.osparams or options.nics):
parser.error("The options --dest-instance-name, --dest-primary-node,"
" --dest-secondary-node, --hypervisor-parameters,"
" --backend-parameters, --os-parameters and --net can"
" only be used when moving exactly one instance")
def CheckOptions(parser, options, args):
"""Checks options and arguments for validity.
"""
if len(args) < 3:
parser.error("Not enough arguments")
src_cluster_name = args.pop(0)
dest_cluster_name = args.pop(0)
instance_names = args
assert len(instance_names) > 0
# TODO: Remove once using system default paths for SSL certificate
# verification is implemented
if not options.src_ca_file:
parser.error("Missing source cluster CA file")
if options.parallel < 1:
parser.error("Number of simultaneous moves must be >= 1")
_CheckAllocatorOptions(parser, options)
_CheckOpportunisticLockingOptions(parser, options)
_CheckInstanceOptions(parser, options, instance_names)
return (src_cluster_name, dest_cluster_name, instance_names)
def DestClusterHasDefaultIAllocator(rapi_factory):
"""Determines if a given cluster has a default iallocator.
"""
result = rapi_factory.GetDestClient().GetInfo()
ia_name = "default_iallocator"
return ia_name in result and result[ia_name]
def ExitWithError(message):
"""Exits after an error and shows a message.
"""
sys.stderr.write("move-instance: error: " + message + "\n")
sys.exit(constants.EXIT_FAILURE)
def _PrepareListOfInstanceMoves(options, instance_names):
moves = []
for src_instance_name in instance_names:
if options.dest_instance_name:
assert len(instance_names) == 1
# Rename instance
dest_instance_name = options.dest_instance_name
else:
dest_instance_name = src_instance_name
moves.append(InstanceMove(src_instance_name, dest_instance_name,
options.dest_primary_node,
options.dest_secondary_node,
options.compress,
options.iallocator,
options.dest_disk_template,
options.hvparams,
options.beparams,
options.osparams,
options.nics,
options.opportunistic_tries,
options.opportunistic_delay))
assert len(moves) == len(instance_names)
return moves
@UsesRapiClient
def main():
"""Main routine.
"""
(parser, options, args) = ParseOptions()
utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
(src_cluster_name, dest_cluster_name, instance_names) = \
CheckOptions(parser, options, args)
logging.info("Source cluster: %s", src_cluster_name)
logging.info("Destination cluster: %s", dest_cluster_name)
logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
CheckRapiSetup(rapi_factory)
has_iallocator = options.iallocator or \
DestClusterHasDefaultIAllocator(rapi_factory)
if len(instance_names) > 1 and not has_iallocator:
ExitWithError("When moving multiple nodes, an iallocator must be used. "
"None was provided and the target cluster does not have "
"a default iallocator.")
if (len(instance_names) == 1 and not (has_iallocator or
options.dest_primary_node or options.dest_secondary_node)):
ExitWithError("Target cluster does not have a default iallocator, "
"please specify either destination nodes or an iallocator.")
moves = _PrepareListOfInstanceMoves(options, instance_names)
# Start workerpool
wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
try:
# Add instance moves to workerpool
for move in moves:
wp.AddTask((rapi_factory, move))
# Wait for all moves to finish
wp.Quiesce()
finally:
wp.TerminateWorkers()
# There should be no threads running at this point, hence not using locks
# anymore
logging.info("Instance move results:")
for move in moves:
if move.dest_instance_name == move.src_instance_name:
name = move.src_instance_name
else:
name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
if move.error_message:
msg = "Failed (%s)" % move.error_message
else:
msg = "Success"
logging.info("%s: %s", name, msg)
if compat.any(move.error_message for move in moves):
sys.exit(constants.EXIT_FAILURE)
sys.exit(constants.EXIT_SUCCESS)
if __name__ == "__main__":
main()