| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2011 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Script for testing ganeti.utils.process""" |
| |
| import unittest |
| import tempfile |
| import shutil |
| import os |
| import stat |
| import time |
| import select |
| import signal |
| |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import errors |
| |
| import testutils |
| |
| |
| class TestIsProcessAlive(unittest.TestCase): |
| """Testing case for IsProcessAlive""" |
| |
| def testExists(self): |
| mypid = os.getpid() |
| self.assert_(utils.IsProcessAlive(mypid), "can't find myself running") |
| |
| def testNotExisting(self): |
| pid_non_existing = os.fork() |
| if pid_non_existing == 0: |
| os._exit(0) |
| elif pid_non_existing < 0: |
| raise SystemError("can't fork") |
| os.waitpid(pid_non_existing, 0) |
| self.assertFalse(utils.IsProcessAlive(pid_non_existing), |
| "nonexisting process detected") |
| |
| |
| class TestGetProcStatusPath(unittest.TestCase): |
| def test(self): |
| self.assert_("/1234/" in utils.process._GetProcStatusPath(1234)) |
| self.assertNotEqual(utils.process._GetProcStatusPath(1), |
| utils.process._GetProcStatusPath(2)) |
| |
| |
| class TestIsProcessHandlingSignal(unittest.TestCase): |
| def setUp(self): |
| self.tmpdir = tempfile.mkdtemp() |
| |
| def tearDown(self): |
| shutil.rmtree(self.tmpdir) |
| |
| def testParseSigsetT(self): |
| parse_sigset_t_fn = utils.process._ParseSigsetT |
| self.assertEqual(len(parse_sigset_t_fn("0")), 0) |
| self.assertEqual(parse_sigset_t_fn("1"), set([1])) |
| self.assertEqual(parse_sigset_t_fn("1000a"), set([2, 4, 17])) |
| self.assertEqual(parse_sigset_t_fn("810002"), set([2, 17, 24, ])) |
| self.assertEqual(parse_sigset_t_fn("0000000180000202"), |
| set([2, 10, 32, 33])) |
| self.assertEqual(parse_sigset_t_fn("0000000180000002"), |
| set([2, 32, 33])) |
| self.assertEqual(parse_sigset_t_fn("0000000188000002"), |
| set([2, 28, 32, 33])) |
| self.assertEqual(parse_sigset_t_fn("000000004b813efb"), |
| set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17, |
| 24, 25, 26, 28, 31])) |
| self.assertEqual(parse_sigset_t_fn("ffffff"), set(range(1, 25))) |
| |
| def testGetProcStatusField(self): |
| for field in ["SigCgt", "Name", "FDSize"]: |
| for value in ["", "0", "cat", " 1234 KB"]: |
| pstatus = "\n".join([ |
| "VmPeak: 999 kB", |
| "%s: %s" % (field, value), |
| "TracerPid: 0", |
| ]) |
| result = utils.process._GetProcStatusField(pstatus, field) |
| self.assertEqual(result, value.strip()) |
| |
| def test(self): |
| sp = utils.PathJoin(self.tmpdir, "status") |
| |
| utils.WriteFile(sp, data="\n".join([ |
| "Name: bash", |
| "State: S (sleeping)", |
| "SleepAVG: 98%", |
| "Pid: 22250", |
| "PPid: 10858", |
| "TracerPid: 0", |
| "SigBlk: 0000000000010000", |
| "SigIgn: 0000000000384004", |
| "SigCgt: 000000004b813efb", |
| "CapEff: 0000000000000000", |
| ])) |
| |
| self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) |
| |
| def testNoSigCgt(self): |
| sp = utils.PathJoin(self.tmpdir, "status") |
| |
| utils.WriteFile(sp, data="\n".join([ |
| "Name: bash", |
| ])) |
| |
| self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal, |
| 1234, 10, status_path=sp) |
| |
| def testNoSuchFile(self): |
| sp = utils.PathJoin(self.tmpdir, "notexist") |
| |
| self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp)) |
| |
| @staticmethod |
| def _TestRealProcess(): |
| signal.signal(signal.SIGUSR1, signal.SIG_DFL) |
| if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1): |
| raise Exception("SIGUSR1 is handled when it should not be") |
| |
| signal.signal(signal.SIGUSR1, lambda signum, frame: None) |
| if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1): |
| raise Exception("SIGUSR1 is not handled when it should be") |
| |
| signal.signal(signal.SIGUSR1, signal.SIG_IGN) |
| if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1): |
| raise Exception("SIGUSR1 is not handled when it should be") |
| |
| signal.signal(signal.SIGUSR1, signal.SIG_DFL) |
| if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1): |
| raise Exception("SIGUSR1 is handled when it should not be") |
| |
| return True |
| |
| def testRealProcess(self): |
| self.assert_(utils.RunInSeparateProcess(self._TestRealProcess)) |
| |
| |
| class _PostforkProcessReadyHelper: |
| """A helper to use with C{postfork_fn} in RunCmd. |
| |
| It makes sure a process has reached a certain state by reading from a fifo. |
| |
| @ivar write_fd: The fd number to write to |
| |
| """ |
| def __init__(self, timeout): |
| """Initialize the helper. |
| |
| @param fifo_dir: The dir where we can create the fifo |
| @param timeout: The time in seconds to wait before giving up |
| |
| """ |
| self.timeout = timeout |
| (self.read_fd, self.write_fd) = os.pipe() |
| |
| def Ready(self, pid): |
| """Waits until the process is ready. |
| |
| @param pid: The pid of the process |
| |
| """ |
| (read_ready, _, _) = select.select([self.read_fd], [], [], self.timeout) |
| |
| if not read_ready: |
| # We hit the timeout |
| raise AssertionError("Timeout %d reached while waiting for process %d" |
| " to become ready" % (self.timeout, pid)) |
| |
| def Cleanup(self): |
| """Cleans up the helper. |
| |
| """ |
| os.close(self.read_fd) |
| os.close(self.write_fd) |
| |
| |
| class TestRunCmd(testutils.GanetiTestCase): |
| """Testing case for the RunCmd function""" |
| |
| def setUp(self): |
| testutils.GanetiTestCase.setUp(self) |
| self.magic = time.ctime() + " ganeti test" |
| self.fname = self._CreateTempFile() |
| self.fifo_tmpdir = tempfile.mkdtemp() |
| self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo") |
| os.mkfifo(self.fifo_file) |
| |
| # If the process is not ready after 20 seconds we have bigger issues |
| self.proc_ready_helper = _PostforkProcessReadyHelper(20) |
| |
| def tearDown(self): |
| self.proc_ready_helper.Cleanup() |
| shutil.rmtree(self.fifo_tmpdir) |
| testutils.GanetiTestCase.tearDown(self) |
| |
| def testOk(self): |
| """Test successful exit code""" |
| result = utils.RunCmd("/bin/sh -c 'exit 0'") |
| self.assertEqual(result.exit_code, 0) |
| self.assertEqual(result.output, "") |
| |
| def testFail(self): |
| """Test fail exit code""" |
| result = utils.RunCmd("/bin/sh -c 'exit 1'") |
| self.assertEqual(result.exit_code, 1) |
| self.assertEqual(result.output, "") |
| |
| def testStdout(self): |
| """Test standard output""" |
| cmd = 'echo -n "%s"' % self.magic |
| result = utils.RunCmd("/bin/sh -c '%s'" % cmd) |
| self.assertEqual(result.stdout, self.magic) |
| result = utils.RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) |
| self.assertEqual(result.output, "") |
| self.assertFileContent(self.fname, self.magic) |
| |
| def testStderr(self): |
| """Test standard error""" |
| cmd = 'echo -n "%s"' % self.magic |
| result = utils.RunCmd("/bin/sh -c '%s' 1>&2" % cmd) |
| self.assertEqual(result.stderr, self.magic) |
| result = utils.RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname) |
| self.assertEqual(result.output, "") |
| self.assertFileContent(self.fname, self.magic) |
| |
| def testCombined(self): |
| """Test combined output""" |
| cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic) |
| expected = "A" + self.magic + "B" + self.magic |
| result = utils.RunCmd("/bin/sh -c '%s'" % cmd) |
| self.assertEqual(result.output, expected) |
| result = utils.RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname) |
| self.assertEqual(result.output, "") |
| self.assertFileContent(self.fname, expected) |
| |
| def testSignal(self): |
| """Test signal""" |
| result = utils.RunCmd(["python", "-c", |
| "import os; os.kill(os.getpid(), 15)"]) |
| self.assertEqual(result.signal, 15) |
| self.assertEqual(result.output, "") |
| |
| def testTimeoutClean(self): |
| cmd = ("trap 'exit 0' TERM; echo >&%d; read < %s" % |
| (self.proc_ready_helper.write_fd, self.fifo_file)) |
| result = utils.RunCmd(["/bin/sh", "-c", cmd], timeout=0.2, |
| noclose_fds=[self.proc_ready_helper.write_fd], |
| postfork_fn=self.proc_ready_helper.Ready) |
| self.assertEqual(result.exit_code, 0) |
| |
| def testTimeoutKill(self): |
| cmd = ["/bin/sh", "-c", "trap '' TERM; echo >&%d; read < %s" % |
| (self.proc_ready_helper.write_fd, self.fifo_file)] |
| timeout = 0.2 |
| (out, err, status, ta) = \ |
| utils.process._RunCmdPipe(cmd, {}, False, "/", False, |
| timeout, [self.proc_ready_helper.write_fd], |
| None, |
| _linger_timeout=0.2, |
| postfork_fn=self.proc_ready_helper.Ready) |
| self.assert_(status < 0) |
| self.assertEqual(-status, signal.SIGKILL) |
| |
| def testTimeoutOutputAfterTerm(self): |
| cmd = ("trap 'echo sigtermed; exit 1' TERM; echo >&%d; read < %s" % |
| (self.proc_ready_helper.write_fd, self.fifo_file)) |
| result = utils.RunCmd(["/bin/sh", "-c", cmd], timeout=0.2, |
| noclose_fds=[self.proc_ready_helper.write_fd], |
| postfork_fn=self.proc_ready_helper.Ready) |
| self.assert_(result.failed) |
| self.assertEqual(result.stdout, "sigtermed\n") |
| |
| def testListRun(self): |
| """Test list runs""" |
| result = utils.RunCmd(["true"]) |
| self.assertEqual(result.signal, None) |
| self.assertEqual(result.exit_code, 0) |
| result = utils.RunCmd(["/bin/sh", "-c", "exit 1"]) |
| self.assertEqual(result.signal, None) |
| self.assertEqual(result.exit_code, 1) |
| result = utils.RunCmd(["echo", "-n", self.magic]) |
| self.assertEqual(result.signal, None) |
| self.assertEqual(result.exit_code, 0) |
| self.assertEqual(result.stdout, self.magic) |
| |
| def testFileEmptyOutput(self): |
| """Test file output""" |
| result = utils.RunCmd(["true"], output=self.fname) |
| self.assertEqual(result.signal, None) |
| self.assertEqual(result.exit_code, 0) |
| self.assertFileContent(self.fname, "") |
| |
| def testLang(self): |
| """Test locale environment""" |
| old_env = os.environ.copy() |
| try: |
| os.environ["LANG"] = "en_US.UTF-8" |
| os.environ["LC_ALL"] = "en_US.UTF-8" |
| result = utils.RunCmd(["locale"]) |
| for line in result.output.splitlines(): |
| key, value = line.split("=", 1) |
| # Ignore these variables, they're overridden by LC_ALL |
| if key == "LANG" or key == "LANGUAGE": |
| continue |
| self.failIf(value and value != "C" and value != '"C"', |
| "Variable %s is set to the invalid value '%s'" % (key, value)) |
| finally: |
| os.environ = old_env |
| |
| def testDefaultCwd(self): |
| """Test default working directory""" |
| self.failUnlessEqual(utils.RunCmd(["pwd"]).stdout.strip(), "/") |
| |
| def testCwd(self): |
| """Test default working directory""" |
| self.failUnlessEqual(utils.RunCmd(["pwd"], cwd="/").stdout.strip(), "/") |
| self.failUnlessEqual(utils.RunCmd(["pwd"], cwd="/tmp").stdout.strip(), |
| "/tmp") |
| cwd = os.getcwd() |
| self.failUnlessEqual(utils.RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd) |
| |
| def testResetEnv(self): |
| """Test environment reset functionality""" |
| self.failUnlessEqual(utils.RunCmd(["env"], reset_env=True).stdout.strip(), |
| "") |
| self.failUnlessEqual(utils.RunCmd(["env"], reset_env=True, |
| env={"FOO": "bar",}).stdout.strip(), |
| "FOO=bar") |
| |
| def testNoFork(self): |
| """Test that nofork raise an error""" |
| self.assertFalse(utils.process._no_fork) |
| utils.DisableFork() |
| try: |
| self.assertTrue(utils.process._no_fork) |
| self.assertRaises(errors.ProgrammerError, utils.RunCmd, ["true"]) |
| finally: |
| utils.process._no_fork = False |
| self.assertFalse(utils.process._no_fork) |
| |
| def testWrongParams(self): |
| """Test wrong parameters""" |
| self.assertRaises(errors.ProgrammerError, utils.RunCmd, ["true"], |
| output="/dev/null", interactive=True) |
| |
| def testNocloseFds(self): |
| """Test selective fd retention (noclose_fds)""" |
| temp = open(self.fname, "r+") |
| try: |
| temp.write("test") |
| temp.seek(0) |
| cmd = "read -u %d; echo $REPLY" % temp.fileno() |
| result = utils.RunCmd(["/bin/bash", "-c", cmd]) |
| self.assertEqual(result.stdout.strip(), "") |
| temp.seek(0) |
| result = utils.RunCmd(["/bin/bash", "-c", cmd], |
| noclose_fds=[temp.fileno()]) |
| self.assertEqual(result.stdout.strip(), "test") |
| finally: |
| temp.close() |
| |
| def testNoInputRead(self): |
| testfile = testutils.TestDataFilename("cert1.pem") |
| |
| result = utils.RunCmd(["cat"], timeout=10.0) |
| self.assertFalse(result.failed) |
| self.assertEqual(result.stderr, "") |
| self.assertEqual(result.stdout, "") |
| |
| def testInputFileHandle(self): |
| testfile = testutils.TestDataFilename("cert1.pem") |
| |
| result = utils.RunCmd(["cat"], input_fd=open(testfile, "r")) |
| self.assertFalse(result.failed) |
| self.assertEqual(result.stdout, utils.ReadFile(testfile)) |
| self.assertEqual(result.stderr, "") |
| |
| def testInputNumericFileDescriptor(self): |
| testfile = testutils.TestDataFilename("cert2.pem") |
| |
| fh = open(testfile, "r") |
| try: |
| result = utils.RunCmd(["cat"], input_fd=fh.fileno()) |
| finally: |
| fh.close() |
| |
| self.assertFalse(result.failed) |
| self.assertEqual(result.stdout, utils.ReadFile(testfile)) |
| self.assertEqual(result.stderr, "") |
| |
| def testInputWithCloseFds(self): |
| testfile = testutils.TestDataFilename("cert1.pem") |
| |
| temp = open(self.fname, "r+") |
| try: |
| temp.write("test283523367") |
| temp.seek(0) |
| |
| result = utils.RunCmd(["/bin/bash", "-c", |
| ("cat && read -u %s; echo $REPLY" % |
| temp.fileno())], |
| input_fd=open(testfile, "r"), |
| noclose_fds=[temp.fileno()]) |
| self.assertFalse(result.failed) |
| self.assertEqual(result.stdout.strip(), |
| utils.ReadFile(testfile) + "test283523367") |
| self.assertEqual(result.stderr, "") |
| finally: |
| temp.close() |
| |
| def testOutputAndInteractive(self): |
| self.assertRaises(errors.ProgrammerError, utils.RunCmd, |
| [], output=self.fname, interactive=True) |
| |
| def testOutputAndInput(self): |
| self.assertRaises(errors.ProgrammerError, utils.RunCmd, |
| [], output=self.fname, input_fd=open(self.fname)) |
| |
| |
| class TestRunParts(testutils.GanetiTestCase): |
| """Testing case for the RunParts function""" |
| |
| def setUp(self): |
| self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp") |
| |
| def tearDown(self): |
| shutil.rmtree(self.rundir) |
| |
| def testEmpty(self): |
| """Test on an empty dir""" |
| self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True), []) |
| |
| def testSkipWrongName(self): |
| """Test that wrong files are skipped""" |
| fname = os.path.join(self.rundir, "00test.dot") |
| utils.WriteFile(fname, data="") |
| os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
| relname = os.path.basename(fname) |
| self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True), |
| [(relname, constants.RUNPARTS_SKIP, None)]) |
| |
| def testSkipNonExec(self): |
| """Test that non executable files are skipped""" |
| fname = os.path.join(self.rundir, "00test") |
| utils.WriteFile(fname, data="") |
| relname = os.path.basename(fname) |
| self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True), |
| [(relname, constants.RUNPARTS_SKIP, None)]) |
| |
| def testError(self): |
| """Test error on a broken executable""" |
| fname = os.path.join(self.rundir, "00test") |
| utils.WriteFile(fname, data="") |
| os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
| (relname, status, error) = utils.RunParts(self.rundir, reset_env=True)[0] |
| self.failUnlessEqual(relname, os.path.basename(fname)) |
| self.failUnlessEqual(status, constants.RUNPARTS_ERR) |
| self.failUnless(error) |
| |
| def testSorted(self): |
| """Test executions are sorted""" |
| files = [] |
| files.append(os.path.join(self.rundir, "64test")) |
| files.append(os.path.join(self.rundir, "00test")) |
| files.append(os.path.join(self.rundir, "42test")) |
| |
| for fname in files: |
| utils.WriteFile(fname, data="") |
| |
| results = utils.RunParts(self.rundir, reset_env=True) |
| |
| for fname in sorted(files): |
| self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0]) |
| |
| def testOk(self): |
| """Test correct execution""" |
| fname = os.path.join(self.rundir, "00test") |
| utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao") |
| os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
| (relname, status, runresult) = \ |
| utils.RunParts(self.rundir, reset_env=True)[0] |
| self.failUnlessEqual(relname, os.path.basename(fname)) |
| self.failUnlessEqual(status, constants.RUNPARTS_RUN) |
| self.failUnlessEqual(runresult.stdout, "ciao") |
| |
| def testRunFail(self): |
| """Test correct execution, with run failure""" |
| fname = os.path.join(self.rundir, "00test") |
| utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1") |
| os.chmod(fname, stat.S_IREAD | stat.S_IEXEC) |
| (relname, status, runresult) = \ |
| utils.RunParts(self.rundir, reset_env=True)[0] |
| self.failUnlessEqual(relname, os.path.basename(fname)) |
| self.failUnlessEqual(status, constants.RUNPARTS_RUN) |
| self.failUnlessEqual(runresult.exit_code, 1) |
| self.failUnless(runresult.failed) |
| |
| def testRunMix(self): |
| files = [] |
| files.append(os.path.join(self.rundir, "00test")) |
| files.append(os.path.join(self.rundir, "42test")) |
| files.append(os.path.join(self.rundir, "64test")) |
| files.append(os.path.join(self.rundir, "99test")) |
| |
| files.sort() |
| |
| # 1st has errors in execution |
| utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1") |
| os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC) |
| |
| # 2nd is skipped |
| utils.WriteFile(files[1], data="") |
| |
| # 3rd cannot execute properly |
| utils.WriteFile(files[2], data="") |
| os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC) |
| |
| # 4th execs |
| utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao") |
| os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC) |
| |
| results = utils.RunParts(self.rundir, reset_env=True) |
| |
| (relname, status, runresult) = results[0] |
| self.failUnlessEqual(relname, os.path.basename(files[0])) |
| self.failUnlessEqual(status, constants.RUNPARTS_RUN) |
| self.failUnlessEqual(runresult.exit_code, 1) |
| self.failUnless(runresult.failed) |
| |
| (relname, status, runresult) = results[1] |
| self.failUnlessEqual(relname, os.path.basename(files[1])) |
| self.failUnlessEqual(status, constants.RUNPARTS_SKIP) |
| self.failUnlessEqual(runresult, None) |
| |
| (relname, status, runresult) = results[2] |
| self.failUnlessEqual(relname, os.path.basename(files[2])) |
| self.failUnlessEqual(status, constants.RUNPARTS_ERR) |
| self.failUnless(runresult) |
| |
| (relname, status, runresult) = results[3] |
| self.failUnlessEqual(relname, os.path.basename(files[3])) |
| self.failUnlessEqual(status, constants.RUNPARTS_RUN) |
| self.failUnlessEqual(runresult.output, "ciao") |
| self.failUnlessEqual(runresult.exit_code, 0) |
| self.failUnless(not runresult.failed) |
| |
| def testMissingDirectory(self): |
| nosuchdir = utils.PathJoin(self.rundir, "no/such/directory") |
| self.assertEqual(utils.RunParts(nosuchdir), []) |
| |
| |
| class TestStartDaemon(testutils.GanetiTestCase): |
| def setUp(self): |
| self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test") |
| self.tmpfile = os.path.join(self.tmpdir, "test") |
| |
| def tearDown(self): |
| shutil.rmtree(self.tmpdir) |
| |
| def testShell(self): |
| utils.StartDaemon("echo Hello World > %s" % self.tmpfile) |
| self._wait(self.tmpfile, 60.0, "Hello World") |
| |
| def testShellOutput(self): |
| utils.StartDaemon("echo Hello World", output=self.tmpfile) |
| self._wait(self.tmpfile, 60.0, "Hello World") |
| |
| def testNoShellNoOutput(self): |
| utils.StartDaemon(["pwd"]) |
| |
| def testNoShellNoOutputTouch(self): |
| testfile = os.path.join(self.tmpdir, "check") |
| self.failIf(os.path.exists(testfile)) |
| utils.StartDaemon(["touch", testfile]) |
| self._wait(testfile, 60.0, "") |
| |
| def testNoShellOutput(self): |
| utils.StartDaemon(["pwd"], output=self.tmpfile) |
| self._wait(self.tmpfile, 60.0, "/") |
| |
| def testNoShellOutputCwd(self): |
| utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd()) |
| self._wait(self.tmpfile, 60.0, os.getcwd()) |
| |
| def testShellEnv(self): |
| utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile, |
| env={ "GNT_TEST_VAR": "Hello World", }) |
| self._wait(self.tmpfile, 60.0, "Hello World") |
| |
| def testNoShellEnv(self): |
| utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile, |
| env={ "GNT_TEST_VAR": "Hello World", }) |
| self._wait(self.tmpfile, 60.0, "Hello World") |
| |
| def testOutputFd(self): |
| fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT) |
| try: |
| utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd()) |
| finally: |
| os.close(fd) |
| self._wait(self.tmpfile, 60.0, os.getcwd()) |
| |
| def testPid(self): |
| pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile) |
| self._wait(self.tmpfile, 60.0, str(pid)) |
| |
| def testPidFile(self): |
| pidfile = os.path.join(self.tmpdir, "pid") |
| checkfile = os.path.join(self.tmpdir, "abort") |
| |
| pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile, |
| output=self.tmpfile) |
| try: |
| fd = os.open(pidfile, os.O_RDONLY) |
| try: |
| # Check file is locked |
| self.assertRaises(errors.LockError, utils.LockFile, fd) |
| |
| pidtext = os.read(fd, 100) |
| finally: |
| os.close(fd) |
| |
| self.assertEqual(int(pidtext.strip()), pid) |
| |
| self.assert_(utils.IsProcessAlive(pid)) |
| finally: |
| # No matter what happens, kill daemon |
| utils.KillProcess(pid, timeout=5.0, waitpid=False) |
| self.failIf(utils.IsProcessAlive(pid)) |
| |
| self.assertEqual(utils.ReadFile(self.tmpfile), "") |
| |
| def _wait(self, path, timeout, expected): |
| # Due to the asynchronous nature of daemon processes, polling is necessary. |
| # A timeout makes sure the test doesn't hang forever. |
| def _CheckFile(): |
| if not (os.path.isfile(path) and |
| utils.ReadFile(path).strip() == expected): |
| raise utils.RetryAgain() |
| |
| try: |
| utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout) |
| except utils.RetryTimeout: |
| self.fail("Apparently the daemon didn't run in %s seconds and/or" |
| " didn't write the correct output" % timeout) |
| |
| def testError(self): |
| self.assertRaises(errors.OpExecError, utils.StartDaemon, |
| ["./does-NOT-EXIST/here/0123456789"]) |
| self.assertRaises(errors.OpExecError, utils.StartDaemon, |
| ["./does-NOT-EXIST/here/0123456789"], |
| output=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
| self.assertRaises(errors.OpExecError, utils.StartDaemon, |
| ["./does-NOT-EXIST/here/0123456789"], |
| cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
| self.assertRaises(errors.OpExecError, utils.StartDaemon, |
| ["./does-NOT-EXIST/here/0123456789"], |
| output=os.path.join(self.tmpdir, "DIR/NOT/EXIST")) |
| |
| fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT) |
| try: |
| self.assertRaises(errors.ProgrammerError, utils.StartDaemon, |
| ["./does-NOT-EXIST/here/0123456789"], |
| output=self.tmpfile, output_fd=fd) |
| finally: |
| os.close(fd) |
| |
| |
| class RunInSeparateProcess(unittest.TestCase): |
| def test(self): |
| for exp in [True, False]: |
| def _child(): |
| return exp |
| |
| self.assertEqual(exp, utils.RunInSeparateProcess(_child)) |
| |
| def testArgs(self): |
| for arg in [0, 1, 999, "Hello World", (1, 2, 3)]: |
| def _child(carg1, carg2): |
| return carg1 == "Foo" and carg2 == arg |
| |
| self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg)) |
| |
| def testPid(self): |
| parent_pid = os.getpid() |
| |
| def _check(): |
| return os.getpid() == parent_pid |
| |
| self.failIf(utils.RunInSeparateProcess(_check)) |
| |
| def testSignal(self): |
| def _kill(): |
| os.kill(os.getpid(), signal.SIGTERM) |
| |
| self.assertRaises(errors.GenericError, |
| utils.RunInSeparateProcess, _kill) |
| |
| def testException(self): |
| def _exc(): |
| raise errors.GenericError("This is a test") |
| |
| self.assertRaises(errors.GenericError, |
| utils.RunInSeparateProcess, _exc) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |