blob: ad223f1d2f426cfe5efb2f34875c46cb47b40737 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Script for unittesting the netutils module"""
import os
import re
import shutil
import socket
import tempfile
import unittest
import testutils
from ganeti import constants
from ganeti import errors
from ganeti import netutils
from ganeti import serializer
from ganeti import utils
def _GetSocketCredentials(path):
"""Connect to a Unix socket and return remote credentials.
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.settimeout(10)
sock.connect(path)
return netutils.GetSocketCredentials(sock)
finally:
sock.close()
class TestGetSocketCredentials(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.sockpath = utils.PathJoin(self.tmpdir, "sock")
self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.listener.settimeout(10)
self.listener.bind(self.sockpath)
self.listener.listen(1)
def tearDown(self):
self.listener.shutdown(socket.SHUT_RDWR)
self.listener.close()
shutil.rmtree(self.tmpdir)
def test(self):
(c2pr, c2pw) = os.pipe()
# Start child process
child = os.fork()
if child == 0:
try:
data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
os.write(c2pw, data)
os.close(c2pw)
os._exit(0)
finally:
os._exit(1)
os.close(c2pw)
# Wait for one connection
(conn, _) = self.listener.accept()
conn.recv(1)
conn.close()
# Wait for result
result = os.read(c2pr, 4096)
os.close(c2pr)
# Check child's exit code
(_, status) = os.waitpid(child, 0)
self.assertFalse(os.WIFSIGNALED(status))
self.assertEqual(os.WEXITSTATUS(status), 0)
# Check result
(pid, uid, gid) = serializer.LoadJson(result)
self.assertEqual(pid, os.getpid())
self.assertEqual(uid, os.getuid())
self.assertEqual(gid, os.getgid())
class TestHostname(unittest.TestCase):
"""Testing case for Hostname"""
def testUppercase(self):
data = "AbC.example.com"
self.assertEqual(netutils.Hostname.GetNormalizedName(data), data.lower())
def testTooLongName(self):
data = "a.b." + "c" * 255
self.assertRaises(errors.OpPrereqError,
netutils.Hostname.GetNormalizedName, data)
def testTrailingDot(self):
data = "a.b.c"
self.assertEqual(netutils.Hostname.GetNormalizedName(data + "."), data)
def testInvalidName(self):
data = [
"a b",
"a/b",
".a.b",
"a..b",
]
for value in data:
self.assertRaises(errors.OpPrereqError,
netutils.Hostname.GetNormalizedName, value)
def testValidName(self):
data = [
"a.b",
"a-b",
"a_b",
"a.b.c",
]
for value in data:
self.assertEqual(netutils.Hostname.GetNormalizedName(value), value)
class TestIPAddress(unittest.TestCase):
def testIsValid(self):
self.assert_(netutils.IPAddress.IsValid("0.0.0.0"))
self.assert_(netutils.IPAddress.IsValid("127.0.0.1"))
self.assert_(netutils.IPAddress.IsValid("::"))
self.assert_(netutils.IPAddress.IsValid("::1"))
def testNotIsValid(self):
self.assertFalse(netutils.IPAddress.IsValid("0"))
self.assertFalse(netutils.IPAddress.IsValid("1.1.1.256"))
self.assertFalse(netutils.IPAddress.IsValid("a:g::1"))
def testGetAddressFamily(self):
fn = netutils.IPAddress.GetAddressFamily
self.assertEqual(fn("127.0.0.1"), socket.AF_INET)
self.assertEqual(fn("10.2.0.127"), socket.AF_INET)
self.assertEqual(fn("::1"), socket.AF_INET6)
self.assertEqual(fn("2001:db8::1"), socket.AF_INET6)
self.assertRaises(errors.IPAddressError, fn, "0")
def testValidateNetmask(self):
for netmask in [0, 33]:
self.assertFalse(netutils.IP4Address.ValidateNetmask(netmask))
for netmask in [1, 32]:
self.assertTrue(netutils.IP4Address.ValidateNetmask(netmask))
for netmask in [0, 129]:
self.assertFalse(netutils.IP6Address.ValidateNetmask(netmask))
for netmask in [1, 128]:
self.assertTrue(netutils.IP6Address.ValidateNetmask(netmask))
def testGetClassFromX(self):
self.assert_(
netutils.IPAddress.GetClassFromIpVersion(constants.IP4_VERSION) ==
netutils.IP4Address)
self.assert_(
netutils.IPAddress.GetClassFromIpVersion(constants.IP6_VERSION) ==
netutils.IP6Address)
self.assert_(
netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET) ==
netutils.IP4Address)
self.assert_(
netutils.IPAddress.GetClassFromIpFamily(socket.AF_INET6) ==
netutils.IP6Address)
def testOwnLoopback(self):
# FIXME: In a pure IPv6 environment this is no longer true
self.assert_(netutils.IPAddress.Own("127.0.0.1"),
"Should own 127.0.0.1 address")
def testNotOwnAddress(self):
self.assertFalse(netutils.IPAddress.Own("2001:db8::1"),
"Should not own IP address 2001:db8::1")
self.assertFalse(netutils.IPAddress.Own("192.0.2.1"),
"Should not own IP address 192.0.2.1")
def testFamilyVersionConversions(self):
# IPAddress.GetAddressFamilyFromVersion
self.assertEqual(
netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP4_VERSION),
socket.AF_INET)
self.assertEqual(
netutils.IPAddress.GetAddressFamilyFromVersion(constants.IP6_VERSION),
socket.AF_INET6)
self.assertRaises(errors.ProgrammerError,
netutils.IPAddress.GetAddressFamilyFromVersion, 3)
# IPAddress.GetVersionFromAddressFamily
self.assertEqual(
netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET),
constants.IP4_VERSION)
self.assertEqual(
netutils.IPAddress.GetVersionFromAddressFamily(socket.AF_INET6),
constants.IP6_VERSION)
self.assertRaises(errors.ProgrammerError,
netutils.IPAddress.GetVersionFromAddressFamily, socket.AF_UNIX)
class TestIP4Address(unittest.TestCase):
def testGetIPIntFromString(self):
fn = netutils.IP4Address._GetIPIntFromString
self.assertEqual(fn("0.0.0.0"), 0)
self.assertEqual(fn("0.0.0.1"), 1)
self.assertEqual(fn("127.0.0.1"), 2130706433)
self.assertEqual(fn("192.0.2.129"), 3221226113)
self.assertEqual(fn("255.255.255.255"), 2**32 - 1)
self.assertNotEqual(fn("0.0.0.0"), 1)
self.assertNotEqual(fn("0.0.0.0"), 1)
def testIsValid(self):
self.assert_(netutils.IP4Address.IsValid("0.0.0.0"))
self.assert_(netutils.IP4Address.IsValid("127.0.0.1"))
self.assert_(netutils.IP4Address.IsValid("192.0.2.199"))
self.assert_(netutils.IP4Address.IsValid("255.255.255.255"))
def testNotIsValid(self):
self.assertFalse(netutils.IP4Address.IsValid("0"))
self.assertFalse(netutils.IP4Address.IsValid("1"))
self.assertFalse(netutils.IP4Address.IsValid("1.1.1"))
self.assertFalse(netutils.IP4Address.IsValid("255.255.255.256"))
self.assertFalse(netutils.IP4Address.IsValid("::1"))
def testInNetwork(self):
self.assert_(netutils.IP4Address.InNetwork("127.0.0.0/8", "127.0.0.1"))
def testNotInNetwork(self):
self.assertFalse(netutils.IP4Address.InNetwork("192.0.2.0/24",
"127.0.0.1"))
def testIsLoopback(self):
self.assert_(netutils.IP4Address.IsLoopback("127.0.0.1"))
def testNotIsLoopback(self):
self.assertFalse(netutils.IP4Address.IsLoopback("192.0.2.1"))
class TestIP6Address(unittest.TestCase):
def testGetIPIntFromString(self):
fn = netutils.IP6Address._GetIPIntFromString
self.assertEqual(fn("::"), 0)
self.assertEqual(fn("::1"), 1)
self.assertEqual(fn("2001:db8::1"),
42540766411282592856903984951653826561L)
self.assertEqual(fn("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), 2**128-1)
self.assertNotEqual(netutils.IP6Address._GetIPIntFromString("::2"), 1)
def testIsValid(self):
self.assert_(netutils.IP6Address.IsValid("::"))
self.assert_(netutils.IP6Address.IsValid("::1"))
self.assert_(netutils.IP6Address.IsValid("1" + (":1" * 7)))
self.assert_(netutils.IP6Address.IsValid("ffff" + (":ffff" * 7)))
self.assert_(netutils.IP6Address.IsValid("::"))
def testNotIsValid(self):
self.assertFalse(netutils.IP6Address.IsValid("0"))
self.assertFalse(netutils.IP6Address.IsValid(":1"))
self.assertFalse(netutils.IP6Address.IsValid("f" + (":f" * 6)))
self.assertFalse(netutils.IP6Address.IsValid("fffg" + (":ffff" * 7)))
self.assertFalse(netutils.IP6Address.IsValid("fffff" + (":ffff" * 7)))
self.assertFalse(netutils.IP6Address.IsValid("1" + (":1" * 8)))
self.assertFalse(netutils.IP6Address.IsValid("127.0.0.1"))
def testInNetwork(self):
self.assert_(netutils.IP6Address.InNetwork("::1/128", "::1"))
def testNotInNetwork(self):
self.assertFalse(netutils.IP6Address.InNetwork("2001:db8::1/128", "::1"))
def testIsLoopback(self):
self.assert_(netutils.IP6Address.IsLoopback("::1"))
def testNotIsLoopback(self):
self.assertFalse(netutils.IP6Address.IsLoopback("2001:db8::1"))
class _BaseTcpPingTest:
"""Base class for TcpPing tests against listen(2)ing port"""
family = None
address = None
def setUp(self):
self.listener = socket.socket(self.family, socket.SOCK_STREAM)
self.listener.bind((self.address, 0))
self.listenerport = self.listener.getsockname()[1]
self.listener.listen(1)
def tearDown(self):
self.listener.shutdown(socket.SHUT_RDWR)
del self.listener
del self.listenerport
def testTcpPingToLocalHostAccept(self):
self.assert_(netutils.TcpPing(self.address,
self.listenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True,
source=self.address,
),
"failed to connect to test listener")
self.assert_(netutils.TcpPing(self.address, self.listenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True),
"failed to connect to test listener (no source)")
class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
"""Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
family = socket.AF_INET
address = constants.IP4_ADDRESS_LOCALHOST
def setUp(self):
unittest.TestCase.setUp(self)
_BaseTcpPingTest.setUp(self)
def tearDown(self):
unittest.TestCase.tearDown(self)
_BaseTcpPingTest.tearDown(self)
class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
"""Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
family = socket.AF_INET6
address = constants.IP6_ADDRESS_LOCALHOST
def setUp(self):
unittest.TestCase.setUp(self)
_BaseTcpPingTest.setUp(self)
def tearDown(self):
unittest.TestCase.tearDown(self)
_BaseTcpPingTest.tearDown(self)
class _BaseTcpPingDeafTest:
"""Base class for TcpPing tests against non listen(2)ing port"""
family = None
address = None
def setUp(self):
self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
self.deaflistener.bind((self.address, 0))
self.deaflistenerport = self.deaflistener.getsockname()[1]
def tearDown(self):
del self.deaflistener
del self.deaflistenerport
def testTcpPingToLocalHostAcceptDeaf(self):
self.assertFalse(netutils.TcpPing(self.address,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True,
source=self.address,
), # need successful connect(2)
"successfully connected to deaf listener")
self.assertFalse(netutils.TcpPing(self.address,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True,
), # need successful connect(2)
"successfully connected to deaf listener (no source)")
def testTcpPingToLocalHostNoAccept(self):
self.assert_(netutils.TcpPing(self.address,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=False,
source=self.address,
), # ECONNREFUSED is OK
"failed to ping alive host on deaf port")
self.assert_(netutils.TcpPing(self.address,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=False,
), # ECONNREFUSED is OK
"failed to ping alive host on deaf port (no source)")
class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
"""Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
family = socket.AF_INET
address = constants.IP4_ADDRESS_LOCALHOST
def setUp(self):
self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
self.deaflistener.bind((self.address, 0))
self.deaflistenerport = self.deaflistener.getsockname()[1]
def tearDown(self):
del self.deaflistener
del self.deaflistenerport
class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
"""Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
family = socket.AF_INET6
address = constants.IP6_ADDRESS_LOCALHOST
def setUp(self):
unittest.TestCase.setUp(self)
_BaseTcpPingDeafTest.setUp(self)
def tearDown(self):
unittest.TestCase.tearDown(self)
_BaseTcpPingDeafTest.tearDown(self)
class TestFormatAddress(unittest.TestCase):
"""Testcase for FormatAddress"""
def testFormatAddressUnixSocket(self):
res1 = netutils.FormatAddress(("12352", 0, 0), family=socket.AF_UNIX)
self.assertEqual(res1, "pid=12352, uid=0, gid=0")
def testFormatAddressIP4(self):
res1 = netutils.FormatAddress(("127.0.0.1", 1234), family=socket.AF_INET)
self.assertEqual(res1, "127.0.0.1:1234")
res2 = netutils.FormatAddress(("192.0.2.32", None), family=socket.AF_INET)
self.assertEqual(res2, "192.0.2.32")
def testFormatAddressIP6(self):
res1 = netutils.FormatAddress(("::1", 1234), family=socket.AF_INET6)
self.assertEqual(res1, "[::1]:1234")
res2 = netutils.FormatAddress(("::1", None), family=socket.AF_INET6)
self.assertEqual(res2, "[::1]")
res2 = netutils.FormatAddress(("2001:db8::beef", "80"),
family=socket.AF_INET6)
self.assertEqual(res2, "[2001:db8::beef]:80")
def testFormatAddressWithoutFamily(self):
res1 = netutils.FormatAddress(("127.0.0.1", 1234))
self.assertEqual(res1, "127.0.0.1:1234")
res2 = netutils.FormatAddress(("::1", 1234))
self.assertEqual(res2, "[::1]:1234")
def testInvalidFormatAddress(self):
self.assertRaises(errors.ParameterError, netutils.FormatAddress,
"127.0.0.1")
self.assertRaises(errors.ParameterError, netutils.FormatAddress,
"127.0.0.1", family=socket.AF_INET)
self.assertRaises(errors.ParameterError, netutils.FormatAddress,
("::1"), family=socket.AF_INET )
class TestIpParsing(testutils.GanetiTestCase):
"""Test the code that parses the ip command output"""
def testIp4(self):
valid_addresses = [constants.IP4_ADDRESS_ANY,
constants.IP4_ADDRESS_LOCALHOST,
"192.0.2.1", # RFC5737, IPv4 address blocks for docs
"198.51.100.1",
"203.0.113.1",
]
for addr in valid_addresses:
self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
def testIp6(self):
valid_addresses = [constants.IP6_ADDRESS_ANY,
constants.IP6_ADDRESS_LOCALHOST,
"0:0:0:0:0:0:0:1", # other form for IP6_ADDRESS_LOCALHOST
"0:0:0:0:0:0:0:0", # other form for IP6_ADDRESS_ANY
"2001:db8:85a3::8a2e:370:7334", # RFC3849 IP6 docs block
"2001:0db8:85a3:0000:0000:8a2e:0370:7334",
"0:0:0:0:0:FFFF:192.0.2.1", # IPv4-compatible IPv6
"::FFFF:192.0.2.1",
"0:0:0:0:0:0:203.0.113.1", # IPv4-mapped IPv6
"::203.0.113.1",
]
for addr in valid_addresses:
self.failUnless(re.search(netutils._IP_RE_TEXT, addr))
def testParseIpCommandOutput(self):
# IPv4-only, fake loopback interface
tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
for test_file in tests:
data = testutils.ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
addr[6])
# IPv6-only, fake loopback interface
tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
for test_file in tests:
data = testutils.ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
# IPv4 and IPv6, fake loopback interface
tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
for test_file in tests:
data = testutils.ReadTestData(test_file)
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
# IPv4 and IPv6, dummy interface
data = testutils.ReadTestData("ip-addr-show-dummy0.txt")
addr = netutils._GetIpAddressesFromIpOutput(data)
self.failUnless(len(addr[6]) == 1 and
addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
len(addr[4]) == 1 and
addr[4][0] == "192.0.2.1")
class TestValidatePortNumber(unittest.TestCase):
"""Test netutils.ValidatePortNumber"""
def testPortNumberInt(self):
self.assertRaises(ValueError, lambda: \
netutils.ValidatePortNumber(500000))
self.assertEqual(netutils.ValidatePortNumber(5000), 5000)
def testPortNumberStr(self):
self.assertRaises(ValueError, lambda: \
netutils.ValidatePortNumber("pinky bunny"))
self.assertEqual(netutils.ValidatePortNumber("5000"), 5000)
if __name__ == "__main__":
testutils.GanetiTestProgram()