| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2010 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Import/export daemon. |
| |
| """ |
| |
| # pylint: disable=C0103 |
| # C0103: Invalid name import-export |
| |
| import errno |
| import logging |
| import optparse |
| import os |
| import select |
| import signal |
| import subprocess |
| import sys |
| import time |
| import math |
| |
| from ganeti import constants |
| from ganeti import cli |
| from ganeti import utils |
| from ganeti import errors |
| from ganeti import serializer |
| from ganeti import objects |
| from ganeti import impexpd |
| from ganeti import netutils |
| |
| |
| #: How many lines to keep in the status file |
| MAX_RECENT_OUTPUT_LINES = 20 |
| |
| #: Don't update status file more than once every 5 seconds (unless forced) |
| MIN_UPDATE_INTERVAL = 5.0 |
| |
| #: How long to wait for a connection to be established |
| DEFAULT_CONNECT_TIMEOUT = 60 |
| |
| #: Get dd(1) statistics every few seconds |
| DD_STATISTICS_INTERVAL = 5.0 |
| |
| #: Seconds for throughput calculation |
| DD_THROUGHPUT_INTERVAL = 60.0 |
| |
| #: Number of samples for throughput calculation |
| DD_THROUGHPUT_SAMPLES = int(math.ceil(float(DD_THROUGHPUT_INTERVAL) / |
| DD_STATISTICS_INTERVAL)) |
| |
| |
| # Global variable for options |
| options = None |
| |
| |
| def SetupLogging(): |
| """Configures the logging module. |
| |
| """ |
| formatter = logging.Formatter("%(asctime)s: %(message)s") |
| |
| stderr_handler = logging.StreamHandler() |
| stderr_handler.setFormatter(formatter) |
| stderr_handler.setLevel(logging.NOTSET) |
| |
| root_logger = logging.getLogger("") |
| root_logger.addHandler(stderr_handler) |
| |
| if options.debug: |
| root_logger.setLevel(logging.NOTSET) |
| elif options.verbose: |
| root_logger.setLevel(logging.INFO) |
| else: |
| root_logger.setLevel(logging.ERROR) |
| |
| # Create special logger for child process output |
| child_logger = logging.Logger("child output") |
| child_logger.addHandler(stderr_handler) |
| child_logger.setLevel(logging.NOTSET) |
| |
| return child_logger |
| |
| |
| class StatusFile: |
| """Status file manager. |
| |
| """ |
| def __init__(self, path): |
| """Initializes class. |
| |
| """ |
| self._path = path |
| self._data = objects.ImportExportStatus(ctime=time.time(), |
| mtime=None, |
| recent_output=[]) |
| |
| def AddRecentOutput(self, line): |
| """Adds a new line of recent output. |
| |
| """ |
| self._data.recent_output.append(line) |
| |
| # Remove old lines |
| del self._data.recent_output[:-MAX_RECENT_OUTPUT_LINES] |
| |
| def SetListenPort(self, port): |
| """Sets the port the daemon is listening on. |
| |
| @type port: int |
| @param port: TCP/UDP port |
| |
| """ |
| assert isinstance(port, (int, long)) and 0 < port < (2 ** 16) |
| self._data.listen_port = port |
| |
| def GetListenPort(self): |
| """Returns the port the daemon is listening on. |
| |
| """ |
| return self._data.listen_port |
| |
| def SetConnected(self): |
| """Sets the connected flag. |
| |
| """ |
| self._data.connected = True |
| |
| def GetConnected(self): |
| """Determines whether the daemon is connected. |
| |
| """ |
| return self._data.connected |
| |
| def SetProgress(self, mbytes, throughput, percent, eta): |
| """Sets how much data has been transferred so far. |
| |
| @type mbytes: number |
| @param mbytes: Transferred amount of data in MiB. |
| @type throughput: float |
| @param throughput: MiB/second |
| @type percent: number |
| @param percent: Percent processed |
| @type eta: number |
| @param eta: Expected number of seconds until done |
| |
| """ |
| self._data.progress_mbytes = mbytes |
| self._data.progress_throughput = throughput |
| self._data.progress_percent = percent |
| self._data.progress_eta = eta |
| |
| def SetExitStatus(self, exit_status, error_message): |
| """Sets the exit status and an error message. |
| |
| """ |
| # Require error message when status isn't 0 |
| assert exit_status == 0 or error_message |
| |
| self._data.exit_status = exit_status |
| self._data.error_message = error_message |
| |
| def ExitStatusIsSuccess(self): |
| """Returns whether the exit status means "success". |
| |
| """ |
| return not bool(self._data.error_message) |
| |
| def Update(self, force): |
| """Updates the status file. |
| |
| @type force: bool |
| @param force: Write status file in any case, not only when minimum interval |
| is expired |
| |
| """ |
| if not (force or |
| self._data.mtime is None or |
| time.time() > (self._data.mtime + MIN_UPDATE_INTERVAL)): |
| return |
| |
| logging.debug("Updating status file %s", self._path) |
| |
| self._data.mtime = time.time() |
| utils.WriteFile(self._path, |
| data=serializer.DumpJson(self._data.ToDict()), |
| mode=0400) |
| |
| |
| def ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, |
| dd_pid_read_fd, exp_size_read_fd, status_file, child_logger, |
| signal_notify, signal_handler, mode): |
| """Handles the child processes' output. |
| |
| """ |
| assert not (signal_handler.signum - set([signal.SIGTERM, signal.SIGINT])), \ |
| "Other signals are not handled in this function" |
| |
| # Buffer size 0 is important, otherwise .read() with a specified length |
| # might buffer data while poll(2) won't mark its file descriptor as |
| # readable again. |
| socat_stderr_read = os.fdopen(socat_stderr_read_fd, "r", 0) |
| dd_stderr_read = os.fdopen(dd_stderr_read_fd, "r", 0) |
| dd_pid_read = os.fdopen(dd_pid_read_fd, "r", 0) |
| exp_size_read = os.fdopen(exp_size_read_fd, "r", 0) |
| |
| tp_samples = DD_THROUGHPUT_SAMPLES |
| |
| if options.exp_size == constants.IE_CUSTOM_SIZE: |
| exp_size = None |
| else: |
| exp_size = options.exp_size |
| |
| child_io_proc = impexpd.ChildIOProcessor(options.debug, status_file, |
| child_logger, tp_samples, |
| exp_size) |
| try: |
| fdmap = { |
| child.stderr.fileno(): |
| (child.stderr, child_io_proc.GetLineSplitter(impexpd.PROG_OTHER)), |
| socat_stderr_read.fileno(): |
| (socat_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_SOCAT)), |
| dd_pid_read.fileno(): |
| (dd_pid_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD_PID)), |
| dd_stderr_read.fileno(): |
| (dd_stderr_read, child_io_proc.GetLineSplitter(impexpd.PROG_DD)), |
| exp_size_read.fileno(): |
| (exp_size_read, child_io_proc.GetLineSplitter(impexpd.PROG_EXP_SIZE)), |
| signal_notify.fileno(): (signal_notify, None), |
| } |
| |
| poller = select.poll() |
| for fd in fdmap: |
| utils.SetNonblockFlag(fd, True) |
| poller.register(fd, select.POLLIN) |
| |
| if options.connect_timeout and mode == constants.IEM_IMPORT: |
| listen_timeout = utils.RunningTimeout(options.connect_timeout, True) |
| else: |
| listen_timeout = None |
| |
| exit_timeout = None |
| dd_stats_timeout = None |
| |
| while True: |
| # Break out of loop if only signal notify FD is left |
| if len(fdmap) == 1 and signal_notify.fileno() in fdmap: |
| break |
| |
| timeout = None |
| |
| if listen_timeout and not exit_timeout: |
| assert mode == constants.IEM_IMPORT and options.connect_timeout |
| if status_file.GetConnected(): |
| listen_timeout = None |
| elif listen_timeout.Remaining() < 0: |
| errmsg = ("Child process didn't establish connection in time" |
| " (%0.0fs), sending SIGTERM" % options.connect_timeout) |
| logging.error(errmsg) |
| status_file.AddRecentOutput(errmsg) |
| status_file.Update(True) |
| |
| child.Kill(signal.SIGTERM) |
| exit_timeout = \ |
| utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) |
| # Next block will calculate timeout |
| else: |
| # Not yet connected, check again in a second |
| timeout = 1000 |
| |
| if exit_timeout: |
| timeout = exit_timeout.Remaining() * 1000 |
| if timeout < 0: |
| logging.info("Child process didn't exit in time") |
| break |
| |
| if (not dd_stats_timeout) or dd_stats_timeout.Remaining() < 0: |
| notify_status = child_io_proc.NotifyDd() |
| if notify_status: |
| # Schedule next notification |
| dd_stats_timeout = utils.RunningTimeout(DD_STATISTICS_INTERVAL, True) |
| else: |
| # Try again soon (dd isn't ready yet) |
| dd_stats_timeout = utils.RunningTimeout(1.0, True) |
| |
| if dd_stats_timeout: |
| dd_timeout = max(0, dd_stats_timeout.Remaining() * 1000) |
| |
| if timeout is None: |
| timeout = dd_timeout |
| else: |
| timeout = min(timeout, dd_timeout) |
| |
| for fd, event in utils.RetryOnSignal(poller.poll, timeout): |
| if event & (select.POLLIN | event & select.POLLPRI): |
| (from_, to) = fdmap[fd] |
| |
| # Read up to 1 KB of data |
| data = from_.read(1024) |
| if data: |
| if to: |
| to.write(data) |
| elif fd == signal_notify.fileno(): |
| # Signal handling |
| if signal_handler.called: |
| signal_handler.Clear() |
| if exit_timeout: |
| logging.info("Child process still has about %0.2f seconds" |
| " to exit", exit_timeout.Remaining()) |
| else: |
| logging.info("Giving child process %0.2f seconds to exit", |
| constants.CHILD_LINGER_TIMEOUT) |
| exit_timeout = \ |
| utils.RunningTimeout(constants.CHILD_LINGER_TIMEOUT, True) |
| else: |
| poller.unregister(fd) |
| del fdmap[fd] |
| |
| elif event & (select.POLLNVAL | select.POLLHUP | |
| select.POLLERR): |
| poller.unregister(fd) |
| del fdmap[fd] |
| |
| child_io_proc.FlushAll() |
| |
| # If there was a timeout calculator, we were waiting for the child to |
| # finish, e.g. due to a signal |
| return not bool(exit_timeout) |
| finally: |
| child_io_proc.CloseAll() |
| |
| |
| def ParseOptions(): |
| """Parses the options passed to the program. |
| |
| @return: Arguments to program |
| |
| """ |
| global options # pylint: disable=W0603 |
| |
| parser = optparse.OptionParser(usage=("%%prog <status-file> {%s|%s}" % |
| (constants.IEM_IMPORT, |
| constants.IEM_EXPORT))) |
| parser.add_option(cli.DEBUG_OPT) |
| parser.add_option(cli.VERBOSE_OPT) |
| parser.add_option("--key", dest="key", action="store", type="string", |
| help="RSA key file") |
| parser.add_option("--cert", dest="cert", action="store", type="string", |
| help="X509 certificate file") |
| parser.add_option("--ca", dest="ca", action="store", type="string", |
| help="X509 CA file") |
| parser.add_option("--bind", dest="bind", action="store", type="string", |
| help="Bind address") |
| parser.add_option("--ipv4", dest="ipv4", action="store_true", |
| help="Use IPv4 only") |
| parser.add_option("--ipv6", dest="ipv6", action="store_true", |
| help="Use IPv6 only") |
| parser.add_option("--host", dest="host", action="store", type="string", |
| help="Remote hostname") |
| parser.add_option("--port", dest="port", action="store", type="int", |
| help="Remote port") |
| parser.add_option("--connect-retries", dest="connect_retries", action="store", |
| type="int", default=0, |
| help=("How many times the connection should be retried" |
| " (export only)")) |
| parser.add_option("--connect-timeout", dest="connect_timeout", action="store", |
| type="int", default=DEFAULT_CONNECT_TIMEOUT, |
| help="Timeout for connection to be established (seconds)") |
| parser.add_option("--compress", dest="compress", action="store", |
| type="choice", help="Compression method", |
| metavar="[%s]" % "|".join(constants.IEC_ALL), |
| choices=list(constants.IEC_ALL), default=constants.IEC_GZIP) |
| parser.add_option("--expected-size", dest="exp_size", action="store", |
| type="string", default=None, |
| help="Expected import/export size (MiB)") |
| parser.add_option("--magic", dest="magic", action="store", |
| type="string", default=None, help="Magic string") |
| parser.add_option("--cmd-prefix", dest="cmd_prefix", action="store", |
| type="string", help="Command prefix") |
| parser.add_option("--cmd-suffix", dest="cmd_suffix", action="store", |
| type="string", help="Command suffix") |
| |
| (options, args) = parser.parse_args() |
| |
| if len(args) != 2: |
| # Won't return |
| parser.error("Expected exactly two arguments") |
| |
| (status_file_path, mode) = args |
| |
| if mode not in (constants.IEM_IMPORT, |
| constants.IEM_EXPORT): |
| # Won't return |
| parser.error("Invalid mode: %s" % mode) |
| |
| # Normalize and check parameters |
| if options.host is not None and not netutils.IPAddress.IsValid(options.host): |
| try: |
| options.host = netutils.Hostname.GetNormalizedName(options.host) |
| except errors.OpPrereqError, err: |
| parser.error("Invalid hostname '%s': %s" % (options.host, err)) |
| |
| if options.port is not None: |
| options.port = utils.ValidateServiceName(options.port) |
| |
| if (options.exp_size is not None and |
| options.exp_size != constants.IE_CUSTOM_SIZE): |
| try: |
| options.exp_size = int(options.exp_size) |
| except (ValueError, TypeError), err: |
| # Won't return |
| parser.error("Invalid value for --expected-size: %s (%s)" % |
| (options.exp_size, err)) |
| |
| if not (options.magic is None or constants.IE_MAGIC_RE.match(options.magic)): |
| parser.error("Magic must match regular expression %s" % |
| constants.IE_MAGIC_RE.pattern) |
| |
| if options.ipv4 and options.ipv6: |
| parser.error("Can only use one of --ipv4 and --ipv6") |
| |
| return (status_file_path, mode) |
| |
| |
| class ChildProcess(subprocess.Popen): |
| def __init__(self, env, cmd, noclose_fds): |
| """Initializes this class. |
| |
| """ |
| self._noclose_fds = noclose_fds |
| |
| # Not using close_fds because doing so would also close the socat stderr |
| # pipe, which we still need. |
| subprocess.Popen.__init__(self, cmd, env=env, shell=False, close_fds=False, |
| stderr=subprocess.PIPE, stdout=None, stdin=None, |
| preexec_fn=self._ChildPreexec) |
| self._SetProcessGroup() |
| |
| def _ChildPreexec(self): |
| """Called before child executable is execve'd. |
| |
| """ |
| # Move to separate process group. By sending a signal to its process group |
| # we can kill the child process and all grandchildren. |
| os.setpgid(0, 0) |
| |
| # Close almost all file descriptors |
| utils.CloseFDs(noclose_fds=self._noclose_fds) |
| |
| def _SetProcessGroup(self): |
| """Sets the child's process group. |
| |
| """ |
| assert self.pid, "Can't be called in child process" |
| |
| # Avoid race condition by setting child's process group (as good as |
| # possible in Python) before sending signals to child. For an |
| # explanation, see preexec function for child. |
| try: |
| os.setpgid(self.pid, self.pid) |
| except EnvironmentError, err: |
| # If the child process was faster we receive EPERM or EACCES |
| if err.errno not in (errno.EPERM, errno.EACCES): |
| raise |
| |
| def Kill(self, signum): |
| """Sends signal to child process. |
| |
| """ |
| logging.info("Sending signal %s to child process", signum) |
| utils.IgnoreProcessNotFound(os.killpg, self.pid, signum) |
| |
| def ForceQuit(self): |
| """Ensure child process is no longer running. |
| |
| """ |
| # Final check if child process is still alive |
| if utils.RetryOnSignal(self.poll) is None: |
| logging.error("Child process still alive, sending SIGKILL") |
| self.Kill(signal.SIGKILL) |
| utils.RetryOnSignal(self.wait) |
| |
| |
| def main(): |
| """Main function. |
| |
| """ |
| # Option parsing |
| (status_file_path, mode) = ParseOptions() |
| |
| # Configure logging |
| child_logger = SetupLogging() |
| |
| status_file = StatusFile(status_file_path) |
| try: |
| try: |
| # Pipe to receive socat's stderr output |
| (socat_stderr_read_fd, socat_stderr_write_fd) = os.pipe() |
| |
| # Pipe to receive dd's stderr output |
| (dd_stderr_read_fd, dd_stderr_write_fd) = os.pipe() |
| |
| # Pipe to receive dd's PID |
| (dd_pid_read_fd, dd_pid_write_fd) = os.pipe() |
| |
| # Pipe to receive size predicted by export script |
| (exp_size_read_fd, exp_size_write_fd) = os.pipe() |
| |
| # Get child process command |
| cmd_builder = impexpd.CommandBuilder(mode, options, socat_stderr_write_fd, |
| dd_stderr_write_fd, dd_pid_write_fd) |
| cmd = cmd_builder.GetCommand() |
| |
| # Prepare command environment |
| cmd_env = os.environ.copy() |
| |
| if options.exp_size == constants.IE_CUSTOM_SIZE: |
| cmd_env["EXP_SIZE_FD"] = str(exp_size_write_fd) |
| |
| logging.debug("Starting command %r", cmd) |
| |
| # Start child process |
| child = ChildProcess(cmd_env, cmd, |
| [socat_stderr_write_fd, dd_stderr_write_fd, |
| dd_pid_write_fd, exp_size_write_fd]) |
| try: |
| |
| def _ForwardSignal(signum, _): |
| """Forwards signals to child process. |
| |
| """ |
| child.Kill(signum) |
| |
| signal_wakeup = utils.SignalWakeupFd() |
| try: |
| # TODO: There is a race condition between starting the child and |
| # handling the signals here. While there might be a way to work around |
| # it by registering the handlers before starting the child and |
| # deferring sent signals until the child is available, doing so can be |
| # complicated. |
| signal_handler = utils.SignalHandler([signal.SIGTERM, signal.SIGINT], |
| handler_fn=_ForwardSignal, |
| wakeup=signal_wakeup) |
| try: |
| # Close child's side |
| utils.RetryOnSignal(os.close, socat_stderr_write_fd) |
| utils.RetryOnSignal(os.close, dd_stderr_write_fd) |
| utils.RetryOnSignal(os.close, dd_pid_write_fd) |
| utils.RetryOnSignal(os.close, exp_size_write_fd) |
| |
| if ProcessChildIO(child, socat_stderr_read_fd, dd_stderr_read_fd, |
| dd_pid_read_fd, exp_size_read_fd, |
| status_file, child_logger, |
| signal_wakeup, signal_handler, mode): |
| # The child closed all its file descriptors and there was no |
| # signal |
| # TODO: Implement timeout instead of waiting indefinitely |
| utils.RetryOnSignal(child.wait) |
| finally: |
| signal_handler.Reset() |
| finally: |
| signal_wakeup.Reset() |
| finally: |
| child.ForceQuit() |
| |
| if child.returncode == 0: |
| errmsg = None |
| elif child.returncode < 0: |
| errmsg = "Exited due to signal %s" % (-child.returncode, ) |
| else: |
| errmsg = "Exited with status %s" % (child.returncode, ) |
| |
| status_file.SetExitStatus(child.returncode, errmsg) |
| except Exception, err: # pylint: disable=W0703 |
| logging.exception("Unhandled error occurred") |
| status_file.SetExitStatus(constants.EXIT_FAILURE, |
| "Unhandled error occurred: %s" % (err, )) |
| |
| if status_file.ExitStatusIsSuccess(): |
| sys.exit(constants.EXIT_SUCCESS) |
| |
| sys.exit(constants.EXIT_FAILURE) |
| finally: |
| status_file.Update(True) |
| |
| |
| if __name__ == "__main__": |
| main() |