blob: bb39b254633309a370f6a0a38ac86eef706e3dd6 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Tool to restart erroneously downed virtual machines.
This program and set of classes implement a watchdog to restart
virtual machines in a Ganeti cluster that have crashed or been killed
by a node reboot. Run from cron or similar.
"""
import os
import os.path
import sys
import time
import logging
import operator
import errno
from optparse import OptionParser
from ganeti import utils
from ganeti import constants
from ganeti import compat
from ganeti import errors
from ganeti import opcodes
from ganeti import cli
from ganeti import luxi
from ganeti import rapi
from ganeti import netutils
from ganeti import qlang
from ganeti import objects
from ganeti import ssconf
from ganeti import ht
from ganeti import pathutils
import ganeti.rapi.client # pylint: disable=W0611
from ganeti.rapi.client import UsesRapiClient
from ganeti.watcher import nodemaint
from ganeti.watcher import state
MAXTRIES = 5
BAD_STATES = compat.UniqueFrozenset([
constants.INSTST_ERRORDOWN,
])
HELPLESS_STATES = compat.UniqueFrozenset([
constants.INSTST_NODEDOWN,
constants.INSTST_NODEOFFLINE,
])
NOTICE = "NOTICE"
ERROR = "ERROR"
#: Number of seconds to wait between starting child processes for node groups
CHILD_PROCESS_DELAY = 1.0
#: How many seconds to wait for instance status file lock
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
class NotMasterError(errors.GenericError):
"""Exception raised when this host is not the master."""
def ShouldPause():
"""Check whether we should pause.
"""
return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
def StartNodeDaemons():
"""Start all the daemons that should be running on all nodes.
"""
# on master or not, try to start the node daemon
utils.EnsureDaemon(constants.NODED)
# start confd as well. On non candidates it will be in disabled mode.
if constants.ENABLE_CONFD:
utils.EnsureDaemon(constants.CONFD)
# start mond as well: all nodes need monitoring
if constants.ENABLE_MOND:
utils.EnsureDaemon(constants.MOND)
def RunWatcherHooks():
"""Run the watcher hooks.
"""
hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
constants.HOOKS_NAME_WATCHER)
if not os.path.isdir(hooks_dir):
return
try:
results = utils.RunParts(hooks_dir)
except Exception, err: # pylint: disable=W0703
logging.exception("RunParts %s failed: %s", hooks_dir, err)
return
for (relname, status, runresult) in results:
if status == constants.RUNPARTS_SKIP:
logging.debug("Watcher hook %s: skipped", relname)
elif status == constants.RUNPARTS_ERR:
logging.warning("Watcher hook %s: error (%s)", relname, runresult)
elif status == constants.RUNPARTS_RUN:
if runresult.failed:
logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
relname, runresult.exit_code, runresult.output)
else:
logging.debug("Watcher hook %s: success (output: %s)", relname,
runresult.output)
else:
raise errors.ProgrammerError("Unknown status %s returned by RunParts",
status)
class Instance(object):
"""Abstraction for a Virtual Machine instance.
"""
def __init__(self, name, status, disks_active, snodes):
self.name = name
self.status = status
self.disks_active = disks_active
self.snodes = snodes
def Restart(self, cl):
"""Encapsulates the start of an instance.
"""
op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
cli.SubmitOpCode(op, cl=cl)
def ActivateDisks(self, cl):
"""Encapsulates the activation of all disks of an instance.
"""
op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
cli.SubmitOpCode(op, cl=cl)
class Node:
"""Data container representing cluster node.
"""
def __init__(self, name, bootid, offline, secondaries):
"""Initializes this class.
"""
self.name = name
self.bootid = bootid
self.offline = offline
self.secondaries = secondaries
def _CheckInstances(cl, notepad, instances):
"""Make a pass over the list of instances, restarting downed ones.
"""
notepad.MaintainInstanceList(instances.keys())
started = set()
for inst in instances.values():
if inst.status in BAD_STATES:
n = notepad.NumberOfRestartAttempts(inst.name)
if n > MAXTRIES:
logging.warning("Not restarting instance '%s', retries exhausted",
inst.name)
continue
if n == MAXTRIES:
notepad.RecordRestartAttempt(inst.name)
logging.error("Could not restart instance '%s' after %s attempts,"
" giving up", inst.name, MAXTRIES)
continue
try:
logging.info("Restarting instance '%s' (attempt #%s)",
inst.name, n + 1)
inst.Restart(cl)
except Exception: # pylint: disable=W0703
logging.exception("Error while restarting instance '%s'", inst.name)
else:
started.add(inst.name)
notepad.RecordRestartAttempt(inst.name)
else:
if notepad.NumberOfRestartAttempts(inst.name):
notepad.RemoveInstance(inst.name)
if inst.status not in HELPLESS_STATES:
logging.info("Restart of instance '%s' succeeded", inst.name)
return started
def _CheckDisks(cl, notepad, nodes, instances, started):
"""Check all nodes for restarted ones.
"""
check_nodes = []
for node in nodes.values():
old = notepad.GetNodeBootID(node.name)
if not node.bootid:
# Bad node, not returning a boot id
if not node.offline:
logging.debug("Node '%s' missing boot ID, skipping secondary checks",
node.name)
continue
if old != node.bootid:
# Node's boot ID has changed, probably through a reboot
check_nodes.append(node)
if check_nodes:
# Activate disks for all instances with any of the checked nodes as a
# secondary node.
for node in check_nodes:
for instance_name in node.secondaries:
try:
inst = instances[instance_name]
except KeyError:
logging.info("Can't find instance '%s', maybe it was ignored",
instance_name)
continue
if not inst.disks_active:
logging.info("Skipping disk activation for instance with not"
" activated disks '%s'", inst.name)
continue
if inst.name in started:
# we already tried to start the instance, which should have
# activated its drives (if they can be at all)
logging.debug("Skipping disk activation for instance '%s' as"
" it was already started", inst.name)
continue
try:
logging.info("Activating disks for instance '%s'", inst.name)
inst.ActivateDisks(cl)
except Exception: # pylint: disable=W0703
logging.exception("Error while activating disks for instance '%s'",
inst.name)
# Keep changed boot IDs
for node in check_nodes:
notepad.SetNodeBootID(node.name, node.bootid)
def _CheckForOfflineNodes(nodes, instance):
"""Checks if given instances has any secondary in offline status.
@param instance: The instance object
@return: True if any of the secondary is offline, False otherwise
"""
return compat.any(nodes[node_name].offline for node_name in instance.snodes)
def _VerifyDisks(cl, uuid, nodes, instances):
"""Run a per-group "gnt-cluster verify-disks".
"""
job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(
group_name=uuid, priority=constants.OP_PRIO_LOW)])
((_, offline_disk_instances, _), ) = \
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
cl.ArchiveJob(job_id)
if not offline_disk_instances:
# nothing to do
logging.debug("Verify-disks reported no offline disks, nothing to do")
return
logging.debug("Will activate disks for instance(s) %s",
utils.CommaJoin(offline_disk_instances))
# We submit only one job, and wait for it. Not optimal, but this puts less
# load on the job queue.
job = []
for name in offline_disk_instances:
try:
inst = instances[name]
except KeyError:
logging.info("Can't find instance '%s', maybe it was ignored", name)
continue
if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
logging.info("Skipping instance '%s' because it is in a helpless state"
" or has offline secondaries", name)
continue
job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
if job:
job_id = cli.SendJob(job, cl=cl)
try:
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
except Exception: # pylint: disable=W0703
logging.exception("Error while activating disks")
def IsRapiResponding(hostname):
"""Connects to RAPI port and does a simple test.
Connects to RAPI port of hostname and does a simple test. At this time, the
test is GetVersion.
If RAPI responds with error code "401 Unauthorized", the test is successful,
because the aim of this function is to assess whether RAPI is responding, not
if it is accessible.
@type hostname: string
@param hostname: hostname of the node to connect to.
@rtype: bool
@return: Whether RAPI is working properly
"""
curl_config = rapi.client.GenericCurlConfig()
rapi_client = rapi.client.GanetiRapiClient(hostname,
curl_config_fn=curl_config)
try:
master_version = rapi_client.GetVersion()
except rapi.client.CertificateError, err:
logging.warning("RAPI certificate error: %s", err)
return False
except rapi.client.GanetiApiError, err:
if err.code == 401:
# Unauthorized, but RAPI is alive and responding
return True
else:
logging.warning("RAPI error: %s", err)
return False
else:
logging.debug("Reported RAPI version %s", master_version)
return master_version == constants.RAPI_VERSION
def ParseOptions():
"""Parse the command line options.
@return: (options, args) as from OptionParser.parse_args()
"""
parser = OptionParser(description="Ganeti cluster watcher",
usage="%prog [-d]",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
parser.add_option(cli.DEBUG_OPT)
parser.add_option(cli.NODEGROUP_OPT)
parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
help="Autoarchive jobs older than this age (default"
" 6 hours)")
parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
action="store_true", help="Ignore cluster pause setting")
parser.add_option("--wait-children", dest="wait_children",
action="store_true", help="Wait for child processes")
parser.add_option("--no-wait-children", dest="wait_children",
action="store_false",
help="Don't wait for child processes")
# See optparse documentation for why default values are not set by options
parser.set_defaults(wait_children=True)
options, args = parser.parse_args()
options.job_age = cli.ParseTimespec(options.job_age)
if args:
parser.error("No arguments expected")
return (options, args)
def _WriteInstanceStatus(filename, data):
"""Writes the per-group instance status file.
The entries are sorted.
@type filename: string
@param filename: Path to instance status file
@type data: list of tuple; (instance name as string, status as string)
@param data: Instance name and status
"""
logging.debug("Updating instance status file '%s' with %s instances",
filename, len(data))
utils.WriteFile(filename,
data="".join(map(compat.partial(operator.mod, "%s %s\n"),
sorted(data))))
def _UpdateInstanceStatus(filename, instances):
"""Writes an instance status file from L{Instance} objects.
@type filename: string
@param filename: Path to status file
@type instances: list of L{Instance}
"""
_WriteInstanceStatus(filename, [(inst.name, inst.status)
for inst in instances])
def _ReadInstanceStatus(filename):
"""Reads an instance status file.
@type filename: string
@param filename: Path to status file
@rtype: tuple; (None or number, list of lists containing instance name and
status)
@return: File's mtime and instance status contained in the file; mtime is
C{None} if file can't be read
"""
logging.debug("Reading per-group instance status from '%s'", filename)
statcb = utils.FileStatHelper()
try:
content = utils.ReadFile(filename, preread=statcb)
except EnvironmentError, err:
if err.errno == errno.ENOENT:
logging.error("Can't read '%s', does not exist (yet)", filename)
else:
logging.exception("Unable to read '%s', ignoring", filename)
return (None, None)
else:
return (statcb.st.st_mtime, [line.split(None, 1)
for line in content.splitlines()])
def _MergeInstanceStatus(filename, pergroup_filename, groups):
"""Merges all per-group instance status files into a global one.
@type filename: string
@param filename: Path to global instance status file
@type pergroup_filename: string
@param pergroup_filename: Path to per-group status files, must contain "%s"
to be replaced with group UUID
@type groups: sequence
@param groups: UUIDs of known groups
"""
# Lock global status file in exclusive mode
lock = utils.FileLock.Open(filename)
try:
lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
except errors.LockError, err:
# All per-group processes will lock and update the file. None of them
# should take longer than 10 seconds (the value of
# INSTANCE_STATUS_LOCK_TIMEOUT).
logging.error("Can't acquire lock on instance status file '%s', not"
" updating: %s", filename, err)
return
logging.debug("Acquired exclusive lock on '%s'", filename)
data = {}
# Load instance status from all groups
for group_uuid in groups:
(mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
if mtime is not None:
for (instance_name, status) in instdata:
data.setdefault(instance_name, []).append((mtime, status))
# Select last update based on file mtime
inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
for (instance_name, status) in data.items()]
# Write the global status file. Don't touch file after it's been
# updated--there is no lock anymore.
_WriteInstanceStatus(filename, inststatus)
def GetLuxiClient(try_restart):
"""Tries to connect to the master daemon.
@type try_restart: bool
@param try_restart: Whether to attempt to restart the master daemon
"""
try:
return cli.GetClient()
except errors.OpPrereqError, err:
# this is, from cli.GetClient, a not-master case
raise NotMasterError("Not on master node (%s)" % err)
except luxi.NoMasterError, err:
if not try_restart:
raise
logging.warning("Master daemon seems to be down (%s), trying to restart",
err)
if not utils.EnsureDaemon(constants.MASTERD):
raise errors.GenericError("Can't start the master daemon")
# Retry the connection
return cli.GetClient()
def _StartGroupChildren(cl, wait):
"""Starts a new instance of the watcher for every node group.
"""
assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
for arg in sys.argv)
result = cl.QueryGroups([], ["name", "uuid"], False)
children = []
for (idx, (name, uuid)) in enumerate(result):
args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
if idx > 0:
# Let's not kill the system
time.sleep(CHILD_PROCESS_DELAY)
logging.debug("Spawning child for group '%s' (%s), arguments %s",
name, uuid, args)
try:
# TODO: Should utils.StartDaemon be used instead?
pid = os.spawnv(os.P_NOWAIT, args[0], args)
except Exception: # pylint: disable=W0703
logging.exception("Failed to start child for group '%s' (%s)",
name, uuid)
else:
logging.debug("Started with PID %s", pid)
children.append(pid)
if wait:
for pid in children:
logging.debug("Waiting for child PID %s", pid)
try:
result = utils.RetryOnSignal(os.waitpid, pid, 0)
except EnvironmentError, err:
result = str(err)
logging.debug("Child PID %s exited with status %s", pid, result)
def _ArchiveJobs(cl, age):
"""Archives old jobs.
"""
(arch_count, left_count) = cl.AutoArchiveJobs(age)
logging.debug("Archived %s jobs, left %s", arch_count, left_count)
def _CheckMaster(cl):
"""Ensures current host is master node.
"""
(master, ) = cl.QueryConfigValues(["master_node"])
if master != netutils.Hostname.GetSysName():
raise NotMasterError("This is not the master node")
@UsesRapiClient
def _GlobalWatcher(opts):
"""Main function for global watcher.
At the end child processes are spawned for every node group.
"""
StartNodeDaemons()
RunWatcherHooks()
# Run node maintenance in all cases, even if master, so that old masters can
# be properly cleaned up
if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
try:
client = GetLuxiClient(True)
except NotMasterError:
# Don't proceed on non-master nodes
return constants.EXIT_SUCCESS
# we are on master now
utils.EnsureDaemon(constants.RAPI)
# If RAPI isn't responding to queries, try one restart
logging.debug("Attempting to talk to remote API on %s",
constants.IP4_ADDRESS_LOCALHOST)
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.warning("Couldn't get answer from remote API, restaring daemon")
utils.StopDaemon(constants.RAPI)
utils.EnsureDaemon(constants.RAPI)
logging.debug("Second attempt to talk to remote API")
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
logging.fatal("RAPI is not responding")
logging.debug("Successfully talked to remote API")
_CheckMaster(client)
_ArchiveJobs(client, opts.job_age)
# Spawn child processes for all node groups
_StartGroupChildren(client, opts.wait_children)
return constants.EXIT_SUCCESS
def _GetGroupData(cl, uuid):
"""Retrieves instances and nodes per node group.
"""
job = [
# Get all primary instances in group
opcodes.OpQuery(what=constants.QR_INSTANCE,
fields=["name", "status", "disks_active", "snodes",
"pnode.group.uuid", "snodes.group.uuid"],
qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
use_locking=True,
priority=constants.OP_PRIO_LOW),
# Get all nodes in group
opcodes.OpQuery(what=constants.QR_NODE,
fields=["name", "bootid", "offline"],
qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
use_locking=True,
priority=constants.OP_PRIO_LOW),
]
job_id = cl.SubmitJob(job)
results = map(objects.QueryResponse.FromDict,
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
cl.ArchiveJob(job_id)
results_data = map(operator.attrgetter("data"), results)
# Ensure results are tuples with two values
assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
# Extract values ignoring result status
(raw_instances, raw_nodes) = [[map(compat.snd, values)
for values in res]
for res in results_data]
secondaries = {}
instances = []
# Load all instances
for (name, status, disks_active, snodes, pnode_group_uuid,
snodes_group_uuid) in raw_instances:
if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
logging.error("Ignoring split instance '%s', primary group %s, secondary"
" groups %s", name, pnode_group_uuid,
utils.CommaJoin(snodes_group_uuid))
else:
instances.append(Instance(name, status, disks_active, snodes))
for node in snodes:
secondaries.setdefault(node, set()).add(name)
# Load all nodes
nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
for (name, bootid, offline) in raw_nodes]
return (dict((node.name, node) for node in nodes),
dict((inst.name, inst) for inst in instances))
def _LoadKnownGroups():
"""Returns a list of all node groups known by L{ssconf}.
"""
groups = ssconf.SimpleStore().GetNodegroupList()
result = list(line.split(None, 1)[0] for line in groups
if line.strip())
if not compat.all(map(utils.UUID_RE.match, result)):
raise errors.GenericError("Ssconf contains invalid group UUID")
return result
def _GroupWatcher(opts):
"""Main function for per-group watcher process.
"""
group_uuid = opts.nodegroup.lower()
if not utils.UUID_RE.match(group_uuid):
raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
" got '%s'" %
(cli.NODEGROUP_OPT_NAME, group_uuid))
logging.info("Watcher for node group '%s'", group_uuid)
known_groups = _LoadKnownGroups()
# Check if node group is known
if group_uuid not in known_groups:
raise errors.GenericError("Node group '%s' is not known by ssconf" %
group_uuid)
# Group UUID has been verified and should not contain any dangerous
# characters
state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
logging.debug("Using state file %s", state_path)
# Global watcher
statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
if not statefile:
return constants.EXIT_FAILURE
notepad = state.WatcherState(statefile) # pylint: disable=E0602
try:
# Connect to master daemon
client = GetLuxiClient(False)
_CheckMaster(client)
(nodes, instances) = _GetGroupData(client, group_uuid)
# Update per-group instance status file
_UpdateInstanceStatus(inst_status_path, instances.values())
_MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
known_groups)
started = _CheckInstances(client, notepad, instances)
_CheckDisks(client, notepad, nodes, instances, started)
_VerifyDisks(client, group_uuid, nodes, instances)
except Exception, err:
logging.info("Not updating status file due to failure: %s", err)
raise
else:
# Save changes for next run
notepad.Save(state_path)
return constants.EXIT_SUCCESS
def Main():
"""Main function.
"""
(options, _) = ParseOptions()
utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
debug=options.debug, stderr_logging=options.debug)
if ShouldPause() and not options.ignore_pause:
logging.debug("Pause has been set, exiting")
return constants.EXIT_SUCCESS
# Try to acquire global watcher lock in shared mode
lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
try:
lock.Shared(blocking=False)
except (EnvironmentError, errors.LockError), err:
logging.error("Can't acquire lock on %s: %s",
pathutils.WATCHER_LOCK_FILE, err)
return constants.EXIT_SUCCESS
if options.nodegroup is None:
fn = _GlobalWatcher
else:
# Per-nodegroup watcher
fn = _GroupWatcher
try:
return fn(options)
except (SystemExit, KeyboardInterrupt):
raise
except NotMasterError:
logging.debug("Not master, exiting")
return constants.EXIT_NOTMASTER
except errors.ResolverError, err:
logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
return constants.EXIT_NODESETUP_ERROR
except errors.JobQueueFull:
logging.error("Job queue is full, can't query cluster state")
except errors.JobQueueDrainError:
logging.error("Job queue is drained, can't maintain cluster state")
except Exception, err:
logging.exception(str(err))
return constants.EXIT_FAILURE
return constants.EXIT_SUCCESS