| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2010, 2012 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| """Tool to merge two or more clusters together. |
| |
| The clusters have to run the same version of Ganeti! |
| |
| """ |
| |
| # pylint: disable=C0103 |
| # C0103: Invalid name cluster-merge |
| |
| import logging |
| import os |
| import optparse |
| import shutil |
| import sys |
| import tempfile |
| |
| from ganeti import cli |
| from ganeti import config |
| from ganeti import constants |
| from ganeti import errors |
| from ganeti import ssh |
| from ganeti import utils |
| from ganeti import pathutils |
| from ganeti import compat |
| |
| |
| _GROUPS_MERGE = "merge" |
| _GROUPS_RENAME = "rename" |
| _CLUSTERMERGE_ECID = "clustermerge-ecid" |
| _RESTART_ALL = "all" |
| _RESTART_UP = "up" |
| _RESTART_NONE = "none" |
| _RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE) |
| _PARAMS_STRICT = "strict" |
| _PARAMS_WARN = "warn" |
| _PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN) |
| |
| |
| PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800, |
| action="store", type="int", |
| dest="pause_period", |
| help=("Amount of time in seconds watcher" |
| " should be suspended from running")) |
| GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY", |
| choices=(_GROUPS_MERGE, _GROUPS_RENAME), |
| dest="groups", |
| help=("How to handle groups that have the" |
| " same name (One of: %s/%s)" % |
| (_GROUPS_MERGE, _GROUPS_RENAME))) |
| PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT, |
| metavar="STRATEGY", |
| choices=_PARAMS_CHOICES, |
| dest="params", |
| help=("How to handle params that have" |
| " different values (One of: %s/%s)" % |
| _PARAMS_CHOICES)) |
| |
| RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL, |
| metavar="STRATEGY", |
| choices=_RESTART_CHOICES, |
| dest="restart", |
| help=("How to handle restarting instances" |
| " same name (One of: %s/%s/%s)" % |
| _RESTART_CHOICES)) |
| |
| SKIP_STOP_INSTANCES_OPT = \ |
| cli.cli_option("--skip-stop-instances", default=True, action="store_false", |
| dest="stop_instances", |
| help=("Don't stop the instances on the clusters, just check " |
| "that none is running")) |
| |
| |
| def Flatten(unflattened_list): |
| """Flattens a list. |
| |
| @param unflattened_list: A list of unflattened list objects. |
| @return: A flattened list |
| |
| """ |
| flattened_list = [] |
| |
| for item in unflattened_list: |
| if isinstance(item, list): |
| flattened_list.extend(Flatten(item)) |
| else: |
| flattened_list.append(item) |
| return flattened_list |
| |
| |
| class MergerData(object): |
| """Container class to hold data used for merger. |
| |
| """ |
| def __init__(self, cluster, key_path, nodes, instances, master_node, |
| config_path=None): |
| """Initialize the container. |
| |
| @param cluster: The name of the cluster |
| @param key_path: Path to the ssh private key used for authentication |
| @param nodes: List of online nodes in the merging cluster |
| @param instances: List of instances running on merging cluster |
| @param master_node: Name of the master node |
| @param config_path: Path to the merging cluster config |
| |
| """ |
| self.cluster = cluster |
| self.key_path = key_path |
| self.nodes = nodes |
| self.instances = instances |
| self.master_node = master_node |
| self.config_path = config_path |
| |
| |
| class Merger(object): |
| """Handling the merge. |
| |
| """ |
| RUNNING_STATUSES = compat.UniqueFrozenset([ |
| constants.INSTST_RUNNING, |
| constants.INSTST_ERRORUP, |
| ]) |
| |
| def __init__(self, clusters, pause_period, groups, restart, params, |
| stop_instances): |
| """Initialize object with sane defaults and infos required. |
| |
| @param clusters: The list of clusters to merge in |
| @param pause_period: The time watcher shall be disabled for |
| @param groups: How to handle group conflicts |
| @param restart: How to handle instance restart |
| @param stop_instances: Indicates whether the instances must be stopped |
| (True) or if the Merger must only check if no |
| instances are running on the mergee clusters (False) |
| |
| """ |
| self.merger_data = [] |
| self.clusters = clusters |
| self.pause_period = pause_period |
| self.work_dir = tempfile.mkdtemp(suffix="cluster-merger") |
| (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"]) |
| self.ssh_runner = ssh.SshRunner(self.cluster_name) |
| self.groups = groups |
| self.restart = restart |
| self.params = params |
| self.stop_instances = stop_instances |
| if self.restart == _RESTART_UP: |
| raise NotImplementedError |
| |
| def Setup(self): |
| """Sets up our end so we can do the merger. |
| |
| This method is setting us up as a preparation for the merger. |
| It makes the initial contact and gathers information needed. |
| |
| @raise errors.RemoteError: for errors in communication/grabbing |
| |
| """ |
| (remote_path, _, _) = ssh.GetUserFiles("root") |
| |
| if self.cluster_name in self.clusters: |
| raise errors.CommandError("Cannot merge cluster %s with itself" % |
| self.cluster_name) |
| |
| # Fetch remotes private key |
| for cluster in self.clusters: |
| result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False, |
| ask_key=False) |
| if result.failed: |
| raise errors.RemoteError("There was an error while grabbing ssh private" |
| " key from %s. Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| |
| key_path = utils.PathJoin(self.work_dir, cluster) |
| utils.WriteFile(key_path, mode=0600, data=result.stdout) |
| |
| result = self._RunCmd(cluster, "gnt-node list -o name,offline" |
| " --no-headers --separator=,", private_key=key_path) |
| if result.failed: |
| raise errors.RemoteError("Unable to retrieve list of nodes from %s." |
| " Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| nodes_statuses = [line.split(",") for line in result.stdout.splitlines()] |
| nodes = [node_status[0] for node_status in nodes_statuses |
| if node_status[1] == "N"] |
| |
| result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers", |
| private_key=key_path) |
| if result.failed: |
| raise errors.RemoteError("Unable to retrieve list of instances from" |
| " %s. Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| instances = result.stdout.splitlines() |
| |
| path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" % |
| constants.SS_MASTER_NODE) |
| result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path) |
| if result.failed: |
| raise errors.RemoteError("Unable to retrieve the master node name from" |
| " %s. Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| master_node = result.stdout.strip() |
| |
| self.merger_data.append(MergerData(cluster, key_path, nodes, instances, |
| master_node)) |
| |
| def _PrepareAuthorizedKeys(self): |
| """Prepare the authorized_keys on every merging node. |
| |
| This method add our public key to remotes authorized_key for further |
| communication. |
| |
| """ |
| (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root") |
| pub_key = utils.ReadFile(pub_key_file) |
| |
| for data in self.merger_data: |
| for node in data.nodes: |
| result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" % |
| (auth_keys, pub_key)), |
| private_key=data.key_path, max_attempts=3) |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to add our public key to %s in %s." |
| " Fail reason: %s; output: %s" % |
| (node, data.cluster, result.fail_reason, |
| result.output)) |
| |
| def _RunCmd(self, hostname, command, user="root", use_cluster_key=False, |
| strict_host_check=False, private_key=None, batch=True, |
| ask_key=False, max_attempts=1): |
| """Wrapping SshRunner.Run with default parameters. |
| |
| For explanation of parameters see L{ganeti.ssh.SshRunner.Run}. |
| |
| """ |
| for _ in range(max_attempts): |
| result = self.ssh_runner.Run(hostname=hostname, command=command, |
| user=user, use_cluster_key=use_cluster_key, |
| strict_host_check=strict_host_check, |
| private_key=private_key, batch=batch, |
| ask_key=ask_key) |
| if not result.failed: |
| break |
| |
| return result |
| |
| def _CheckRunningInstances(self): |
| """Checks if on the clusters to be merged there are running instances |
| |
| @rtype: boolean |
| @return: True if there are running instances, False otherwise |
| |
| """ |
| for cluster in self.clusters: |
| result = self._RunCmd(cluster, "gnt-instance list -o status") |
| if self.RUNNING_STATUSES.intersection(result.output.splitlines()): |
| return True |
| |
| return False |
| |
| def _StopMergingInstances(self): |
| """Stop instances on merging clusters. |
| |
| """ |
| for cluster in self.clusters: |
| result = self._RunCmd(cluster, "gnt-instance shutdown --all" |
| " --force-multiple") |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to stop instances on %s." |
| " Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| |
| def _DisableWatcher(self): |
| """Disable watch on all merging clusters, including ourself. |
| |
| """ |
| for cluster in ["localhost"] + self.clusters: |
| result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" % |
| self.pause_period) |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to pause watcher on %s." |
| " Fail reason: %s; output: %s" % |
| (cluster, result.fail_reason, result.output)) |
| |
| def _RemoveMasterIps(self): |
| """Removes the master IPs from the master nodes of each cluster. |
| |
| """ |
| for data in self.merger_data: |
| result = self._RunCmd(data.master_node, |
| "gnt-cluster deactivate-master-ip --yes") |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to remove master IP on %s." |
| " Fail reason: %s; output: %s" % |
| (data.master_node, |
| result.fail_reason, |
| result.output)) |
| |
| def _StopDaemons(self): |
| """Stop all daemons on merging nodes. |
| |
| """ |
| cmd = "%s stop-all" % pathutils.DAEMON_UTIL |
| for data in self.merger_data: |
| for node in data.nodes: |
| result = self._RunCmd(node, cmd, max_attempts=3) |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to stop daemons on %s." |
| " Fail reason: %s; output: %s." % |
| (node, result.fail_reason, result.output)) |
| |
| def _FetchRemoteConfig(self): |
| """Fetches and stores remote cluster config from the master. |
| |
| This step is needed before we can merge the config. |
| |
| """ |
| for data in self.merger_data: |
| result = self._RunCmd(data.cluster, "cat %s" % |
| pathutils.CLUSTER_CONF_FILE) |
| |
| if result.failed: |
| raise errors.RemoteError("Unable to retrieve remote config on %s." |
| " Fail reason: %s; output %s" % |
| (data.cluster, result.fail_reason, |
| result.output)) |
| |
| data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" % |
| data.cluster) |
| utils.WriteFile(data.config_path, data=result.stdout) |
| |
| # R0201: Method could be a function |
| def _KillMasterDaemon(self): # pylint: disable=R0201 |
| """Kills the local master daemon. |
| |
| @raise errors.CommandError: If unable to kill |
| |
| """ |
| result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"]) |
| if result.failed: |
| raise errors.CommandError("Unable to stop master daemons." |
| " Fail reason: %s; output: %s" % |
| (result.fail_reason, result.output)) |
| |
| def _MergeConfig(self): |
| """Merges all foreign config into our own config. |
| |
| """ |
| my_config = config.ConfigWriter(offline=True) |
| fake_ec_id = 0 # Needs to be uniq over the whole config merge |
| |
| for data in self.merger_data: |
| other_config = config.ConfigWriter(data.config_path, accept_foreign=True) |
| self._MergeClusterConfigs(my_config, other_config) |
| self._MergeNodeGroups(my_config, other_config) |
| |
| for node in other_config.GetNodeList(): |
| node_info = other_config.GetNodeInfo(node) |
| # Offline the node, it will be reonlined later at node readd |
| node_info.master_candidate = False |
| node_info.drained = False |
| node_info.offline = True |
| my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id)) |
| fake_ec_id += 1 |
| |
| for instance in other_config.GetInstanceList(): |
| instance_info = other_config.GetInstanceInfo(instance) |
| |
| # Update the DRBD port assignments |
| # This is a little bit hackish |
| for dsk in instance_info.disks: |
| if dsk.dev_type in constants.DTS_DRBD: |
| port = my_config.AllocatePort() |
| |
| logical_id = list(dsk.logical_id) |
| logical_id[2] = port |
| dsk.logical_id = tuple(logical_id) |
| |
| physical_id = list(dsk.physical_id) |
| physical_id[1] = physical_id[3] = port |
| dsk.physical_id = tuple(physical_id) |
| |
| my_config.AddInstance(instance_info, |
| _CLUSTERMERGE_ECID + str(fake_ec_id)) |
| fake_ec_id += 1 |
| |
| def _MergeClusterConfigs(self, my_config, other_config): |
| """Checks that all relevant cluster parameters are compatible |
| |
| """ |
| my_cluster = my_config.GetClusterInfo() |
| other_cluster = other_config.GetClusterInfo() |
| err_count = 0 |
| |
| # |
| # Generic checks |
| # |
| check_params = [ |
| "beparams", |
| "default_iallocator", |
| "drbd_usermode_helper", |
| "hidden_os", |
| "maintain_node_health", |
| "master_netdev", |
| "ndparams", |
| "nicparams", |
| "primary_ip_family", |
| "tags", |
| "uid_pool", |
| ] |
| check_params_strict = [ |
| "volume_group_name", |
| ] |
| if my_cluster.IsFileStorageEnabled() or \ |
| other_cluster.IsFileStorageEnabled(): |
| check_params_strict.append("file_storage_dir") |
| if my_cluster.IsSharedFileStorageEnabled() or \ |
| other_cluster.IsSharedFileStorageEnabled(): |
| check_params_strict.append("shared_file_storage_dir") |
| check_params.extend(check_params_strict) |
| |
| if self.params == _PARAMS_STRICT: |
| params_strict = True |
| else: |
| params_strict = False |
| |
| for param_name in check_params: |
| my_param = getattr(my_cluster, param_name) |
| other_param = getattr(other_cluster, param_name) |
| if my_param != other_param: |
| logging.error("The value (%s) of the cluster parameter %s on %s" |
| " differs to this cluster's value (%s)", |
| other_param, param_name, other_cluster.cluster_name, |
| my_param) |
| if params_strict or param_name in check_params_strict: |
| err_count += 1 |
| |
| # |
| # Custom checks |
| # |
| |
| # Check default hypervisor |
| my_defhyp = my_cluster.enabled_hypervisors[0] |
| other_defhyp = other_cluster.enabled_hypervisors[0] |
| if my_defhyp != other_defhyp: |
| logging.warning("The default hypervisor (%s) differs on %s, new" |
| " instances will be created with this cluster's" |
| " default hypervisor (%s)", other_defhyp, |
| other_cluster.cluster_name, my_defhyp) |
| |
| if (set(my_cluster.enabled_hypervisors) != |
| set(other_cluster.enabled_hypervisors)): |
| logging.error("The set of enabled hypervisors (%s) on %s differs to" |
| " this cluster's set (%s)", |
| other_cluster.enabled_hypervisors, |
| other_cluster.cluster_name, my_cluster.enabled_hypervisors) |
| err_count += 1 |
| |
| # Check hypervisor params for hypervisors we care about |
| for hyp in my_cluster.enabled_hypervisors: |
| for param in my_cluster.hvparams[hyp]: |
| my_value = my_cluster.hvparams[hyp][param] |
| other_value = other_cluster.hvparams[hyp][param] |
| if my_value != other_value: |
| logging.error("The value (%s) of the %s parameter of the %s" |
| " hypervisor on %s differs to this cluster's parameter" |
| " (%s)", |
| other_value, param, hyp, other_cluster.cluster_name, |
| my_value) |
| if params_strict: |
| err_count += 1 |
| |
| # Check os hypervisor params for hypervisors we care about |
| for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()): |
| for hyp in my_cluster.enabled_hypervisors: |
| my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp) |
| other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp) |
| if my_os_hvp != other_os_hvp: |
| logging.error("The OS parameters (%s) for the %s OS for the %s" |
| " hypervisor on %s differs to this cluster's parameters" |
| " (%s)", |
| other_os_hvp, os_name, hyp, other_cluster.cluster_name, |
| my_os_hvp) |
| if params_strict: |
| err_count += 1 |
| |
| # |
| # Warnings |
| # |
| if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts: |
| logging.warning("The modify_etc_hosts value (%s) differs on %s," |
| " this cluster's value (%s) will take precedence", |
| other_cluster.modify_etc_hosts, |
| other_cluster.cluster_name, |
| my_cluster.modify_etc_hosts) |
| |
| if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup: |
| logging.warning("The modify_ssh_setup value (%s) differs on %s," |
| " this cluster's value (%s) will take precedence", |
| other_cluster.modify_ssh_setup, |
| other_cluster.cluster_name, |
| my_cluster.modify_ssh_setup) |
| |
| # |
| # Actual merging |
| # |
| my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs + |
| other_cluster.reserved_lvs)) |
| |
| if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks: |
| logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this" |
| " cluster's value (%s). The least permissive value (%s)" |
| " will be used", other_cluster.prealloc_wipe_disks, |
| other_cluster.cluster_name, |
| my_cluster.prealloc_wipe_disks, True) |
| my_cluster.prealloc_wipe_disks = True |
| |
| for os_, osparams in other_cluster.osparams.items(): |
| if os_ not in my_cluster.osparams: |
| my_cluster.osparams[os_] = osparams |
| elif my_cluster.osparams[os_] != osparams: |
| logging.error("The OS parameters (%s) for the %s OS on %s differs to" |
| " this cluster's parameters (%s)", |
| osparams, os_, other_cluster.cluster_name, |
| my_cluster.osparams[os_]) |
| if params_strict: |
| err_count += 1 |
| |
| if err_count: |
| raise errors.ConfigurationError("Cluster config for %s has incompatible" |
| " values, please fix and re-run" % |
| other_cluster.cluster_name) |
| |
| # R0201: Method could be a function |
| def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201 |
| if os_name in cluster.os_hvp: |
| return cluster.os_hvp[os_name].get(hyp, None) |
| else: |
| return None |
| |
| # R0201: Method could be a function |
| def _MergeNodeGroups(self, my_config, other_config): |
| """Adds foreign node groups |
| |
| ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts. |
| """ |
| # pylint: disable=R0201 |
| logging.info("Node group conflict strategy: %s", self.groups) |
| |
| my_grps = my_config.GetAllNodeGroupsInfo().values() |
| other_grps = other_config.GetAllNodeGroupsInfo().values() |
| |
| # Check for node group naming conflicts: |
| conflicts = [] |
| for other_grp in other_grps: |
| for my_grp in my_grps: |
| if other_grp.name == my_grp.name: |
| conflicts.append(other_grp) |
| |
| if conflicts: |
| conflict_names = utils.CommaJoin([g.name for g in conflicts]) |
| logging.info("Node groups in both local and remote cluster: %s", |
| conflict_names) |
| |
| # User hasn't specified how to handle conflicts |
| if not self.groups: |
| raise errors.CommandError("The following node group(s) are in both" |
| " clusters, and no merge strategy has been" |
| " supplied (see the --groups option): %s" % |
| conflict_names) |
| |
| # User wants to rename conflicts |
| elif self.groups == _GROUPS_RENAME: |
| for grp in conflicts: |
| new_name = "%s-%s" % (grp.name, other_config.GetClusterName()) |
| logging.info("Renaming remote node group from %s to %s" |
| " to resolve conflict", grp.name, new_name) |
| grp.name = new_name |
| |
| # User wants to merge conflicting groups |
| elif self.groups == _GROUPS_MERGE: |
| for other_grp in conflicts: |
| logging.info("Merging local and remote '%s' groups", other_grp.name) |
| for node_name in other_grp.members[:]: |
| node = other_config.GetNodeInfo(node_name) |
| # Access to a protected member of a client class |
| # pylint: disable=W0212 |
| other_config._UnlockedRemoveNodeFromGroup(node) |
| |
| # Access to a protected member of a client class |
| # pylint: disable=W0212 |
| my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name) |
| |
| # Access to a protected member of a client class |
| # pylint: disable=W0212 |
| my_config._UnlockedAddNodeToGroup(node, my_grp_uuid) |
| node.group = my_grp_uuid |
| # Remove from list of groups to add |
| other_grps.remove(other_grp) |
| |
| for grp in other_grps: |
| #TODO: handle node group conflicts |
| my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID) |
| |
| # R0201: Method could be a function |
| def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201 |
| """Starts the local master daemon. |
| |
| @param no_vote: Should the masterd started without voting? default: False |
| @raise errors.CommandError: If unable to start daemon. |
| |
| """ |
| env = {} |
| if no_vote: |
| env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it" |
| |
| result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env) |
| if result.failed: |
| raise errors.CommandError("Couldn't start ganeti master." |
| " Fail reason: %s; output: %s" % |
| (result.fail_reason, result.output)) |
| |
| def _ReaddMergedNodesAndRedist(self): |
| """Readds all merging nodes and make sure their config is up-to-date. |
| |
| @raise errors.CommandError: If anything fails. |
| |
| """ |
| for data in self.merger_data: |
| for node in data.nodes: |
| logging.info("Readding node %s", node) |
| result = utils.RunCmd(["gnt-node", "add", "--readd", |
| "--no-ssh-key-check", node]) |
| if result.failed: |
| logging.error("%s failed to be readded. Reason: %s, output: %s", |
| node, result.fail_reason, result.output) |
| |
| result = utils.RunCmd(["gnt-cluster", "redist-conf"]) |
| if result.failed: |
| raise errors.CommandError("Redistribution failed. Fail reason: %s;" |
| " output: %s" % (result.fail_reason, |
| result.output)) |
| |
| # R0201: Method could be a function |
| def _StartupAllInstances(self): # pylint: disable=R0201 |
| """Starts up all instances (locally). |
| |
| @raise errors.CommandError: If unable to start clusters |
| |
| """ |
| result = utils.RunCmd(["gnt-instance", "startup", "--all", |
| "--force-multiple"]) |
| if result.failed: |
| raise errors.CommandError("Unable to start all instances." |
| " Fail reason: %s; output: %s" % |
| (result.fail_reason, result.output)) |
| |
| # R0201: Method could be a function |
| # TODO: make this overridable, for some verify errors |
| def _VerifyCluster(self): # pylint: disable=R0201 |
| """Runs gnt-cluster verify to verify the health. |
| |
| @raise errors.ProgrammError: If cluster fails on verification |
| |
| """ |
| result = utils.RunCmd(["gnt-cluster", "verify"]) |
| if result.failed: |
| raise errors.CommandError("Verification of cluster failed." |
| " Fail reason: %s; output: %s" % |
| (result.fail_reason, result.output)) |
| |
| def Merge(self): |
| """Does the actual merge. |
| |
| It runs all the steps in the right order and updates the user about steps |
| taken. Also it keeps track of rollback_steps to undo everything. |
| |
| """ |
| rbsteps = [] |
| try: |
| logging.info("Pre cluster verification") |
| self._VerifyCluster() |
| |
| logging.info("Prepare authorized_keys") |
| rbsteps.append("Remove our key from authorized_keys on nodes:" |
| " %(nodes)s") |
| self._PrepareAuthorizedKeys() |
| |
| rbsteps.append("Start all instances again on the merging" |
| " clusters: %(clusters)s") |
| if self.stop_instances: |
| logging.info("Stopping merging instances (takes a while)") |
| self._StopMergingInstances() |
| logging.info("Checking that no instances are running on the mergees") |
| instances_running = self._CheckRunningInstances() |
| if instances_running: |
| raise errors.CommandError("Some instances are still running on the" |
| " mergees") |
| logging.info("Disable watcher") |
| self._DisableWatcher() |
| logging.info("Merging config") |
| self._FetchRemoteConfig() |
| logging.info("Removing master IPs on mergee master nodes") |
| self._RemoveMasterIps() |
| logging.info("Stop daemons on merging nodes") |
| self._StopDaemons() |
| |
| logging.info("Stopping master daemon") |
| self._KillMasterDaemon() |
| |
| rbsteps.append("Restore %s from another master candidate" |
| " and restart master daemon" % |
| pathutils.CLUSTER_CONF_FILE) |
| self._MergeConfig() |
| self._StartMasterDaemon(no_vote=True) |
| |
| # Point of no return, delete rbsteps |
| del rbsteps[:] |
| |
| logging.warning("We are at the point of no return. Merge can not easily" |
| " be undone after this point.") |
| logging.info("Readd nodes") |
| self._ReaddMergedNodesAndRedist() |
| |
| logging.info("Merge done, restart master daemon normally") |
| self._KillMasterDaemon() |
| self._StartMasterDaemon() |
| |
| if self.restart == _RESTART_ALL: |
| logging.info("Starting instances again") |
| self._StartupAllInstances() |
| else: |
| logging.info("Not starting instances again") |
| logging.info("Post cluster verification") |
| self._VerifyCluster() |
| except errors.GenericError, e: |
| logging.exception(e) |
| |
| if rbsteps: |
| nodes = Flatten([data.nodes for data in self.merger_data]) |
| info = { |
| "clusters": self.clusters, |
| "nodes": nodes, |
| } |
| logging.critical("In order to rollback do the following:") |
| for step in rbsteps: |
| logging.critical(" * %s", step % info) |
| else: |
| logging.critical("Nothing to rollback.") |
| |
| # TODO: Keep track of steps done for a flawless resume? |
| |
| def Cleanup(self): |
| """Clean up our environment. |
| |
| This cleans up remote private keys and configs and after that |
| deletes the temporary directory. |
| |
| """ |
| shutil.rmtree(self.work_dir) |
| |
| |
| def main(): |
| """Main routine. |
| |
| """ |
| program = os.path.basename(sys.argv[0]) |
| |
| parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>", |
| prog=program) |
| parser.add_option(cli.DEBUG_OPT) |
| parser.add_option(cli.VERBOSE_OPT) |
| parser.add_option(PAUSE_PERIOD_OPT) |
| parser.add_option(GROUPS_OPT) |
| parser.add_option(RESTART_OPT) |
| parser.add_option(PARAMS_OPT) |
| parser.add_option(SKIP_STOP_INSTANCES_OPT) |
| |
| (options, args) = parser.parse_args() |
| |
| utils.SetupToolLogging(options.debug, options.verbose) |
| |
| if not args: |
| parser.error("No clusters specified") |
| |
| cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period, |
| options.groups, options.restart, options.params, |
| options.stop_instances) |
| try: |
| try: |
| cluster_merger.Setup() |
| cluster_merger.Merge() |
| except errors.GenericError, e: |
| logging.exception(e) |
| return constants.EXIT_FAILURE |
| finally: |
| cluster_merger.Cleanup() |
| |
| return constants.EXIT_SUCCESS |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |