blob: 517694c7208e1d1922a3e7d9d0e13fc9a585f85f [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.utils.log"""
import os
import unittest
import logging
import tempfile
import shutil
import threading
from cStringIO import StringIO
from ganeti import constants
from ganeti import errors
from ganeti import compat
from ganeti import utils
import testutils
class TestLogHandler(unittest.TestCase):
def testNormal(self):
tmpfile = tempfile.NamedTemporaryFile()
handler = utils.log._ReopenableLogHandler(tmpfile.name)
handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger = logging.Logger("TestLogger")
logger.addHandler(handler)
self.assertEqual(len(logger.handlers), 1)
logger.error("Test message ERROR")
logger.info("Test message INFO")
logger.removeHandler(handler)
self.assertFalse(logger.handlers)
handler.close()
self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)
def testReopen(self):
tmpfile = tempfile.NamedTemporaryFile()
tmpfile2 = tempfile.NamedTemporaryFile()
handler = utils.log._ReopenableLogHandler(tmpfile.name)
self.assertFalse(utils.ReadFile(tmpfile.name))
self.assertFalse(utils.ReadFile(tmpfile2.name))
logger = logging.Logger("TestLoggerReopen")
logger.addHandler(handler)
for _ in range(3):
logger.error("Test message ERROR")
handler.flush()
self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
before_id = utils.GetFileID(tmpfile.name)
handler.RequestReopen()
self.assertTrue(handler._reopen)
self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
before_id))
# Rename only after requesting reopen
os.rename(tmpfile.name, tmpfile2.name)
assert not os.path.exists(tmpfile.name)
# Write another message, should reopen
for _ in range(4):
logger.info("Test message INFO")
# Flag must be reset
self.assertFalse(handler._reopen)
self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
before_id))
logger.removeHandler(handler)
self.assertFalse(logger.handlers)
handler.close()
self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)
def testConsole(self):
for (console, check) in [(None, False),
(tempfile.NamedTemporaryFile(), True),
(self._FailingFile(os.devnull), False)]:
# Create a handler which will fail when handling errors
cls = utils.log._LogErrorsToConsole(self._FailingHandler)
# Instantiate handler with file which will fail when writing,
# provoking a write to the console
handler = cls(console, self._FailingFile(os.devnull))
logger = logging.Logger("TestLogger")
logger.addHandler(handler)
self.assertEqual(len(logger.handlers), 1)
# Provoke write
logger.error("Test message ERROR")
# Take everything apart
logger.removeHandler(handler)
self.assertFalse(logger.handlers)
handler.close()
if console and check:
console.flush()
# Check console output
consout = utils.ReadFile(console.name)
self.assertTrue("Cannot log message" in consout)
self.assertTrue("Test message ERROR" in consout)
class _FailingFile(file):
def write(self, _):
raise Exception
class _FailingHandler(logging.StreamHandler):
def handleError(self, _):
raise Exception
class TestSetupLogging(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testSimple(self):
logfile = utils.PathJoin(self.tmpdir, "basic.log")
logger = logging.Logger("TestLogger")
self.assertTrue(callable(utils.SetupLogging(logfile, "test",
console_logging=False,
syslog=constants.SYSLOG_NO,
stderr_logging=False,
multithreaded=False,
root_logger=logger)))
self.assertEqual(utils.ReadFile(logfile), "")
logger.error("This is a test")
# Ensure SetupLogging used custom logger
logging.error("This message should not show up in the test log file")
self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
def testReopen(self):
logfile = utils.PathJoin(self.tmpdir, "reopen.log")
logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
logger = logging.Logger("TestLogger")
reopen_fn = utils.SetupLogging(logfile, "test",
console_logging=False,
syslog=constants.SYSLOG_NO,
stderr_logging=False,
multithreaded=False,
root_logger=logger)
self.assertTrue(callable(reopen_fn))
self.assertEqual(utils.ReadFile(logfile), "")
logger.error("This is a test")
self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
os.rename(logfile, logfile2)
assert not os.path.exists(logfile)
# Notify logger to reopen on the next message
reopen_fn()
assert not os.path.exists(logfile)
# Provoke actual reopen
logger.error("First message")
self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))
class TestSetupToolLogging(unittest.TestCase):
def test(self):
error_name = logging.getLevelName(logging.ERROR)
warn_name = logging.getLevelName(logging.WARNING)
info_name = logging.getLevelName(logging.INFO)
debug_name = logging.getLevelName(logging.DEBUG)
for debug in [False, True]:
for verbose in [False, True]:
logger = logging.Logger("TestLogger")
buf = StringIO()
utils.SetupToolLogging(debug, verbose, _root_logger=logger, _stream=buf)
logger.error("level=error")
logger.warning("level=warning")
logger.info("level=info")
logger.debug("level=debug")
lines = buf.getvalue().splitlines()
self.assertTrue(compat.all(line.count(":") == 3 for line in lines))
messages = [line.split(":", 3)[-1].strip() for line in lines]
if debug:
self.assertEqual(messages, [
"%s level=error" % error_name,
"%s level=warning" % warn_name,
"%s level=info" % info_name,
"%s level=debug" % debug_name,
])
elif verbose:
self.assertEqual(messages, [
"%s level=error" % error_name,
"%s level=warning" % warn_name,
"%s level=info" % info_name,
])
else:
self.assertEqual(messages, [
"level=error",
"level=warning",
])
def testThreadName(self):
thread_name = threading.currentThread().getName()
for enable_threadname in [False, True]:
logger = logging.Logger("TestLogger")
buf = StringIO()
utils.SetupToolLogging(True, True, threadname=enable_threadname,
_root_logger=logger, _stream=buf)
logger.debug("test134042376")
lines = buf.getvalue().splitlines()
self.assertEqual(len(lines), 1)
if enable_threadname:
self.assertTrue((" %s " % thread_name) in lines[0])
else:
self.assertTrue(thread_name not in lines[0])
if __name__ == "__main__":
testutils.GanetiTestProgram()