| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Tool to restart erroneously downed virtual machines. |
| |
| This program and set of classes implement a watchdog to restart |
| virtual machines in a Ganeti cluster that have crashed or been killed |
| by a node reboot. Run from cron or similar. |
| |
| """ |
| |
| import os |
| import os.path |
| import sys |
| import time |
| import logging |
| import operator |
| import errno |
| from optparse import OptionParser |
| |
| from ganeti import utils |
| from ganeti import wconfd |
| from ganeti import constants |
| from ganeti import compat |
| from ganeti import errors |
| from ganeti import opcodes |
| from ganeti import cli |
| import ganeti.rpc.errors as rpcerr |
| from ganeti import rapi |
| from ganeti import netutils |
| from ganeti import qlang |
| from ganeti import ssconf |
| from ganeti import ht |
| from ganeti import pathutils |
| |
| import ganeti.rapi.client # pylint: disable=W0611 |
| from ganeti.rapi.client import UsesRapiClient |
| |
| from ganeti.watcher import nodemaint |
| from ganeti.watcher import state |
| |
| |
| MAXTRIES = 5 |
| BAD_STATES = compat.UniqueFrozenset([ |
| constants.INSTST_ERRORDOWN, |
| ]) |
| HELPLESS_STATES = compat.UniqueFrozenset([ |
| constants.INSTST_NODEDOWN, |
| constants.INSTST_NODEOFFLINE, |
| ]) |
| NOTICE = "NOTICE" |
| ERROR = "ERROR" |
| |
| #: Number of seconds to wait between starting child processes for node groups |
| CHILD_PROCESS_DELAY = 1.0 |
| |
| #: How many seconds to wait for instance status file lock |
| INSTANCE_STATUS_LOCK_TIMEOUT = 10.0 |
| |
| |
| class NotMasterError(errors.GenericError): |
| """Exception raised when this host is not the master.""" |
| |
| |
| def ShouldPause(): |
| """Check whether we should pause. |
| |
| """ |
| return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)) |
| |
| |
| def StartNodeDaemons(): |
| """Start all the daemons that should be running on all nodes. |
| |
| """ |
| # on master or not, try to start the node daemon |
| utils.EnsureDaemon(constants.NODED) |
| # start confd as well. On non candidates it will be in disabled mode. |
| utils.EnsureDaemon(constants.CONFD) |
| # start mond as well: all nodes need monitoring |
| if constants.ENABLE_MOND: |
| utils.EnsureDaemon(constants.MOND) |
| # start kvmd, which will quit if not needed to run |
| utils.EnsureDaemon(constants.KVMD) |
| |
| |
| def RunWatcherHooks(): |
| """Run the watcher hooks. |
| |
| """ |
| hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR, |
| constants.HOOKS_NAME_WATCHER) |
| if not os.path.isdir(hooks_dir): |
| return |
| |
| try: |
| results = utils.RunParts(hooks_dir) |
| except Exception, err: # pylint: disable=W0703 |
| logging.exception("RunParts %s failed: %s", hooks_dir, err) |
| return |
| |
| for (relname, status, runresult) in results: |
| if status == constants.RUNPARTS_SKIP: |
| logging.debug("Watcher hook %s: skipped", relname) |
| elif status == constants.RUNPARTS_ERR: |
| logging.warning("Watcher hook %s: error (%s)", relname, runresult) |
| elif status == constants.RUNPARTS_RUN: |
| if runresult.failed: |
| logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", |
| relname, runresult.exit_code, runresult.output) |
| else: |
| logging.debug("Watcher hook %s: success (output: %s)", relname, |
| runresult.output) |
| else: |
| raise errors.ProgrammerError("Unknown status %s returned by RunParts", |
| status) |
| |
| |
| class Instance(object): |
| """Abstraction for a Virtual Machine instance. |
| |
| """ |
| def __init__(self, name, status, config_state, config_state_source, |
| disks_active, snodes, disk_template): |
| self.name = name |
| self.status = status |
| self.config_state = config_state |
| self.config_state_source = config_state_source |
| self.disks_active = disks_active |
| self.snodes = snodes |
| self.disk_template = disk_template |
| |
| def Restart(self, cl): |
| """Encapsulates the start of an instance. |
| |
| """ |
| op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) |
| op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, |
| "Restarting instance %s" % self.name, |
| utils.EpochNano())] |
| cli.SubmitOpCode(op, cl=cl) |
| |
| def ActivateDisks(self, cl): |
| """Encapsulates the activation of all disks of an instance. |
| |
| """ |
| op = opcodes.OpInstanceActivateDisks(instance_name=self.name) |
| op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, |
| "Activating disks for instance %s" % self.name, |
| utils.EpochNano())] |
| cli.SubmitOpCode(op, cl=cl) |
| |
| def NeedsCleanup(self): |
| """Determines whether the instance needs cleanup. |
| |
| Determines whether the instance needs cleanup after having been |
| shutdown by the user. |
| |
| @rtype: bool |
| @return: True if the instance needs cleanup, False otherwise. |
| |
| """ |
| return self.status == constants.INSTST_USERDOWN and \ |
| self.config_state != constants.ADMINST_DOWN |
| |
| |
| class Node(object): |
| """Data container representing cluster node. |
| |
| """ |
| def __init__(self, name, bootid, offline, secondaries): |
| """Initializes this class. |
| |
| """ |
| self.name = name |
| self.bootid = bootid |
| self.offline = offline |
| self.secondaries = secondaries |
| |
| |
| def _CleanupInstance(cl, notepad, inst, locks): |
| n = notepad.NumberOfCleanupAttempts(inst.name) |
| |
| if inst.name in locks: |
| logging.info("Not cleaning up instance '%s', instance is locked", |
| inst.name) |
| return |
| |
| if n > MAXTRIES: |
| logging.warning("Not cleaning up instance '%s', retries exhausted", |
| inst.name) |
| return |
| |
| logging.info("Instance '%s' was shutdown by the user, cleaning up instance", |
| inst.name) |
| op = opcodes.OpInstanceShutdown(instance_name=inst.name, |
| admin_state_source=constants.USER_SOURCE) |
| |
| op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, |
| "Cleaning up instance %s" % inst.name, |
| utils.EpochNano())] |
| try: |
| cli.SubmitOpCode(op, cl=cl) |
| if notepad.NumberOfCleanupAttempts(inst.name): |
| notepad.RemoveInstance(inst.name) |
| except Exception: # pylint: disable=W0703 |
| logging.exception("Error while cleaning up instance '%s'", inst.name) |
| notepad.RecordCleanupAttempt(inst.name) |
| |
| |
| def _CheckInstances(cl, notepad, instances, locks): |
| """Make a pass over the list of instances, restarting downed ones. |
| |
| """ |
| notepad.MaintainInstanceList(instances.keys()) |
| |
| started = set() |
| |
| for inst in instances.values(): |
| if inst.NeedsCleanup(): |
| _CleanupInstance(cl, notepad, inst, locks) |
| elif inst.status in BAD_STATES: |
| n = notepad.NumberOfRestartAttempts(inst.name) |
| |
| if n > MAXTRIES: |
| logging.warning("Not restarting instance '%s', retries exhausted", |
| inst.name) |
| continue |
| |
| if n == MAXTRIES: |
| notepad.RecordRestartAttempt(inst.name) |
| logging.error("Could not restart instance '%s' after %s attempts," |
| " giving up", inst.name, MAXTRIES) |
| continue |
| |
| try: |
| logging.info("Restarting instance '%s' (attempt #%s)", |
| inst.name, n + 1) |
| inst.Restart(cl) |
| except Exception: # pylint: disable=W0703 |
| logging.exception("Error while restarting instance '%s'", inst.name) |
| else: |
| started.add(inst.name) |
| |
| notepad.RecordRestartAttempt(inst.name) |
| |
| else: |
| if notepad.NumberOfRestartAttempts(inst.name): |
| notepad.RemoveInstance(inst.name) |
| if inst.status not in HELPLESS_STATES: |
| logging.info("Restart of instance '%s' succeeded", inst.name) |
| |
| return started |
| |
| |
| def _CheckDisks(cl, notepad, nodes, instances, started): |
| """Check all nodes for restarted ones. |
| |
| """ |
| check_nodes = [] |
| |
| for node in nodes.values(): |
| old = notepad.GetNodeBootID(node.name) |
| if not node.bootid: |
| # Bad node, not returning a boot id |
| if not node.offline: |
| logging.debug("Node '%s' missing boot ID, skipping secondary checks", |
| node.name) |
| continue |
| |
| if old != node.bootid: |
| # Node's boot ID has changed, probably through a reboot |
| check_nodes.append(node) |
| |
| if check_nodes: |
| # Activate disks for all instances with any of the checked nodes as a |
| # secondary node. |
| for node in check_nodes: |
| for instance_name in node.secondaries: |
| try: |
| inst = instances[instance_name] |
| except KeyError: |
| logging.info("Can't find instance '%s', maybe it was ignored", |
| instance_name) |
| continue |
| |
| if not inst.disks_active: |
| logging.info("Skipping disk activation for instance with not" |
| " activated disks '%s'", inst.name) |
| continue |
| |
| if inst.name in started: |
| # we already tried to start the instance, which should have |
| # activated its drives (if they can be at all) |
| logging.debug("Skipping disk activation for instance '%s' as" |
| " it was already started", inst.name) |
| continue |
| |
| try: |
| logging.info("Activating disks for instance '%s'", inst.name) |
| inst.ActivateDisks(cl) |
| except Exception: # pylint: disable=W0703 |
| logging.exception("Error while activating disks for instance '%s'", |
| inst.name) |
| |
| # Keep changed boot IDs |
| for node in check_nodes: |
| notepad.SetNodeBootID(node.name, node.bootid) |
| |
| |
| def _CheckForOfflineNodes(nodes, instance): |
| """Checks if given instances has any secondary in offline status. |
| |
| @param instance: The instance object |
| @return: True if any of the secondary is offline, False otherwise |
| |
| """ |
| return compat.any(nodes[node_name].offline for node_name in instance.snodes) |
| |
| |
| def _GetPendingVerifyDisks(cl, uuid): |
| """Checks if there are any currently running or pending group verify jobs and |
| if so, returns their id. |
| |
| """ |
| qfilter = qlang.MakeSimpleFilter("status", |
| frozenset([constants.JOB_STATUS_RUNNING, |
| constants.JOB_STATUS_QUEUED, |
| constants.JOB_STATUS_WAITING])) |
| qresult = cl.Query(constants.QR_JOB, ["id", "summary"], qfilter) |
| |
| ids = [jobid for ((_, jobid), (_, (job, ))) in qresult.data |
| if job == ("GROUP_VERIFY_DISKS(%s)" % uuid)] |
| return ids |
| |
| |
| def _VerifyDisks(cl, uuid, nodes, instances): |
| """Run a per-group "gnt-cluster verify-disks". |
| |
| """ |
| |
| existing_jobs = _GetPendingVerifyDisks(cl, uuid) |
| if existing_jobs: |
| logging.info("There are verify disks jobs already pending (%s), skipping " |
| "VerifyDisks step for %s.", |
| utils.CommaJoin(existing_jobs), uuid) |
| return |
| |
| op = opcodes.OpGroupVerifyDisks( |
| group_name=uuid, priority=constants.OP_PRIO_LOW) |
| op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, |
| "Verifying disks of group %s" % uuid, |
| utils.EpochNano())] |
| job_id = cl.SubmitJob([op]) |
| ((_, offline_disk_instances, _), ) = \ |
| cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) |
| cl.ArchiveJob(job_id) |
| |
| if not offline_disk_instances: |
| # nothing to do |
| logging.debug("Verify-disks reported no offline disks, nothing to do") |
| return |
| |
| logging.debug("Will activate disks for instance(s) %s", |
| utils.CommaJoin(offline_disk_instances)) |
| |
| # We submit only one job, and wait for it. Not optimal, but this puts less |
| # load on the job queue. |
| job = [] |
| for name in offline_disk_instances: |
| try: |
| inst = instances[name] |
| except KeyError: |
| logging.info("Can't find instance '%s', maybe it was ignored", name) |
| continue |
| |
| if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): |
| logging.info("Skipping instance '%s' because it is in a helpless state" |
| " or has offline secondaries", name) |
| continue |
| |
| op = opcodes.OpInstanceActivateDisks(instance_name=name) |
| op.reason = [(constants.OPCODE_REASON_SRC_WATCHER, |
| "Activating disks for instance %s" % name, |
| utils.EpochNano())] |
| job.append(op) |
| |
| if job: |
| job_id = cli.SendJob(job, cl=cl) |
| |
| try: |
| cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) |
| except Exception: # pylint: disable=W0703 |
| logging.exception("Error while activating disks") |
| |
| |
| def IsRapiResponding(hostname): |
| """Connects to RAPI port and does a simple test. |
| |
| Connects to RAPI port of hostname and does a simple test. At this time, the |
| test is GetVersion. |
| |
| If RAPI responds with error code "401 Unauthorized", the test is successful, |
| because the aim of this function is to assess whether RAPI is responding, not |
| if it is accessible. |
| |
| @type hostname: string |
| @param hostname: hostname of the node to connect to. |
| @rtype: bool |
| @return: Whether RAPI is working properly |
| |
| """ |
| curl_config = rapi.client.GenericCurlConfig() |
| rapi_client = rapi.client.GanetiRapiClient(hostname, |
| curl_config_fn=curl_config) |
| try: |
| master_version = rapi_client.GetVersion() |
| except rapi.client.CertificateError, err: |
| logging.warning("RAPI certificate error: %s", err) |
| return False |
| except rapi.client.GanetiApiError, err: |
| if err.code == 401: |
| # Unauthorized, but RAPI is alive and responding |
| return True |
| else: |
| logging.warning("RAPI error: %s", err) |
| return False |
| else: |
| logging.debug("Reported RAPI version %s", master_version) |
| return master_version == constants.RAPI_VERSION |
| |
| |
| def IsWconfdResponding(): |
| """Probes an echo RPC to WConfD. |
| |
| """ |
| probe_string = "ganeti watcher probe %d" % time.time() |
| |
| try: |
| result = wconfd.Client().Echo(probe_string) |
| except Exception, err: # pylint: disable=W0703 |
| logging.warning("WConfd connection error: %s", err) |
| return False |
| |
| if result != probe_string: |
| logging.warning("WConfd echo('%s') returned '%s'", probe_string, result) |
| return False |
| |
| return True |
| |
| |
| def ParseOptions(): |
| """Parse the command line options. |
| |
| @return: (options, args) as from OptionParser.parse_args() |
| |
| """ |
| parser = OptionParser(description="Ganeti cluster watcher", |
| usage="%prog [-d]", |
| version="%%prog (ganeti) %s" % |
| constants.RELEASE_VERSION) |
| |
| parser.add_option(cli.DEBUG_OPT) |
| parser.add_option(cli.NODEGROUP_OPT) |
| parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, |
| help="Autoarchive jobs older than this age (default" |
| " 6 hours)") |
| parser.add_option("--ignore-pause", dest="ignore_pause", default=False, |
| action="store_true", help="Ignore cluster pause setting") |
| parser.add_option("--wait-children", dest="wait_children", |
| action="store_true", help="Wait for child processes") |
| parser.add_option("--no-wait-children", dest="wait_children", |
| action="store_false", |
| help="Don't wait for child processes") |
| parser.add_option("--no-verify-disks", dest="no_verify_disks", default=False, |
| action="store_true", help="Do not verify disk status") |
| parser.add_option("--rapi-ip", dest="rapi_ip", |
| default=constants.IP4_ADDRESS_LOCALHOST, |
| help="Use this IP to talk to RAPI.") |
| # See optparse documentation for why default values are not set by options |
| parser.set_defaults(wait_children=True) |
| options, args = parser.parse_args() |
| options.job_age = cli.ParseTimespec(options.job_age) |
| |
| if args: |
| parser.error("No arguments expected") |
| |
| return (options, args) |
| |
| |
| def _WriteInstanceStatus(filename, data): |
| """Writes the per-group instance status file. |
| |
| The entries are sorted. |
| |
| @type filename: string |
| @param filename: Path to instance status file |
| @type data: list of tuple; (instance name as string, status as string) |
| @param data: Instance name and status |
| |
| """ |
| logging.debug("Updating instance status file '%s' with %s instances", |
| filename, len(data)) |
| |
| utils.WriteFile(filename, |
| data="".join(map(compat.partial(operator.mod, "%s %s\n"), |
| sorted(data)))) |
| |
| |
| def _UpdateInstanceStatus(filename, instances): |
| """Writes an instance status file from L{Instance} objects. |
| |
| @type filename: string |
| @param filename: Path to status file |
| @type instances: list of L{Instance} |
| |
| """ |
| _WriteInstanceStatus(filename, [(inst.name, inst.status) |
| for inst in instances]) |
| |
| |
| def _ReadInstanceStatus(filename): |
| """Reads an instance status file. |
| |
| @type filename: string |
| @param filename: Path to status file |
| @rtype: tuple; (None or number, list of lists containing instance name and |
| status) |
| @return: File's mtime and instance status contained in the file; mtime is |
| C{None} if file can't be read |
| |
| """ |
| logging.debug("Reading per-group instance status from '%s'", filename) |
| |
| statcb = utils.FileStatHelper() |
| try: |
| content = utils.ReadFile(filename, preread=statcb) |
| except EnvironmentError, err: |
| if err.errno == errno.ENOENT: |
| logging.error("Can't read '%s', does not exist (yet)", filename) |
| else: |
| logging.exception("Unable to read '%s', ignoring", filename) |
| return (None, None) |
| else: |
| return (statcb.st.st_mtime, [line.split(None, 1) |
| for line in content.splitlines()]) |
| |
| |
| def _MergeInstanceStatus(filename, pergroup_filename, groups): |
| """Merges all per-group instance status files into a global one. |
| |
| @type filename: string |
| @param filename: Path to global instance status file |
| @type pergroup_filename: string |
| @param pergroup_filename: Path to per-group status files, must contain "%s" |
| to be replaced with group UUID |
| @type groups: sequence |
| @param groups: UUIDs of known groups |
| |
| """ |
| # Lock global status file in exclusive mode |
| lock = utils.FileLock.Open(filename) |
| try: |
| lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT) |
| except errors.LockError, err: |
| # All per-group processes will lock and update the file. None of them |
| # should take longer than 10 seconds (the value of |
| # INSTANCE_STATUS_LOCK_TIMEOUT). |
| logging.error("Can't acquire lock on instance status file '%s', not" |
| " updating: %s", filename, err) |
| return |
| |
| logging.debug("Acquired exclusive lock on '%s'", filename) |
| |
| data = {} |
| |
| # Load instance status from all groups |
| for group_uuid in groups: |
| (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid) |
| |
| if mtime is not None: |
| for (instance_name, status) in instdata: |
| data.setdefault(instance_name, []).append((mtime, status)) |
| |
| # Select last update based on file mtime |
| inststatus = [(instance_name, sorted(status, reverse=True)[0][1]) |
| for (instance_name, status) in data.items()] |
| |
| # Write the global status file. Don't touch file after it's been |
| # updated--there is no lock anymore. |
| _WriteInstanceStatus(filename, inststatus) |
| |
| |
| def GetLuxiClient(try_restart): |
| """Tries to connect to the luxi daemon. |
| |
| @type try_restart: bool |
| @param try_restart: Whether to attempt to restart the master daemon |
| |
| """ |
| try: |
| return cli.GetClient() |
| except errors.OpPrereqError, err: |
| # this is, from cli.GetClient, a not-master case |
| raise NotMasterError("Not on master node (%s)" % err) |
| |
| except (rpcerr.NoMasterError, rpcerr.TimeoutError), err: |
| if not try_restart: |
| raise |
| |
| logging.warning("Luxi daemon seems to be down (%s), trying to restart", |
| err) |
| |
| if not utils.EnsureDaemon(constants.LUXID): |
| raise errors.GenericError("Can't start the master daemon") |
| |
| # Retry the connection |
| return cli.GetClient() |
| |
| |
| def _StartGroupChildren(cl, wait): |
| """Starts a new instance of the watcher for every node group. |
| |
| """ |
| assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) |
| for arg in sys.argv) |
| |
| result = cl.QueryGroups([], ["name", "uuid"], False) |
| |
| children = [] |
| |
| for (idx, (name, uuid)) in enumerate(result): |
| args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] |
| |
| if idx > 0: |
| # Let's not kill the system |
| time.sleep(CHILD_PROCESS_DELAY) |
| |
| logging.debug("Spawning child for group '%s' (%s), arguments %s", |
| name, uuid, args) |
| |
| try: |
| # TODO: Should utils.StartDaemon be used instead? |
| pid = os.spawnv(os.P_NOWAIT, args[0], args) |
| except Exception: # pylint: disable=W0703 |
| logging.exception("Failed to start child for group '%s' (%s)", |
| name, uuid) |
| else: |
| logging.debug("Started with PID %s", pid) |
| children.append(pid) |
| |
| if wait: |
| for pid in children: |
| logging.debug("Waiting for child PID %s", pid) |
| try: |
| result = utils.RetryOnSignal(os.waitpid, pid, 0) |
| except EnvironmentError, err: |
| result = str(err) |
| |
| logging.debug("Child PID %s exited with status %s", pid, result) |
| |
| |
| def _ArchiveJobs(cl, age): |
| """Archives old jobs. |
| |
| """ |
| (arch_count, left_count) = cl.AutoArchiveJobs(age) |
| logging.debug("Archived %s jobs, left %s", arch_count, left_count) |
| |
| |
| def _CheckMaster(cl): |
| """Ensures current host is master node. |
| |
| """ |
| (master, ) = cl.QueryConfigValues(["master_node"]) |
| if master != netutils.Hostname.GetSysName(): |
| raise NotMasterError("This is not the master node") |
| |
| |
| @UsesRapiClient |
| def _GlobalWatcher(opts): |
| """Main function for global watcher. |
| |
| At the end child processes are spawned for every node group. |
| |
| """ |
| StartNodeDaemons() |
| RunWatcherHooks() |
| |
| # Run node maintenance in all cases, even if master, so that old masters can |
| # be properly cleaned up |
| if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602 |
| nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602 |
| |
| try: |
| client = GetLuxiClient(True) |
| except NotMasterError: |
| # Don't proceed on non-master nodes |
| return constants.EXIT_SUCCESS |
| |
| # we are on master now |
| utils.EnsureDaemon(constants.RAPI) |
| utils.EnsureDaemon(constants.WCONFD) |
| utils.EnsureDaemon(constants.MAINTD) |
| |
| # If RAPI isn't responding to queries, try one restart |
| logging.debug("Attempting to talk to remote API on %s", |
| opts.rapi_ip) |
| if not IsRapiResponding(opts.rapi_ip): |
| logging.warning("Couldn't get answer from remote API, restaring daemon") |
| utils.StopDaemon(constants.RAPI) |
| utils.EnsureDaemon(constants.RAPI) |
| logging.debug("Second attempt to talk to remote API") |
| if not IsRapiResponding(opts.rapi_ip): |
| logging.fatal("RAPI is not responding") |
| logging.debug("Successfully talked to remote API") |
| |
| # If WConfD isn't responding to queries, try one restart |
| logging.debug("Attempting to talk to WConfD") |
| if not IsWconfdResponding(): |
| logging.warning("WConfD not responsive, restarting daemon") |
| utils.StopDaemon(constants.WCONFD) |
| utils.EnsureDaemon(constants.WCONFD) |
| logging.debug("Second attempt to talk to WConfD") |
| if not IsWconfdResponding(): |
| logging.fatal("WConfD is not responding") |
| |
| _CheckMaster(client) |
| _ArchiveJobs(client, opts.job_age) |
| |
| # Spawn child processes for all node groups |
| _StartGroupChildren(client, opts.wait_children) |
| |
| return constants.EXIT_SUCCESS |
| |
| |
| def _GetGroupData(qcl, uuid): |
| """Retrieves instances and nodes per node group. |
| |
| """ |
| locks = qcl.Query(constants.QR_LOCK, ["name", "mode"], None) |
| |
| prefix = "instance/" |
| prefix_len = len(prefix) |
| |
| locked_instances = set() |
| |
| for [[_, name], [_, lock]] in locks.data: |
| if name.startswith(prefix) and lock: |
| locked_instances.add(name[prefix_len:]) |
| |
| queries = [ |
| (constants.QR_INSTANCE, |
| ["name", "status", "admin_state", "admin_state_source", "disks_active", |
| "snodes", "pnode.group.uuid", "snodes.group.uuid", "disk_template"], |
| [qlang.OP_EQUAL, "pnode.group.uuid", uuid]), |
| (constants.QR_NODE, |
| ["name", "bootid", "offline"], |
| [qlang.OP_EQUAL, "group.uuid", uuid]), |
| ] |
| |
| results = [] |
| for what, fields, qfilter in queries: |
| results.append(qcl.Query(what, fields, qfilter)) |
| |
| results_data = map(operator.attrgetter("data"), results) |
| |
| # Ensure results are tuples with two values |
| assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) |
| |
| # Extract values ignoring result status |
| (raw_instances, raw_nodes) = [[map(compat.snd, values) |
| for values in res] |
| for res in results_data] |
| |
| secondaries = {} |
| instances = [] |
| |
| # Load all instances |
| for (name, status, config_state, config_state_source, disks_active, snodes, |
| pnode_group_uuid, snodes_group_uuid, disk_template) in raw_instances: |
| if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): |
| logging.error("Ignoring split instance '%s', primary group %s, secondary" |
| " groups %s", name, pnode_group_uuid, |
| utils.CommaJoin(snodes_group_uuid)) |
| else: |
| instances.append(Instance(name, status, config_state, config_state_source, |
| disks_active, snodes, disk_template)) |
| |
| for node in snodes: |
| secondaries.setdefault(node, set()).add(name) |
| |
| # Load all nodes |
| nodes = [Node(name, bootid, offline, secondaries.get(name, set())) |
| for (name, bootid, offline) in raw_nodes] |
| |
| return (dict((node.name, node) for node in nodes), |
| dict((inst.name, inst) for inst in instances), |
| locked_instances) |
| |
| |
| def _LoadKnownGroups(): |
| """Returns a list of all node groups known by L{ssconf}. |
| |
| """ |
| groups = ssconf.SimpleStore().GetNodegroupList() |
| |
| result = list(line.split(None, 1)[0] for line in groups |
| if line.strip()) |
| |
| if not compat.all(map(utils.UUID_RE.match, result)): |
| raise errors.GenericError("Ssconf contains invalid group UUID") |
| |
| return result |
| |
| |
| def _GroupWatcher(opts): |
| """Main function for per-group watcher process. |
| |
| """ |
| group_uuid = opts.nodegroup.lower() |
| |
| if not utils.UUID_RE.match(group_uuid): |
| raise errors.GenericError("Node group parameter (%s) must be given a UUID," |
| " got '%s'" % |
| (cli.NODEGROUP_OPT_NAME, group_uuid)) |
| |
| logging.info("Watcher for node group '%s'", group_uuid) |
| |
| known_groups = _LoadKnownGroups() |
| |
| # Check if node group is known |
| if group_uuid not in known_groups: |
| raise errors.GenericError("Node group '%s' is not known by ssconf" % |
| group_uuid) |
| |
| # Group UUID has been verified and should not contain any dangerous |
| # characters |
| state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid |
| inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid |
| |
| logging.debug("Using state file %s", state_path) |
| |
| # Group watcher file lock |
| statefile = state.OpenStateFile(state_path) # pylint: disable=E0602 |
| if not statefile: |
| return constants.EXIT_FAILURE |
| |
| notepad = state.WatcherState(statefile) # pylint: disable=E0602 |
| try: |
| # Connect to master daemon |
| client = GetLuxiClient(False) |
| |
| _CheckMaster(client) |
| |
| (nodes, instances, locks) = _GetGroupData(client, group_uuid) |
| |
| # Update per-group instance status file |
| _UpdateInstanceStatus(inst_status_path, instances.values()) |
| |
| _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE, |
| pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE, |
| known_groups) |
| |
| started = _CheckInstances(client, notepad, instances, locks) |
| _CheckDisks(client, notepad, nodes, instances, started) |
| except Exception, err: |
| logging.info("Not updating status file due to failure: %s", err) |
| raise |
| else: |
| # Save changes for next run |
| notepad.Save(state_path) |
| notepad.Close() |
| |
| # Check if the nodegroup only has ext storage type |
| only_ext = compat.all(i.disk_template == constants.DT_EXT |
| for i in instances.values()) |
| |
| # We skip current NodeGroup verification if there are only external storage |
| # devices. Currently we provide an interface for external storage provider |
| # for disk verification implementations, however current ExtStorageDevice |
| # does not provide an API for this yet. |
| # |
| # This check needs to be revisited if ES_ACTION_VERIFY on ExtStorageDevice |
| # is implemented. |
| if not opts.no_verify_disks and not only_ext: |
| _VerifyDisks(client, group_uuid, nodes, instances) |
| |
| return constants.EXIT_SUCCESS |
| |
| |
| def Main(): |
| """Main function. |
| |
| """ |
| (options, _) = ParseOptions() |
| |
| utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0], |
| debug=options.debug, stderr_logging=options.debug) |
| |
| if ShouldPause() and not options.ignore_pause: |
| logging.debug("Pause has been set, exiting") |
| return constants.EXIT_SUCCESS |
| |
| # Try to acquire global watcher lock in shared mode |
| lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE) |
| try: |
| lock.Shared(blocking=False) |
| except (EnvironmentError, errors.LockError), err: |
| logging.error("Can't acquire lock on %s: %s", |
| pathutils.WATCHER_LOCK_FILE, err) |
| return constants.EXIT_SUCCESS |
| |
| if options.nodegroup is None: |
| fn = _GlobalWatcher |
| else: |
| # Per-nodegroup watcher |
| fn = _GroupWatcher |
| |
| try: |
| return fn(options) |
| except (SystemExit, KeyboardInterrupt): |
| raise |
| except NotMasterError: |
| logging.debug("Not master, exiting") |
| return constants.EXIT_NOTMASTER |
| except errors.ResolverError, err: |
| logging.error("Cannot resolve hostname '%s', exiting", err.args[0]) |
| return constants.EXIT_NODESETUP_ERROR |
| except errors.JobQueueFull: |
| logging.error("Job queue is full, can't query cluster state") |
| except errors.JobQueueDrainError: |
| logging.error("Job queue is drained, can't maintain cluster state") |
| except Exception, err: |
| logging.exception(str(err)) |
| return constants.EXIT_FAILURE |
| |
| return constants.EXIT_SUCCESS |