blob: 167be3dabde631b50fa4a1b41b735bcd0b3ddb30 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test logical units."""
import logging
import shutil
import socket
import tempfile
import time
from ganeti import compat
from ganeti import constants
from ganeti import errors
from ganeti import locking
from ganeti import utils
from ganeti.masterd import iallocator
from ganeti.cmdlib.base import NoHooksLU
from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \
GetWantedInstances
class TestSocketWrapper(object):
""" Utility class that opens a domain socket and cleans up as needed.
"""
def __init__(self):
""" Constructor cleaning up variables to be used.
"""
self.tmpdir = None
self.sock = None
def Create(self, max_connections=1):
""" Creates a bound and ready socket, cleaning up in case of failure.
@type max_connections: int
@param max_connections: The number of max connections allowed for the
socket.
@rtype: tuple of socket, string
@return: The socket object and the path to reach it with.
"""
# Using a temporary directory as there's no easy way to create temporary
# sockets without writing a custom loop around tempfile.mktemp and
# socket.bind
self.tmpdir = tempfile.mkdtemp()
try:
tmpsock = utils.PathJoin(self.tmpdir, "sock")
logging.debug("Creating temporary socket at %s", tmpsock)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
self.sock.bind(tmpsock)
self.sock.listen(max_connections)
except:
self.sock.close()
raise
except:
shutil.rmtree(self.tmpdir)
raise
return self.sock, tmpsock
def Destroy(self):
""" Destroys the socket and performs all necessary cleanup.
"""
if self.tmpdir is None or self.sock is None:
raise Exception("A socket must be created successfully before attempting "
"its destruction")
try:
self.sock.close()
finally:
shutil.rmtree(self.tmpdir)
class LUTestDelay(NoHooksLU):
"""Sleep for a specified amount of time.
This LU sleeps on the master and/or nodes for a specified amount of
time.
"""
REQ_BGL = False
def ExpandNames(self):
"""Expand names and set required locks.
This expands the node list, if any.
"""
self.needed_locks = {}
if self.op.duration <= 0:
raise errors.OpPrereqError("Duration must be greater than zero")
if not self.op.no_locks and (self.op.on_nodes or self.op.on_master):
self.needed_locks[locking.LEVEL_NODE] = []
self.op.on_node_uuids = []
if self.op.on_nodes:
# _GetWantedNodes can be used here, but is not always appropriate to use
# this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
# more information.
(self.op.on_node_uuids, self.op.on_nodes) = \
GetWantedNodes(self, self.op.on_nodes)
master_uuid = self.cfg.GetMasterNode()
if self.op.on_master and master_uuid not in self.op.on_node_uuids:
self.op.on_node_uuids.append(master_uuid)
self.needed_locks = {}
self.needed_locks[locking.LEVEL_NODE] = self.op.on_node_uuids
def _InterruptibleDelay(self):
"""Delays but provides the mechanisms necessary to interrupt the delay as
needed.
"""
socket_wrapper = TestSocketWrapper()
sock, path = socket_wrapper.Create()
self.Log(constants.ELOG_DELAY_TEST, (path,))
try:
sock.settimeout(self.op.duration)
start = time.time()
(conn, _) = sock.accept()
except socket.timeout, _:
# If we timed out, all is well
return False
finally:
# Destroys the original socket, but the new connection is still usable
socket_wrapper.Destroy()
try:
# Change to remaining time
time_to_go = self.op.duration - (time.time() - start)
self.Log(constants.ELOG_MESSAGE,
"Received connection, time to go is %d" % time_to_go)
if time_to_go < 0:
time_to_go = 0
# pylint: disable=E1101
# Instance of '_socketobject' has no ... member
conn.settimeout(time_to_go)
conn.recv(1)
# pylint: enable=E1101
except socket.timeout, _:
# A second timeout can occur if no data is sent
return False
finally:
conn.close()
self.Log(constants.ELOG_MESSAGE,
"Interrupted, time spent waiting: %d" % (time.time() - start))
# Reaching this point means we were interrupted
return True
def _UninterruptibleDelay(self):
"""Delays without allowing interruptions.
"""
if self.op.on_node_uuids:
result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration)
for node_uuid, node_result in result.items():
node_result.Raise("Failure during rpc call to node %s" %
self.cfg.GetNodeName(node_uuid))
else:
if not utils.TestDelay(self.op.duration)[0]:
raise errors.OpExecError("Error during master delay test")
def _TestDelay(self):
"""Do the actual sleep.
@rtype: bool
@return: Whether the delay was interrupted
"""
if self.op.interruptible:
return self._InterruptibleDelay()
else:
self._UninterruptibleDelay()
return False
def Exec(self, feedback_fn):
"""Execute the test delay opcode, with the wanted repetitions.
"""
if self.op.repeat == 0:
i = self._TestDelay()
else:
top_value = self.op.repeat - 1
for i in range(self.op.repeat):
self.LogInfo("Test delay iteration %d/%d", i, top_value)
# Break in case of interruption
if self._TestDelay():
break
class LUTestJqueue(NoHooksLU):
"""Utility LU to test some aspects of the job queue.
"""
REQ_BGL = False
# Must be lower than default timeout for WaitForJobChange to see whether it
# notices changed jobs
_CLIENT_CONNECT_TIMEOUT = 20.0
_CLIENT_CONFIRM_TIMEOUT = 60.0
@classmethod
def _NotifyUsingSocket(cls, cb, errcls):
"""Opens a Unix socket and waits for another program to connect.
@type cb: callable
@param cb: Callback to send socket name to client
@type errcls: class
@param errcls: Exception class to use for errors
"""
# Using a temporary directory as there's no easy way to create temporary
# sockets without writing a custom loop around tempfile.mktemp and
# socket.bind
socket_wrapper = TestSocketWrapper()
sock, path = socket_wrapper.Create()
cb(path)
try:
sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
(conn, _) = sock.accept()
except socket.error, err:
raise errcls("Client didn't connect in time (%s)" % err)
finally:
socket_wrapper.Destroy()
# Wait for client to close
try:
try:
# pylint: disable=E1101
# Instance of '_socketobject' has no ... member
conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
conn.recv(1)
except socket.error, err:
raise errcls("Client failed to confirm notification (%s)" % err)
finally:
conn.close()
def _SendNotification(self, test, arg, sockname):
"""Sends a notification to the client.
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
@type sockname: string
@param sockname: Socket path
"""
self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
def _Notify(self, prereq, test, arg):
"""Notifies the client of a test.
@type prereq: bool
@param prereq: Whether this is a prereq-phase test
@type test: string
@param test: Test name
@param arg: Test argument (depends on test)
"""
if prereq:
errcls = errors.OpPrereqError
else:
errcls = errors.OpExecError
return self._NotifyUsingSocket(compat.partial(self._SendNotification,
test, arg),
errcls)
def CheckArguments(self):
self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
self.expandnames_calls = 0
def ExpandNames(self):
checkargs_calls = getattr(self, "checkargs_calls", 0)
if checkargs_calls < 1:
raise errors.ProgrammerError("CheckArguments was not called")
self.expandnames_calls += 1
if self.op.notify_waitlock:
self._Notify(True, constants.JQT_EXPANDNAMES, None)
self.LogInfo("Expanding names")
# Get lock on master node (just to get a lock, not for a particular reason)
self.needed_locks = {
locking.LEVEL_NODE: self.cfg.GetMasterNode(),
}
def Exec(self, feedback_fn):
if self.expandnames_calls < 1:
raise errors.ProgrammerError("ExpandNames was not called")
if self.op.notify_exec:
self._Notify(False, constants.JQT_EXEC, None)
self.LogInfo("Executing")
if self.op.log_messages:
self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
for idx, msg in enumerate(self.op.log_messages):
self.LogInfo("Sending log message %s", idx + 1)
feedback_fn(constants.JQT_MSGPREFIX + msg)
# Report how many test messages have been sent
self._Notify(False, constants.JQT_LOGMSG, idx + 1)
if self.op.fail:
raise errors.OpExecError("Opcode failure was requested")
return True
class LUTestOsParams(NoHooksLU):
"""Utility LU to test secret OS parameter transmission.
"""
REQ_BGL = False
def ExpandNames(self):
self.needed_locks = {}
def Exec(self, feedback_fn):
if self.op.osparams_secret:
msg = "Secret OS parameters: %s" % self.op.osparams_secret.Unprivate()
feedback_fn(msg)
else:
raise errors.OpExecError("Opcode needs secret parameters")
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
This LU runs the allocator tests
"""
def CheckPrereq(self):
"""Check prerequisites.
This checks the opcode parameters depending on the director and mode test.
"""
if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
constants.IALLOCATOR_MODE_MULTI_ALLOC):
(self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name)
if iname is not None:
raise errors.OpPrereqError("Instance '%s' already in the cluster" %
iname, errors.ECODE_EXISTS)
for row in self.op.disks:
if (not isinstance(row, dict) or
constants.IDISK_SIZE not in row or
not isinstance(row[constants.IDISK_SIZE], int) or
constants.IDISK_MODE not in row or
row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
raise errors.OpPrereqError("Invalid contents of the 'disks'"
" parameter", errors.ECODE_INVAL)
if self.op.hypervisor is None:
self.op.hypervisor = self.cfg.GetHypervisorType()
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
(self.inst_uuid, self.op.name) = ExpandInstanceUuidAndName(self.cfg, None,
self.op.name)
self.relocate_from_node_uuids = \
list(self.cfg.GetInstanceSecondaryNodes(self.inst_uuid))
elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
constants.IALLOCATOR_MODE_NODE_EVAC):
if not self.op.instances:
raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
(_, self.op.instances) = GetWantedInstances(self, self.op.instances)
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
if self.op.direction == constants.IALLOCATOR_DIR_OUT:
if self.op.iallocator is None:
raise errors.OpPrereqError("Missing allocator name",
errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Run the allocator test.
"""
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
req = iallocator.IAReqInstanceAlloc(name=self.op.name,
memory=self.op.memory,
disks=self.op.disks,
disk_template=self.op.disk_template,
group_name=self.op.group_name,
os=self.op.os,
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
spindle_use=self.op.spindle_use,
hypervisor=self.op.hypervisor,
node_whitelist=None)
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
req = iallocator.IAReqRelocate(
inst_uuid=self.inst_uuid,
relocate_from_node_uuids=list(self.relocate_from_node_uuids))
elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
req = iallocator.IAReqGroupChange(instances=self.op.instances,
target_groups=self.op.target_groups)
elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
req = iallocator.IAReqNodeEvac(instances=self.op.instances,
evac_mode=self.op.evac_mode,
ignore_soft_errors=False)
elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
disk_template = self.op.disk_template
insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
memory=self.op.memory,
disks=self.op.disks,
disk_template=disk_template,
group_name=self.op.group_name,
os=self.op.os,
tags=self.op.tags,
nics=self.op.nics,
vcpus=self.op.vcpus,
spindle_use=self.op.spindle_use,
hypervisor=self.op.hypervisor,
node_whitelist=None)
for idx in range(self.op.count)]
req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
else:
raise errors.ProgrammerError("Uncatched mode %s in"
" LUTestAllocator.Exec", self.op.mode)
ial = iallocator.IAllocator(self.cfg, self.rpc, req)
if self.op.direction == constants.IALLOCATOR_DIR_IN:
result = ial.in_text
else:
ial.Run(self.op.iallocator, validate=False)
result = ial.out_text
return result