blob: b0b8a3eecc10cd0b2d093d30afa985c2e8b9528b [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Tool to move instances from one cluster to another.
"""
# pylint: disable=C0103
# C0103: Invalid name move-instance
import os
import sys
import time
import logging
import optparse
import threading
from ganeti import cli
from ganeti import constants
from ganeti import utils
from ganeti import workerpool
from ganeti import objects
from ganeti import compat
from ganeti import rapi
import ganeti.rapi.client # pylint: disable=W0611
import ganeti.rapi.client_utils
from ganeti.rapi.client import UsesRapiClient
SRC_RAPI_PORT_OPT = \
cli.cli_option("--src-rapi-port", action="store", type="int",
dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT,
help=("Source cluster RAPI port (defaults to %s)" %
constants.DEFAULT_RAPI_PORT))
SRC_CA_FILE_OPT = \
cli.cli_option("--src-ca-file", action="store", type="string",
dest="src_ca_file",
help=("File containing source cluster Certificate"
" Authority (CA) in PEM format"))
SRC_USERNAME_OPT = \
cli.cli_option("--src-username", action="store", type="string",
dest="src_username", default=None,
help="Source cluster username")
SRC_PASSWORD_FILE_OPT = \
cli.cli_option("--src-password-file", action="store", type="string",
dest="src_password_file",
help="File containing source cluster password")
DEST_RAPI_PORT_OPT = \
cli.cli_option("--dest-rapi-port", action="store", type="int",
dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT,
help=("Destination cluster RAPI port (defaults to source"
" cluster RAPI port)"))
DEST_CA_FILE_OPT = \
cli.cli_option("--dest-ca-file", action="store", type="string",
dest="dest_ca_file",
help=("File containing destination cluster Certificate"
" Authority (CA) in PEM format (defaults to source"
" cluster CA)"))
DEST_USERNAME_OPT = \
cli.cli_option("--dest-username", action="store", type="string",
dest="dest_username", default=None,
help=("Destination cluster username (defaults to"
" source cluster username)"))
DEST_PASSWORD_FILE_OPT = \
cli.cli_option("--dest-password-file", action="store", type="string",
dest="dest_password_file",
help=("File containing destination cluster password"
" (defaults to source cluster password)"))
DEST_INSTANCE_NAME_OPT = \
cli.cli_option("--dest-instance-name", action="store", type="string",
dest="dest_instance_name",
help=("Instance name on destination cluster (only"
" when moving exactly one instance)"))
DEST_PRIMARY_NODE_OPT = \
cli.cli_option("--dest-primary-node", action="store", type="string",
dest="dest_primary_node",
help=("Primary node on destination cluster (only"
" when moving exactly one instance)"))
DEST_SECONDARY_NODE_OPT = \
cli.cli_option("--dest-secondary-node", action="store", type="string",
dest="dest_secondary_node",
help=("Secondary node on destination cluster (only"
" when moving exactly one instance)"))
PARALLEL_OPT = \
cli.cli_option("-p", "--parallel", action="store", type="int", default=1,
dest="parallel", metavar="<number>",
help="Number of instances to be moved simultaneously")
class Error(Exception):
"""Generic error.
"""
class Abort(Error):
"""Special exception for aborting import/export.
"""
class RapiClientFactory:
"""Factory class for creating RAPI clients.
@ivar src_cluster_name: Source cluster name
@ivar dest_cluster_name: Destination cluster name
@ivar GetSourceClient: Callable returning new client for source cluster
@ivar GetDestClient: Callable returning new client for destination cluster
"""
def __init__(self, options, src_cluster_name, dest_cluster_name):
"""Initializes this class.
@param options: Program options
@type src_cluster_name: string
@param src_cluster_name: Source cluster name
@type dest_cluster_name: string
@param dest_cluster_name: Destination cluster name
"""
self.src_cluster_name = src_cluster_name
self.dest_cluster_name = dest_cluster_name
# TODO: Implement timeouts for RAPI connections
# TODO: Support for using system default paths for verifying SSL certificate
logging.debug("Using '%s' as source CA", options.src_ca_file)
src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
if options.dest_ca_file:
logging.debug("Using '%s' as destination CA", options.dest_ca_file)
dest_curl_config = \
rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
else:
logging.debug("Using source CA for destination")
dest_curl_config = src_curl_config
logging.debug("Source RAPI server is %s:%s",
src_cluster_name, options.src_rapi_port)
logging.debug("Source username is '%s'", options.src_username)
if options.src_username is None:
src_username = ""
else:
src_username = options.src_username
if options.src_password_file:
logging.debug("Reading '%s' for source password",
options.src_password_file)
src_password = utils.ReadOneLineFile(options.src_password_file,
strict=True)
else:
logging.debug("Source has no password")
src_password = None
self.GetSourceClient = lambda: \
rapi.client.GanetiRapiClient(src_cluster_name,
port=options.src_rapi_port,
curl_config_fn=src_curl_config,
username=src_username,
password=src_password)
if options.dest_rapi_port:
dest_rapi_port = options.dest_rapi_port
else:
dest_rapi_port = options.src_rapi_port
if options.dest_username is None:
dest_username = src_username
else:
dest_username = options.dest_username
logging.debug("Destination RAPI server is %s:%s",
dest_cluster_name, dest_rapi_port)
logging.debug("Destination username is '%s'", dest_username)
if options.dest_password_file:
logging.debug("Reading '%s' for destination password",
options.dest_password_file)
dest_password = utils.ReadOneLineFile(options.dest_password_file,
strict=True)
else:
logging.debug("Using source password for destination")
dest_password = src_password
self.GetDestClient = lambda: \
rapi.client.GanetiRapiClient(dest_cluster_name,
port=dest_rapi_port,
curl_config_fn=dest_curl_config,
username=dest_username,
password=dest_password)
class MoveJobPollReportCb(cli.JobPollReportCbBase):
def __init__(self, abort_check_fn, remote_import_fn):
"""Initializes this class.
@type abort_check_fn: callable
@param abort_check_fn: Function to check whether move is aborted
@type remote_import_fn: callable or None
@param remote_import_fn: Callback for reporting received remote import
information
"""
cli.JobPollReportCbBase.__init__(self)
self._abort_check_fn = abort_check_fn
self._remote_import_fn = remote_import_fn
def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
"""Handles a log message.
"""
if log_type == constants.ELOG_REMOTE_IMPORT:
logging.debug("Received remote import information")
if not self._remote_import_fn:
raise RuntimeError("Received unexpected remote import information")
assert "x509_ca" in log_msg
assert "disks" in log_msg
self._remote_import_fn(log_msg)
return
logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
cli.FormatLogMessage(log_type, log_msg))
def ReportNotChanged(self, job_id, status):
"""Called if a job hasn't changed in a while.
"""
try:
# Check whether we were told to abort by the other thread
self._abort_check_fn()
except Abort:
logging.warning("Aborting despite job %s still running", job_id)
raise
class InstanceMove(object):
"""Status class for instance moves.
"""
def __init__(self, src_instance_name, dest_instance_name,
dest_pnode, dest_snode, dest_iallocator,
hvparams, beparams, osparams, nics):
"""Initializes this class.
@type src_instance_name: string
@param src_instance_name: Instance name on source cluster
@type dest_instance_name: string
@param dest_instance_name: Instance name on destination cluster
@type dest_pnode: string or None
@param dest_pnode: Name of primary node on destination cluster
@type dest_snode: string or None
@param dest_snode: Name of secondary node on destination cluster
@type dest_iallocator: string or None
@param dest_iallocator: Name of iallocator to use
@type hvparams: dict or None
@param hvparams: Hypervisor parameters to override
@type beparams: dict or None
@param beparams: Backend parameters to override
@type osparams: dict or None
@param osparams: OS parameters to override
@type nics: dict or None
@param nics: NICs to override
"""
self.src_instance_name = src_instance_name
self.dest_instance_name = dest_instance_name
self.dest_pnode = dest_pnode
self.dest_snode = dest_snode
self.dest_iallocator = dest_iallocator
self.hvparams = hvparams
self.beparams = beparams
self.osparams = osparams
self.nics = nics
self.error_message = None
class MoveRuntime(object):
"""Class to keep track of instance move.
"""
def __init__(self, move):
"""Initializes this class.
@type move: L{InstanceMove}
"""
self.move = move
# Thread synchronization
self.lock = threading.Lock()
self.source_to_dest = threading.Condition(self.lock)
self.dest_to_source = threading.Condition(self.lock)
# Source information
self.src_error_message = None
self.src_expinfo = None
self.src_instinfo = None
# Destination information
self.dest_error_message = None
self.dest_impinfo = None
def HandleErrors(self, prefix, fn, *args):
"""Wrapper to catch errors and abort threads.
@type prefix: string
@param prefix: Variable name prefix ("src" or "dest")
@type fn: callable
@param fn: Function
"""
assert prefix in ("dest", "src")
try:
# Call inner function
fn(*args)
errmsg = None
except Abort:
errmsg = "Aborted"
except Exception, err:
logging.exception("Caught unhandled exception")
errmsg = str(err)
setattr(self, "%s_error_message" % prefix, errmsg)
self.lock.acquire()
try:
self.source_to_dest.notifyAll()
self.dest_to_source.notifyAll()
finally:
self.lock.release()
def CheckAbort(self):
"""Check whether thread should be aborted.
@raise Abort: When thread should be aborted
"""
if not (self.src_error_message is None and
self.dest_error_message is None):
logging.info("Aborting")
raise Abort()
def Wait(self, cond, check_fn):
"""Waits for a condition to become true.
@type cond: threading.Condition
@param cond: Threading condition
@type check_fn: callable
@param check_fn: Function to check whether condition is true
"""
cond.acquire()
try:
while check_fn(self):
self.CheckAbort()
cond.wait()
finally:
cond.release()
def PollJob(self, cl, job_id, remote_import_fn=None):
"""Wrapper for polling a job.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type job_id: string
@param job_id: Job ID
@type remote_import_fn: callable or None
@param remote_import_fn: Callback for reporting received remote import
information
"""
return rapi.client_utils.PollJob(cl, job_id,
MoveJobPollReportCb(self.CheckAbort,
remote_import_fn))
class MoveDestExecutor(object):
def __init__(self, dest_client, mrt):
"""Destination side of an instance move.
@type dest_client: L{rapi.client.GanetiRapiClient}
@param dest_client: RAPI client
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
"""
logging.debug("Waiting for instance information to become available")
mrt.Wait(mrt.source_to_dest,
lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None)
logging.info("Creating instance %s in remote-import mode",
mrt.move.dest_instance_name)
job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
mrt.move.dest_pnode, mrt.move.dest_snode,
mrt.move.dest_iallocator,
mrt.src_instinfo, mrt.src_expinfo,
mrt.move.hvparams, mrt.move.beparams,
mrt.move.beparams, mrt.move.nics)
mrt.PollJob(dest_client, job_id,
remote_import_fn=compat.partial(self._SetImportInfo, mrt))
logging.info("Import successful")
@staticmethod
def _SetImportInfo(mrt, impinfo):
"""Sets the remote import information and notifies source thread.
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
@param impinfo: Remote import information
"""
mrt.dest_to_source.acquire()
try:
mrt.dest_impinfo = impinfo
mrt.dest_to_source.notifyAll()
finally:
mrt.dest_to_source.release()
@staticmethod
def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
override_hvparams, override_beparams, override_osparams,
override_nics):
"""Starts the instance creation in remote import mode.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type name: string
@param name: Instance name
@type pnode: string or None
@param pnode: Name of primary node on destination cluster
@type snode: string or None
@param snode: Name of secondary node on destination cluster
@type iallocator: string or None
@param iallocator: Name of iallocator to use
@type instance: dict
@param instance: Instance details from source cluster
@type expinfo: dict
@param expinfo: Prepared export information from source cluster
@type override_hvparams: dict or None
@param override_hvparams: Hypervisor parameters to override
@type override_beparams: dict or None
@param override_beparams: Backend parameters to override
@type override_osparams: dict or None
@param override_osparams: OS parameters to override
@type override_nics: dict or None
@param override_nics: NICs to override
@return: Job ID
"""
disk_template = instance["disk_template"]
disks = []
for idisk in instance["disks"]:
odisk = {
constants.IDISK_SIZE: idisk["size"],
constants.IDISK_MODE: idisk["mode"],
constants.IDISK_NAME: str(idisk.get("name")),
}
spindles = idisk.get("spindles")
if spindles is not None:
odisk[constants.IDISK_SPINDLES] = spindles
disks.append(odisk)
nics = [{
constants.INIC_IP: ip,
constants.INIC_MAC: mac,
constants.INIC_MODE: mode,
constants.INIC_LINK: link,
constants.INIC_NETWORK: network,
constants.INIC_NAME: nic_name
} for nic_name, _, ip, mac, mode, link, network, _ in instance["nics"]]
if len(override_nics) > len(nics):
raise Error("Can not create new NICs")
if override_nics:
assert len(override_nics) <= len(nics)
for idx, (nic, override) in enumerate(zip(nics, override_nics)):
nics[idx] = objects.FillDict(nic, override)
# TODO: Should this be the actual up/down status? (run_state)
start = (instance["config_state"] == "up")
assert len(disks) == len(instance["disks"])
assert len(nics) == len(instance["nics"])
inst_beparams = instance["be_instance"]
if not inst_beparams:
inst_beparams = {}
inst_hvparams = instance["hv_instance"]
if not inst_hvparams:
inst_hvparams = {}
inst_osparams = instance["os_instance"]
if not inst_osparams:
inst_osparams = {}
return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
name, disk_template, disks, nics,
os=instance["os"],
pnode=pnode,
snode=snode,
start=start,
ip_check=False,
iallocator=iallocator,
hypervisor=instance["hypervisor"],
source_handshake=expinfo["handshake"],
source_x509_ca=expinfo["x509_ca"],
source_instance_name=instance["name"],
beparams=objects.FillDict(inst_beparams,
override_beparams),
hvparams=objects.FillDict(inst_hvparams,
override_hvparams),
osparams=objects.FillDict(inst_osparams,
override_osparams))
class MoveSourceExecutor(object):
def __init__(self, src_client, mrt):
"""Source side of an instance move.
@type src_client: L{rapi.client.GanetiRapiClient}
@param src_client: RAPI client
@type mrt: L{MoveRuntime}
@param mrt: Instance move runtime information
"""
logging.info("Checking whether instance exists")
self._CheckInstance(src_client, mrt.move.src_instance_name)
logging.info("Retrieving instance information from source cluster")
instinfo = self._GetInstanceInfo(src_client, mrt.PollJob,
mrt.move.src_instance_name)
if (instinfo["disk_template"] in
[constants.DT_FILE, constants.DT_SHARED_FILE]):
raise Error("Inter-cluster move of file-based instances is not"
" supported.")
logging.info("Preparing export on source cluster")
expinfo = self._PrepareExport(src_client, mrt.PollJob,
mrt.move.src_instance_name)
assert "handshake" in expinfo
assert "x509_key_name" in expinfo
assert "x509_ca" in expinfo
# Hand information to destination thread
mrt.source_to_dest.acquire()
try:
mrt.src_instinfo = instinfo
mrt.src_expinfo = expinfo
mrt.source_to_dest.notifyAll()
finally:
mrt.source_to_dest.release()
logging.info("Waiting for destination information to become available")
mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None)
logging.info("Starting remote export on source cluster")
self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name,
expinfo["x509_key_name"], mrt.dest_impinfo)
logging.info("Export successful")
@staticmethod
def _CheckInstance(cl, name):
"""Checks whether the instance exists on the source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type name: string
@param name: Instance name
"""
try:
cl.GetInstance(name)
except rapi.client.GanetiApiError, err:
if err.code == rapi.client.HTTP_NOT_FOUND:
raise Error("Instance %s not found (%s)" % (name, str(err)))
raise
@staticmethod
def _GetInstanceInfo(cl, poll_job_fn, name):
"""Retrieves detailed instance information from source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
"""
job_id = cl.GetInstanceInfo(name, static=True)
result = poll_job_fn(cl, job_id)
assert len(result[0].keys()) == 1
return result[0][result[0].keys()[0]]
@staticmethod
def _PrepareExport(cl, poll_job_fn, name):
"""Prepares export on source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
"""
job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE)
return poll_job_fn(cl, job_id)[0]
@staticmethod
def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo):
"""Exports instance from source cluster.
@type cl: L{rapi.client.GanetiRapiClient}
@param cl: RAPI client
@type poll_job_fn: callable
@param poll_job_fn: Function to poll for job result
@type name: string
@param name: Instance name
@param x509_key_name: Source X509 key
@param impinfo: Import information from destination cluster
"""
job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE,
impinfo["disks"], shutdown=True,
remove_instance=True,
x509_key_name=x509_key_name,
destination_x509_ca=impinfo["x509_ca"])
(fin_resu, dresults) = poll_job_fn(cl, job_id)[0]
if not (fin_resu and compat.all(dresults)):
raise Error("Export failed for disks %s" %
utils.CommaJoin(str(idx) for idx, result
in enumerate(dresults) if not result))
class MoveSourceWorker(workerpool.BaseWorker):
def RunTask(self, rapi_factory, move): # pylint: disable=W0221
"""Executes an instance move.
@type rapi_factory: L{RapiClientFactory}
@param rapi_factory: RAPI client factory
@type move: L{InstanceMove}
@param move: Instance move information
"""
try:
logging.info("Preparing to move %s from cluster %s to %s as %s",
move.src_instance_name, rapi_factory.src_cluster_name,
rapi_factory.dest_cluster_name, move.dest_instance_name)
mrt = MoveRuntime(move)
logging.debug("Starting destination thread")
dest_thread = threading.Thread(name="DestFor%s" % self.getName(),
target=mrt.HandleErrors,
args=("dest", MoveDestExecutor,
rapi_factory.GetDestClient(),
mrt, ))
dest_thread.start()
try:
mrt.HandleErrors("src", MoveSourceExecutor,
rapi_factory.GetSourceClient(), mrt)
finally:
dest_thread.join()
if mrt.src_error_message or mrt.dest_error_message:
move.error_message = ("Source error: %s, destination error: %s" %
(mrt.src_error_message, mrt.dest_error_message))
else:
move.error_message = None
except Exception, err: # pylint: disable=W0703
logging.exception("Caught unhandled exception")
move.error_message = str(err)
def CheckRapiSetup(rapi_factory):
"""Checks the RAPI setup by retrieving the version.
@type rapi_factory: L{RapiClientFactory}
@param rapi_factory: RAPI client factory
"""
src_client = rapi_factory.GetSourceClient()
logging.info("Connecting to source RAPI server")
logging.info("Source cluster RAPI version: %s", src_client.GetVersion())
dest_client = rapi_factory.GetDestClient()
logging.info("Connecting to destination RAPI server")
logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion())
def ParseOptions():
"""Parses options passed to program.
"""
program = os.path.basename(sys.argv[0])
parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
" <source-cluster> <dest-cluster>"
" <instance...>"),
prog=program)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(cli.IALLOCATOR_OPT)
parser.add_option(cli.BACKEND_OPT)
parser.add_option(cli.HVOPTS_OPT)
parser.add_option(cli.OSPARAMS_OPT)
parser.add_option(cli.NET_OPT)
parser.add_option(SRC_RAPI_PORT_OPT)
parser.add_option(SRC_CA_FILE_OPT)
parser.add_option(SRC_USERNAME_OPT)
parser.add_option(SRC_PASSWORD_FILE_OPT)
parser.add_option(DEST_RAPI_PORT_OPT)
parser.add_option(DEST_CA_FILE_OPT)
parser.add_option(DEST_USERNAME_OPT)
parser.add_option(DEST_PASSWORD_FILE_OPT)
parser.add_option(DEST_INSTANCE_NAME_OPT)
parser.add_option(DEST_PRIMARY_NODE_OPT)
parser.add_option(DEST_SECONDARY_NODE_OPT)
parser.add_option(PARALLEL_OPT)
(options, args) = parser.parse_args()
return (parser, options, args)
def CheckOptions(parser, options, args):
"""Checks options and arguments for validity.
"""
if len(args) < 3:
parser.error("Not enough arguments")
src_cluster_name = args.pop(0)
dest_cluster_name = args.pop(0)
instance_names = args
assert len(instance_names) > 0
# TODO: Remove once using system default paths for SSL certificate
# verification is implemented
if not options.src_ca_file:
parser.error("Missing source cluster CA file")
if options.parallel < 1:
parser.error("Number of simultaneous moves must be >= 1")
if not (bool(options.iallocator) ^
bool(options.dest_primary_node or options.dest_secondary_node)):
parser.error("Destination node and iallocator options exclude each other")
if len(instance_names) == 1:
# Moving one instance only
if not (options.iallocator or
options.dest_primary_node or
options.dest_secondary_node):
parser.error("An iallocator or the destination node is required")
if options.hvparams:
utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
if options.beparams:
utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
if options.nics:
options.nics = cli.ParseNicOption(options.nics)
else:
# Moving more than one instance
if (options.dest_instance_name or options.dest_primary_node or
options.dest_secondary_node or options.hvparams or
options.beparams or options.osparams or options.nics):
parser.error("The options --dest-instance-name, --dest-primary-node,"
" --dest-secondary-node, --hypervisor-parameters,"
" --backend-parameters, --os-parameters and --net can"
" only be used when moving exactly one instance")
if not options.iallocator:
parser.error("An iallocator must be specified for moving more than one"
" instance")
return (src_cluster_name, dest_cluster_name, instance_names)
@UsesRapiClient
def main():
"""Main routine.
"""
(parser, options, args) = ParseOptions()
utils.SetupToolLogging(options.debug, options.verbose, threadname=True)
(src_cluster_name, dest_cluster_name, instance_names) = \
CheckOptions(parser, options, args)
logging.info("Source cluster: %s", src_cluster_name)
logging.info("Destination cluster: %s", dest_cluster_name)
logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names))
rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name)
CheckRapiSetup(rapi_factory)
assert (len(instance_names) == 1 or
not (options.dest_primary_node or options.dest_secondary_node))
assert len(instance_names) == 1 or options.iallocator
assert (len(instance_names) > 1 or options.iallocator or
options.dest_primary_node or options.dest_secondary_node)
assert (len(instance_names) == 1 or
not (options.hvparams or options.beparams or options.osparams or
options.nics))
# Prepare list of instance moves
moves = []
for src_instance_name in instance_names:
if options.dest_instance_name:
assert len(instance_names) == 1
# Rename instance
dest_instance_name = options.dest_instance_name
else:
dest_instance_name = src_instance_name
moves.append(InstanceMove(src_instance_name, dest_instance_name,
options.dest_primary_node,
options.dest_secondary_node,
options.iallocator, options.hvparams,
options.beparams, options.osparams,
options.nics))
assert len(moves) == len(instance_names)
# Start workerpool
wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker)
try:
# Add instance moves to workerpool
for move in moves:
wp.AddTask((rapi_factory, move))
# Wait for all moves to finish
wp.Quiesce()
finally:
wp.TerminateWorkers()
# There should be no threads running at this point, hence not using locks
# anymore
logging.info("Instance move results:")
for move in moves:
if move.dest_instance_name == move.src_instance_name:
name = move.src_instance_name
else:
name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
if move.error_message:
msg = "Failed (%s)" % move.error_message
else:
msg = "Success"
logging.info("%s: %s", name, msg)
if compat.any(move.error_message for move in moves):
sys.exit(constants.EXIT_FAILURE)
sys.exit(constants.EXIT_SUCCESS)
if __name__ == "__main__":
main()