blob: 1e910bdb56f57b3c777d43a81e5dc36bd798959a [file] [log] [blame]
#
#
# Copyright (C) 2010 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Classes and functions for import/export daemon.
"""
import os
import re
import socket
import logging
import signal
import errno
import time
from cStringIO import StringIO
from ganeti import constants
from ganeti import errors
from ganeti import utils
from ganeti import netutils
from ganeti import compat
#: Used to recognize point at which socat(1) starts to listen on its socket.
#: The local address is required for the remote peer to connect (in particular
#: the port number).
LISTENING_RE = re.compile(r"^listening on\s+"
r"AF=(?P<family>\d+)\s+"
r"(?P<address>.+):(?P<port>\d+)$", re.I)
#: Used to recognize point at which socat(1) is sending data over the wire
TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
re.I)
SOCAT_LOG_DEBUG = "D"
SOCAT_LOG_INFO = "I"
SOCAT_LOG_NOTICE = "N"
SOCAT_LOG_WARNING = "W"
SOCAT_LOG_ERROR = "E"
SOCAT_LOG_FATAL = "F"
SOCAT_LOG_IGNORE = compat.UniqueFrozenset([
SOCAT_LOG_DEBUG,
SOCAT_LOG_INFO,
SOCAT_LOG_NOTICE,
])
#: Used to parse GNU dd(1) statistics
DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
#: Used to ignore "N+N records in/out" on dd(1)'s stderr
DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
#: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
#: unavailable and SIGUSR1 is used instead)
DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1)
#: Buffer size: at most this many bytes are transferred at once
BUFSIZE = 1024 * 1024
# Common options for socat
SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"]
SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1",
"cipher=%s" % constants.OPENSSL_CIPHERS]
if constants.SOCAT_USE_COMPRESS:
# Disables all compression in by OpenSSL. Only supported in patched versions
# of socat (as of November 2010). See INSTALL for more information.
SOCAT_OPENSSL_OPTS.append("compress=none")
SOCAT_OPTION_MAXLEN = 400
(PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID,
PROG_EXP_SIZE) = range(1, 6)
PROG_ALL = compat.UniqueFrozenset([
PROG_OTHER,
PROG_SOCAT,
PROG_DD,
PROG_DD_PID,
PROG_EXP_SIZE,
])
class CommandBuilder(object):
def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd):
"""Initializes this class.
@param mode: Daemon mode (import or export)
@param opts: Options object
@type socat_stderr_fd: int
@param socat_stderr_fd: File descriptor socat should write its stderr to
@type dd_stderr_fd: int
@param dd_stderr_fd: File descriptor dd should write its stderr to
@type dd_pid_fd: int
@param dd_pid_fd: File descriptor the child should write dd's PID to
"""
self._opts = opts
self._mode = mode
self._socat_stderr_fd = socat_stderr_fd
self._dd_stderr_fd = dd_stderr_fd
self._dd_pid_fd = dd_pid_fd
assert (self._opts.magic is None or
constants.IE_MAGIC_RE.match(self._opts.magic))
@staticmethod
def GetBashCommand(cmd):
"""Prepares a command to be run in Bash.
"""
return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd]
def _GetSocatCommand(self):
"""Returns the socat command.
"""
common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [
"key=%s" % self._opts.key,
"cert=%s" % self._opts.cert,
"cafile=%s" % self._opts.ca,
]
if self._opts.bind is not None:
common_addr_opts.append("bind=%s" % self._opts.bind)
assert not (self._opts.ipv4 and self._opts.ipv6)
if self._opts.ipv4:
common_addr_opts.append("pf=ipv4")
elif self._opts.ipv6:
common_addr_opts.append("pf=ipv6")
if self._mode == constants.IEM_IMPORT:
if self._opts.port is None:
port = 0
else:
port = self._opts.port
addr1 = [
"OPENSSL-LISTEN:%s" % port,
"reuseaddr",
# Retry to listen if connection wasn't established successfully, up to
# 100 times a second. Note that this still leaves room for DoS attacks.
"forever",
"intervall=0.01",
] + common_addr_opts
addr2 = ["stdout"]
elif self._mode == constants.IEM_EXPORT:
if self._opts.host and netutils.IP6Address.IsValid(self._opts.host):
host = "[%s]" % self._opts.host
else:
host = self._opts.host
addr1 = ["stdin"]
addr2 = [
"OPENSSL:%s:%s" % (host, self._opts.port),
# How long to wait per connection attempt
"connect-timeout=%s" % self._opts.connect_timeout,
# Retry a few times before giving up to connect (once per second)
"retry=%s" % self._opts.connect_retries,
"intervall=1",
] + common_addr_opts
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
for i in [addr1, addr2]:
for value in i:
if len(value) > SOCAT_OPTION_MAXLEN:
raise errors.GenericError("Socat option longer than %s"
" characters: %r" %
(SOCAT_OPTION_MAXLEN, value))
if "," in value:
raise errors.GenericError("Comma not allowed in socat option"
" value: %r" % value)
return [
constants.SOCAT_PATH,
# Log to stderr
"-ls",
# Log level
"-d", "-d",
# Buffer size
"-b%s" % BUFSIZE,
# Unidirectional mode, the first address is only used for reading, and the
# second address is only used for writing
"-u",
",".join(addr1), ",".join(addr2),
]
def _GetMagicCommand(self):
"""Returns the command to read/write the magic value.
"""
if not self._opts.magic:
return None
# Prefix to ensure magic isn't interpreted as option to "echo"
magic = "M=%s" % self._opts.magic
cmd = StringIO()
if self._mode == constants.IEM_IMPORT:
cmd.write("{ ")
cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"]))
cmd.write(" && ")
cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch"))
cmd.write(" exit 1;")
cmd.write("fi;")
cmd.write(" }")
elif self._mode == constants.IEM_EXPORT:
cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic]))
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
return cmd.getvalue()
def _GetDdCommand(self):
"""Returns the command for measuring throughput.
"""
dd_cmd = StringIO()
magic_cmd = self._GetMagicCommand()
if magic_cmd:
dd_cmd.write("{ ")
dd_cmd.write(magic_cmd)
dd_cmd.write(" && ")
dd_cmd.write("{ ")
# Setting LC_ALL since we want to parse the output and explicitly
# redirecting stdin, as the background process (dd) would have
# /dev/null as stdin otherwise
dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
(BUFSIZE, self._dd_stderr_fd))
# Send PID to daemon
dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd)
# And wait for dd
dd_cmd.write(" wait $pid;")
dd_cmd.write(" }")
if magic_cmd:
dd_cmd.write(" }")
return dd_cmd.getvalue()
def _GetTransportCommand(self):
"""Returns the command for the transport part of the daemon.
"""
socat_cmd = ("%s 2>&%d" %
(utils.ShellQuoteArgs(self._GetSocatCommand()),
self._socat_stderr_fd))
dd_cmd = self._GetDdCommand()
compr = self._opts.compress
assert compr in constants.IEC_ALL
parts = []
if self._mode == constants.IEM_IMPORT:
parts.append(socat_cmd)
if compr == constants.IEC_GZIP:
parts.append("gunzip -c")
parts.append(dd_cmd)
elif self._mode == constants.IEM_EXPORT:
parts.append(dd_cmd)
if compr == constants.IEC_GZIP:
parts.append("gzip -c")
parts.append(socat_cmd)
else:
raise errors.GenericError("Invalid mode '%s'" % self._mode)
# TODO: Run transport as separate user
# The transport uses its own shell to simplify running it as a separate user
# in the future.
return self.GetBashCommand(" | ".join(parts))
def GetCommand(self):
"""Returns the complete child process command.
"""
transport_cmd = self._GetTransportCommand()
buf = StringIO()
if self._opts.cmd_prefix:
buf.write(self._opts.cmd_prefix)
buf.write(" ")
buf.write(utils.ShellQuoteArgs(transport_cmd))
if self._opts.cmd_suffix:
buf.write(" ")
buf.write(self._opts.cmd_suffix)
return self.GetBashCommand(buf.getvalue())
def _VerifyListening(family, address, port):
"""Verify address given as listening address by socat.
"""
if family not in (socket.AF_INET, socket.AF_INET6):
raise errors.GenericError("Address family %r not supported" % family)
if (family == socket.AF_INET6 and address.startswith("[") and
address.endswith("]")):
address = address.lstrip("[").rstrip("]")
try:
packed_address = socket.inet_pton(family, address)
except socket.error:
raise errors.GenericError("Invalid address %r for family %s" %
(address, family))
return (socket.inet_ntop(family, packed_address), port)
class ChildIOProcessor(object):
def __init__(self, debug, status_file, logger, throughput_samples, exp_size):
"""Initializes this class.
"""
self._debug = debug
self._status_file = status_file
self._logger = logger
self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog))
for prog in PROG_ALL])
self._dd_pid = None
self._dd_ready = False
self._dd_tp_samples = throughput_samples
self._dd_progress = []
# Expected size of transferred data
self._exp_size = exp_size
def GetLineSplitter(self, prog):
"""Returns the line splitter for a program.
"""
return self._splitter[prog]
def FlushAll(self):
"""Flushes all line splitters.
"""
for ls in self._splitter.itervalues():
ls.flush()
def CloseAll(self):
"""Closes all line splitters.
"""
for ls in self._splitter.itervalues():
ls.close()
self._splitter.clear()
def NotifyDd(self):
"""Tells dd(1) to write statistics.
"""
if self._dd_pid is None:
# Can't notify
return False
if not self._dd_ready:
# There's a race condition between starting the program and sending
# signals. The signal handler is only registered after some time, so we
# have to check whether the program is ready. If it isn't, sending a
# signal will invoke the default handler (and usually abort the program).
if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL):
logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
return False
logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
self._dd_ready = True
logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid)
try:
os.kill(self._dd_pid, DD_INFO_SIGNAL)
except EnvironmentError, err:
if err.errno != errno.ESRCH:
raise
# Process no longer exists
logging.debug("dd exited")
self._dd_pid = None
return True
def _ProcessOutput(self, line, prog):
"""Takes care of child process output.
@type line: string
@param line: Child output line
@type prog: number
@param prog: Program from which the line originates
"""
force_update = False
forward_line = line
if prog == PROG_SOCAT:
level = None
parts = line.split(None, 4)
if len(parts) == 5:
(_, _, _, level, msg) = parts
force_update = self._ProcessSocatOutput(self._status_file, level, msg)
if self._debug or (level and level not in SOCAT_LOG_IGNORE):
forward_line = "socat: %s %s" % (level, msg)
else:
forward_line = None
else:
forward_line = "socat: %s" % line
elif prog == PROG_DD:
(should_forward, force_update) = self._ProcessDdOutput(line)
if should_forward or self._debug:
forward_line = "dd: %s" % line
else:
forward_line = None
elif prog == PROG_DD_PID:
if self._dd_pid:
raise RuntimeError("dd PID reported more than once")
logging.debug("Received dd PID %r", line)
self._dd_pid = int(line)
forward_line = None
elif prog == PROG_EXP_SIZE:
logging.debug("Received predicted size %r", line)
forward_line = None
if line:
try:
exp_size = utils.BytesToMebibyte(int(line))
except (ValueError, TypeError), err:
logging.error("Failed to convert predicted size %r to number: %s",
line, err)
exp_size = None
else:
exp_size = None
self._exp_size = exp_size
if forward_line:
self._logger.info(forward_line)
self._status_file.AddRecentOutput(forward_line)
self._status_file.Update(force_update)
@staticmethod
def _ProcessSocatOutput(status_file, level, msg):
"""Interprets socat log output.
"""
if level == SOCAT_LOG_NOTICE:
if status_file.GetListenPort() is None:
# TODO: Maybe implement timeout to not listen forever
m = LISTENING_RE.match(msg)
if m:
(_, port) = _VerifyListening(int(m.group("family")),
m.group("address"),
int(m.group("port")))
status_file.SetListenPort(port)
return True
if not status_file.GetConnected():
m = TRANSFER_LOOP_RE.match(msg)
if m:
logging.debug("Connection established")
status_file.SetConnected()
return True
return False
def _ProcessDdOutput(self, line):
"""Interprets a line of dd(1)'s output.
"""
m = DD_INFO_RE.match(line)
if m:
seconds = float(m.group("seconds"))
mbytes = utils.BytesToMebibyte(int(m.group("bytes")))
self._UpdateDdProgress(seconds, mbytes)
return (False, True)
m = DD_STDERR_IGNORE.match(line)
if m:
# Ignore
return (False, False)
# Forward line
return (True, False)
def _UpdateDdProgress(self, seconds, mbytes):
"""Updates the internal status variables for dd(1) progress.
@type seconds: float
@param seconds: Timestamp of this update
@type mbytes: float
@param mbytes: Total number of MiB transferred so far
"""
# Add latest sample
self._dd_progress.append((seconds, mbytes))
# Remove old samples
del self._dd_progress[:-self._dd_tp_samples]
# Calculate throughput
throughput = _CalcThroughput(self._dd_progress)
# Calculate percent and ETA
percent = None
eta = None
if self._exp_size is not None:
if self._exp_size != 0:
percent = max(0, min(100, (100.0 * mbytes) / self._exp_size))
if throughput:
eta = max(0, float(self._exp_size - mbytes) / throughput)
self._status_file.SetProgress(mbytes, throughput, percent, eta)
def _CalcThroughput(samples):
"""Calculates the throughput in MiB/second.
@type samples: sequence
@param samples: List of samples, each consisting of a (timestamp, mbytes)
tuple
@rtype: float or None
@return: Throughput in MiB/second
"""
if len(samples) < 2:
# Can't calculate throughput
return None
(start_time, start_mbytes) = samples[0]
(end_time, end_mbytes) = samples[-1]
return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time)