| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2006, 2007, 2010, 2011 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| """Run an executable on a list of hosts. |
| |
| Script to serially run an executable on a list of hosts via ssh |
| with password auth as root. If the provided log dir does not yet |
| exist, it will try to create it. |
| |
| Implementation: |
| - the main process spawns up to batch_size children, which: |
| - connects to the remote host via ssh as root |
| - uploads the executable with a random name to /tmp via sftp |
| - chmod 500s it |
| - via ssh: chdirs into the upload directory and runs the script |
| - deletes it |
| - writes status messages and all output to one logfile per host |
| - the main process gathers then the status of the children and |
| reports the success/failure ratio |
| - entire script can be aborted with Ctrl-C |
| |
| Security considerations: |
| - the root password for the remote hosts is stored in memory for the |
| runtime of the script |
| - the executable to be run on the remote host is handled the following way: |
| - try to create a random directory with permissions 700 on the |
| remote host, abort furter processing on this host if this failes |
| - upload the executable with to a random filename in that directory |
| - set executable permissions to 500 |
| - run the executable |
| - delete the execuable and the directory on the remote host |
| |
| """ |
| |
| # pylint: disable=C0103 |
| # C0103: Invalid name ganeti-listrunner |
| |
| import errno |
| import optparse |
| import getpass |
| import logging |
| import os |
| import random |
| import select |
| import socket |
| import sys |
| import time |
| import traceback |
| |
| try: |
| import paramiko |
| except ImportError: |
| print >> sys.stderr, \ |
| ("The \"paramiko\" module could not be imported. Install it from your" |
| " distribution's repository. The package is usually named" |
| " \"python-paramiko\".") |
| sys.exit(1) |
| |
| |
| REMOTE_PATH_BASE = "/tmp/listrunner" |
| |
| USAGE = ("%prog -l logdir {-c command | -x /path/to/file} [-b batch_size]" |
| " {-f hostfile|-h hosts} [-u username]" |
| " [-p password_file | -A]") |
| |
| |
| def LogDirUseable(logdir): |
| """Ensure log file directory is available and usable.""" |
| testfile = "%s/test-%s-%s.deleteme" % (logdir, random.random(), |
| random.random()) |
| try: |
| os.mkdir(logdir) |
| except OSError, err: |
| if err.errno != errno.EEXIST: |
| raise |
| try: |
| logtest = open(testfile, "aw") |
| logtest.writelines("log file writeability test\n") |
| logtest.close() |
| os.unlink(testfile) |
| return True |
| except (OSError, IOError): |
| return False |
| |
| |
| def GetTimeStamp(timestamp=None): |
| """Return ISO8601 timestamp. |
| |
| Returns ISO8601 timestamp, optionally expects a time.localtime() tuple |
| in timestamp, but will use the current time if this argument is not |
| supplied. |
| """ |
| if timestamp is None: |
| timestamp = time.localtime() |
| |
| isotime = time.strftime("%Y-%m-%dT%H:%M:%S", timestamp) |
| return isotime |
| |
| |
| def PingByTcp(target, port, timeout=10, live_port_needed=False, source=None): |
| """Simple ping implementation using TCP connect(2). |
| |
| Try to do a TCP connect(2) from an optional source IP to the |
| specified target IP and the specified target port. If the optional |
| parameter live_port_needed is set to true, requires the remote end |
| to accept the connection. The timeout is specified in seconds and |
| defaults to 10 seconds. If the source optional argument is not |
| passed, the source address selection is left to the kernel, |
| otherwise we try to connect using the passed address (failures to |
| bind other than EADDRNOTAVAIL will be ignored). |
| |
| """ |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| |
| success = False |
| |
| if source is not None: |
| try: |
| sock.bind((source, 0)) |
| except socket.error, (errcode): |
| if errcode == errno.EADDRNOTAVAIL: |
| success = False |
| |
| sock.settimeout(timeout) |
| |
| try: |
| sock.connect((target, port)) |
| sock.close() |
| success = True |
| except socket.timeout: |
| success = False |
| except socket.error, (errcode): |
| success = (not live_port_needed) and (errcode == errno.ECONNREFUSED) |
| |
| return success |
| |
| |
| def GetHosts(hostsfile): |
| """Return list of hosts from hostfile. |
| |
| Reads the hostslist file and returns a list of hosts. |
| Expects the hostslist file to contain one hostname per line. |
| |
| """ |
| try: |
| datafile = open(hostsfile, "r") |
| except IOError, msg: |
| print "Failed to open hosts file %s: %s" % (hostsfile, msg) |
| sys.exit(2) |
| |
| hosts = datafile.readlines() |
| datafile.close() |
| |
| return hosts |
| |
| |
| def WriteLog(message, logfile): |
| """Writes message, terminated by newline, to logfile.""" |
| try: |
| logfile = open(logfile, "aw") |
| except IOError, msg: |
| print "failed to open log file %s: %s" % (logfile, msg) |
| print "log message was: %s" % message |
| sys.exit(1) # no being able to log is critical |
| try: |
| timestamp = GetTimeStamp() |
| logfile.writelines("%s %s\n" % (timestamp, message)) |
| logfile.close() |
| except IOError, msg: |
| print "failed to write to logfile %s: %s" % (logfile, msg) |
| print "log message was: %s" % message |
| sys.exit(1) # no being able to log is critical |
| |
| |
| def GetAgentKeys(): |
| """Tries to get a list of ssh keys from an agent.""" |
| try: |
| agent = paramiko.Agent() |
| return list(agent.get_keys()) |
| except paramiko.SSHException: |
| return [] |
| |
| |
| def SetupSshConnection(host, username, password, use_agent, logfile): |
| """Setup the ssh connection used for all later steps. |
| |
| This function sets up the ssh connection that will be used both |
| for upload and remote command execution. |
| |
| On success, it will return paramiko.Transport object with an |
| already logged in session. On failure, False will be returned. |
| |
| """ |
| # check if target is willing to talk to us at all |
| if not PingByTcp(host, 22, live_port_needed=True): |
| WriteLog("ERROR: FAILURE_NOT_REACHABLE", logfile) |
| print " - ERROR: host not reachable on 22/tcp" |
| return False |
| |
| if use_agent: |
| keys = GetAgentKeys() |
| else: |
| keys = [] |
| all_kwargs = [{"pkey": k} for k in keys] |
| all_desc = ["key %d" % d for d in range(len(keys))] |
| if password is not None: |
| all_kwargs.append({"password": password}) |
| all_desc.append("password") |
| |
| # deal with logging out of paramiko.transport |
| handler = None |
| |
| for desc, kwargs in zip(all_desc, all_kwargs): |
| try: |
| transport = paramiko.Transport((host, 22)) |
| |
| # only try to setup the logging handler once |
| if not handler: |
| handler = logging.StreamHandler() |
| handler.setLevel(logging.ERROR) |
| log = logging.getLogger(transport.get_log_channel()) |
| log.addHandler(handler) |
| |
| transport.connect(username=username, **kwargs) # pylint: disable=W0142 |
| WriteLog("ssh connection established using %s" % desc, logfile) |
| # strange ... when establishing the session and the immediately |
| # setting up the channels for sftp & shell from that, it sometimes |
| # fails, but waiting 1 second after session setup makes it always work |
| # time.sleep(1) |
| # FIXME apparently needfull to give sshd some time |
| return transport |
| except (socket.gaierror, socket.error, paramiko.SSHException): |
| continue |
| |
| methods = ", ".join(all_desc) |
| WriteLog("ERROR: FAILURE_CONNECTION_SETUP (tried %s) " % methods, logfile) |
| WriteLog("aborted", logfile) |
| print " - ERROR: connection setup failed (tried %s)" % methods |
| |
| return False |
| |
| |
| def UploadFiles(connection, executable, filelist, logfile): |
| """Uploads the specified files via sftp. |
| |
| Uploads the specified files to a random, freshly created directory with |
| a temporary name under /tmp. All uploaded files are chmod 0400 after upload |
| with the exception of executable, with is chmod 500. |
| |
| Upon success, returns the absolute path to the remote upload directory, |
| but will return False upon failure. |
| """ |
| remote_dir = "%s.%s-%s" % (REMOTE_PATH_BASE, |
| random.random(), random.random()) |
| |
| try: |
| sftp = paramiko.SFTPClient.from_transport(connection) |
| sftp.mkdir(remote_dir, mode=0700) |
| for item in filelist: |
| remote_file = "%s/%s" % (remote_dir, os.path.basename(item)) |
| WriteLog("uploading %s to remote %s" % (item, remote_file), logfile) |
| sftp.put(item, remote_file) |
| if item == executable: |
| sftp.chmod(remote_file, 0500) |
| else: |
| sftp.chmod(remote_file, 0400) |
| sftp.close() |
| except IOError, err: |
| WriteLog("ERROR: FAILURE_UPLOAD: %s" % err, logfile) |
| return False |
| |
| return remote_dir |
| |
| |
| def CleanupRemoteDir(connection, upload_dir, filelist, logfile): |
| """Cleanes out and removes the remote work directory.""" |
| try: |
| sftp = paramiko.SFTPClient.from_transport(connection) |
| for item in filelist: |
| fullpath = "%s/%s" % (upload_dir, os.path.basename(item)) |
| WriteLog("removing remote %s" % fullpath, logfile) |
| sftp.remove(fullpath) |
| sftp.rmdir(upload_dir) |
| sftp.close() |
| except IOError, err: |
| WriteLog("ERROR: FAILURE_CLEANUP: %s" % err, logfile) |
| return False |
| |
| return True |
| |
| |
| def RunRemoteCommand(connection, command, logfile): |
| """Execute the command via ssh on the remote host.""" |
| session = connection.open_session() |
| session.setblocking(0) |
| |
| # the following dance is needed because paramiko changed APIs: |
| # from returning True/False for success to always returning None |
| # and throwing an exception in case of problems. |
| # And I want to support both the old and the new API. |
| result = True # being optimistic here, I know |
| message = None |
| try: |
| if session.exec_command("%s 2>&1" % command) is False: |
| result = False |
| except paramiko.SSHException, message: |
| result = False |
| |
| if not result: |
| WriteLog("ERROR: FAILURE_COMMAND_EXECUTION: %s" % message, logfile) |
| return False |
| |
| ### Read when data is available |
| output = "" |
| while select.select([session], [], []): |
| try: |
| data = session.recv(1024) |
| except socket.timeout, err: |
| data = None |
| WriteLog("FAILED: socket.timeout %s" % err, logfile) |
| except socket.error, err: |
| data = None |
| WriteLog("FAILED: socket.error %s" % err, logfile) |
| if not data: |
| break |
| output += data |
| select.select([], [], [], .1) |
| |
| WriteLog("SUCCESS: command output follows", logfile) |
| for line in output.splitlines(): |
| WriteLog("output = %s" % line, logfile) |
| WriteLog("command execution completed", logfile) |
| session.close() |
| |
| return True |
| |
| |
| def HostWorker(logdir, username, password, use_agent, hostname, |
| executable, exec_args, command, filelist): |
| """Per-host worker. |
| |
| This function does not return - it's the main code of the childs, |
| which exit at the end of this function. The exit code 0 or 1 will be |
| interpreted by the parent. |
| |
| @param logdir: the directory where the logfiles must be created |
| @param username: SSH username |
| @param password: SSH password |
| @param use_agent: whether we should instead use an agent |
| @param hostname: the hostname to connect to |
| @param executable: the executable to upload, if not None |
| @param exec_args: Additional arguments for executable |
| @param command: the command to run |
| @param filelist: auxiliary files to upload |
| |
| """ |
| # in the child/worker process |
| logfile = "%s/%s.log" % (logdir, hostname) |
| print "%s - starting" % hostname |
| result = 0 # optimism, I know |
| try: |
| connection = SetupSshConnection(hostname, username, |
| password, use_agent, logfile) |
| if connection is not False: |
| if executable is not None: |
| print " %s: uploading files" % hostname |
| upload_dir = UploadFiles(connection, executable, |
| filelist, logfile) |
| command = ("cd %s && ./%s" % |
| (upload_dir, os.path.basename(executable))) |
| if exec_args: |
| command += " %s" % exec_args |
| print " %s: executing remote command" % hostname |
| cmd_result = RunRemoteCommand(connection, command, logfile) |
| if cmd_result is True: |
| print " %s: remote command execution successful" % hostname |
| else: |
| print (" %s: remote command execution failed," |
| " check log for details" % hostname) |
| result = 1 |
| if executable is not None: |
| print " %s: cleaning up remote work dir" % hostname |
| cln_result = CleanupRemoteDir(connection, upload_dir, |
| filelist, logfile) |
| if cln_result is False: |
| print (" %s: remote work dir cleanup failed, check" |
| " log for details" % hostname) |
| result = 1 |
| connection.close() |
| else: |
| print " %s: connection setup failed, skipping" % hostname |
| result = 1 |
| except KeyboardInterrupt: |
| print " %s: received KeyboardInterrupt, aborting" % hostname |
| WriteLog("ERROR: ABORT_KEYBOARD_INTERRUPT", logfile) |
| result = 1 |
| except Exception, err: |
| result = 1 |
| trace = traceback.format_exc() |
| msg = "ERROR: UNHANDLED_EXECPTION_ERROR: %s\nTrace: %s" % (err, trace) |
| WriteLog(msg, logfile) |
| print " %s: %s" % (hostname, msg) |
| # and exit with exit code 0 or 1, so the parent can compute statistics |
| sys.exit(result) |
| |
| |
| def LaunchWorker(child_pids, logdir, username, password, use_agent, hostname, |
| executable, exec_args, command, filelist): |
| """Launch the per-host worker. |
| |
| Arguments are the same as for HostWorker, except for child_pids, |
| which is a dictionary holding the pid-to-hostname mapping. |
| |
| """ |
| hostname = hostname.rstrip("\n") |
| pid = os.fork() |
| if pid > 0: |
| # controller just record the pids |
| child_pids[pid] = hostname |
| else: |
| HostWorker(logdir, username, password, use_agent, hostname, |
| executable, exec_args, command, filelist) |
| |
| |
| def ParseOptions(): |
| """Parses the command line options. |
| |
| In case of command line errors, it will show the usage and exit the |
| program. |
| |
| @return: the options in a tuple |
| |
| """ |
| # resolve because original used -h for hostfile, which conflicts |
| # with -h for help |
| parser = optparse.OptionParser(usage="\n%s" % USAGE, |
| conflict_handler="resolve") |
| |
| parser.add_option("-l", dest="logdir", default=None, |
| help="directory to write logfiles to") |
| parser.add_option("-x", dest="executable", default=None, |
| help="executable to run on remote host(s)",) |
| parser.add_option("-f", dest="hostfile", default=None, |
| help="hostlist file (one host per line)") |
| parser.add_option("-h", dest="hostlist", default=None, metavar="HOSTS", |
| help="comma-separated list of hosts or single hostname",) |
| parser.add_option("-a", dest="auxfiles", action="append", default=[], |
| help="optional auxiliary file to upload" |
| " (can be given multiple times)", |
| metavar="FILE") |
| parser.add_option("-c", dest="command", default=None, |
| help="shell command to run on remote host(s)") |
| parser.add_option("-b", dest="batch_size", default=15, type="int", |
| help="batch-size, how many hosts to process" |
| " in parallel [15]") |
| parser.add_option("-u", dest="username", default="root", |
| help="username used to connect [root]") |
| parser.add_option("-p", dest="password", default=None, |
| help="password used to authenticate (when not" |
| " using an agent)") |
| parser.add_option("-A", dest="use_agent", default=False, action="store_true", |
| help="instead of password, use keys from an SSH agent") |
| parser.add_option("--args", dest="exec_args", default=None, |
| help="Arguments to be passed to executable (-x)") |
| |
| opts, args = parser.parse_args() |
| |
| if opts.executable and opts.command: |
| parser.error("Options -x and -c conflict with each other") |
| if not (opts.executable or opts.command): |
| parser.error("One of -x and -c must be given") |
| if opts.command and opts.exec_args: |
| parser.error("Can't specify arguments when using custom command") |
| if not opts.logdir: |
| parser.error("Option -l is required") |
| if opts.hostfile and opts.hostlist: |
| parser.error("Options -f and -h conflict with each other") |
| if not (opts.hostfile or opts.hostlist): |
| parser.error("One of -f or -h must be given") |
| if args: |
| parser.error("This program doesn't take any arguments, passed in: %s" % |
| ", ".join(args)) |
| |
| return (opts.logdir, opts.executable, opts.exec_args, |
| opts.hostfile, opts.hostlist, |
| opts.command, opts.use_agent, opts.auxfiles, opts.username, |
| opts.password, opts.batch_size) |
| |
| |
| def main(): |
| """main.""" |
| (logdir, executable, exec_args, hostfile, hostlist, |
| command, use_agent, auxfiles, username, |
| password, batch_size) = ParseOptions() |
| |
| ### Unbuffered sys.stdout |
| sys.stdout = os.fdopen(1, "w", 0) |
| |
| if LogDirUseable(logdir) is False: |
| print "ERROR: cannot create logfiles in dir %s, aborting" % logdir |
| sys.exit(1) |
| |
| if use_agent: |
| pass |
| elif password: |
| try: |
| fh = file(password) |
| pwvalue = fh.readline().strip() |
| fh.close() |
| except IOError, e: |
| print "error: can not read in from password file %s: %s" % (password, e) |
| sys.exit(1) |
| password = pwvalue |
| else: |
| password = getpass.getpass("%s's password for all nodes: " % username) |
| |
| if hostfile: |
| hosts = GetHosts(hostfile) |
| else: |
| if "," in hostlist: |
| hostlist = hostlist.rstrip(",") # commandline robustness |
| hosts = hostlist.split(",") |
| else: |
| hosts = [hostlist] |
| |
| successes = failures = 0 |
| |
| filelist = auxfiles[:] |
| filelist.append(executable) |
| |
| # initial batch |
| batch = hosts[:batch_size] |
| hosts = hosts[batch_size:] |
| child_pids = {} |
| for hostname in batch: |
| LaunchWorker(child_pids, logdir, username, password, use_agent, hostname, |
| executable, exec_args, command, filelist) |
| |
| while child_pids: |
| pid, status = os.wait() |
| hostname = child_pids.pop(pid, "<unknown host>") |
| print " %s: done (in parent)" % hostname |
| if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: |
| successes += 1 |
| else: |
| failures += 1 |
| if hosts: |
| LaunchWorker(child_pids, logdir, username, password, use_agent, |
| hosts.pop(0), executable, exec_args, command, filelist) |
| |
| print |
| print "All done, %s successful and %s failed hosts" % (successes, failures) |
| |
| sys.exit(0) |
| |
| |
| if __name__ == "__main__": |
| try: |
| main() |
| except KeyboardInterrupt: |
| print "Received KeyboardInterrupt, aborting" |
| sys.exit(1) |