| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2010, 2011, 2012 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| """Tool to move instances from one cluster to another. |
| |
| """ |
| |
| # pylint: disable=C0103 |
| # C0103: Invalid name move-instance |
| |
| import os |
| import sys |
| import time |
| import logging |
| import optparse |
| import threading |
| |
| from ganeti import cli |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import workerpool |
| from ganeti import objects |
| from ganeti import compat |
| from ganeti import rapi |
| |
| import ganeti.rapi.client # pylint: disable=W0611 |
| import ganeti.rapi.client_utils |
| from ganeti.rapi.client import UsesRapiClient |
| |
| |
| SRC_RAPI_PORT_OPT = \ |
| cli.cli_option("--src-rapi-port", action="store", type="int", |
| dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
| help=("Source cluster RAPI port (defaults to %s)" % |
| constants.DEFAULT_RAPI_PORT)) |
| |
| SRC_CA_FILE_OPT = \ |
| cli.cli_option("--src-ca-file", action="store", type="string", |
| dest="src_ca_file", |
| help=("File containing source cluster Certificate" |
| " Authority (CA) in PEM format")) |
| |
| SRC_USERNAME_OPT = \ |
| cli.cli_option("--src-username", action="store", type="string", |
| dest="src_username", default=None, |
| help="Source cluster username") |
| |
| SRC_PASSWORD_FILE_OPT = \ |
| cli.cli_option("--src-password-file", action="store", type="string", |
| dest="src_password_file", |
| help="File containing source cluster password") |
| |
| DEST_RAPI_PORT_OPT = \ |
| cli.cli_option("--dest-rapi-port", action="store", type="int", |
| dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
| help=("Destination cluster RAPI port (defaults to source" |
| " cluster RAPI port)")) |
| |
| DEST_CA_FILE_OPT = \ |
| cli.cli_option("--dest-ca-file", action="store", type="string", |
| dest="dest_ca_file", |
| help=("File containing destination cluster Certificate" |
| " Authority (CA) in PEM format (defaults to source" |
| " cluster CA)")) |
| |
| DEST_USERNAME_OPT = \ |
| cli.cli_option("--dest-username", action="store", type="string", |
| dest="dest_username", default=None, |
| help=("Destination cluster username (defaults to" |
| " source cluster username)")) |
| |
| DEST_PASSWORD_FILE_OPT = \ |
| cli.cli_option("--dest-password-file", action="store", type="string", |
| dest="dest_password_file", |
| help=("File containing destination cluster password" |
| " (defaults to source cluster password)")) |
| |
| DEST_INSTANCE_NAME_OPT = \ |
| cli.cli_option("--dest-instance-name", action="store", type="string", |
| dest="dest_instance_name", |
| help=("Instance name on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| DEST_PRIMARY_NODE_OPT = \ |
| cli.cli_option("--dest-primary-node", action="store", type="string", |
| dest="dest_primary_node", |
| help=("Primary node on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| DEST_SECONDARY_NODE_OPT = \ |
| cli.cli_option("--dest-secondary-node", action="store", type="string", |
| dest="dest_secondary_node", |
| help=("Secondary node on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| PARALLEL_OPT = \ |
| cli.cli_option("-p", "--parallel", action="store", type="int", default=1, |
| dest="parallel", metavar="<number>", |
| help="Number of instances to be moved simultaneously") |
| |
| |
| class Error(Exception): |
| """Generic error. |
| |
| """ |
| |
| |
| class Abort(Error): |
| """Special exception for aborting import/export. |
| |
| """ |
| |
| |
| class RapiClientFactory: |
| """Factory class for creating RAPI clients. |
| |
| @ivar src_cluster_name: Source cluster name |
| @ivar dest_cluster_name: Destination cluster name |
| @ivar GetSourceClient: Callable returning new client for source cluster |
| @ivar GetDestClient: Callable returning new client for destination cluster |
| |
| """ |
| def __init__(self, options, src_cluster_name, dest_cluster_name): |
| """Initializes this class. |
| |
| @param options: Program options |
| @type src_cluster_name: string |
| @param src_cluster_name: Source cluster name |
| @type dest_cluster_name: string |
| @param dest_cluster_name: Destination cluster name |
| |
| """ |
| self.src_cluster_name = src_cluster_name |
| self.dest_cluster_name = dest_cluster_name |
| |
| # TODO: Implement timeouts for RAPI connections |
| # TODO: Support for using system default paths for verifying SSL certificate |
| logging.debug("Using '%s' as source CA", options.src_ca_file) |
| src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file) |
| |
| if options.dest_ca_file: |
| logging.debug("Using '%s' as destination CA", options.dest_ca_file) |
| dest_curl_config = \ |
| rapi.client.GenericCurlConfig(cafile=options.dest_ca_file) |
| else: |
| logging.debug("Using source CA for destination") |
| dest_curl_config = src_curl_config |
| |
| logging.debug("Source RAPI server is %s:%s", |
| src_cluster_name, options.src_rapi_port) |
| logging.debug("Source username is '%s'", options.src_username) |
| |
| if options.src_username is None: |
| src_username = "" |
| else: |
| src_username = options.src_username |
| |
| if options.src_password_file: |
| logging.debug("Reading '%s' for source password", |
| options.src_password_file) |
| src_password = utils.ReadOneLineFile(options.src_password_file, |
| strict=True) |
| else: |
| logging.debug("Source has no password") |
| src_password = None |
| |
| self.GetSourceClient = lambda: \ |
| rapi.client.GanetiRapiClient(src_cluster_name, |
| port=options.src_rapi_port, |
| curl_config_fn=src_curl_config, |
| username=src_username, |
| password=src_password) |
| |
| if options.dest_rapi_port: |
| dest_rapi_port = options.dest_rapi_port |
| else: |
| dest_rapi_port = options.src_rapi_port |
| |
| if options.dest_username is None: |
| dest_username = src_username |
| else: |
| dest_username = options.dest_username |
| |
| logging.debug("Destination RAPI server is %s:%s", |
| dest_cluster_name, dest_rapi_port) |
| logging.debug("Destination username is '%s'", dest_username) |
| |
| if options.dest_password_file: |
| logging.debug("Reading '%s' for destination password", |
| options.dest_password_file) |
| dest_password = utils.ReadOneLineFile(options.dest_password_file, |
| strict=True) |
| else: |
| logging.debug("Using source password for destination") |
| dest_password = src_password |
| |
| self.GetDestClient = lambda: \ |
| rapi.client.GanetiRapiClient(dest_cluster_name, |
| port=dest_rapi_port, |
| curl_config_fn=dest_curl_config, |
| username=dest_username, |
| password=dest_password) |
| |
| |
| class MoveJobPollReportCb(cli.JobPollReportCbBase): |
| def __init__(self, abort_check_fn, remote_import_fn): |
| """Initializes this class. |
| |
| @type abort_check_fn: callable |
| @param abort_check_fn: Function to check whether move is aborted |
| @type remote_import_fn: callable or None |
| @param remote_import_fn: Callback for reporting received remote import |
| information |
| |
| """ |
| cli.JobPollReportCbBase.__init__(self) |
| self._abort_check_fn = abort_check_fn |
| self._remote_import_fn = remote_import_fn |
| |
| def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
| """Handles a log message. |
| |
| """ |
| if log_type == constants.ELOG_REMOTE_IMPORT: |
| logging.debug("Received remote import information") |
| |
| if not self._remote_import_fn: |
| raise RuntimeError("Received unexpected remote import information") |
| |
| assert "x509_ca" in log_msg |
| assert "disks" in log_msg |
| |
| self._remote_import_fn(log_msg) |
| |
| return |
| |
| logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)), |
| cli.FormatLogMessage(log_type, log_msg)) |
| |
| def ReportNotChanged(self, job_id, status): |
| """Called if a job hasn't changed in a while. |
| |
| """ |
| try: |
| # Check whether we were told to abort by the other thread |
| self._abort_check_fn() |
| except Abort: |
| logging.warning("Aborting despite job %s still running", job_id) |
| raise |
| |
| |
| class InstanceMove(object): |
| """Status class for instance moves. |
| |
| """ |
| def __init__(self, src_instance_name, dest_instance_name, |
| dest_pnode, dest_snode, dest_iallocator, |
| hvparams, beparams, osparams, nics): |
| """Initializes this class. |
| |
| @type src_instance_name: string |
| @param src_instance_name: Instance name on source cluster |
| @type dest_instance_name: string |
| @param dest_instance_name: Instance name on destination cluster |
| @type dest_pnode: string or None |
| @param dest_pnode: Name of primary node on destination cluster |
| @type dest_snode: string or None |
| @param dest_snode: Name of secondary node on destination cluster |
| @type dest_iallocator: string or None |
| @param dest_iallocator: Name of iallocator to use |
| @type hvparams: dict or None |
| @param hvparams: Hypervisor parameters to override |
| @type beparams: dict or None |
| @param beparams: Backend parameters to override |
| @type osparams: dict or None |
| @param osparams: OS parameters to override |
| @type nics: dict or None |
| @param nics: NICs to override |
| |
| """ |
| self.src_instance_name = src_instance_name |
| self.dest_instance_name = dest_instance_name |
| self.dest_pnode = dest_pnode |
| self.dest_snode = dest_snode |
| self.dest_iallocator = dest_iallocator |
| self.hvparams = hvparams |
| self.beparams = beparams |
| self.osparams = osparams |
| self.nics = nics |
| |
| self.error_message = None |
| |
| |
| class MoveRuntime(object): |
| """Class to keep track of instance move. |
| |
| """ |
| def __init__(self, move): |
| """Initializes this class. |
| |
| @type move: L{InstanceMove} |
| |
| """ |
| self.move = move |
| |
| # Thread synchronization |
| self.lock = threading.Lock() |
| self.source_to_dest = threading.Condition(self.lock) |
| self.dest_to_source = threading.Condition(self.lock) |
| |
| # Source information |
| self.src_error_message = None |
| self.src_expinfo = None |
| self.src_instinfo = None |
| |
| # Destination information |
| self.dest_error_message = None |
| self.dest_impinfo = None |
| |
| def HandleErrors(self, prefix, fn, *args): |
| """Wrapper to catch errors and abort threads. |
| |
| @type prefix: string |
| @param prefix: Variable name prefix ("src" or "dest") |
| @type fn: callable |
| @param fn: Function |
| |
| """ |
| assert prefix in ("dest", "src") |
| |
| try: |
| # Call inner function |
| fn(*args) |
| |
| errmsg = None |
| except Abort: |
| errmsg = "Aborted" |
| except Exception, err: |
| logging.exception("Caught unhandled exception") |
| errmsg = str(err) |
| |
| setattr(self, "%s_error_message" % prefix, errmsg) |
| |
| self.lock.acquire() |
| try: |
| self.source_to_dest.notifyAll() |
| self.dest_to_source.notifyAll() |
| finally: |
| self.lock.release() |
| |
| def CheckAbort(self): |
| """Check whether thread should be aborted. |
| |
| @raise Abort: When thread should be aborted |
| |
| """ |
| if not (self.src_error_message is None and |
| self.dest_error_message is None): |
| logging.info("Aborting") |
| raise Abort() |
| |
| def Wait(self, cond, check_fn): |
| """Waits for a condition to become true. |
| |
| @type cond: threading.Condition |
| @param cond: Threading condition |
| @type check_fn: callable |
| @param check_fn: Function to check whether condition is true |
| |
| """ |
| cond.acquire() |
| try: |
| while check_fn(self): |
| self.CheckAbort() |
| cond.wait() |
| finally: |
| cond.release() |
| |
| def PollJob(self, cl, job_id, remote_import_fn=None): |
| """Wrapper for polling a job. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type job_id: string |
| @param job_id: Job ID |
| @type remote_import_fn: callable or None |
| @param remote_import_fn: Callback for reporting received remote import |
| information |
| |
| """ |
| return rapi.client_utils.PollJob(cl, job_id, |
| MoveJobPollReportCb(self.CheckAbort, |
| remote_import_fn)) |
| |
| |
| class MoveDestExecutor(object): |
| def __init__(self, dest_client, mrt): |
| """Destination side of an instance move. |
| |
| @type dest_client: L{rapi.client.GanetiRapiClient} |
| @param dest_client: RAPI client |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| |
| """ |
| logging.debug("Waiting for instance information to become available") |
| mrt.Wait(mrt.source_to_dest, |
| lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None) |
| |
| logging.info("Creating instance %s in remote-import mode", |
| mrt.move.dest_instance_name) |
| job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name, |
| mrt.move.dest_pnode, mrt.move.dest_snode, |
| mrt.move.dest_iallocator, |
| mrt.src_instinfo, mrt.src_expinfo, |
| mrt.move.hvparams, mrt.move.beparams, |
| mrt.move.beparams, mrt.move.nics) |
| mrt.PollJob(dest_client, job_id, |
| remote_import_fn=compat.partial(self._SetImportInfo, mrt)) |
| |
| logging.info("Import successful") |
| |
| @staticmethod |
| def _SetImportInfo(mrt, impinfo): |
| """Sets the remote import information and notifies source thread. |
| |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| @param impinfo: Remote import information |
| |
| """ |
| mrt.dest_to_source.acquire() |
| try: |
| mrt.dest_impinfo = impinfo |
| mrt.dest_to_source.notifyAll() |
| finally: |
| mrt.dest_to_source.release() |
| |
| @staticmethod |
| def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo, |
| override_hvparams, override_beparams, override_osparams, |
| override_nics): |
| """Starts the instance creation in remote import mode. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type name: string |
| @param name: Instance name |
| @type pnode: string or None |
| @param pnode: Name of primary node on destination cluster |
| @type snode: string or None |
| @param snode: Name of secondary node on destination cluster |
| @type iallocator: string or None |
| @param iallocator: Name of iallocator to use |
| @type instance: dict |
| @param instance: Instance details from source cluster |
| @type expinfo: dict |
| @param expinfo: Prepared export information from source cluster |
| @type override_hvparams: dict or None |
| @param override_hvparams: Hypervisor parameters to override |
| @type override_beparams: dict or None |
| @param override_beparams: Backend parameters to override |
| @type override_osparams: dict or None |
| @param override_osparams: OS parameters to override |
| @type override_nics: dict or None |
| @param override_nics: NICs to override |
| @return: Job ID |
| |
| """ |
| disk_template = instance["disk_template"] |
| |
| disks = [] |
| for idisk in instance["disks"]: |
| odisk = { |
| constants.IDISK_SIZE: idisk["size"], |
| constants.IDISK_MODE: idisk["mode"], |
| constants.IDISK_NAME: str(idisk.get("name")), |
| } |
| spindles = idisk.get("spindles") |
| if spindles is not None: |
| odisk[constants.IDISK_SPINDLES] = spindles |
| disks.append(odisk) |
| |
| nics = [{ |
| constants.INIC_IP: ip, |
| constants.INIC_MAC: mac, |
| constants.INIC_MODE: mode, |
| constants.INIC_LINK: link, |
| constants.INIC_NETWORK: network, |
| constants.INIC_NAME: nic_name |
| } for nic_name, _, ip, mac, mode, link, network, _ in instance["nics"]] |
| |
| if len(override_nics) > len(nics): |
| raise Error("Can not create new NICs") |
| |
| if override_nics: |
| assert len(override_nics) <= len(nics) |
| for idx, (nic, override) in enumerate(zip(nics, override_nics)): |
| nics[idx] = objects.FillDict(nic, override) |
| |
| # TODO: Should this be the actual up/down status? (run_state) |
| start = (instance["config_state"] == "up") |
| |
| assert len(disks) == len(instance["disks"]) |
| assert len(nics) == len(instance["nics"]) |
| |
| inst_beparams = instance["be_instance"] |
| if not inst_beparams: |
| inst_beparams = {} |
| |
| inst_hvparams = instance["hv_instance"] |
| if not inst_hvparams: |
| inst_hvparams = {} |
| |
| inst_osparams = instance["os_instance"] |
| if not inst_osparams: |
| inst_osparams = {} |
| |
| return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT, |
| name, disk_template, disks, nics, |
| os=instance["os"], |
| pnode=pnode, |
| snode=snode, |
| start=start, |
| ip_check=False, |
| iallocator=iallocator, |
| hypervisor=instance["hypervisor"], |
| source_handshake=expinfo["handshake"], |
| source_x509_ca=expinfo["x509_ca"], |
| source_instance_name=instance["name"], |
| beparams=objects.FillDict(inst_beparams, |
| override_beparams), |
| hvparams=objects.FillDict(inst_hvparams, |
| override_hvparams), |
| osparams=objects.FillDict(inst_osparams, |
| override_osparams)) |
| |
| |
| class MoveSourceExecutor(object): |
| def __init__(self, src_client, mrt): |
| """Source side of an instance move. |
| |
| @type src_client: L{rapi.client.GanetiRapiClient} |
| @param src_client: RAPI client |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| |
| """ |
| logging.info("Checking whether instance exists") |
| self._CheckInstance(src_client, mrt.move.src_instance_name) |
| |
| logging.info("Retrieving instance information from source cluster") |
| instinfo = self._GetInstanceInfo(src_client, mrt.PollJob, |
| mrt.move.src_instance_name) |
| if (instinfo["disk_template"] in |
| [constants.DT_FILE, constants.DT_SHARED_FILE]): |
| raise Error("Inter-cluster move of file-based instances is not" |
| " supported.") |
| |
| logging.info("Preparing export on source cluster") |
| expinfo = self._PrepareExport(src_client, mrt.PollJob, |
| mrt.move.src_instance_name) |
| assert "handshake" in expinfo |
| assert "x509_key_name" in expinfo |
| assert "x509_ca" in expinfo |
| |
| # Hand information to destination thread |
| mrt.source_to_dest.acquire() |
| try: |
| mrt.src_instinfo = instinfo |
| mrt.src_expinfo = expinfo |
| mrt.source_to_dest.notifyAll() |
| finally: |
| mrt.source_to_dest.release() |
| |
| logging.info("Waiting for destination information to become available") |
| mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None) |
| |
| logging.info("Starting remote export on source cluster") |
| self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name, |
| expinfo["x509_key_name"], mrt.dest_impinfo) |
| |
| logging.info("Export successful") |
| |
| @staticmethod |
| def _CheckInstance(cl, name): |
| """Checks whether the instance exists on the source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| try: |
| cl.GetInstance(name) |
| except rapi.client.GanetiApiError, err: |
| if err.code == rapi.client.HTTP_NOT_FOUND: |
| raise Error("Instance %s not found (%s)" % (name, str(err))) |
| raise |
| |
| @staticmethod |
| def _GetInstanceInfo(cl, poll_job_fn, name): |
| """Retrieves detailed instance information from source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| job_id = cl.GetInstanceInfo(name, static=True) |
| result = poll_job_fn(cl, job_id) |
| assert len(result[0].keys()) == 1 |
| return result[0][result[0].keys()[0]] |
| |
| @staticmethod |
| def _PrepareExport(cl, poll_job_fn, name): |
| """Prepares export on source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE) |
| return poll_job_fn(cl, job_id)[0] |
| |
| @staticmethod |
| def _ExportInstance(cl, poll_job_fn, name, x509_key_name, impinfo): |
| """Exports instance from source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| @param x509_key_name: Source X509 key |
| @param impinfo: Import information from destination cluster |
| |
| """ |
| job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE, |
| impinfo["disks"], shutdown=True, |
| remove_instance=True, |
| x509_key_name=x509_key_name, |
| destination_x509_ca=impinfo["x509_ca"]) |
| (fin_resu, dresults) = poll_job_fn(cl, job_id)[0] |
| |
| if not (fin_resu and compat.all(dresults)): |
| raise Error("Export failed for disks %s" % |
| utils.CommaJoin(str(idx) for idx, result |
| in enumerate(dresults) if not result)) |
| |
| |
| class MoveSourceWorker(workerpool.BaseWorker): |
| def RunTask(self, rapi_factory, move): # pylint: disable=W0221 |
| """Executes an instance move. |
| |
| @type rapi_factory: L{RapiClientFactory} |
| @param rapi_factory: RAPI client factory |
| @type move: L{InstanceMove} |
| @param move: Instance move information |
| |
| """ |
| try: |
| logging.info("Preparing to move %s from cluster %s to %s as %s", |
| move.src_instance_name, rapi_factory.src_cluster_name, |
| rapi_factory.dest_cluster_name, move.dest_instance_name) |
| |
| mrt = MoveRuntime(move) |
| |
| logging.debug("Starting destination thread") |
| dest_thread = threading.Thread(name="DestFor%s" % self.getName(), |
| target=mrt.HandleErrors, |
| args=("dest", MoveDestExecutor, |
| rapi_factory.GetDestClient(), |
| mrt, )) |
| dest_thread.start() |
| try: |
| mrt.HandleErrors("src", MoveSourceExecutor, |
| rapi_factory.GetSourceClient(), mrt) |
| finally: |
| dest_thread.join() |
| |
| if mrt.src_error_message or mrt.dest_error_message: |
| move.error_message = ("Source error: %s, destination error: %s" % |
| (mrt.src_error_message, mrt.dest_error_message)) |
| else: |
| move.error_message = None |
| except Exception, err: # pylint: disable=W0703 |
| logging.exception("Caught unhandled exception") |
| move.error_message = str(err) |
| |
| |
| def CheckRapiSetup(rapi_factory): |
| """Checks the RAPI setup by retrieving the version. |
| |
| @type rapi_factory: L{RapiClientFactory} |
| @param rapi_factory: RAPI client factory |
| |
| """ |
| src_client = rapi_factory.GetSourceClient() |
| logging.info("Connecting to source RAPI server") |
| logging.info("Source cluster RAPI version: %s", src_client.GetVersion()) |
| |
| dest_client = rapi_factory.GetDestClient() |
| logging.info("Connecting to destination RAPI server") |
| logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion()) |
| |
| |
| def ParseOptions(): |
| """Parses options passed to program. |
| |
| """ |
| program = os.path.basename(sys.argv[0]) |
| |
| parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" |
| " <source-cluster> <dest-cluster>" |
| " <instance...>"), |
| prog=program) |
| parser.add_option(cli.DEBUG_OPT) |
| parser.add_option(cli.VERBOSE_OPT) |
| parser.add_option(cli.IALLOCATOR_OPT) |
| parser.add_option(cli.BACKEND_OPT) |
| parser.add_option(cli.HVOPTS_OPT) |
| parser.add_option(cli.OSPARAMS_OPT) |
| parser.add_option(cli.NET_OPT) |
| parser.add_option(SRC_RAPI_PORT_OPT) |
| parser.add_option(SRC_CA_FILE_OPT) |
| parser.add_option(SRC_USERNAME_OPT) |
| parser.add_option(SRC_PASSWORD_FILE_OPT) |
| parser.add_option(DEST_RAPI_PORT_OPT) |
| parser.add_option(DEST_CA_FILE_OPT) |
| parser.add_option(DEST_USERNAME_OPT) |
| parser.add_option(DEST_PASSWORD_FILE_OPT) |
| parser.add_option(DEST_INSTANCE_NAME_OPT) |
| parser.add_option(DEST_PRIMARY_NODE_OPT) |
| parser.add_option(DEST_SECONDARY_NODE_OPT) |
| parser.add_option(PARALLEL_OPT) |
| |
| (options, args) = parser.parse_args() |
| |
| return (parser, options, args) |
| |
| |
| def CheckOptions(parser, options, args): |
| """Checks options and arguments for validity. |
| |
| """ |
| if len(args) < 3: |
| parser.error("Not enough arguments") |
| |
| src_cluster_name = args.pop(0) |
| dest_cluster_name = args.pop(0) |
| instance_names = args |
| |
| assert len(instance_names) > 0 |
| |
| # TODO: Remove once using system default paths for SSL certificate |
| # verification is implemented |
| if not options.src_ca_file: |
| parser.error("Missing source cluster CA file") |
| |
| if options.parallel < 1: |
| parser.error("Number of simultaneous moves must be >= 1") |
| |
| if not (bool(options.iallocator) ^ |
| bool(options.dest_primary_node or options.dest_secondary_node)): |
| parser.error("Destination node and iallocator options exclude each other") |
| |
| if len(instance_names) == 1: |
| # Moving one instance only |
| if not (options.iallocator or |
| options.dest_primary_node or |
| options.dest_secondary_node): |
| parser.error("An iallocator or the destination node is required") |
| |
| if options.hvparams: |
| utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES) |
| |
| if options.beparams: |
| utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES) |
| |
| if options.nics: |
| options.nics = cli.ParseNicOption(options.nics) |
| else: |
| # Moving more than one instance |
| if (options.dest_instance_name or options.dest_primary_node or |
| options.dest_secondary_node or options.hvparams or |
| options.beparams or options.osparams or options.nics): |
| parser.error("The options --dest-instance-name, --dest-primary-node," |
| " --dest-secondary-node, --hypervisor-parameters," |
| " --backend-parameters, --os-parameters and --net can" |
| " only be used when moving exactly one instance") |
| |
| if not options.iallocator: |
| parser.error("An iallocator must be specified for moving more than one" |
| " instance") |
| |
| return (src_cluster_name, dest_cluster_name, instance_names) |
| |
| |
| @UsesRapiClient |
| def main(): |
| """Main routine. |
| |
| """ |
| (parser, options, args) = ParseOptions() |
| |
| utils.SetupToolLogging(options.debug, options.verbose, threadname=True) |
| |
| (src_cluster_name, dest_cluster_name, instance_names) = \ |
| CheckOptions(parser, options, args) |
| |
| logging.info("Source cluster: %s", src_cluster_name) |
| logging.info("Destination cluster: %s", dest_cluster_name) |
| logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names)) |
| |
| rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name) |
| |
| CheckRapiSetup(rapi_factory) |
| |
| assert (len(instance_names) == 1 or |
| not (options.dest_primary_node or options.dest_secondary_node)) |
| assert len(instance_names) == 1 or options.iallocator |
| assert (len(instance_names) > 1 or options.iallocator or |
| options.dest_primary_node or options.dest_secondary_node) |
| assert (len(instance_names) == 1 or |
| not (options.hvparams or options.beparams or options.osparams or |
| options.nics)) |
| |
| # Prepare list of instance moves |
| moves = [] |
| for src_instance_name in instance_names: |
| if options.dest_instance_name: |
| assert len(instance_names) == 1 |
| # Rename instance |
| dest_instance_name = options.dest_instance_name |
| else: |
| dest_instance_name = src_instance_name |
| |
| moves.append(InstanceMove(src_instance_name, dest_instance_name, |
| options.dest_primary_node, |
| options.dest_secondary_node, |
| options.iallocator, options.hvparams, |
| options.beparams, options.osparams, |
| options.nics)) |
| |
| assert len(moves) == len(instance_names) |
| |
| # Start workerpool |
| wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker) |
| try: |
| # Add instance moves to workerpool |
| for move in moves: |
| wp.AddTask((rapi_factory, move)) |
| |
| # Wait for all moves to finish |
| wp.Quiesce() |
| |
| finally: |
| wp.TerminateWorkers() |
| |
| # There should be no threads running at this point, hence not using locks |
| # anymore |
| |
| logging.info("Instance move results:") |
| |
| for move in moves: |
| if move.dest_instance_name == move.src_instance_name: |
| name = move.src_instance_name |
| else: |
| name = "%s as %s" % (move.src_instance_name, move.dest_instance_name) |
| |
| if move.error_message: |
| msg = "Failed (%s)" % move.error_message |
| else: |
| msg = "Success" |
| |
| logging.info("%s: %s", name, msg) |
| |
| if compat.any(move.error_message for move in moves): |
| sys.exit(constants.EXIT_FAILURE) |
| |
| sys.exit(constants.EXIT_SUCCESS) |
| |
| |
| if __name__ == "__main__": |
| main() |