blob: bc724961921246d5a13d617d485bf0ea1075a6b8 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.ovf.
"""
import optparse
import os
import os.path
import re
import shutil
import sys
import tempfile
import unittest
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
from ganeti import constants
from ganeti import errors
from ganeti import ovf
from ganeti import utils
from ganeti import pathutils
import testutils
OUTPUT_DIR = "newdir"
GANETI_DISKS = {
"disk_count": "1",
"disk0_dump": "new_disk.raw",
"disk0_size": "0",
"disk0_ivname": "disk/0",
}
GANETI_NETWORKS = {
"nic_count": "1",
"nic0_mode": "bridged",
"nic0_ip": "none",
"nic0_mac": "aa:00:00:d8:2c:1e",
"nic0_link": "xen-br0",
"nic0_network": "auto",
}
GANETI_HYPERVISOR = {
"hypervisor_name": "xen-pvm",
"root-path": "/dev/sda",
"kernel_args": "ro",
}
GANETI_OS = {"os_name": "lenny-image"}
GANETI_BACKEND = {
"vcpus": "1",
"memory" : "2048",
"auto_balance": "False",
}
GANETI_NAME = "ganeti-test-xen"
GANETI_TEMPLATE = "plain"
GANETI_TAGS = None
GANETI_VERSION = "0"
VIRTUALBOX_DISKS = {
"disk_count": "2",
"disk0_ivname": "disk/0",
"disk0_dump": "new_disk.raw",
"disk0_size": "0",
"disk1_ivname": "disk/1",
"disk1_dump": "second_disk.raw",
"disk1_size": "0",
}
VIRTUALBOX_NETWORKS = {
"nic_count": "1",
"nic0_mode": "bridged",
"nic0_ip": "none",
"nic0_link": "auto",
"nic0_mac": "auto",
"nic0_network": "auto",
}
VIRTUALBOX_HYPERVISOR = {"hypervisor_name": "auto"}
VIRTUALBOX_OS = {"os_name": None}
VIRTUALBOX_BACKEND = {
"vcpus": "1",
"memory" : "2048",
"auto_balance": "auto",
}
VIRTUALBOX_NAME = None
VIRTUALBOX_TEMPLATE = None
VIRTUALBOX_TAGS = None
VIRTUALBOX_VERSION = None
EMPTY_DISKS = {}
EMPTY_NETWORKS = {}
EMPTY_HYPERVISOR = {"hypervisor_name": "auto"}
EMPTY_OS = {}
EMPTY_BACKEND = {
"vcpus": "auto",
"memory" : "auto",
"auto_balance": "auto",
}
EMPTY_NAME = None
EMPTY_TEMPLATE = None
EMPTY_TAGS = None
EMPTY_VERSION = None
CMDARGS_DISKS = {
"disk_count": "1",
"disk0_ivname": "disk/0",
"disk0_dump": "disk0.raw",
"disk0_size": "8",
}
CMDARGS_NETWORKS = {
"nic0_link": "auto",
"nic0_mode": "bridged",
"nic0_ip": "none",
"nic0_mac": "auto",
"nic_count": "1",
"nic0_network": "auto",
}
CMDARGS_HYPERVISOR = {
"hypervisor_name": "xen-pvm"
}
CMDARGS_OS = {"os_name": "lenny-image"}
CMDARGS_BACKEND = {
"auto_balance": False,
"vcpus": "1",
"memory": "256",
}
CMDARGS_NAME = "test-instance"
CMDARGS_TEMPLATE = "plain"
CMDARGS_TAGS = "test-tag-1,test-tag-2"
ARGS_EMPTY = {
"output_dir": None,
"nics": [],
"disks": [],
"name": "test-instance",
"ova_package": False,
"ext_usage": False,
"disk_format": "cow",
"compression": False,
}
ARGS_EXPORT_DIR = dict(ARGS_EMPTY, **{
"output_dir": OUTPUT_DIR,
"name": None,
"hypervisor": None,
"os": None,
"beparams": {},
"no_nics": False,
"disk_template": None,
"tags": None,
})
ARGS_VBOX = dict(ARGS_EXPORT_DIR, **{
"output_dir": OUTPUT_DIR,
"name": "test-instance",
"os": "lenny-image",
"hypervisor": ("xen-pvm", {}),
"osparams":{},
"disks": [],
})
ARGS_COMPLETE = dict(ARGS_VBOX, **{
"beparams": {"vcpus":"1", "memory":"256", "auto_balance": False},
"disks": [(0,{"size":"5mb"})],
"nics": [("0",{"mode":"bridged"})],
"disk_template": "plain",
"tags": "test-tag-1,test-tag-2",
})
ARGS_BROKEN = dict(ARGS_EXPORT_DIR , **{
"no_nics": True,
"disk_template": "diskless",
"name": "test-instance",
"os": "lenny-image",
"osparams": {},
})
EXP_ARGS_COMPRESSED = dict(ARGS_EXPORT_DIR, **{
"compression": True,
})
EXP_DISKS_LIST = [
{
"format": "vmdk",
"compression": "gzip",
"virt-size": 90000,
"real-size": 203,
"path": "new_disk.cow.gz",
},
{
"format": "cow",
"virt-size": 15,
"real-size": 15,
"path": "new_disk.cow",
},
]
EXP_NETWORKS_LIST = [
{"mac": "aa:00:00:d8:2c:1e", "ip":"None", "link":"br0",
"mode":"routed", "network": "test"},
]
EXP_PARTIAL_GANETI_DICT = {
"hypervisor": {"name": "xen-kvm"},
"os": {"name": "lenny-image"},
"auto_balance": "True",
"version": "0",
}
EXP_GANETI_DICT = {
"tags": None,
"auto_balance": "False",
"hypervisor": {
"root-path": "/dev/sda",
"name": "xen-pvm",
"kernel_args": "ro"
},
"version": "0",
"disk_template": None,
"os": {"name": "lenny-image"}
}
EXP_NAME ="xen-dev-i1"
EXP_VCPUS = 1
EXP_MEMORY = 512
EXPORT_EMPTY = ("<Envelope xml:lang=\"en-US\" xmlns=\"http://schemas.dmtf.org/"
"ovf/envelope/1\" xmlns:gnt=\"http://ganeti\" xmlns:ovf=\""
"http://schemas.dmtf.org/ovf/envelope/1\" xmlns:rasd=\""
"http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/CIM_Resource"
"AllocationSettingData\" xmlns:vssd=\"http://schemas.dmtf.org"
"/wbem/wscim/1/cim-schema/2/CIM_VirtualSystemSettingData\""
" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" />")
EXPORT_DISKS_EMPTY = ("<References /><DiskSection><Info>Virtual disk"
" information</Info></DiskSection>")
EXPORT_DISKS = ("<References><File ovf:compression=\"gzip\" ovf:href=\"new_disk"
".cow.gz\" ovf:id=\"file0\" ovf:size=\"203\" /><File ovf:href="
"\"new_disk.cow\" ovf:id=\"file1\" ovf:size=\"15\" />"
"</References><DiskSection><Info>Virtual disk information"
"</Info><Disk ovf:capacity=\"90000\" ovf:diskId=\"disk0\" ovf"
":fileRef=\"file0\" ovf:format=\"http://www.vmware.com/"
"interfaces/specifications/vmdk.html#monolithicSparse\" /><Disk"
" ovf:capacity=\"15\" ovf:diskId=\"disk1\" ovf:fileRef"
"=\"file1\" ovf:format=\"http://www.gnome.org/~markmc/qcow"
"-image-format.html\" /></DiskSection>")
EXPORT_NETWORKS_EMPTY = ("<NetworkSection><Info>List of logical networks</Info>"
"</NetworkSection>")
EXPORT_NETWORKS = ("<NetworkSection><Info>List of logical networks</Info>"
"<Network ovf:name=\"routed0\" /></NetworkSection>")
EXPORT_GANETI_INCOMPLETE = ("<gnt:GanetiSection><gnt:Version>0</gnt:Version>"
"<gnt:AutoBalance>True</gnt:AutoBalance><gnt:"
"OperatingSystem><gnt:Name>lenny-image</gnt:Name>"
"<gnt:Parameters /></gnt:OperatingSystem><gnt:"
"Hypervisor><gnt:Name>xen-kvm</gnt:Name><gnt:"
"Parameters /></gnt:Hypervisor><gnt:Network><gnt:"
"Nic ovf:name=\"routed0\"><gnt:Mode>routed</gnt:"
"Mode><gnt:MACAddress>aa:00:00:d8:2c:1e</gnt:"
"MACAddress><gnt:IPAddress>None</gnt:IPAddress>"
"<gnt:Link>br0</gnt:Link><gnt:Net>test</gnt:Net>"
"</gnt:Nic></gnt:Network></gnt:GanetiSection>")
EXPORT_GANETI = ("<gnt:GanetiSection><gnt:Version>0</gnt:Version><gnt:"
"AutoBalance>False</gnt:AutoBalance><gnt:OperatingSystem>"
"<gnt:Name>lenny-image</gnt:Name><gnt:Parameters /></gnt:"
"OperatingSystem><gnt:Hypervisor><gnt:Name>xen-pvm</gnt:Name>"
"<gnt:Parameters><gnt:root-path>/dev/sda</gnt:root-path><gnt:"
"kernel_args>ro</gnt:kernel_args></gnt:Parameters></gnt:"
"Hypervisor><gnt:Network><gnt:Nic ovf:name=\"routed0\"><gnt:"
"Mode>routed</gnt:Mode><gnt:MACAddress>aa:00:00:d8:2c:1e</gnt:"
"MACAddress><gnt:IPAddress>None</gnt:IPAddress><gnt:Link>br0"
"</gnt:Link><gnt:Net>test</gnt:Net></gnt:Nic></gnt:Network>"
"</gnt:GanetiSection>")
EXPORT_SYSTEM = ("<References><File ovf:compression=\"gzip\" ovf:href=\"new_"
"disk.cow.gz\" ovf:id=\"file0\" ovf:size=\"203\" /><File ovf:"
"href=\"new_disk.cow\" ovf:id=\"file1\" ovf:size=\"15\" />"
"</References><DiskSection><Info>Virtual disk information"
"</Info><Disk ovf:capacity=\"90000\" ovf:diskId=\"disk0\""
" ovf:fileRef=\"file0\" ovf:format=\"http://www.vmware.com"
"/interfaces/specifications/vmdk.html#monolithicSparse\" />"
"<Disk ovf:capacity=\"15\" ovf:diskId=\"disk1\" ovf:fileRef"
"=\"file1\" ovf:format=\"http://www.gnome.org/~markmc/qcow"
"-image-format.html\" /></DiskSection><NetworkSection><Info>"
"List of logical networks</Info><Network ovf:name=\"routed0\""
" /></NetworkSection><VirtualSystem ovf:id=\"xen-dev-i1\">"
"<Info>A virtual machine</Info><Name>xen-dev-i1</Name>"
"<OperatingSystemSection ovf:id=\"0\"><Info>Installed guest"
" operating system</Info></OperatingSystemSection><Virtual"
"HardwareSection><Info>Virtual hardware requirements</Info>"
"<System><vssd:ElementName>Virtual Hardware Family"
"</vssd:ElementName><vssd:InstanceID>0</vssd:InstanceID><vssd:"
"VirtualSystemIdentifier>xen-dev-i1</vssd:VirtualSystem"
"Identifier><vssd:VirtualSystemType>ganeti-ovf</vssd:Virtual"
"SystemType></System><Item><rasd:ElementName>1 virtual CPU(s)"
"</rasd:ElementName><rasd:InstanceID>1</rasd:InstanceID><rasd:"
"ResourceType>3</rasd:ResourceType><rasd:VirtualQuantity>1"
"</rasd:VirtualQuantity></Item><Item><rasd:AllocationUnits>"
"byte * 2^20</rasd:AllocationUnits><rasd:ElementName>512MB of"
" memory</rasd:ElementName><rasd:InstanceID>2</rasd:"
"InstanceID><rasd:ResourceType>4</rasd:ResourceType><rasd:"
"VirtualQuantity>512</rasd:VirtualQuantity></Item><Item>"
"<rasd:Address>0</rasd:Address><rasd:ElementName>scsi"
"_controller0</rasd:ElementName><rasd:InstanceID>3"
"</rasd:InstanceID><rasd:ResourceSubType>lsilogic</rasd"
":ResourceSubType><rasd:ResourceType>6</rasd:ResourceType>"
"</Item><Item><rasd:ElementName>disk0</rasd:ElementName><rasd"
":HostResource>ovf:/disk/disk0</rasd:HostResource><rasd"
":InstanceID>4</rasd:InstanceID><rasd:Parent>3</rasd:Parent>"
"<rasd:ResourceType>17</rasd:ResourceType></Item><Item><rasd:"
"ElementName>disk1</rasd:ElementName><rasd:HostResource>ovf:/"
"disk/disk1</rasd:HostResource><rasd:InstanceID>5</rasd"
":InstanceID><rasd:Parent>3</rasd:Parent><rasd:ResourceType>17"
"</rasd:ResourceType></Item><Item><rasd:Address>aa:00"
":00:d8:2c:1e</rasd:Address><rasd:Connection>routed0</rasd"
":Connection><rasd:ElementName>routed0</rasd:ElementName><rasd"
":InstanceID>6</rasd:InstanceID><rasd:ResourceType>10</rasd"
":ResourceType></Item></VirtualHardwareSection>"
"</VirtualSystem>")
def _GetArgs(args, with_name=False):
options = optparse.Values()
needed = args
if with_name:
needed["name"] = "test-instance"
options._update_loose(needed)
return options
OPTS_EMPTY = _GetArgs(ARGS_EMPTY)
OPTS_EXPORT_NO_NAME = _GetArgs(ARGS_EXPORT_DIR)
OPTS_EXPORT = _GetArgs(ARGS_EXPORT_DIR, with_name=True)
EXP_OPTS = OPTS_EXPORT_NO_NAME
EXP_OPTS_COMPRESSED = _GetArgs(EXP_ARGS_COMPRESSED)
OPTS_VBOX = _GetArgs(ARGS_VBOX)
OPTS_COMPLETE = _GetArgs(ARGS_COMPLETE)
OPTS_NONIC_NODISK = _GetArgs(ARGS_BROKEN)
def _GetFullFilename(file_name):
file_path = "%s/test/data/ovfdata/%s" % (testutils.GetSourceDir(),
file_name)
file_path = os.path.abspath(file_path)
return file_path
class BetterUnitTest(unittest.TestCase):
def assertRaisesRegexp(self, exception, regexp_val, function, *args):
try:
function(*args)
self.fail("Expected raising %s" % exception)
except exception, err:
regexp = re.compile(regexp_val)
if re.search(regexp, str(err)) == None:
self.fail("Expected matching '%s', got '%s'" %
(regexp_val, str(err)))
class TestOVFImporter(BetterUnitTest):
def setUp(self):
self.non_existing_file = _GetFullFilename("not_the_file.ovf")
self.ganeti_ovf = _GetFullFilename("ganeti.ovf")
self.virtualbox_ovf = _GetFullFilename("virtualbox.ovf")
self.ova_package = _GetFullFilename("ova.ova")
self.empty_ovf = _GetFullFilename("empty.ovf")
self.wrong_extension = _GetFullFilename("wrong_extension.ovd")
self.wrong_ova_archive = _GetFullFilename("wrong_ova.ova")
self.no_ovf_in_ova = _GetFullFilename("no_ovf.ova")
self.importer = None
def tearDown(self):
if self.importer:
self.importer.Cleanup()
del_dir = os.path.abspath(OUTPUT_DIR)
try:
shutil.rmtree(del_dir)
except OSError:
pass
def testFileDoesNotExistError(self):
self.assertRaisesRegexp(errors.OpPrereqError, "does not exist",
ovf.OVFImporter, self.non_existing_file, None)
def testWrongInputFileExtensionError(self):
self.assertRaisesRegexp(errors.OpPrereqError,
"Unknown file extension", ovf.OVFImporter,
self.wrong_extension, None)
def testOVAUnpackingDirectories(self):
self.importer = ovf.OVFImporter(self.ova_package, OPTS_EMPTY)
self.assertTrue(self.importer.input_dir != None)
self.assertEquals(self.importer.output_dir , pathutils.EXPORT_DIR)
self.assertTrue(self.importer.temp_dir != None)
def testOVFUnpackingDirectories(self):
self.importer = ovf.OVFImporter(self.virtualbox_ovf,
OPTS_EMPTY)
self.assertEquals(self.importer.input_dir , _GetFullFilename(""))
self.assertEquals(self.importer.output_dir , pathutils.EXPORT_DIR)
self.assertEquals(self.importer.temp_dir , None)
def testOVFSetOutputDirDirectories(self):
self.importer = ovf.OVFImporter(self.ganeti_ovf, OPTS_EXPORT)
self.assertEquals(self.importer.input_dir , _GetFullFilename(""))
self.assertTrue(OUTPUT_DIR in self.importer.output_dir)
self.assertEquals(self.importer.temp_dir , None)
def testWrongOVAArchiveError(self):
self.assertRaisesRegexp(errors.OpPrereqError, "not a proper tar",
ovf.OVFImporter, self.wrong_ova_archive, None)
def testNoOVFFileInOVAPackageError(self):
self.assertRaisesRegexp(errors.OpPrereqError, "No .ovf file",
ovf.OVFImporter, self.no_ovf_in_ova, None)
def testParseGanetiOvf(self):
self.importer = ovf.OVFImporter(self.ganeti_ovf, OPTS_EXPORT_NO_NAME)
self.importer.Parse()
self.assertTrue("%s/ganeti-test-xen" % OUTPUT_DIR in
self.importer.output_dir)
self.assertEqual(self.importer.results_disk, GANETI_DISKS)
self.assertEqual(self.importer.results_network, GANETI_NETWORKS)
self.assertEqual(self.importer.results_hypervisor, GANETI_HYPERVISOR)
self.assertEqual(self.importer.results_os, GANETI_OS)
self.assertEqual(self.importer.results_backend, GANETI_BACKEND)
self.assertEqual(self.importer.results_name, GANETI_NAME)
self.assertEqual(self.importer.results_template, GANETI_TEMPLATE)
self.assertEqual(self.importer.results_tags, GANETI_TAGS)
self.assertEqual(self.importer.results_version, GANETI_VERSION)
def testParseVirtualboxOvf(self):
self.importer = ovf.OVFImporter(self.virtualbox_ovf, OPTS_VBOX)
self.importer.Parse()
self.assertTrue("%s/test-instance" % OUTPUT_DIR in self.importer.output_dir)
self.assertEquals(self.importer.results_disk, VIRTUALBOX_DISKS)
self.assertEquals(self.importer.results_network, VIRTUALBOX_NETWORKS)
self.assertEquals(self.importer.results_hypervisor, CMDARGS_HYPERVISOR)
self.assertEquals(self.importer.results_os, CMDARGS_OS)
self.assertEquals(self.importer.results_backend, VIRTUALBOX_BACKEND)
self.assertEquals(self.importer.results_name, CMDARGS_NAME)
self.assertEquals(self.importer.results_template, VIRTUALBOX_TEMPLATE)
self.assertEqual(self.importer.results_tags, VIRTUALBOX_TAGS)
self.assertEqual(self.importer.results_version, constants.EXPORT_VERSION)
def testParseEmptyOvf(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
self.importer.Parse()
self.assertTrue("%s/test-instance" % OUTPUT_DIR in self.importer.output_dir)
self.assertEquals(self.importer.results_disk, CMDARGS_DISKS)
self.assertEquals(self.importer.results_network, CMDARGS_NETWORKS)
self.assertEquals(self.importer.results_hypervisor, CMDARGS_HYPERVISOR)
self.assertEquals(self.importer.results_os, CMDARGS_OS)
self.assertEquals(self.importer.results_backend, CMDARGS_BACKEND)
self.assertEquals(self.importer.results_name, CMDARGS_NAME)
self.assertEquals(self.importer.results_template, CMDARGS_TEMPLATE)
self.assertEqual(self.importer.results_tags, CMDARGS_TAGS)
self.assertEqual(self.importer.results_version, constants.EXPORT_VERSION)
def testParseNameOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseNameOptions()
self.assertEquals(results, CMDARGS_NAME)
def testParseHypervisorOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseHypervisorOptions()
self.assertEquals(results, CMDARGS_HYPERVISOR)
def testParseOSOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseOSOptions()
self.assertEquals(results, CMDARGS_OS)
def testParseBackendOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseBackendOptions()
self.assertEquals(results, CMDARGS_BACKEND)
def testParseTags(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseTags()
self.assertEquals(results, CMDARGS_TAGS)
def testParseNicOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseNicOptions()
self.assertEquals(results, CMDARGS_NETWORKS)
def testParseDiskOptionsFromGanetiOVF(self):
self.importer = ovf.OVFImporter(self.ganeti_ovf, OPTS_EXPORT)
os.mkdir(OUTPUT_DIR)
results = self.importer._GetDiskInfo()
self.assertEquals(results, GANETI_DISKS)
def testParseTemplateOptions(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
results = self.importer._ParseTemplateOptions()
self.assertEquals(results, GANETI_TEMPLATE)
def testParseDiskOptionsFromCmdLine(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_COMPLETE)
os.mkdir(OUTPUT_DIR)
results = self.importer._ParseDiskOptions()
self.assertEquals(results, CMDARGS_DISKS)
def testGetDiskFormat(self):
self.importer = ovf.OVFImporter(self.ganeti_ovf, OPTS_EXPORT)
disks_list = self.importer.ovf_reader.GetDisksNames()
results = [self.importer._GetDiskQemuInfo("%s/%s" %
(self.importer.input_dir, path), "file format: (\S+)")
for (path, _) in disks_list]
self.assertEqual(results, ["vmdk"])
def testNoInstanceNameOVF(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_EXPORT_NO_NAME)
self.assertRaisesRegexp(errors.OpPrereqError, "Name of instance",
self.importer.Parse)
def testErrorNoOSNameOVF(self):
self.importer = ovf.OVFImporter(self.virtualbox_ovf, OPTS_EXPORT)
self.assertRaisesRegexp(errors.OpPrereqError, "OS name",
self.importer.Parse)
def testErrorNoDiskAndNoNetwork(self):
self.importer = ovf.OVFImporter(self.empty_ovf, OPTS_NONIC_NODISK)
self.assertRaisesRegexp(errors.OpPrereqError,
"Either disk specification or network"
" description", self.importer.Parse)
class TestOVFExporter(BetterUnitTest):
def setUp(self):
self.exporter = None
self.wrong_config_file = _GetFullFilename("wrong_config.ini")
self.unsafe_path_to_disk = _GetFullFilename("unsafe_path.ini")
self.disk_image_not_exist = _GetFullFilename("no_disk.ini")
self.empty_config = _GetFullFilename("empty.ini")
self.standard_export = _GetFullFilename("config.ini")
self.wrong_network_mode = self.disk_image_not_exist
self.no_memory = self.disk_image_not_exist
self.no_vcpus = self.disk_image_not_exist
self.no_os = _GetFullFilename("no_os.ini")
self.no_hypervisor = self.disk_image_not_exist
def tearDown(self):
if self.exporter:
self.exporter.Cleanup()
del_dir = os.path.abspath(OUTPUT_DIR)
try:
shutil.rmtree(del_dir)
except OSError:
pass
def testErrorWrongConfigFile(self):
self.assertRaisesRegexp(errors.OpPrereqError,
"Error when trying to read", ovf.OVFExporter,
self.wrong_config_file, EXP_OPTS)
def testErrorPathToTheDiskIncorrect(self):
self.exporter = ovf.OVFExporter(self.unsafe_path_to_disk, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError, "contains a directory name",
self.exporter._ParseDisks)
def testErrorDiskImageNotExist(self):
self.exporter = ovf.OVFExporter(self.disk_image_not_exist, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError, "Disk image does not exist",
self.exporter._ParseDisks)
def testParseNetworks(self):
self.exporter = ovf.OVFExporter(self.standard_export, EXP_OPTS)
results = self.exporter._ParseNetworks()
self.assertEqual(results, EXP_NETWORKS_LIST)
def testErrorWrongNetworkMode(self):
self.exporter = ovf.OVFExporter(self.wrong_network_mode, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError,
"Network mode nic not recognized", self.exporter._ParseNetworks)
def testParseVCPusMem(self):
self.exporter = ovf.OVFExporter(self.standard_export, EXP_OPTS)
vcpus = self.exporter._ParseVCPUs()
memory = self.exporter._ParseMemory()
self.assertEqual(vcpus, EXP_VCPUS)
self.assertEqual(memory, EXP_MEMORY)
def testErrorNoVCPUs(self):
self.exporter = ovf.OVFExporter(self.no_vcpus, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError, "No CPU information found",
self.exporter._ParseVCPUs)
def testErrorNoMemory(self):
self.exporter = ovf.OVFExporter(self.no_memory, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError, "No memory information found",
self.exporter._ParseMemory)
def testParseGaneti(self):
self.exporter = ovf.OVFExporter(self.standard_export, EXP_OPTS)
results = self.exporter._ParseGaneti()
self.assertEqual(results, EXP_GANETI_DICT)
def testErrorNoHypervisor(self):
self.exporter = ovf.OVFExporter(self.no_hypervisor, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError,
"No hypervisor information found", self.exporter._ParseGaneti)
def testErrorNoOS(self):
self.exporter = ovf.OVFExporter(self.no_os, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError,
"No operating system information found", self.exporter._ParseGaneti)
def testErrorParseNoInstanceName(self):
self.exporter = ovf.OVFExporter(self.empty_config, EXP_OPTS)
self.assertRaisesRegexp(errors.OpPrereqError, "No instance name found",
self.exporter.Parse)
class TestOVFReader(BetterUnitTest):
def setUp(self):
self.wrong_xml_file = _GetFullFilename("wrong_xml.ovf")
self.ganeti_ovf = _GetFullFilename("ganeti.ovf")
self.virtualbox_ovf = _GetFullFilename("virtualbox.ovf")
self.corrupted_ovf = _GetFullFilename("corrupted_resources.ovf")
self.wrong_manifest_ovf = _GetFullFilename("wrong_manifest.ovf")
self.no_disk_in_ref_ovf = _GetFullFilename("no_disk_in_ref.ovf")
self.empty_ovf = _GetFullFilename("empty.ovf")
self.compressed_disk = _GetFullFilename("gzip_disk.ovf")
def tearDown(self):
pass
def testXMLParsingError(self):
self.assertRaisesRegexp(errors.OpPrereqError,
"Error while reading .ovf", ovf.OVFReader, self.wrong_xml_file)
def testFileInResourcesDoesNotExistError(self):
self.assertRaisesRegexp(errors.OpPrereqError, "does not exist",
ovf.OVFReader, self.corrupted_ovf)
def testWrongManifestChecksumError(self):
reader = ovf.OVFReader(self.wrong_manifest_ovf)
self.assertRaisesRegexp(errors.OpPrereqError,
"does not match the value in manifest file", reader.VerifyManifest)
def testGoodManifestChecksum(self):
reader = ovf.OVFReader(self.ganeti_ovf)
self.assertEqual(reader.VerifyManifest(), None)
def testGetDisksNamesOVFCorruptedError(self):
reader = ovf.OVFReader(self.no_disk_in_ref_ovf)
self.assertRaisesRegexp(errors.OpPrereqError,
"not found in references", reader.GetDisksNames)
def testGetDisksNamesVirtualbox(self):
reader = ovf.OVFReader(self.virtualbox_ovf)
disk_names = reader.GetDisksNames()
expected_names = [
("new_disk.vmdk", None) ,
("second_disk.vmdk", None),
]
self.assertEqual(sorted(disk_names), sorted(expected_names))
def testGetDisksNamesEmpty(self):
reader = ovf.OVFReader(self.empty_ovf)
disk_names = reader.GetDisksNames()
self.assertEqual(disk_names, [])
def testGetDisksNamesCompressed(self):
reader = ovf.OVFReader(self.compressed_disk)
disk_names = reader.GetDisksNames()
self.assertEqual(disk_names, [("compr_disk.vmdk.gz", "gzip")])
def testGetNetworkDataGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
networks = reader.GetNetworkData()
self.assertEqual(networks, GANETI_NETWORKS)
def testGetNetworkDataVirtualbox(self):
reader = ovf.OVFReader(self.virtualbox_ovf)
networks = reader.GetNetworkData()
self.assertEqual(networks, VIRTUALBOX_NETWORKS)
def testGetNetworkDataEmpty(self):
reader = ovf.OVFReader(self.empty_ovf)
networks = reader.GetNetworkData()
self.assertEqual(networks, EMPTY_NETWORKS)
def testGetHypervisorDataGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
hypervisor = reader.GetHypervisorData()
self.assertEqual(hypervisor, GANETI_HYPERVISOR)
def testGetHypervisorDataEmptyOvf(self):
reader = ovf.OVFReader(self.empty_ovf)
hypervisor = reader.GetHypervisorData()
self.assertEqual(hypervisor, EMPTY_HYPERVISOR)
def testGetOSDataGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
osys = reader.GetOSData()
self.assertEqual(osys, GANETI_OS)
def testGetOSDataEmptyOvf(self):
reader = ovf.OVFReader(self.empty_ovf)
osys = reader.GetOSData()
self.assertEqual(osys, EMPTY_OS)
def testGetBackendDataGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
backend = reader.GetBackendData()
self.assertEqual(backend, GANETI_BACKEND)
def testGetBackendDataVirtualbox(self):
reader = ovf.OVFReader(self.virtualbox_ovf)
backend = reader.GetBackendData()
self.assertEqual(backend, VIRTUALBOX_BACKEND)
def testGetBackendDataEmptyOvf(self):
reader = ovf.OVFReader(self.empty_ovf)
backend = reader.GetBackendData()
self.assertEqual(backend, EMPTY_BACKEND)
def testGetInstanceNameGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
name = reader.GetInstanceName()
self.assertEqual(name, GANETI_NAME)
def testGetInstanceNameDataEmptyOvf(self):
reader = ovf.OVFReader(self.empty_ovf)
name = reader.GetInstanceName()
self.assertEqual(name, EMPTY_NAME)
def testGetDiskTemplateGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
name = reader.GetDiskTemplate()
self.assertEqual(name, GANETI_TEMPLATE)
def testGetDiskTemplateEmpty(self):
reader = ovf.OVFReader(self.empty_ovf)
name = reader.GetDiskTemplate()
self.assertEqual(name, EMPTY_TEMPLATE)
def testGetTagsGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
tags = reader.GetTagsData()
self.assertEqual(tags, GANETI_TAGS)
def testGetTagsEmpty(self):
reader = ovf.OVFReader(self.empty_ovf)
tags = reader.GetTagsData()
self.assertEqual(tags, EMPTY_TAGS)
def testGetVersionGaneti(self):
reader = ovf.OVFReader(self.ganeti_ovf)
version = reader.GetVersionData()
self.assertEqual(version, GANETI_VERSION)
def testGetVersionEmpty(self):
reader = ovf.OVFReader(self.empty_ovf)
version = reader.GetVersionData()
self.assertEqual(version, EMPTY_VERSION)
class TestOVFWriter(BetterUnitTest):
def setUp(self):
self.writer = ovf.OVFWriter(True)
def tearDown(self):
pass
def testOVFWriterInit(self):
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_EMPTY in result)
def testSaveDisksDataEmpty(self):
self.writer.SaveDisksData([])
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_DISKS_EMPTY in result)
def testSaveDisksData(self):
self.writer.SaveDisksData(EXP_DISKS_LIST)
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_DISKS in result)
def testSaveNetworkDataEmpty(self):
self.writer.SaveNetworksData([])
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_NETWORKS_EMPTY in result)
def testSaveNetworksData(self):
self.writer.SaveNetworksData(EXP_NETWORKS_LIST)
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_NETWORKS in result)
def testSaveGanetiDataIncomplete(self):
self.writer.SaveGanetiData(EXP_PARTIAL_GANETI_DICT, EXP_NETWORKS_LIST)
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_GANETI_INCOMPLETE in result)
def testSaveGanetiDataComplete(self):
self.writer.SaveGanetiData(EXP_GANETI_DICT, EXP_NETWORKS_LIST)
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_GANETI in result)
def testSaveVirtualSystem(self):
self.writer.SaveDisksData(EXP_DISKS_LIST)
self.writer.SaveNetworksData(EXP_NETWORKS_LIST)
self.writer.SaveVirtualSystemData(EXP_NAME, EXP_VCPUS, EXP_MEMORY)
result = ET.tostring(self.writer.tree)
self.assertTrue(EXPORT_SYSTEM in result)
if __name__ == "__main__":
testutils.GanetiTestProgram()