blob: 926b70516b4829d0df5d342c5fdc1261efe70b79 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2012 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tool to merge two or more clusters together.
The clusters have to run the same version of Ganeti!
"""
# pylint: disable=C0103
# C0103: Invalid name cluster-merge
import logging
import os
import optparse
import shutil
import sys
import tempfile
from ganeti import cli
from ganeti import config
from ganeti import constants
from ganeti import errors
from ganeti import ssh
from ganeti import utils
from ganeti import pathutils
from ganeti import compat
_GROUPS_MERGE = "merge"
_GROUPS_RENAME = "rename"
_CLUSTERMERGE_ECID = "clustermerge-ecid"
_RESTART_ALL = "all"
_RESTART_UP = "up"
_RESTART_NONE = "none"
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
_PARAMS_STRICT = "strict"
_PARAMS_WARN = "warn"
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
action="store", type="int",
dest="pause_period",
help=("Amount of time in seconds watcher"
" should be suspended from running"))
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
choices=(_GROUPS_MERGE, _GROUPS_RENAME),
dest="groups",
help=("How to handle groups that have the"
" same name (One of: %s/%s)" %
(_GROUPS_MERGE, _GROUPS_RENAME)))
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
metavar="STRATEGY",
choices=_PARAMS_CHOICES,
dest="params",
help=("How to handle params that have"
" different values (One of: %s/%s)" %
_PARAMS_CHOICES))
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
metavar="STRATEGY",
choices=_RESTART_CHOICES,
dest="restart",
help=("How to handle restarting instances"
" same name (One of: %s/%s/%s)" %
_RESTART_CHOICES))
SKIP_STOP_INSTANCES_OPT = \
cli.cli_option("--skip-stop-instances", default=True, action="store_false",
dest="stop_instances",
help=("Don't stop the instances on the clusters, just check "
"that none is running"))
def Flatten(unflattened_list):
"""Flattens a list.
@param unflattened_list: A list of unflattened list objects.
@return: A flattened list
"""
flattened_list = []
for item in unflattened_list:
if isinstance(item, list):
flattened_list.extend(Flatten(item))
else:
flattened_list.append(item)
return flattened_list
class MergerData(object):
"""Container class to hold data used for merger.
"""
def __init__(self, cluster, key_path, nodes, instances, master_node,
config_path=None):
"""Initialize the container.
@param cluster: The name of the cluster
@param key_path: Path to the ssh private key used for authentication
@param nodes: List of online nodes in the merging cluster
@param instances: List of instances running on merging cluster
@param master_node: Name of the master node
@param config_path: Path to the merging cluster config
"""
self.cluster = cluster
self.key_path = key_path
self.nodes = nodes
self.instances = instances
self.master_node = master_node
self.config_path = config_path
class Merger(object):
"""Handling the merge.
"""
RUNNING_STATUSES = compat.UniqueFrozenset([
constants.INSTST_RUNNING,
constants.INSTST_ERRORUP,
])
def __init__(self, clusters, pause_period, groups, restart, params,
stop_instances):
"""Initialize object with sane defaults and infos required.
@param clusters: The list of clusters to merge in
@param pause_period: The time watcher shall be disabled for
@param groups: How to handle group conflicts
@param restart: How to handle instance restart
@param stop_instances: Indicates whether the instances must be stopped
(True) or if the Merger must only check if no
instances are running on the mergee clusters (False)
"""
self.merger_data = []
self.clusters = clusters
self.pause_period = pause_period
self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
(self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
self.ssh_runner = ssh.SshRunner(self.cluster_name)
self.groups = groups
self.restart = restart
self.params = params
self.stop_instances = stop_instances
if self.restart == _RESTART_UP:
raise NotImplementedError
def Setup(self):
"""Sets up our end so we can do the merger.
This method is setting us up as a preparation for the merger.
It makes the initial contact and gathers information needed.
@raise errors.RemoteError: for errors in communication/grabbing
"""
(remote_path, _, _) = ssh.GetUserFiles("root")
if self.cluster_name in self.clusters:
raise errors.CommandError("Cannot merge cluster %s with itself" %
self.cluster_name)
# Fetch remotes private key
for cluster in self.clusters:
result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
ask_key=False)
if result.failed:
raise errors.RemoteError("There was an error while grabbing ssh private"
" key from %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
key_path = utils.PathJoin(self.work_dir, cluster)
utils.WriteFile(key_path, mode=0600, data=result.stdout)
result = self._RunCmd(cluster, "gnt-node list -o name,offline"
" --no-headers --separator=,", private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of nodes from %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
nodes = [node_status[0] for node_status in nodes_statuses
if node_status[1] == "N"]
result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve list of instances from"
" %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
instances = result.stdout.splitlines()
path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
constants.SS_MASTER_NODE)
result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
if result.failed:
raise errors.RemoteError("Unable to retrieve the master node name from"
" %s. Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
master_node = result.stdout.strip()
self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
master_node))
def _PrepareAuthorizedKeys(self):
"""Prepare the authorized_keys on every merging node.
This method add our public key to remotes authorized_key for further
communication.
"""
(_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
pub_key = utils.ReadFile(pub_key_file)
for data in self.merger_data:
for node in data.nodes:
result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
(auth_keys, pub_key)),
private_key=data.key_path, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to add our public key to %s in %s."
" Fail reason: %s; output: %s" %
(node, data.cluster, result.fail_reason,
result.output))
def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
strict_host_check=False, private_key=None, batch=True,
ask_key=False, max_attempts=1):
"""Wrapping SshRunner.Run with default parameters.
For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
"""
for _ in range(max_attempts):
result = self.ssh_runner.Run(hostname=hostname, command=command,
user=user, use_cluster_key=use_cluster_key,
strict_host_check=strict_host_check,
private_key=private_key, batch=batch,
ask_key=ask_key)
if not result.failed:
break
return result
def _CheckRunningInstances(self):
"""Checks if on the clusters to be merged there are running instances
@rtype: boolean
@return: True if there are running instances, False otherwise
"""
for cluster in self.clusters:
result = self._RunCmd(cluster, "gnt-instance list -o status")
if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
return True
return False
def _StopMergingInstances(self):
"""Stop instances on merging clusters.
"""
for cluster in self.clusters:
result = self._RunCmd(cluster, "gnt-instance shutdown --all"
" --force-multiple")
if result.failed:
raise errors.RemoteError("Unable to stop instances on %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
def _DisableWatcher(self):
"""Disable watch on all merging clusters, including ourself.
"""
for cluster in ["localhost"] + self.clusters:
result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
self.pause_period)
if result.failed:
raise errors.RemoteError("Unable to pause watcher on %s."
" Fail reason: %s; output: %s" %
(cluster, result.fail_reason, result.output))
def _RemoveMasterIps(self):
"""Removes the master IPs from the master nodes of each cluster.
"""
for data in self.merger_data:
result = self._RunCmd(data.master_node,
"gnt-cluster deactivate-master-ip --yes")
if result.failed:
raise errors.RemoteError("Unable to remove master IP on %s."
" Fail reason: %s; output: %s" %
(data.master_node,
result.fail_reason,
result.output))
def _StopDaemons(self):
"""Stop all daemons on merging nodes.
"""
cmd = "%s stop-all" % pathutils.DAEMON_UTIL
for data in self.merger_data:
for node in data.nodes:
result = self._RunCmd(node, cmd, max_attempts=3)
if result.failed:
raise errors.RemoteError("Unable to stop daemons on %s."
" Fail reason: %s; output: %s." %
(node, result.fail_reason, result.output))
def _FetchRemoteConfig(self):
"""Fetches and stores remote cluster config from the master.
This step is needed before we can merge the config.
"""
for data in self.merger_data:
result = self._RunCmd(data.cluster, "cat %s" %
pathutils.CLUSTER_CONF_FILE)
if result.failed:
raise errors.RemoteError("Unable to retrieve remote config on %s."
" Fail reason: %s; output %s" %
(data.cluster, result.fail_reason,
result.output))
data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
data.cluster)
utils.WriteFile(data.config_path, data=result.stdout)
# R0201: Method could be a function
def _KillMasterDaemon(self): # pylint: disable=R0201
"""Kills the local master daemon.
@raise errors.CommandError: If unable to kill
"""
result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
if result.failed:
raise errors.CommandError("Unable to stop master daemons."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def _MergeConfig(self):
"""Merges all foreign config into our own config.
"""
my_config = config.ConfigWriter(offline=True)
fake_ec_id = 0 # Needs to be uniq over the whole config merge
for data in self.merger_data:
other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
self._MergeClusterConfigs(my_config, other_config)
self._MergeNodeGroups(my_config, other_config)
for node in other_config.GetNodeList():
node_info = other_config.GetNodeInfo(node)
# Offline the node, it will be reonlined later at node readd
node_info.master_candidate = False
node_info.drained = False
node_info.offline = True
my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
for instance in other_config.GetInstanceList():
instance_info = other_config.GetInstanceInfo(instance)
# Update the DRBD port assignments
# This is a little bit hackish
for dsk in instance_info.disks:
if dsk.dev_type in constants.DTS_DRBD:
port = my_config.AllocatePort()
logical_id = list(dsk.logical_id)
logical_id[2] = port
dsk.logical_id = tuple(logical_id)
my_config.AddInstance(instance_info,
_CLUSTERMERGE_ECID + str(fake_ec_id))
fake_ec_id += 1
def _MergeClusterConfigs(self, my_config, other_config):
"""Checks that all relevant cluster parameters are compatible
"""
my_cluster = my_config.GetClusterInfo()
other_cluster = other_config.GetClusterInfo()
err_count = 0
#
# Generic checks
#
check_params = [
"beparams",
"default_iallocator",
"drbd_usermode_helper",
"hidden_os",
"maintain_node_health",
"master_netdev",
"ndparams",
"nicparams",
"primary_ip_family",
"tags",
"uid_pool",
]
check_params_strict = [
"volume_group_name",
]
if my_cluster.IsFileStorageEnabled() or \
other_cluster.IsFileStorageEnabled():
check_params_strict.append("file_storage_dir")
if my_cluster.IsSharedFileStorageEnabled() or \
other_cluster.IsSharedFileStorageEnabled():
check_params_strict.append("shared_file_storage_dir")
check_params.extend(check_params_strict)
if self.params == _PARAMS_STRICT:
params_strict = True
else:
params_strict = False
for param_name in check_params:
my_param = getattr(my_cluster, param_name)
other_param = getattr(other_cluster, param_name)
if my_param != other_param:
logging.error("The value (%s) of the cluster parameter %s on %s"
" differs to this cluster's value (%s)",
other_param, param_name, other_cluster.cluster_name,
my_param)
if params_strict or param_name in check_params_strict:
err_count += 1
#
# Custom checks
#
# Check default hypervisor
my_defhyp = my_cluster.enabled_hypervisors[0]
other_defhyp = other_cluster.enabled_hypervisors[0]
if my_defhyp != other_defhyp:
logging.warning("The default hypervisor (%s) differs on %s, new"
" instances will be created with this cluster's"
" default hypervisor (%s)", other_defhyp,
other_cluster.cluster_name, my_defhyp)
if (set(my_cluster.enabled_hypervisors) !=
set(other_cluster.enabled_hypervisors)):
logging.error("The set of enabled hypervisors (%s) on %s differs to"
" this cluster's set (%s)",
other_cluster.enabled_hypervisors,
other_cluster.cluster_name, my_cluster.enabled_hypervisors)
err_count += 1
# Check hypervisor params for hypervisors we care about
for hyp in my_cluster.enabled_hypervisors:
for param in my_cluster.hvparams[hyp]:
my_value = my_cluster.hvparams[hyp][param]
other_value = other_cluster.hvparams[hyp][param]
if my_value != other_value:
logging.error("The value (%s) of the %s parameter of the %s"
" hypervisor on %s differs to this cluster's parameter"
" (%s)",
other_value, param, hyp, other_cluster.cluster_name,
my_value)
if params_strict:
err_count += 1
# Check os hypervisor params for hypervisors we care about
for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
for hyp in my_cluster.enabled_hypervisors:
my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
if my_os_hvp != other_os_hvp:
logging.error("The OS parameters (%s) for the %s OS for the %s"
" hypervisor on %s differs to this cluster's parameters"
" (%s)",
other_os_hvp, os_name, hyp, other_cluster.cluster_name,
my_os_hvp)
if params_strict:
err_count += 1
#
# Warnings
#
if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
logging.warning("The modify_etc_hosts value (%s) differs on %s,"
" this cluster's value (%s) will take precedence",
other_cluster.modify_etc_hosts,
other_cluster.cluster_name,
my_cluster.modify_etc_hosts)
if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
logging.warning("The modify_ssh_setup value (%s) differs on %s,"
" this cluster's value (%s) will take precedence",
other_cluster.modify_ssh_setup,
other_cluster.cluster_name,
my_cluster.modify_ssh_setup)
#
# Actual merging
#
my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
other_cluster.reserved_lvs))
if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
" cluster's value (%s). The least permissive value (%s)"
" will be used", other_cluster.prealloc_wipe_disks,
other_cluster.cluster_name,
my_cluster.prealloc_wipe_disks, True)
my_cluster.prealloc_wipe_disks = True
for os_, osparams in other_cluster.osparams.items():
if os_ not in my_cluster.osparams:
my_cluster.osparams[os_] = osparams
elif my_cluster.osparams[os_] != osparams:
logging.error("The OS parameters (%s) for the %s OS on %s differs to"
" this cluster's parameters (%s)",
osparams, os_, other_cluster.cluster_name,
my_cluster.osparams[os_])
if params_strict:
err_count += 1
if err_count:
raise errors.ConfigurationError("Cluster config for %s has incompatible"
" values, please fix and re-run" %
other_cluster.cluster_name)
# R0201: Method could be a function
def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
if os_name in cluster.os_hvp:
return cluster.os_hvp[os_name].get(hyp, None)
else:
return None
# R0201: Method could be a function
def _MergeNodeGroups(self, my_config, other_config):
"""Adds foreign node groups
ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
"""
# pylint: disable=R0201
logging.info("Node group conflict strategy: %s", self.groups)
my_grps = my_config.GetAllNodeGroupsInfo().values()
other_grps = other_config.GetAllNodeGroupsInfo().values()
# Check for node group naming conflicts:
conflicts = []
for other_grp in other_grps:
for my_grp in my_grps:
if other_grp.name == my_grp.name:
conflicts.append(other_grp)
if conflicts:
conflict_names = utils.CommaJoin([g.name for g in conflicts])
logging.info("Node groups in both local and remote cluster: %s",
conflict_names)
# User hasn't specified how to handle conflicts
if not self.groups:
raise errors.CommandError("The following node group(s) are in both"
" clusters, and no merge strategy has been"
" supplied (see the --groups option): %s" %
conflict_names)
# User wants to rename conflicts
elif self.groups == _GROUPS_RENAME:
for grp in conflicts:
new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
logging.info("Renaming remote node group from %s to %s"
" to resolve conflict", grp.name, new_name)
grp.name = new_name
# User wants to merge conflicting groups
elif self.groups == _GROUPS_MERGE:
for other_grp in conflicts:
logging.info("Merging local and remote '%s' groups", other_grp.name)
for node_name in other_grp.members[:]:
node = other_config.GetNodeInfo(node_name)
# Access to a protected member of a client class
# pylint: disable=W0212
other_config._UnlockedRemoveNodeFromGroup(node)
# Access to a protected member of a client class
# pylint: disable=W0212
my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
# Access to a protected member of a client class
# pylint: disable=W0212
my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
node.group = my_grp_uuid
# Remove from list of groups to add
other_grps.remove(other_grp)
for grp in other_grps:
#TODO: handle node group conflicts
my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
# R0201: Method could be a function
def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
"""Starts the local master daemon.
@param no_vote: Should the masterd started without voting? default: False
@raise errors.CommandError: If unable to start daemon.
"""
env = {}
if no_vote:
env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
if result.failed:
raise errors.CommandError("Couldn't start ganeti master."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def _ReaddMergedNodesAndRedist(self):
"""Readds all merging nodes and make sure their config is up-to-date.
@raise errors.CommandError: If anything fails.
"""
for data in self.merger_data:
for node in data.nodes:
logging.info("Readding node %s", node)
result = utils.RunCmd(["gnt-node", "add", "--readd",
"--no-ssh-key-check", node])
if result.failed:
logging.error("%s failed to be readded. Reason: %s, output: %s",
node, result.fail_reason, result.output)
result = utils.RunCmd(["gnt-cluster", "redist-conf"])
if result.failed:
raise errors.CommandError("Redistribution failed. Fail reason: %s;"
" output: %s" % (result.fail_reason,
result.output))
# R0201: Method could be a function
def _StartupAllInstances(self): # pylint: disable=R0201
"""Starts up all instances (locally).
@raise errors.CommandError: If unable to start clusters
"""
result = utils.RunCmd(["gnt-instance", "startup", "--all",
"--force-multiple"])
if result.failed:
raise errors.CommandError("Unable to start all instances."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
# R0201: Method could be a function
# TODO: make this overridable, for some verify errors
def _VerifyCluster(self): # pylint: disable=R0201
"""Runs gnt-cluster verify to verify the health.
@raise errors.ProgrammError: If cluster fails on verification
"""
result = utils.RunCmd(["gnt-cluster", "verify"])
if result.failed:
raise errors.CommandError("Verification of cluster failed."
" Fail reason: %s; output: %s" %
(result.fail_reason, result.output))
def Merge(self):
"""Does the actual merge.
It runs all the steps in the right order and updates the user about steps
taken. Also it keeps track of rollback_steps to undo everything.
"""
rbsteps = []
try:
logging.info("Pre cluster verification")
self._VerifyCluster()
logging.info("Prepare authorized_keys")
rbsteps.append("Remove our key from authorized_keys on nodes:"
" %(nodes)s")
self._PrepareAuthorizedKeys()
rbsteps.append("Start all instances again on the merging"
" clusters: %(clusters)s")
if self.stop_instances:
logging.info("Stopping merging instances (takes a while)")
self._StopMergingInstances()
logging.info("Checking that no instances are running on the mergees")
instances_running = self._CheckRunningInstances()
if instances_running:
raise errors.CommandError("Some instances are still running on the"
" mergees")
logging.info("Disable watcher")
self._DisableWatcher()
logging.info("Merging config")
self._FetchRemoteConfig()
logging.info("Removing master IPs on mergee master nodes")
self._RemoveMasterIps()
logging.info("Stop daemons on merging nodes")
self._StopDaemons()
logging.info("Stopping master daemon")
self._KillMasterDaemon()
rbsteps.append("Restore %s from another master candidate"
" and restart master daemon" %
pathutils.CLUSTER_CONF_FILE)
self._MergeConfig()
self._StartMasterDaemon(no_vote=True)
# Point of no return, delete rbsteps
del rbsteps[:]
logging.warning("We are at the point of no return. Merge can not easily"
" be undone after this point.")
logging.info("Readd nodes")
self._ReaddMergedNodesAndRedist()
logging.info("Merge done, restart master daemon normally")
self._KillMasterDaemon()
self._StartMasterDaemon()
if self.restart == _RESTART_ALL:
logging.info("Starting instances again")
self._StartupAllInstances()
else:
logging.info("Not starting instances again")
logging.info("Post cluster verification")
self._VerifyCluster()
except errors.GenericError, e:
logging.exception(e)
if rbsteps:
nodes = Flatten([data.nodes for data in self.merger_data])
info = {
"clusters": self.clusters,
"nodes": nodes,
}
logging.critical("In order to rollback do the following:")
for step in rbsteps:
logging.critical(" * %s", step % info)
else:
logging.critical("Nothing to rollback.")
# TODO: Keep track of steps done for a flawless resume?
def Cleanup(self):
"""Clean up our environment.
This cleans up remote private keys and configs and after that
deletes the temporary directory.
"""
shutil.rmtree(self.work_dir)
def main():
"""Main routine.
"""
program = os.path.basename(sys.argv[0])
parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
prog=program)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.VERBOSE_OPT)
parser.add_option(PAUSE_PERIOD_OPT)
parser.add_option(GROUPS_OPT)
parser.add_option(RESTART_OPT)
parser.add_option(PARAMS_OPT)
parser.add_option(SKIP_STOP_INSTANCES_OPT)
(options, args) = parser.parse_args()
utils.SetupToolLogging(options.debug, options.verbose)
if not args:
parser.error("No clusters specified")
cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
options.groups, options.restart, options.params,
options.stop_instances)
try:
try:
cluster_merger.Setup()
cluster_merger.Merge()
except errors.GenericError, e:
logging.exception(e)
return constants.EXIT_FAILURE
finally:
cluster_merger.Cleanup()
return constants.EXIT_SUCCESS
if __name__ == "__main__":
sys.exit(main())