| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Test logical units.""" |
| |
| import logging |
| import shutil |
| import socket |
| import tempfile |
| |
| from ganeti import compat |
| from ganeti import constants |
| from ganeti import errors |
| from ganeti import locking |
| from ganeti import utils |
| from ganeti.masterd import iallocator |
| from ganeti.cmdlib.base import NoHooksLU |
| from ganeti.cmdlib.common import ExpandInstanceUuidAndName, GetWantedNodes, \ |
| GetWantedInstances |
| |
| |
| class LUTestDelay(NoHooksLU): |
| """Sleep for a specified amount of time. |
| |
| This LU sleeps on the master and/or nodes for a specified amount of |
| time. |
| |
| """ |
| REQ_BGL = False |
| |
| def ExpandNames(self): |
| """Expand names and set required locks. |
| |
| This expands the node list, if any. |
| |
| """ |
| self.needed_locks = {} |
| |
| if self.op.on_nodes or self.op.on_master: |
| self.needed_locks[locking.LEVEL_NODE] = [] |
| |
| if self.op.on_nodes: |
| # _GetWantedNodes can be used here, but is not always appropriate to use |
| # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for |
| # more information. |
| (self.op.on_node_uuids, self.op.on_nodes) = \ |
| GetWantedNodes(self, self.op.on_nodes) |
| self.needed_locks[locking.LEVEL_NODE].extend(self.op.on_node_uuids) |
| |
| if self.op.on_master: |
| # The node lock should be acquired for the master as well. |
| self.needed_locks[locking.LEVEL_NODE].append(self.cfg.GetMasterNode()) |
| |
| def _TestDelay(self): |
| """Do the actual sleep. |
| |
| """ |
| if self.op.on_master: |
| if not utils.TestDelay(self.op.duration): |
| raise errors.OpExecError("Error during master delay test") |
| if self.op.on_node_uuids: |
| result = self.rpc.call_test_delay(self.op.on_node_uuids, self.op.duration) |
| for node_uuid, node_result in result.items(): |
| node_result.Raise("Failure during rpc call to node %s" % |
| self.cfg.GetNodeName(node_uuid)) |
| |
| def Exec(self, feedback_fn): |
| """Execute the test delay opcode, with the wanted repetitions. |
| |
| """ |
| if self.op.repeat == 0: |
| self._TestDelay() |
| else: |
| top_value = self.op.repeat - 1 |
| for i in range(self.op.repeat): |
| self.LogInfo("Test delay iteration %d/%d", i, top_value) |
| self._TestDelay() |
| |
| |
| class LUTestJqueue(NoHooksLU): |
| """Utility LU to test some aspects of the job queue. |
| |
| """ |
| REQ_BGL = False |
| |
| # Must be lower than default timeout for WaitForJobChange to see whether it |
| # notices changed jobs |
| _CLIENT_CONNECT_TIMEOUT = 20.0 |
| _CLIENT_CONFIRM_TIMEOUT = 60.0 |
| |
| @classmethod |
| def _NotifyUsingSocket(cls, cb, errcls): |
| """Opens a Unix socket and waits for another program to connect. |
| |
| @type cb: callable |
| @param cb: Callback to send socket name to client |
| @type errcls: class |
| @param errcls: Exception class to use for errors |
| |
| """ |
| # Using a temporary directory as there's no easy way to create temporary |
| # sockets without writing a custom loop around tempfile.mktemp and |
| # socket.bind |
| tmpdir = tempfile.mkdtemp() |
| try: |
| tmpsock = utils.PathJoin(tmpdir, "sock") |
| |
| logging.debug("Creating temporary socket at %s", tmpsock) |
| sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| try: |
| sock.bind(tmpsock) |
| sock.listen(1) |
| |
| # Send details to client |
| cb(tmpsock) |
| |
| # Wait for client to connect before continuing |
| sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) |
| try: |
| (conn, _) = sock.accept() |
| except socket.error, err: |
| raise errcls("Client didn't connect in time (%s)" % err) |
| finally: |
| sock.close() |
| finally: |
| # Remove as soon as client is connected |
| shutil.rmtree(tmpdir) |
| |
| # Wait for client to close |
| try: |
| try: |
| # pylint: disable=E1101 |
| # Instance of '_socketobject' has no ... member |
| conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) |
| conn.recv(1) |
| except socket.error, err: |
| raise errcls("Client failed to confirm notification (%s)" % err) |
| finally: |
| conn.close() |
| |
| def _SendNotification(self, test, arg, sockname): |
| """Sends a notification to the client. |
| |
| @type test: string |
| @param test: Test name |
| @param arg: Test argument (depends on test) |
| @type sockname: string |
| @param sockname: Socket path |
| |
| """ |
| self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) |
| |
| def _Notify(self, prereq, test, arg): |
| """Notifies the client of a test. |
| |
| @type prereq: bool |
| @param prereq: Whether this is a prereq-phase test |
| @type test: string |
| @param test: Test name |
| @param arg: Test argument (depends on test) |
| |
| """ |
| if prereq: |
| errcls = errors.OpPrereqError |
| else: |
| errcls = errors.OpExecError |
| |
| return self._NotifyUsingSocket(compat.partial(self._SendNotification, |
| test, arg), |
| errcls) |
| |
| def CheckArguments(self): |
| self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 |
| self.expandnames_calls = 0 |
| |
| def ExpandNames(self): |
| checkargs_calls = getattr(self, "checkargs_calls", 0) |
| if checkargs_calls < 1: |
| raise errors.ProgrammerError("CheckArguments was not called") |
| |
| self.expandnames_calls += 1 |
| |
| if self.op.notify_waitlock: |
| self._Notify(True, constants.JQT_EXPANDNAMES, None) |
| |
| self.LogInfo("Expanding names") |
| |
| # Get lock on master node (just to get a lock, not for a particular reason) |
| self.needed_locks = { |
| locking.LEVEL_NODE: self.cfg.GetMasterNode(), |
| } |
| |
| def Exec(self, feedback_fn): |
| if self.expandnames_calls < 1: |
| raise errors.ProgrammerError("ExpandNames was not called") |
| |
| if self.op.notify_exec: |
| self._Notify(False, constants.JQT_EXEC, None) |
| |
| self.LogInfo("Executing") |
| |
| if self.op.log_messages: |
| self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) |
| for idx, msg in enumerate(self.op.log_messages): |
| self.LogInfo("Sending log message %s", idx + 1) |
| feedback_fn(constants.JQT_MSGPREFIX + msg) |
| # Report how many test messages have been sent |
| self._Notify(False, constants.JQT_LOGMSG, idx + 1) |
| |
| if self.op.fail: |
| raise errors.OpExecError("Opcode failure was requested") |
| |
| return True |
| |
| |
| class LUTestAllocator(NoHooksLU): |
| """Run allocator tests. |
| |
| This LU runs the allocator tests |
| |
| """ |
| def CheckPrereq(self): |
| """Check prerequisites. |
| |
| This checks the opcode parameters depending on the director and mode test. |
| |
| """ |
| if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC, |
| constants.IALLOCATOR_MODE_MULTI_ALLOC): |
| for attr in ["memory", "disks", "disk_template", |
| "os", "tags", "nics", "vcpus"]: |
| if not hasattr(self.op, attr): |
| raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % |
| attr, errors.ECODE_INVAL) |
| (self.inst_uuid, iname) = self.cfg.ExpandInstanceName(self.op.name) |
| if iname is not None: |
| raise errors.OpPrereqError("Instance '%s' already in the cluster" % |
| iname, errors.ECODE_EXISTS) |
| if not isinstance(self.op.nics, list): |
| raise errors.OpPrereqError("Invalid parameter 'nics'", |
| errors.ECODE_INVAL) |
| if not isinstance(self.op.disks, list): |
| raise errors.OpPrereqError("Invalid parameter 'disks'", |
| errors.ECODE_INVAL) |
| for row in self.op.disks: |
| if (not isinstance(row, dict) or |
| constants.IDISK_SIZE not in row or |
| not isinstance(row[constants.IDISK_SIZE], int) or |
| constants.IDISK_MODE not in row or |
| row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET): |
| raise errors.OpPrereqError("Invalid contents of the 'disks'" |
| " parameter", errors.ECODE_INVAL) |
| if self.op.hypervisor is None: |
| self.op.hypervisor = self.cfg.GetHypervisorType() |
| elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: |
| (fuuid, fname) = ExpandInstanceUuidAndName(self.cfg, None, self.op.name) |
| self.op.name = fname |
| self.relocate_from_node_uuids = \ |
| list(self.cfg.GetInstanceInfo(fuuid).secondary_nodes) |
| elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, |
| constants.IALLOCATOR_MODE_NODE_EVAC): |
| if not self.op.instances: |
| raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL) |
| (_, self.op.instances) = GetWantedInstances(self, self.op.instances) |
| else: |
| raise errors.OpPrereqError("Invalid test allocator mode '%s'" % |
| self.op.mode, errors.ECODE_INVAL) |
| |
| if self.op.direction == constants.IALLOCATOR_DIR_OUT: |
| if self.op.iallocator is None: |
| raise errors.OpPrereqError("Missing allocator name", |
| errors.ECODE_INVAL) |
| elif self.op.direction != constants.IALLOCATOR_DIR_IN: |
| raise errors.OpPrereqError("Wrong allocator test '%s'" % |
| self.op.direction, errors.ECODE_INVAL) |
| |
| def Exec(self, feedback_fn): |
| """Run the allocator test. |
| |
| """ |
| if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: |
| req = iallocator.IAReqInstanceAlloc(name=self.op.name, |
| memory=self.op.memory, |
| disks=self.op.disks, |
| disk_template=self.op.disk_template, |
| os=self.op.os, |
| tags=self.op.tags, |
| nics=self.op.nics, |
| vcpus=self.op.vcpus, |
| spindle_use=self.op.spindle_use, |
| hypervisor=self.op.hypervisor, |
| node_whitelist=None) |
| elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: |
| req = iallocator.IAReqRelocate( |
| inst_uuid=self.inst_uuid, |
| relocate_from_node_uuids=list(self.relocate_from_node_uuids)) |
| elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: |
| req = iallocator.IAReqGroupChange(instances=self.op.instances, |
| target_groups=self.op.target_groups) |
| elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: |
| req = iallocator.IAReqNodeEvac(instances=self.op.instances, |
| evac_mode=self.op.evac_mode) |
| elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC: |
| disk_template = self.op.disk_template |
| insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx), |
| memory=self.op.memory, |
| disks=self.op.disks, |
| disk_template=disk_template, |
| os=self.op.os, |
| tags=self.op.tags, |
| nics=self.op.nics, |
| vcpus=self.op.vcpus, |
| spindle_use=self.op.spindle_use, |
| hypervisor=self.op.hypervisor) |
| for idx in range(self.op.count)] |
| req = iallocator.IAReqMultiInstanceAlloc(instances=insts) |
| else: |
| raise errors.ProgrammerError("Uncatched mode %s in" |
| " LUTestAllocator.Exec", self.op.mode) |
| |
| ial = iallocator.IAllocator(self.cfg, self.rpc, req) |
| if self.op.direction == constants.IALLOCATOR_DIR_IN: |
| result = ial.in_text |
| else: |
| ial.Run(self.op.iallocator, validate=False) |
| result = ial.out_text |
| return result |