blob: e4685fe7ccba0c82ef88e4a28623a5630678eb63 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2006, 2007, 2010, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the bdev module"""
import os
import random
import unittest
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import objects
from ganeti import utils
from ganeti.storage import bdev
import testutils
class TestRADOSBlockDevice(testutils.GanetiTestCase):
def setUp(self):
"""Set up input data"""
testutils.GanetiTestCase.setUp(self)
self.plain_output_old_ok = \
testutils.ReadTestData("bdev-rbd/plain_output_old_ok.txt")
self.plain_output_old_no_matches = \
testutils.ReadTestData("bdev-rbd/plain_output_old_no_matches.txt")
self.plain_output_old_extra_matches = \
testutils.ReadTestData("bdev-rbd/plain_output_old_extra_matches.txt")
self.plain_output_old_empty = \
testutils.ReadTestData("bdev-rbd/plain_output_old_empty.txt")
self.plain_output_new_ok = \
testutils.ReadTestData("bdev-rbd/plain_output_new_ok.txt")
self.plain_output_new_no_matches = \
testutils.ReadTestData("bdev-rbd/plain_output_new_no_matches.txt")
self.plain_output_new_extra_matches = \
testutils.ReadTestData("bdev-rbd/plain_output_new_extra_matches.txt")
# This file is completely empty, and as such it's not shipped.
self.plain_output_new_empty = ""
self.json_output_ok = testutils.ReadTestData("bdev-rbd/json_output_ok.txt")
self.json_output_no_matches = \
testutils.ReadTestData("bdev-rbd/json_output_no_matches.txt")
self.json_output_extra_matches = \
testutils.ReadTestData("bdev-rbd/json_output_extra_matches.txt")
self.json_output_empty = \
testutils.ReadTestData("bdev-rbd/json_output_empty.txt")
self.output_invalid = testutils.ReadTestData("bdev-rbd/output_invalid.txt")
self.volume_name = "d7ab910a-4933-4ffe-88d0-faf2ce31390a.rbd.disk0"
def test_ParseRbdShowmappedJson(self):
parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedJson
self.assertEqual(parse_function(self.json_output_ok, self.volume_name),
"/dev/rbd3")
self.assertEqual(parse_function(self.json_output_empty, self.volume_name),
None)
self.assertEqual(parse_function(self.json_output_no_matches,
self.volume_name), None)
self.assertRaises(errors.BlockDeviceError, parse_function,
self.json_output_extra_matches, self.volume_name)
self.assertRaises(errors.BlockDeviceError, parse_function,
self.output_invalid, self.volume_name)
def test_ParseRbdShowmappedPlain(self):
parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedPlain
self.assertEqual(parse_function(self.plain_output_new_ok,
self.volume_name), "/dev/rbd3")
self.assertEqual(parse_function(self.plain_output_old_ok,
self.volume_name), "/dev/rbd3")
self.assertEqual(parse_function(self.plain_output_new_empty,
self.volume_name), None)
self.assertEqual(parse_function(self.plain_output_old_empty,
self.volume_name), None)
self.assertEqual(parse_function(self.plain_output_new_no_matches,
self.volume_name), None)
self.assertEqual(parse_function(self.plain_output_old_no_matches,
self.volume_name), None)
self.assertRaises(errors.BlockDeviceError, parse_function,
self.plain_output_new_extra_matches, self.volume_name)
self.assertRaises(errors.BlockDeviceError, parse_function,
self.plain_output_old_extra_matches, self.volume_name)
self.assertRaises(errors.BlockDeviceError, parse_function,
self.output_invalid, self.volume_name)
class TestExclusiveStoragePvs(unittest.TestCase):
"""Test cases for functions dealing with LVM PV and exclusive storage"""
# Allowance for rounding
_EPS = 1e-4
_MARGIN = constants.PART_MARGIN + constants.PART_RESERVED + _EPS
@staticmethod
def _GenerateRandomPvInfo(rnd, name, vg):
# Granularity is .01 MiB
size = rnd.randint(1024 * 100, 10 * 1024 * 1024 * 100)
if rnd.choice([False, True]):
free = float(rnd.randint(0, size)) / 100.0
else:
free = float(size) / 100.0
size = float(size) / 100.0
attr = "a-"
return objects.LvmPvInfo(name=name, vg_name=vg, size=size, free=free,
attributes=attr)
def testGetStdPvSize(self):
"""Test cases for bdev.LogicalVolume._GetStdPvSize()"""
rnd = random.Random(9517)
for _ in range(0, 50):
# Identical volumes
pvi = self._GenerateRandomPvInfo(rnd, "disk", "myvg")
onesize = bdev.LogicalVolume._GetStdPvSize([pvi])
self.assertTrue(onesize <= pvi.size)
self.assertTrue(onesize > pvi.size * (1 - self._MARGIN))
for length in range(2, 10):
n_size = bdev.LogicalVolume._GetStdPvSize([pvi] * length)
self.assertEqual(onesize, n_size)
# Mixed volumes
for length in range(1, 10):
pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")
for _ in range(0, length)]
std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
self.assertTrue(compat.all(std_size <= pvi.size for pvi in pvlist))
self.assertTrue(compat.any(std_size > pvi.size * (1 - self._MARGIN)
for pvi in pvlist))
pvlist.append(pvlist[0])
p1_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
self.assertEqual(std_size, p1_size)
def testComputeNumPvs(self):
"""Test cases for bdev.LogicalVolume._ComputeNumPvs()"""
rnd = random.Random(8067)
for _ in range(0, 1000):
pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")]
lv_size = float(rnd.randint(10 * 100, 1024 * 1024 * 100)) / 100.0
num_pv = bdev.LogicalVolume._ComputeNumPvs(lv_size, pvlist)
std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
self.assertTrue(num_pv >= 1)
self.assertTrue(num_pv * std_size >= lv_size)
self.assertTrue((num_pv - 1) * std_size < lv_size * (1 + self._EPS))
def testGetEmptyPvNames(self):
"""Test cases for bdev.LogicalVolume._GetEmptyPvNames()"""
rnd = random.Random(21126)
for _ in range(0, 100):
num_pvs = rnd.randint(1, 20)
pvlist = [self._GenerateRandomPvInfo(rnd, "disk%d" % n, "myvg")
for n in range(0, num_pvs)]
for num_req in range(1, num_pvs + 2):
epvs = bdev.LogicalVolume._GetEmptyPvNames(pvlist, num_req)
epvs_set = compat.UniqueFrozenset(epvs)
if len(epvs) > 1:
self.assertEqual(len(epvs), len(epvs_set))
for pvi in pvlist:
if pvi.name in epvs_set:
self.assertEqual(pvi.size, pvi.free)
else:
# There should be no remaining empty PV when less than the
# requeste number of PVs has been returned
self.assertTrue(len(epvs) == num_req or pvi.free != pvi.size)
class TestLogicalVolume(unittest.TestCase):
"""Tests for bdev.LogicalVolume."""
def testParseLvInfoLine(self):
"""Tests for LogicalVolume._ParseLvInfoLine."""
broken_lines = [
" toomuch#-wi-ao#253#3#4096.00#2#/dev/abc(20)",
" -wi-ao#253#3#4096.00#/dev/abc(20)",
" -wi-a#253#3#4096.00#2#/dev/abc(20)",
" -wi-ao#25.3#3#4096.00#2#/dev/abc(20)",
" -wi-ao#twenty#3#4096.00#2#/dev/abc(20)",
" -wi-ao#253#3.1#4096.00#2#/dev/abc(20)",
" -wi-ao#253#three#4096.00#2#/dev/abc(20)",
" -wi-ao#253#3#four#2#/dev/abc(20)",
" -wi-ao#253#3#4096..00#2#/dev/abc(20)",
" -wi-ao#253#3#4096.00#2.0#/dev/abc(20)",
" -wi-ao#253#3#4096.00#two#/dev/abc(20)",
]
for broken in broken_lines:
self.assertRaises(errors.BlockDeviceError,
bdev.LogicalVolume._ParseLvInfoLine, broken, "#")
# Examples of good lines from "lvs":
# -wi-ao|253|3|4096.00|2|/dev/sdb(144),/dev/sdc(0)
# -wi-a-|253|4|4096.00|1|/dev/sdb(208)
true_out = [
("-wi-ao", 253, 3, 4096.00, 2, ["/dev/abc"]),
("-wi-a-", 253, 7, 4096.00, 4, ["/dev/abc"]),
("-ri-a-", 253, 4, 4.00, 5, ["/dev/abc", "/dev/def"]),
("-wc-ao", 15, 18, 4096.00, 32, ["/dev/abc", "/dev/def", "/dev/ghi0"]),
]
for exp in true_out:
for sep in "#;|":
pvs = ",".join("%s(%s)" % (d, i * 12) for (i, d) in enumerate(exp[-1]))
lvs_line = (sep.join((" %s", "%d", "%d", "%.2f", "%d", "%s")) %
(exp[0:-1] + (pvs,)))
parsed = bdev.LogicalVolume._ParseLvInfoLine(lvs_line, sep)
self.assertEqual(parsed, exp)
@staticmethod
def _FakeRunCmd(success, stdout):
if success:
exit_code = 0
else:
exit_code = 1
return lambda cmd: utils.RunResult(exit_code, None, stdout, "", cmd,
utils.process._TIMEOUT_NONE, 5)
def testGetLvInfo(self):
"""Tests for LogicalVolume._GetLvInfo."""
self.assertRaises(errors.BlockDeviceError, bdev.LogicalVolume._GetLvInfo,
"fake_path",
_run_cmd=self._FakeRunCmd(False, "Fake error msg"))
self.assertRaises(errors.BlockDeviceError, bdev.LogicalVolume._GetLvInfo,
"fake_path", _run_cmd=self._FakeRunCmd(True, ""))
self.assertRaises(errors.BlockDeviceError, bdev.LogicalVolume._GetLvInfo,
"fake_path", _run_cmd=self._FakeRunCmd(True, "BadStdOut"))
good_line = " -wi-ao|253|3|4096.00|2|/dev/abc(20)"
fake_cmd = self._FakeRunCmd(True, good_line)
good_res = bdev.LogicalVolume._GetLvInfo("fake_path", _run_cmd=fake_cmd)
# If the same line is repeated, the result should be the same
for lines in [
[good_line] * 2,
[good_line] * 3,
]:
fake_cmd = self._FakeRunCmd(True, "\n".join(lines))
same_res = bdev.LogicalVolume._GetLvInfo("fake_path", fake_cmd)
self.assertEqual(same_res, good_res)
# Complex multi-line examples
one_line = " -wi-ao|253|3|4096.00|2|/dev/sda(20),/dev/sdb(50),/dev/sdc(0)"
fake_cmd = self._FakeRunCmd(True, one_line)
one_res = bdev.LogicalVolume._GetLvInfo("fake_path", _run_cmd=fake_cmd)
# These should give the same results
for multi_lines in [
(" -wi-ao|253|3|4096.00|2|/dev/sda(30),/dev/sdb(50)\n"
" -wi-ao|253|3|4096.00|2|/dev/sdb(200),/dev/sdc(300)"),
(" -wi-ao|253|3|4096.00|2|/dev/sda(0)\n"
" -wi-ao|253|3|4096.00|2|/dev/sdb(20)\n"
" -wi-ao|253|3|4096.00|2|/dev/sdc(30)"),
(" -wi-ao|253|3|4096.00|2|/dev/sda(20)\n"
" -wi-ao|253|3|4096.00|2|/dev/sdb(50),/dev/sdc(0)"),
]:
fake_cmd = self._FakeRunCmd(True, multi_lines)
multi_res = bdev.LogicalVolume._GetLvInfo("fake_path", _run_cmd=fake_cmd)
self.assertEqual(multi_res, one_res)
if __name__ == "__main__":
testutils.GanetiTestProgram()