| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2010, 2011, 2012 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Tool to move instances from one cluster to another. |
| |
| """ |
| |
| # pylint: disable=C0103 |
| # C0103: Invalid name move-instance |
| |
| import os |
| import sys |
| import time |
| import logging |
| import optparse |
| import random |
| import threading |
| |
| from ganeti import cli |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import workerpool |
| from ganeti import objects |
| from ganeti import compat |
| from ganeti import rapi |
| from ganeti import errors |
| |
| import ganeti.rapi.client # pylint: disable=W0611 |
| import ganeti.rapi.client_utils |
| from ganeti.rapi.client import UsesRapiClient |
| |
| |
| SRC_RAPI_PORT_OPT = \ |
| cli.cli_option("--src-rapi-port", action="store", type="int", |
| dest="src_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
| help=("Source cluster RAPI port (defaults to %s)" % |
| constants.DEFAULT_RAPI_PORT)) |
| |
| SRC_CA_FILE_OPT = \ |
| cli.cli_option("--src-ca-file", action="store", type="string", |
| dest="src_ca_file", |
| help=("File containing source cluster Certificate" |
| " Authority (CA) in PEM format")) |
| |
| SRC_USERNAME_OPT = \ |
| cli.cli_option("--src-username", action="store", type="string", |
| dest="src_username", default=None, |
| help="Source cluster username") |
| |
| SRC_PASSWORD_FILE_OPT = \ |
| cli.cli_option("--src-password-file", action="store", type="string", |
| dest="src_password_file", |
| help="File containing source cluster password") |
| |
| DEST_RAPI_PORT_OPT = \ |
| cli.cli_option("--dest-rapi-port", action="store", type="int", |
| dest="dest_rapi_port", default=constants.DEFAULT_RAPI_PORT, |
| help=("Destination cluster RAPI port (defaults to source" |
| " cluster RAPI port)")) |
| |
| DEST_CA_FILE_OPT = \ |
| cli.cli_option("--dest-ca-file", action="store", type="string", |
| dest="dest_ca_file", |
| help=("File containing destination cluster Certificate" |
| " Authority (CA) in PEM format (defaults to source" |
| " cluster CA)")) |
| |
| DEST_USERNAME_OPT = \ |
| cli.cli_option("--dest-username", action="store", type="string", |
| dest="dest_username", default=None, |
| help=("Destination cluster username (defaults to" |
| " source cluster username)")) |
| |
| DEST_PASSWORD_FILE_OPT = \ |
| cli.cli_option("--dest-password-file", action="store", type="string", |
| dest="dest_password_file", |
| help=("File containing destination cluster password" |
| " (defaults to source cluster password)")) |
| |
| DEST_INSTANCE_NAME_OPT = \ |
| cli.cli_option("--dest-instance-name", action="store", type="string", |
| dest="dest_instance_name", |
| help=("Instance name on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| DEST_PRIMARY_NODE_OPT = \ |
| cli.cli_option("--dest-primary-node", action="store", type="string", |
| dest="dest_primary_node", |
| help=("Primary node on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| DEST_SECONDARY_NODE_OPT = \ |
| cli.cli_option("--dest-secondary-node", action="store", type="string", |
| dest="dest_secondary_node", |
| help=("Secondary node on destination cluster (only" |
| " when moving exactly one instance)")) |
| |
| DEST_DISK_TEMPLATE_OPT = \ |
| cli.cli_option("--dest-disk-template", action="store", type="string", |
| dest="dest_disk_template", default=None, |
| help="Disk template to use on destination cluster") |
| |
| COMPRESS_OPT = \ |
| cli.cli_option("--compress", action="store", type="string", |
| dest="compress", default="none", |
| help="Compression mode to use during the move (this mode has" |
| " to be supported by both clusters)") |
| |
| PARALLEL_OPT = \ |
| cli.cli_option("-p", "--parallel", action="store", type="int", default=1, |
| dest="parallel", metavar="<number>", |
| help="Number of instances to be moved simultaneously") |
| |
| OPPORTUNISTIC_TRIES_OPT = \ |
| cli.cli_option("--opportunistic-tries", action="store", type="int", |
| dest="opportunistic_tries", metavar="<number>", |
| help="Number of opportunistic instance creation attempts" |
| " before a normal creation is performed. An opportunistic" |
| " attempt will use the iallocator with all the nodes" |
| " currently unlocked, failing if not enough nodes are" |
| " available. Even though it will succeed (or fail) more" |
| " quickly, it can result in suboptimal instance" |
| " placement") |
| |
| OPPORTUNISTIC_DELAY_OPT = \ |
| cli.cli_option("--opportunistic-delay", action="store", type="int", |
| dest="opportunistic_delay", metavar="<number>", |
| help="The delay between successive opportunistic instance" |
| " creation attempts, in seconds") |
| |
| |
| class Error(Exception): |
| """Generic error. |
| |
| """ |
| |
| |
| class Abort(Error): |
| """Special exception for aborting import/export. |
| |
| """ |
| |
| |
| class RapiClientFactory(object): |
| """Factory class for creating RAPI clients. |
| |
| @ivar src_cluster_name: Source cluster name |
| @ivar dest_cluster_name: Destination cluster name |
| @ivar GetSourceClient: Callable returning new client for source cluster |
| @ivar GetDestClient: Callable returning new client for destination cluster |
| |
| """ |
| def __init__(self, options, src_cluster_name, dest_cluster_name): |
| """Initializes this class. |
| |
| @param options: Program options |
| @type src_cluster_name: string |
| @param src_cluster_name: Source cluster name |
| @type dest_cluster_name: string |
| @param dest_cluster_name: Destination cluster name |
| |
| """ |
| self.src_cluster_name = src_cluster_name |
| self.dest_cluster_name = dest_cluster_name |
| |
| # TODO: Implement timeouts for RAPI connections |
| # TODO: Support for using system default paths for verifying SSL certificate |
| logging.debug("Using '%s' as source CA", options.src_ca_file) |
| src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file) |
| |
| if options.dest_ca_file: |
| logging.debug("Using '%s' as destination CA", options.dest_ca_file) |
| dest_curl_config = \ |
| rapi.client.GenericCurlConfig(cafile=options.dest_ca_file) |
| else: |
| logging.debug("Using source CA for destination") |
| dest_curl_config = src_curl_config |
| |
| logging.debug("Source RAPI server is %s:%s", |
| src_cluster_name, options.src_rapi_port) |
| logging.debug("Source username is '%s'", options.src_username) |
| |
| if options.src_username is None: |
| src_username = "" |
| else: |
| src_username = options.src_username |
| |
| if options.src_password_file: |
| logging.debug("Reading '%s' for source password", |
| options.src_password_file) |
| src_password = utils.ReadOneLineFile(options.src_password_file, |
| strict=True) |
| else: |
| logging.debug("Source has no password") |
| src_password = None |
| |
| self.GetSourceClient = lambda: \ |
| rapi.client.GanetiRapiClient(src_cluster_name, |
| port=options.src_rapi_port, |
| curl_config_fn=src_curl_config, |
| username=src_username, |
| password=src_password) |
| |
| if options.dest_rapi_port: |
| dest_rapi_port = options.dest_rapi_port |
| else: |
| dest_rapi_port = options.src_rapi_port |
| |
| if options.dest_username is None: |
| dest_username = src_username |
| else: |
| dest_username = options.dest_username |
| |
| logging.debug("Destination RAPI server is %s:%s", |
| dest_cluster_name, dest_rapi_port) |
| logging.debug("Destination username is '%s'", dest_username) |
| |
| if options.dest_password_file: |
| logging.debug("Reading '%s' for destination password", |
| options.dest_password_file) |
| dest_password = utils.ReadOneLineFile(options.dest_password_file, |
| strict=True) |
| else: |
| logging.debug("Using source password for destination") |
| dest_password = src_password |
| |
| self.GetDestClient = lambda: \ |
| rapi.client.GanetiRapiClient(dest_cluster_name, |
| port=dest_rapi_port, |
| curl_config_fn=dest_curl_config, |
| username=dest_username, |
| password=dest_password) |
| |
| |
| class MoveJobPollReportCb(cli.JobPollReportCbBase): |
| def __init__(self, abort_check_fn, remote_import_fn): |
| """Initializes this class. |
| |
| @type abort_check_fn: callable |
| @param abort_check_fn: Function to check whether move is aborted |
| @type remote_import_fn: callable or None |
| @param remote_import_fn: Callback for reporting received remote import |
| information |
| |
| """ |
| cli.JobPollReportCbBase.__init__(self) |
| self._abort_check_fn = abort_check_fn |
| self._remote_import_fn = remote_import_fn |
| |
| def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): |
| """Handles a log message. |
| |
| """ |
| if log_type == constants.ELOG_REMOTE_IMPORT: |
| logging.debug("Received remote import information") |
| |
| if not self._remote_import_fn: |
| raise RuntimeError("Received unexpected remote import information") |
| |
| assert "x509_ca" in log_msg |
| assert "disks" in log_msg |
| |
| self._remote_import_fn(log_msg) |
| |
| return |
| |
| logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)), |
| cli.FormatLogMessage(log_type, log_msg)) |
| |
| def ReportNotChanged(self, job_id, status): |
| """Called if a job hasn't changed in a while. |
| |
| """ |
| try: |
| # Check whether we were told to abort by the other thread |
| self._abort_check_fn() |
| except Abort: |
| logging.warning("Aborting despite job %s still running", job_id) |
| raise |
| |
| |
| class InstanceMove(object): |
| """Status class for instance moves. |
| |
| """ |
| def __init__(self, src_instance_name, dest_instance_name, |
| dest_pnode, dest_snode, compress, dest_iallocator, |
| dest_disk_template, hvparams, |
| beparams, osparams, nics, opportunistic_tries, |
| opportunistic_delay): |
| """Initializes this class. |
| |
| @type src_instance_name: string |
| @param src_instance_name: Instance name on source cluster |
| @type dest_instance_name: string |
| @param dest_instance_name: Instance name on destination cluster |
| @type dest_pnode: string or None |
| @param dest_pnode: Name of primary node on destination cluster |
| @type dest_snode: string or None |
| @param dest_snode: Name of secondary node on destination cluster |
| @type compress; string |
| @param compress: Compression mode to use (has to be supported on both |
| clusters) |
| @type dest_iallocator: string or None |
| @param dest_iallocator: Name of iallocator to use |
| @type dest_disk_template: string or None |
| @param dest_disk_template: Disk template to use instead of the original one |
| @type hvparams: dict or None |
| @param hvparams: Hypervisor parameters to override |
| @type beparams: dict or None |
| @param beparams: Backend parameters to override |
| @type osparams: dict or None |
| @param osparams: OS parameters to override |
| @type nics: dict or None |
| @param nics: NICs to override |
| @type opportunistic_tries: int or None |
| @param opportunistic_tries: Number of opportunistic creation attempts to |
| perform |
| @type opportunistic_delay: int or None |
| @param opportunistic_delay: Delay between successive creation attempts, in |
| seconds |
| |
| """ |
| self.src_instance_name = src_instance_name |
| self.dest_instance_name = dest_instance_name |
| self.dest_pnode = dest_pnode |
| self.dest_snode = dest_snode |
| self.compress = compress |
| self.dest_iallocator = dest_iallocator |
| self.dest_disk_template = dest_disk_template |
| self.hvparams = hvparams |
| self.beparams = beparams |
| self.osparams = osparams |
| self.nics = nics |
| |
| if opportunistic_tries is not None: |
| self.opportunistic_tries = opportunistic_tries |
| else: |
| self.opportunistic_tries = 0 |
| |
| if opportunistic_delay is not None: |
| self.opportunistic_delay = opportunistic_delay |
| else: |
| self.opportunistic_delay = constants.DEFAULT_OPPORTUNISTIC_RETRY_INTERVAL |
| |
| self.error_message = None |
| |
| |
| class MoveRuntime(object): |
| """Class to keep track of instance move. |
| |
| """ |
| def __init__(self, move): |
| """Initializes this class. |
| |
| @type move: L{InstanceMove} |
| |
| """ |
| self.move = move |
| |
| # Thread synchronization |
| self.lock = threading.Lock() |
| self.source_to_dest = threading.Condition(self.lock) |
| self.dest_to_source = threading.Condition(self.lock) |
| |
| # Source information |
| self.src_error_message = None |
| self.src_expinfo = None |
| self.src_instinfo = None |
| |
| # Destination information |
| self.dest_error_message = None |
| self.dest_impinfo = None |
| |
| def HandleErrors(self, prefix, fn, *args): |
| """Wrapper to catch errors and abort threads. |
| |
| @type prefix: string |
| @param prefix: Variable name prefix ("src" or "dest") |
| @type fn: callable |
| @param fn: Function |
| |
| """ |
| assert prefix in ("dest", "src") |
| |
| try: |
| # Call inner function |
| fn(*args) |
| |
| errmsg = None |
| except Abort: |
| errmsg = "Aborted" |
| except Exception, err: |
| logging.exception("Caught unhandled exception") |
| errmsg = str(err) |
| |
| setattr(self, "%s_error_message" % prefix, errmsg) |
| |
| self.lock.acquire() |
| try: |
| self.source_to_dest.notifyAll() |
| self.dest_to_source.notifyAll() |
| finally: |
| self.lock.release() |
| |
| def CheckAbort(self): |
| """Check whether thread should be aborted. |
| |
| @raise Abort: When thread should be aborted |
| |
| """ |
| if not (self.src_error_message is None and |
| self.dest_error_message is None): |
| logging.info("Aborting") |
| raise Abort() |
| |
| def Wait(self, cond, check_fn): |
| """Waits for a condition to become true. |
| |
| @type cond: threading.Condition |
| @param cond: Threading condition |
| @type check_fn: callable |
| @param check_fn: Function to check whether condition is true |
| |
| """ |
| cond.acquire() |
| try: |
| while check_fn(self): |
| self.CheckAbort() |
| cond.wait() |
| finally: |
| cond.release() |
| |
| def PollJob(self, cl, job_id, remote_import_fn=None): |
| """Wrapper for polling a job. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type job_id: string |
| @param job_id: Job ID |
| @type remote_import_fn: callable or None |
| @param remote_import_fn: Callback for reporting received remote import |
| information |
| |
| @return: opreturn of the move job |
| @raise errors.JobLost: If job can't be found |
| @raise errors.OpExecError: If job didn't succeed |
| |
| @see: L{ganeti.rapi.client_utils.PollJob} |
| |
| """ |
| return rapi.client_utils.PollJob(cl, job_id, |
| MoveJobPollReportCb(self.CheckAbort, |
| remote_import_fn)) |
| |
| |
| class MoveDestExecutor(object): |
| def __init__(self, dest_client, mrt): |
| """Destination side of an instance move. |
| |
| @type dest_client: L{rapi.client.GanetiRapiClient} |
| @param dest_client: RAPI client |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| |
| """ |
| logging.debug("Waiting for instance information to become available") |
| mrt.Wait(mrt.source_to_dest, |
| lambda mrt: mrt.src_instinfo is None or mrt.src_expinfo is None) |
| |
| logging.info("Creating instance %s in remote-import mode", |
| mrt.move.dest_instance_name) |
| |
| # Depending on whether opportunistic tries are enabled, we may have to |
| # make multiple creation attempts |
| creation_attempts = [True] * mrt.move.opportunistic_tries |
| |
| # But the last one is never opportunistic, and will block until completion |
| # or failure |
| creation_attempts.append(False) |
| |
| # Initiate the RNG for the variations |
| random.seed() |
| |
| for is_attempt_opportunistic in creation_attempts: |
| job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name, |
| mrt.move.dest_pnode, mrt.move.dest_snode, |
| mrt.move.compress, |
| mrt.move.dest_iallocator, |
| mrt.move.dest_disk_template, |
| mrt.src_instinfo, mrt.src_expinfo, |
| mrt.move.hvparams, mrt.move.beparams, |
| mrt.move.osparams, mrt.move.nics, |
| is_attempt_opportunistic |
| ) |
| |
| try: |
| # The completion of this block signifies that the import has been |
| # completed successfullly |
| mrt.PollJob(dest_client, job_id, |
| remote_import_fn=compat.partial(self._SetImportInfo, mrt)) |
| logging.info("Import successful") |
| return |
| except errors.OpPrereqError, err: |
| # Any exception in the non-opportunistic creation is to be passed on, |
| # as well as exceptions apart from resources temporarily unavailable |
| if not is_attempt_opportunistic or \ |
| err.args[1] != rapi.client.ECODE_TEMP_NORES: |
| raise |
| |
| delay_to_use = MoveDestExecutor._VaryDelay(mrt) |
| logging.info("Opportunistic attempt unsuccessful, waiting %.2f seconds" |
| " before another creation attempt is made", |
| delay_to_use) |
| time.sleep(delay_to_use) |
| |
| @staticmethod |
| def _VaryDelay(mrt): |
| """ Varies the opportunistic delay by a small amount. |
| |
| """ |
| MAX_VARIATION = 0.15 |
| variation_factor = (1.0 + random.uniform(-MAX_VARIATION, MAX_VARIATION)) |
| return mrt.move.opportunistic_delay * variation_factor |
| |
| @staticmethod |
| def _SetImportInfo(mrt, impinfo): |
| """Sets the remote import information and notifies source thread. |
| |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| @param impinfo: Remote import information |
| |
| """ |
| mrt.dest_to_source.acquire() |
| try: |
| mrt.dest_impinfo = impinfo |
| mrt.dest_to_source.notifyAll() |
| finally: |
| mrt.dest_to_source.release() |
| |
| @staticmethod |
| def _GetDisks(instance): |
| disks = [] |
| for idisk in instance["disks"]: |
| odisk = { |
| constants.IDISK_SIZE: idisk["size"], |
| constants.IDISK_MODE: idisk["mode"], |
| constants.IDISK_NAME: str(idisk.get("name")), |
| } |
| spindles = idisk.get("spindles") |
| if spindles is not None: |
| odisk[constants.IDISK_SPINDLES] = spindles |
| disks.append(odisk) |
| return disks |
| |
| @staticmethod |
| def _GetNics(instance, override_nics): |
| try: |
| nics = [{ |
| constants.INIC_IP: ip, |
| constants.INIC_MAC: mac, |
| constants.INIC_MODE: mode, |
| constants.INIC_LINK: link, |
| constants.INIC_VLAN: vlan, |
| constants.INIC_NETWORK: network, |
| constants.INIC_NAME: nic_name |
| } for nic_name, _, ip, mac, mode, link, vlan, network, _ |
| in instance["nics"]] |
| except ValueError: |
| raise Error("Received NIC information does not match expected format; " |
| "Do the versions of this tool and the source cluster match?") |
| |
| if len(override_nics) > len(nics): |
| raise Error("Can not create new NICs") |
| |
| if override_nics: |
| assert len(override_nics) <= len(nics) |
| for idx, (nic, override) in enumerate(zip(nics, override_nics)): |
| nics[idx] = objects.FillDict(nic, override) |
| |
| return nics |
| |
| @staticmethod |
| def _CreateInstance(cl, name, pnode, snode, compress, iallocator, |
| dest_disk_template, instance, expinfo, override_hvparams, |
| override_beparams, override_osparams, override_nics, |
| is_attempt_opportunistic): |
| """Starts the instance creation in remote import mode. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type name: string |
| @param name: Instance name |
| @type pnode: string or None |
| @param pnode: Name of primary node on destination cluster |
| @type snode: string or None |
| @param snode: Name of secondary node on destination cluster |
| @type compress: string |
| @param compress: Compression mode to use |
| @type iallocator: string or None |
| @param iallocator: Name of iallocator to use |
| @type dest_disk_template: string or None |
| @param dest_disk_template: Disk template to use instead of the original one |
| @type instance: dict |
| @param instance: Instance details from source cluster |
| @type expinfo: dict |
| @param expinfo: Prepared export information from source cluster |
| @type override_hvparams: dict or None |
| @param override_hvparams: Hypervisor parameters to override |
| @type override_beparams: dict or None |
| @param override_beparams: Backend parameters to override |
| @type override_osparams: dict or None |
| @param override_osparams: OS parameters to override |
| @type override_nics: dict or None |
| @param override_nics: NICs to override |
| @type is_attempt_opportunistic: bool |
| @param is_attempt_opportunistic: Whether to use opportunistic locking or not |
| @return: Job ID |
| |
| """ |
| if dest_disk_template: |
| disk_template = dest_disk_template |
| else: |
| disk_template = instance["disk_template"] |
| |
| disks = MoveDestExecutor._GetDisks(instance) |
| nics = MoveDestExecutor._GetNics(instance, override_nics) |
| os_type = instance.get("os", None) |
| |
| # TODO: Should this be the actual up/down status? (run_state) |
| start = (instance["config_state"] == "up") |
| |
| assert len(disks) == len(instance["disks"]) |
| assert len(nics) == len(instance["nics"]) |
| |
| inst_beparams = instance.get("be_instance", {}) |
| inst_hvparams = instance.get("hv_instance", {}) |
| inst_osparams = instance.get("os_instance", {}) |
| |
| return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT, |
| name, disk_template, disks, nics, |
| os=os_type, |
| pnode=pnode, |
| snode=snode, |
| start=start, |
| ip_check=False, |
| iallocator=iallocator, |
| hypervisor=instance["hypervisor"], |
| source_handshake=expinfo["handshake"], |
| source_x509_ca=expinfo["x509_ca"], |
| compress=compress, |
| source_instance_name=instance["name"], |
| beparams=objects.FillDict(inst_beparams, |
| override_beparams), |
| hvparams=objects.FillDict(inst_hvparams, |
| override_hvparams), |
| osparams=objects.FillDict(inst_osparams, |
| override_osparams), |
| opportunistic_locking=is_attempt_opportunistic |
| ) |
| |
| |
| class MoveSourceExecutor(object): |
| def __init__(self, src_client, mrt): |
| """Source side of an instance move. |
| |
| @type src_client: L{rapi.client.GanetiRapiClient} |
| @param src_client: RAPI client |
| @type mrt: L{MoveRuntime} |
| @param mrt: Instance move runtime information |
| |
| """ |
| logging.info("Checking whether instance exists") |
| self._CheckInstance(src_client, mrt.move.src_instance_name) |
| |
| logging.info("Retrieving instance information from source cluster") |
| instinfo = self._GetInstanceInfo(src_client, mrt.PollJob, |
| mrt.move.src_instance_name) |
| |
| logging.info("Preparing export on source cluster") |
| expinfo = self._PrepareExport(src_client, mrt.PollJob, |
| mrt.move.src_instance_name) |
| assert "handshake" in expinfo |
| assert "x509_key_name" in expinfo |
| assert "x509_ca" in expinfo |
| |
| # Hand information to destination thread |
| mrt.source_to_dest.acquire() |
| try: |
| mrt.src_instinfo = instinfo |
| mrt.src_expinfo = expinfo |
| mrt.source_to_dest.notifyAll() |
| finally: |
| mrt.source_to_dest.release() |
| |
| logging.info("Waiting for destination information to become available") |
| mrt.Wait(mrt.dest_to_source, lambda mrt: mrt.dest_impinfo is None) |
| |
| logging.info("Starting remote export on source cluster") |
| self._ExportInstance(src_client, mrt.PollJob, mrt.move.src_instance_name, |
| expinfo["x509_key_name"], mrt.move.compress, |
| mrt.dest_impinfo) |
| |
| logging.info("Export successful") |
| |
| @staticmethod |
| def _CheckInstance(cl, name): |
| """Checks whether the instance exists on the source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| try: |
| cl.GetInstance(name) |
| except rapi.client.GanetiApiError, err: |
| if err.code == rapi.client.HTTP_NOT_FOUND: |
| raise Error("Instance %s not found (%s)" % (name, str(err))) |
| raise |
| |
| @staticmethod |
| def _GetInstanceInfo(cl, poll_job_fn, name): |
| """Retrieves detailed instance information from source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| job_id = cl.GetInstanceInfo(name, static=True) |
| result = poll_job_fn(cl, job_id) |
| assert len(result[0].keys()) == 1 |
| return result[0][result[0].keys()[0]] |
| |
| @staticmethod |
| def _PrepareExport(cl, poll_job_fn, name): |
| """Prepares export on source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| |
| """ |
| job_id = cl.PrepareExport(name, constants.EXPORT_MODE_REMOTE) |
| return poll_job_fn(cl, job_id)[0] |
| |
| @staticmethod |
| def _ExportInstance(cl, poll_job_fn, name, x509_key_name, compress, impinfo): |
| """Exports instance from source cluster. |
| |
| @type cl: L{rapi.client.GanetiRapiClient} |
| @param cl: RAPI client |
| @type poll_job_fn: callable |
| @param poll_job_fn: Function to poll for job result |
| @type name: string |
| @param name: Instance name |
| @param x509_key_name: Source X509 key |
| @type compress: string |
| @param compress: Compression mode to use |
| @param impinfo: Import information from destination cluster |
| |
| """ |
| job_id = cl.ExportInstance(name, constants.EXPORT_MODE_REMOTE, |
| impinfo["disks"], shutdown=True, |
| remove_instance=True, |
| x509_key_name=x509_key_name, |
| destination_x509_ca=impinfo["x509_ca"], |
| compress=compress) |
| (fin_resu, dresults) = poll_job_fn(cl, job_id)[0] |
| |
| if not (fin_resu and compat.all(dresults)): |
| raise Error("Export failed for disks %s" % |
| utils.CommaJoin(str(idx) for idx, result |
| in enumerate(dresults) if not result)) |
| |
| |
| class MoveSourceWorker(workerpool.BaseWorker): |
| def RunTask(self, rapi_factory, move): # pylint: disable=W0221 |
| """Executes an instance move. |
| |
| @type rapi_factory: L{RapiClientFactory} |
| @param rapi_factory: RAPI client factory |
| @type move: L{InstanceMove} |
| @param move: Instance move information |
| |
| """ |
| try: |
| logging.info("Preparing to move %s from cluster %s to %s as %s", |
| move.src_instance_name, rapi_factory.src_cluster_name, |
| rapi_factory.dest_cluster_name, move.dest_instance_name) |
| |
| mrt = MoveRuntime(move) |
| |
| logging.debug("Starting destination thread") |
| dest_thread = threading.Thread(name="DestFor%s" % self.getName(), |
| target=mrt.HandleErrors, |
| args=("dest", MoveDestExecutor, |
| rapi_factory.GetDestClient(), |
| mrt, )) |
| dest_thread.start() |
| try: |
| mrt.HandleErrors("src", MoveSourceExecutor, |
| rapi_factory.GetSourceClient(), mrt) |
| finally: |
| dest_thread.join() |
| |
| if mrt.src_error_message or mrt.dest_error_message: |
| move.error_message = ("Source error: %s, destination error: %s" % |
| (mrt.src_error_message, mrt.dest_error_message)) |
| else: |
| move.error_message = None |
| except Exception, err: # pylint: disable=W0703 |
| logging.exception("Caught unhandled exception") |
| move.error_message = str(err) |
| |
| |
| def CheckRapiSetup(rapi_factory): |
| """Checks the RAPI setup by retrieving the version. |
| |
| @type rapi_factory: L{RapiClientFactory} |
| @param rapi_factory: RAPI client factory |
| |
| """ |
| src_client = rapi_factory.GetSourceClient() |
| logging.info("Connecting to source RAPI server") |
| logging.info("Source cluster RAPI version: %s", src_client.GetVersion()) |
| |
| dest_client = rapi_factory.GetDestClient() |
| logging.info("Connecting to destination RAPI server") |
| logging.info("Destination cluster RAPI version: %s", dest_client.GetVersion()) |
| |
| |
| def ParseOptions(): |
| """Parses options passed to program. |
| |
| """ |
| program = os.path.basename(sys.argv[0]) |
| |
| parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]" |
| " <source-cluster> <dest-cluster>" |
| " <instance...>"), |
| prog=program) |
| parser.add_option(cli.DEBUG_OPT) |
| parser.add_option(cli.VERBOSE_OPT) |
| parser.add_option(cli.IALLOCATOR_OPT) |
| parser.add_option(cli.BACKEND_OPT) |
| parser.add_option(cli.HVOPTS_OPT) |
| parser.add_option(cli.OSPARAMS_OPT) |
| parser.add_option(cli.NET_OPT) |
| parser.add_option(SRC_RAPI_PORT_OPT) |
| parser.add_option(SRC_CA_FILE_OPT) |
| parser.add_option(SRC_USERNAME_OPT) |
| parser.add_option(SRC_PASSWORD_FILE_OPT) |
| parser.add_option(DEST_RAPI_PORT_OPT) |
| parser.add_option(DEST_CA_FILE_OPT) |
| parser.add_option(DEST_USERNAME_OPT) |
| parser.add_option(DEST_PASSWORD_FILE_OPT) |
| parser.add_option(DEST_INSTANCE_NAME_OPT) |
| parser.add_option(DEST_PRIMARY_NODE_OPT) |
| parser.add_option(DEST_SECONDARY_NODE_OPT) |
| parser.add_option(DEST_DISK_TEMPLATE_OPT) |
| parser.add_option(COMPRESS_OPT) |
| parser.add_option(PARALLEL_OPT) |
| parser.add_option(OPPORTUNISTIC_TRIES_OPT) |
| parser.add_option(OPPORTUNISTIC_DELAY_OPT) |
| |
| (options, args) = parser.parse_args() |
| |
| return (parser, options, args) |
| |
| |
| def _CheckAllocatorOptions(parser, options): |
| if (bool(options.iallocator) and |
| bool(options.dest_primary_node or options.dest_secondary_node)): |
| parser.error("Destination node and iallocator options exclude each other") |
| |
| if not options.iallocator and (options.opportunistic_tries > 0): |
| parser.error("Opportunistic instance creation can only be used with an" |
| " iallocator") |
| |
| |
| def _CheckOpportunisticLockingOptions(parser, options): |
| tries_specified = options.opportunistic_tries is not None |
| delay_specified = options.opportunistic_delay is not None |
| if tries_specified: |
| if options.opportunistic_tries < 0: |
| parser.error("Number of opportunistic creation attempts must be >= 0") |
| if delay_specified: |
| if options.opportunistic_delay <= 0: |
| parser.error("The delay between two successive creation attempts must" |
| " be greater than zero") |
| elif delay_specified: |
| parser.error("Opportunistic delay can only be specified when opportunistic" |
| " tries are used") |
| else: |
| # The default values will be provided later |
| pass |
| |
| |
| def _CheckInstanceOptions(parser, options, instance_names): |
| if len(instance_names) == 1: |
| # Moving one instance only |
| if options.hvparams: |
| utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES) |
| |
| if options.beparams: |
| utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES) |
| |
| if options.nics: |
| options.nics = cli.ParseNicOption(options.nics) |
| else: |
| # Moving more than one instance |
| if (options.dest_instance_name or options.dest_primary_node or |
| options.dest_secondary_node or options.hvparams or |
| options.beparams or options.osparams or options.nics): |
| parser.error("The options --dest-instance-name, --dest-primary-node," |
| " --dest-secondary-node, --hypervisor-parameters," |
| " --backend-parameters, --os-parameters and --net can" |
| " only be used when moving exactly one instance") |
| |
| |
| def CheckOptions(parser, options, args): |
| """Checks options and arguments for validity. |
| |
| """ |
| if len(args) < 3: |
| parser.error("Not enough arguments") |
| |
| src_cluster_name = args.pop(0) |
| dest_cluster_name = args.pop(0) |
| instance_names = args |
| |
| assert len(instance_names) > 0 |
| |
| # TODO: Remove once using system default paths for SSL certificate |
| # verification is implemented |
| if not options.src_ca_file: |
| parser.error("Missing source cluster CA file") |
| |
| if options.parallel < 1: |
| parser.error("Number of simultaneous moves must be >= 1") |
| |
| _CheckAllocatorOptions(parser, options) |
| _CheckOpportunisticLockingOptions(parser, options) |
| _CheckInstanceOptions(parser, options, instance_names) |
| |
| return (src_cluster_name, dest_cluster_name, instance_names) |
| |
| |
| def DestClusterHasDefaultIAllocator(rapi_factory): |
| """Determines if a given cluster has a default iallocator. |
| |
| """ |
| result = rapi_factory.GetDestClient().GetInfo() |
| ia_name = "default_iallocator" |
| return ia_name in result and result[ia_name] |
| |
| |
| def ExitWithError(message): |
| """Exits after an error and shows a message. |
| |
| """ |
| sys.stderr.write("move-instance: error: " + message + "\n") |
| sys.exit(constants.EXIT_FAILURE) |
| |
| |
| def _PrepareListOfInstanceMoves(options, instance_names): |
| moves = [] |
| for src_instance_name in instance_names: |
| if options.dest_instance_name: |
| assert len(instance_names) == 1 |
| # Rename instance |
| dest_instance_name = options.dest_instance_name |
| else: |
| dest_instance_name = src_instance_name |
| |
| moves.append(InstanceMove(src_instance_name, dest_instance_name, |
| options.dest_primary_node, |
| options.dest_secondary_node, |
| options.compress, |
| options.iallocator, |
| options.dest_disk_template, |
| options.hvparams, |
| options.beparams, |
| options.osparams, |
| options.nics, |
| options.opportunistic_tries, |
| options.opportunistic_delay)) |
| |
| assert len(moves) == len(instance_names) |
| return moves |
| |
| |
| @UsesRapiClient |
| def main(): |
| """Main routine. |
| |
| """ |
| (parser, options, args) = ParseOptions() |
| |
| utils.SetupToolLogging(options.debug, options.verbose, threadname=True) |
| |
| (src_cluster_name, dest_cluster_name, instance_names) = \ |
| CheckOptions(parser, options, args) |
| |
| logging.info("Source cluster: %s", src_cluster_name) |
| logging.info("Destination cluster: %s", dest_cluster_name) |
| logging.info("Instances to be moved: %s", utils.CommaJoin(instance_names)) |
| |
| rapi_factory = RapiClientFactory(options, src_cluster_name, dest_cluster_name) |
| |
| CheckRapiSetup(rapi_factory) |
| |
| has_iallocator = options.iallocator or \ |
| DestClusterHasDefaultIAllocator(rapi_factory) |
| |
| if len(instance_names) > 1 and not has_iallocator: |
| ExitWithError("When moving multiple nodes, an iallocator must be used. " |
| "None was provided and the target cluster does not have " |
| "a default iallocator.") |
| if (len(instance_names) == 1 and not (has_iallocator or |
| options.dest_primary_node or options.dest_secondary_node)): |
| ExitWithError("Target cluster does not have a default iallocator, " |
| "please specify either destination nodes or an iallocator.") |
| |
| moves = _PrepareListOfInstanceMoves(options, instance_names) |
| |
| # Start workerpool |
| wp = workerpool.WorkerPool("Move", options.parallel, MoveSourceWorker) |
| try: |
| # Add instance moves to workerpool |
| for move in moves: |
| wp.AddTask((rapi_factory, move)) |
| |
| # Wait for all moves to finish |
| wp.Quiesce() |
| |
| finally: |
| wp.TerminateWorkers() |
| |
| # There should be no threads running at this point, hence not using locks |
| # anymore |
| |
| logging.info("Instance move results:") |
| |
| for move in moves: |
| if move.dest_instance_name == move.src_instance_name: |
| name = move.src_instance_name |
| else: |
| name = "%s as %s" % (move.src_instance_name, move.dest_instance_name) |
| |
| if move.error_message: |
| msg = "Failed (%s)" % move.error_message |
| else: |
| msg = "Success" |
| |
| logging.info("%s: %s", name, msg) |
| |
| if compat.any(move.error_message for move in moves): |
| sys.exit(constants.EXIT_FAILURE) |
| |
| sys.exit(constants.EXIT_SUCCESS) |
| |
| |
| if __name__ == "__main__": |
| main() |