| # |
| # |
| |
| # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # 1. Redistributions of source code must retain the above copyright notice, |
| # this list of conditions and the following disclaimer. |
| # |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
| # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
| # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| |
| """Module implementing the job queue handling. |
| |
| """ |
| |
| import logging |
| import errno |
| import time |
| import weakref |
| import threading |
| import itertools |
| import operator |
| import os |
| |
| try: |
| # pylint: disable=E0611 |
| from pyinotify import pyinotify |
| except ImportError: |
| import pyinotify |
| |
| from ganeti import asyncnotifier |
| from ganeti import constants |
| from ganeti import serializer |
| from ganeti import locking |
| from ganeti import luxi |
| from ganeti import opcodes |
| from ganeti import opcodes_base |
| from ganeti import errors |
| from ganeti import mcpu |
| from ganeti import utils |
| from ganeti import jstore |
| import ganeti.rpc.node as rpc |
| from ganeti import runtime |
| from ganeti import netutils |
| from ganeti import compat |
| from ganeti import ht |
| from ganeti import query |
| from ganeti import qlang |
| from ganeti import pathutils |
| from ganeti import vcluster |
| from ganeti.cmdlib import cluster |
| |
| |
| #: Retrieves "id" attribute |
| _GetIdAttr = operator.attrgetter("id") |
| |
| |
| class CancelJob(Exception): |
| """Special exception to cancel a job. |
| |
| """ |
| |
| |
| def TimeStampNow(): |
| """Returns the current timestamp. |
| |
| @rtype: tuple |
| @return: the current time in the (seconds, microseconds) format |
| |
| """ |
| return utils.SplitTime(time.time()) |
| |
| |
| def _CallJqUpdate(runner, names, file_name, content): |
| """Updates job queue file after virtualizing filename. |
| |
| """ |
| virt_file_name = vcluster.MakeVirtualPath(file_name) |
| return runner.call_jobqueue_update(names, virt_file_name, content) |
| |
| |
| class _QueuedOpCode(object): |
| """Encapsulates an opcode object. |
| |
| @ivar log: holds the execution log and consists of tuples |
| of the form C{(log_serial, timestamp, level, message)} |
| @ivar input: the OpCode we encapsulate |
| @ivar status: the current status |
| @ivar result: the result of the LU execution |
| @ivar start_timestamp: timestamp for the start of the execution |
| @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation |
| @ivar stop_timestamp: timestamp for the end of the execution |
| |
| """ |
| __slots__ = ["input", "status", "result", "log", "priority", |
| "start_timestamp", "exec_timestamp", "end_timestamp", |
| "__weakref__"] |
| |
| def __init__(self, op): |
| """Initializes instances of this class. |
| |
| @type op: L{opcodes.OpCode} |
| @param op: the opcode we encapsulate |
| |
| """ |
| self.input = op |
| self.status = constants.OP_STATUS_QUEUED |
| self.result = None |
| self.log = [] |
| self.start_timestamp = None |
| self.exec_timestamp = None |
| self.end_timestamp = None |
| |
| # Get initial priority (it might change during the lifetime of this opcode) |
| self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT) |
| |
| @classmethod |
| def Restore(cls, state): |
| """Restore the _QueuedOpCode from the serialized form. |
| |
| @type state: dict |
| @param state: the serialized state |
| @rtype: _QueuedOpCode |
| @return: a new _QueuedOpCode instance |
| |
| """ |
| obj = _QueuedOpCode.__new__(cls) |
| obj.input = opcodes.OpCode.LoadOpCode(state["input"]) |
| obj.status = state["status"] |
| obj.result = state["result"] |
| obj.log = state["log"] |
| obj.start_timestamp = state.get("start_timestamp", None) |
| obj.exec_timestamp = state.get("exec_timestamp", None) |
| obj.end_timestamp = state.get("end_timestamp", None) |
| obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) |
| return obj |
| |
| def Serialize(self): |
| """Serializes this _QueuedOpCode. |
| |
| @rtype: dict |
| @return: the dictionary holding the serialized state |
| |
| """ |
| return { |
| "input": self.input.__getstate__(), |
| "status": self.status, |
| "result": self.result, |
| "log": self.log, |
| "start_timestamp": self.start_timestamp, |
| "exec_timestamp": self.exec_timestamp, |
| "end_timestamp": self.end_timestamp, |
| "priority": self.priority, |
| } |
| |
| |
| class _QueuedJob(object): |
| """In-memory job representation. |
| |
| This is what we use to track the user-submitted jobs. Locking must |
| be taken care of by users of this class. |
| |
| @type queue: L{JobQueue} |
| @ivar queue: the parent queue |
| @ivar id: the job ID |
| @type ops: list |
| @ivar ops: the list of _QueuedOpCode that constitute the job |
| @type log_serial: int |
| @ivar log_serial: holds the index for the next log entry |
| @ivar received_timestamp: the timestamp for when the job was received |
| @ivar start_timestmap: the timestamp for start of execution |
| @ivar end_timestamp: the timestamp for end of execution |
| @ivar writable: Whether the job is allowed to be modified |
| |
| """ |
| # pylint: disable=W0212 |
| __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", |
| "received_timestamp", "start_timestamp", "end_timestamp", |
| "writable", "archived", |
| "livelock", "process_id", |
| "__weakref__"] |
| |
| def AddReasons(self, pickup=False): |
| """Extend the reason trail |
| |
| Add the reason for all the opcodes of this job to be executed. |
| |
| """ |
| count = 0 |
| for queued_op in self.ops: |
| op = queued_op.input |
| if pickup: |
| reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP |
| else: |
| reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE |
| reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__, |
| reason_src_prefix) |
| reason_text = "job=%d;index=%d" % (self.id, count) |
| reason = getattr(op, "reason", []) |
| reason.append((reason_src, reason_text, utils.EpochNano())) |
| op.reason = reason |
| count = count + 1 |
| |
| def __init__(self, queue, job_id, ops, writable): |
| """Constructor for the _QueuedJob. |
| |
| @type queue: L{JobQueue} |
| @param queue: our parent queue |
| @type job_id: job_id |
| @param job_id: our job id |
| @type ops: list |
| @param ops: the list of opcodes we hold, which will be encapsulated |
| in _QueuedOpCodes |
| @type writable: bool |
| @param writable: Whether job can be modified |
| |
| """ |
| if not ops: |
| raise errors.GenericError("A job needs at least one opcode") |
| |
| self.queue = queue |
| self.id = int(job_id) |
| self.ops = [_QueuedOpCode(op) for op in ops] |
| self.AddReasons() |
| self.log_serial = 0 |
| self.received_timestamp = TimeStampNow() |
| self.start_timestamp = None |
| self.end_timestamp = None |
| self.archived = False |
| self.livelock = None |
| self.process_id = None |
| |
| self._InitInMemory(self, writable) |
| |
| assert not self.archived, "New jobs can not be marked as archived" |
| |
| @staticmethod |
| def _InitInMemory(obj, writable): |
| """Initializes in-memory variables. |
| |
| """ |
| obj.writable = writable |
| obj.ops_iter = None |
| obj.cur_opctx = None |
| |
| def __repr__(self): |
| status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), |
| "id=%s" % self.id, |
| "ops=%s" % ",".join([op.input.Summary() for op in self.ops])] |
| |
| return "<%s at %#x>" % (" ".join(status), id(self)) |
| |
| @classmethod |
| def Restore(cls, queue, state, writable, archived): |
| """Restore a _QueuedJob from serialized state: |
| |
| @type queue: L{JobQueue} |
| @param queue: to which queue the restored job belongs |
| @type state: dict |
| @param state: the serialized state |
| @type writable: bool |
| @param writable: Whether job can be modified |
| @type archived: bool |
| @param archived: Whether job was already archived |
| @rtype: _JobQueue |
| @return: the restored _JobQueue instance |
| |
| """ |
| obj = _QueuedJob.__new__(cls) |
| obj.queue = queue |
| obj.id = int(state["id"]) |
| obj.received_timestamp = state.get("received_timestamp", None) |
| obj.start_timestamp = state.get("start_timestamp", None) |
| obj.end_timestamp = state.get("end_timestamp", None) |
| obj.archived = archived |
| obj.livelock = state.get("livelock", None) |
| obj.process_id = state.get("process_id", None) |
| if obj.process_id is not None: |
| obj.process_id = int(obj.process_id) |
| |
| obj.ops = [] |
| obj.log_serial = 0 |
| for op_state in state["ops"]: |
| op = _QueuedOpCode.Restore(op_state) |
| for log_entry in op.log: |
| obj.log_serial = max(obj.log_serial, log_entry[0]) |
| obj.ops.append(op) |
| |
| cls._InitInMemory(obj, writable) |
| |
| return obj |
| |
| def Serialize(self): |
| """Serialize the _JobQueue instance. |
| |
| @rtype: dict |
| @return: the serialized state |
| |
| """ |
| return { |
| "id": self.id, |
| "ops": [op.Serialize() for op in self.ops], |
| "start_timestamp": self.start_timestamp, |
| "end_timestamp": self.end_timestamp, |
| "received_timestamp": self.received_timestamp, |
| "livelock": self.livelock, |
| "process_id": self.process_id, |
| } |
| |
| def CalcStatus(self): |
| """Compute the status of this job. |
| |
| This function iterates over all the _QueuedOpCodes in the job and |
| based on their status, computes the job status. |
| |
| The algorithm is: |
| - if we find a cancelled, or finished with error, the job |
| status will be the same |
| - otherwise, the last opcode with the status one of: |
| - waitlock |
| - canceling |
| - running |
| |
| will determine the job status |
| |
| - otherwise, it means either all opcodes are queued, or success, |
| and the job status will be the same |
| |
| @return: the job status |
| |
| """ |
| status = constants.JOB_STATUS_QUEUED |
| |
| all_success = True |
| for op in self.ops: |
| if op.status == constants.OP_STATUS_SUCCESS: |
| continue |
| |
| all_success = False |
| |
| if op.status == constants.OP_STATUS_QUEUED: |
| pass |
| elif op.status == constants.OP_STATUS_WAITING: |
| status = constants.JOB_STATUS_WAITING |
| elif op.status == constants.OP_STATUS_RUNNING: |
| status = constants.JOB_STATUS_RUNNING |
| elif op.status == constants.OP_STATUS_CANCELING: |
| status = constants.JOB_STATUS_CANCELING |
| break |
| elif op.status == constants.OP_STATUS_ERROR: |
| status = constants.JOB_STATUS_ERROR |
| # The whole job fails if one opcode failed |
| break |
| elif op.status == constants.OP_STATUS_CANCELED: |
| status = constants.OP_STATUS_CANCELED |
| break |
| |
| if all_success: |
| status = constants.JOB_STATUS_SUCCESS |
| |
| return status |
| |
| def CalcPriority(self): |
| """Gets the current priority for this job. |
| |
| Only unfinished opcodes are considered. When all are done, the default |
| priority is used. |
| |
| @rtype: int |
| |
| """ |
| priorities = [op.priority for op in self.ops |
| if op.status not in constants.OPS_FINALIZED] |
| |
| if not priorities: |
| # All opcodes are done, assume default priority |
| return constants.OP_PRIO_DEFAULT |
| |
| return min(priorities) |
| |
| def GetLogEntries(self, newer_than): |
| """Selectively returns the log entries. |
| |
| @type newer_than: None or int |
| @param newer_than: if this is None, return all log entries, |
| otherwise return only the log entries with serial higher |
| than this value |
| @rtype: list |
| @return: the list of the log entries selected |
| |
| """ |
| if newer_than is None: |
| serial = -1 |
| else: |
| serial = newer_than |
| |
| entries = [] |
| for op in self.ops: |
| entries.extend(filter(lambda entry: entry[0] > serial, op.log)) |
| |
| return entries |
| |
| def MarkUnfinishedOps(self, status, result): |
| """Mark unfinished opcodes with a given status and result. |
| |
| This is an utility function for marking all running or waiting to |
| be run opcodes with a given status. Opcodes which are already |
| finalised are not changed. |
| |
| @param status: a given opcode status |
| @param result: the opcode result |
| |
| """ |
| not_marked = True |
| for op in self.ops: |
| if op.status in constants.OPS_FINALIZED: |
| assert not_marked, "Finalized opcodes found after non-finalized ones" |
| continue |
| op.status = status |
| op.result = result |
| not_marked = False |
| |
| def Finalize(self): |
| """Marks the job as finalized. |
| |
| """ |
| self.end_timestamp = TimeStampNow() |
| |
| def Cancel(self): |
| """Marks job as canceled/-ing if possible. |
| |
| @rtype: tuple; (bool, string) |
| @return: Boolean describing whether job was successfully canceled or marked |
| as canceling and a text message |
| |
| """ |
| status = self.CalcStatus() |
| |
| if status == constants.JOB_STATUS_QUEUED: |
| self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, |
| "Job canceled by request") |
| self.Finalize() |
| return (True, "Job %s canceled" % self.id) |
| |
| elif status == constants.JOB_STATUS_WAITING: |
| # The worker will notice the new status and cancel the job |
| self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) |
| return (True, "Job %s will be canceled" % self.id) |
| |
| else: |
| logging.debug("Job %s is no longer waiting in the queue", self.id) |
| return (False, "Job %s is no longer waiting in the queue" % self.id) |
| |
| def ChangePriority(self, priority): |
| """Changes the job priority. |
| |
| @type priority: int |
| @param priority: New priority |
| @rtype: tuple; (bool, string) |
| @return: Boolean describing whether job's priority was successfully changed |
| and a text message |
| |
| """ |
| status = self.CalcStatus() |
| |
| if status in constants.JOBS_FINALIZED: |
| return (False, "Job %s is finished" % self.id) |
| elif status == constants.JOB_STATUS_CANCELING: |
| return (False, "Job %s is cancelling" % self.id) |
| else: |
| assert status in (constants.JOB_STATUS_QUEUED, |
| constants.JOB_STATUS_WAITING, |
| constants.JOB_STATUS_RUNNING) |
| |
| changed = False |
| for op in self.ops: |
| if (op.status == constants.OP_STATUS_RUNNING or |
| op.status in constants.OPS_FINALIZED): |
| assert not changed, \ |
| ("Found opcode for which priority should not be changed after" |
| " priority has been changed for previous opcodes") |
| continue |
| |
| assert op.status in (constants.OP_STATUS_QUEUED, |
| constants.OP_STATUS_WAITING) |
| |
| changed = True |
| |
| # Set new priority (doesn't modify opcode input) |
| op.priority = priority |
| |
| if changed: |
| return (True, ("Priorities of pending opcodes for job %s have been" |
| " changed to %s" % (self.id, priority))) |
| else: |
| return (False, "Job %s had no pending opcodes" % self.id) |
| |
| def SetPid(self, pid): |
| """Sets the job's process ID |
| |
| @type pid: int |
| @param pid: the process ID |
| |
| """ |
| status = self.CalcStatus() |
| |
| if status in (constants.JOB_STATUS_QUEUED, |
| constants.JOB_STATUS_WAITING): |
| if self.process_id is not None: |
| logging.warning("Replacing the process id %s of job %s with %s", |
| self.process_id, self.id, pid) |
| self.process_id = pid |
| else: |
| logging.warning("Can set pid only for queued/waiting jobs") |
| |
| |
| class _OpExecCallbacks(mcpu.OpExecCbBase): |
| |
| def __init__(self, queue, job, op): |
| """Initializes this class. |
| |
| @type queue: L{JobQueue} |
| @param queue: Job queue |
| @type job: L{_QueuedJob} |
| @param job: Job object |
| @type op: L{_QueuedOpCode} |
| @param op: OpCode |
| |
| """ |
| super(_OpExecCallbacks, self).__init__() |
| |
| assert queue, "Queue is missing" |
| assert job, "Job is missing" |
| assert op, "Opcode is missing" |
| |
| self._queue = queue |
| self._job = job |
| self._op = op |
| |
| def _CheckCancel(self): |
| """Raises an exception to cancel the job if asked to. |
| |
| """ |
| # Cancel here if we were asked to |
| if self._op.status == constants.OP_STATUS_CANCELING: |
| logging.debug("Canceling opcode") |
| raise CancelJob() |
| |
| def NotifyStart(self): |
| """Mark the opcode as running, not lock-waiting. |
| |
| This is called from the mcpu code as a notifier function, when the LU is |
| finally about to start the Exec() method. Of course, to have end-user |
| visible results, the opcode must be initially (before calling into |
| Processor.ExecOpCode) set to OP_STATUS_WAITING. |
| |
| """ |
| assert self._op in self._job.ops |
| assert self._op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING) |
| |
| # Cancel here if we were asked to |
| self._CheckCancel() |
| |
| logging.debug("Opcode is now running") |
| |
| self._op.status = constants.OP_STATUS_RUNNING |
| self._op.exec_timestamp = TimeStampNow() |
| |
| # And finally replicate the job status |
| self._queue.UpdateJobUnlocked(self._job) |
| |
| def NotifyRetry(self): |
| """Mark opcode again as lock-waiting. |
| |
| This is called from the mcpu code just after calling PrepareRetry. |
| The opcode will now again acquire locks (more, hopefully). |
| |
| """ |
| self._op.status = constants.OP_STATUS_WAITING |
| logging.debug("Opcode will be retried. Back to waiting.") |
| |
| def _AppendFeedback(self, timestamp, log_type, log_msgs): |
| """Internal feedback append function, with locks |
| |
| @type timestamp: tuple (int, int) |
| @param timestamp: timestamp of the log message |
| |
| @type log_type: string |
| @param log_type: log type (one of Types.ELogType) |
| |
| @type log_msgs: any |
| @param log_msgs: log data to append |
| """ |
| |
| # This should be removed once Feedback() has a clean interface. |
| # Feedback can be called with anything, we interpret ELogMessageList as |
| # messages that have to be individually added to the log list, but pushed |
| # in a single update. Other msgtypes are only transparently passed forward. |
| if log_type == constants.ELOG_MESSAGE_LIST: |
| log_type = constants.ELOG_MESSAGE |
| else: |
| log_msgs = [log_msgs] |
| |
| for msg in log_msgs: |
| self._job.log_serial += 1 |
| self._op.log.append((self._job.log_serial, timestamp, log_type, msg)) |
| self._queue.UpdateJobUnlocked(self._job, replicate=False) |
| |
| # TODO: Cleanup calling conventions, make them explicit |
| def Feedback(self, *args): |
| """Append a log entry. |
| |
| Calling conventions: |
| arg[0]: (optional) string, message type (Types.ELogType) |
| arg[1]: data to be interpreted as a message |
| """ |
| assert len(args) < 3 |
| |
| # TODO: Use separate keyword arguments for a single string vs. a list. |
| if len(args) == 1: |
| log_type = constants.ELOG_MESSAGE |
| log_msg = args[0] |
| else: |
| (log_type, log_msg) = args |
| |
| # The time is split to make serialization easier and not lose |
| # precision. |
| timestamp = utils.SplitTime(time.time()) |
| self._AppendFeedback(timestamp, log_type, log_msg) |
| |
| def CurrentPriority(self): |
| """Returns current priority for opcode. |
| |
| """ |
| assert self._op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING) |
| |
| # Cancel here if we were asked to |
| self._CheckCancel() |
| |
| return self._op.priority |
| |
| def SubmitManyJobs(self, jobs): |
| """Submits jobs for processing. |
| |
| See L{JobQueue.SubmitManyJobs}. |
| |
| """ |
| # Locking is done in job queue |
| return self._queue.SubmitManyJobs(jobs) |
| |
| |
| def _EncodeOpError(err): |
| """Encodes an error which occurred while processing an opcode. |
| |
| """ |
| if isinstance(err, errors.GenericError): |
| to_encode = err |
| else: |
| to_encode = errors.OpExecError(str(err)) |
| |
| return errors.EncodeException(to_encode) |
| |
| |
| class _TimeoutStrategyWrapper: |
| def __init__(self, fn): |
| """Initializes this class. |
| |
| """ |
| self._fn = fn |
| self._next = None |
| |
| def _Advance(self): |
| """Gets the next timeout if necessary. |
| |
| """ |
| if self._next is None: |
| self._next = self._fn() |
| |
| def Peek(self): |
| """Returns the next timeout. |
| |
| """ |
| self._Advance() |
| return self._next |
| |
| def Next(self): |
| """Returns the current timeout and advances the internal state. |
| |
| """ |
| self._Advance() |
| result = self._next |
| self._next = None |
| return result |
| |
| |
| class _OpExecContext: |
| def __init__(self, op, index, log_prefix, timeout_strategy_factory): |
| """Initializes this class. |
| |
| """ |
| self.op = op |
| self.index = index |
| self.log_prefix = log_prefix |
| self.summary = op.input.Summary() |
| |
| # Create local copy to modify |
| if getattr(op.input, opcodes_base.DEPEND_ATTR, None): |
| self.jobdeps = op.input.depends[:] |
| else: |
| self.jobdeps = None |
| |
| self._timeout_strategy_factory = timeout_strategy_factory |
| self._ResetTimeoutStrategy() |
| |
| def _ResetTimeoutStrategy(self): |
| """Creates a new timeout strategy. |
| |
| """ |
| self._timeout_strategy = \ |
| _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) |
| |
| def CheckPriorityIncrease(self): |
| """Checks whether priority can and should be increased. |
| |
| Called when locks couldn't be acquired. |
| |
| """ |
| op = self.op |
| |
| # Exhausted all retries and next round should not use blocking acquire |
| # for locks? |
| if (self._timeout_strategy.Peek() is None and |
| op.priority > constants.OP_PRIO_HIGHEST): |
| logging.debug("Increasing priority") |
| op.priority -= 1 |
| self._ResetTimeoutStrategy() |
| return True |
| |
| return False |
| |
| def GetNextLockTimeout(self): |
| """Returns the next lock acquire timeout. |
| |
| """ |
| return self._timeout_strategy.Next() |
| |
| |
| class _JobProcessor(object): |
| (DEFER, |
| WAITDEP, |
| FINISHED) = range(1, 4) |
| |
| def __init__(self, queue, opexec_fn, job, |
| _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): |
| """Initializes this class. |
| |
| """ |
| self.queue = queue |
| self.opexec_fn = opexec_fn |
| self.job = job |
| self._timeout_strategy_factory = _timeout_strategy_factory |
| |
| @staticmethod |
| def _FindNextOpcode(job, timeout_strategy_factory): |
| """Locates the next opcode to run. |
| |
| @type job: L{_QueuedJob} |
| @param job: Job object |
| @param timeout_strategy_factory: Callable to create new timeout strategy |
| |
| """ |
| # Create some sort of a cache to speed up locating next opcode for future |
| # lookups |
| # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for |
| # pending and one for processed ops. |
| if job.ops_iter is None: |
| job.ops_iter = enumerate(job.ops) |
| |
| # Find next opcode to run |
| while True: |
| try: |
| (idx, op) = job.ops_iter.next() |
| except StopIteration: |
| raise errors.ProgrammerError("Called for a finished job") |
| |
| if op.status == constants.OP_STATUS_RUNNING: |
| # Found an opcode already marked as running |
| raise errors.ProgrammerError("Called for job marked as running") |
| |
| opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), |
| timeout_strategy_factory) |
| |
| if op.status not in constants.OPS_FINALIZED: |
| return opctx |
| |
| # This is a job that was partially completed before master daemon |
| # shutdown, so it can be expected that some opcodes are already |
| # completed successfully (if any did error out, then the whole job |
| # should have been aborted and not resubmitted for processing). |
| logging.info("%s: opcode %s already processed, skipping", |
| opctx.log_prefix, opctx.summary) |
| |
| @staticmethod |
| def _MarkWaitlock(job, op): |
| """Marks an opcode as waiting for locks. |
| |
| The job's start timestamp is also set if necessary. |
| |
| @type job: L{_QueuedJob} |
| @param job: Job object |
| @type op: L{_QueuedOpCode} |
| @param op: Opcode object |
| |
| """ |
| assert op in job.ops |
| assert op.status in (constants.OP_STATUS_QUEUED, |
| constants.OP_STATUS_WAITING) |
| |
| update = False |
| |
| op.result = None |
| |
| if op.status == constants.OP_STATUS_QUEUED: |
| op.status = constants.OP_STATUS_WAITING |
| update = True |
| |
| if op.start_timestamp is None: |
| op.start_timestamp = TimeStampNow() |
| update = True |
| |
| if job.start_timestamp is None: |
| job.start_timestamp = op.start_timestamp |
| update = True |
| |
| assert op.status == constants.OP_STATUS_WAITING |
| |
| return update |
| |
| @staticmethod |
| def _CheckDependencies(queue, job, opctx): |
| """Checks if an opcode has dependencies and if so, processes them. |
| |
| @type queue: L{JobQueue} |
| @param queue: Queue object |
| @type job: L{_QueuedJob} |
| @param job: Job object |
| @type opctx: L{_OpExecContext} |
| @param opctx: Opcode execution context |
| @rtype: bool |
| @return: Whether opcode will be re-scheduled by dependency tracker |
| |
| """ |
| op = opctx.op |
| |
| result = False |
| |
| while opctx.jobdeps: |
| (dep_job_id, dep_status) = opctx.jobdeps[0] |
| |
| (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id, |
| dep_status) |
| assert ht.TNonEmptyString(depmsg), "No dependency message" |
| |
| logging.info("%s: %s", opctx.log_prefix, depmsg) |
| |
| if depresult == _JobDependencyManager.CONTINUE: |
| # Remove dependency and continue |
| opctx.jobdeps.pop(0) |
| |
| elif depresult == _JobDependencyManager.WAIT: |
| # Need to wait for notification, dependency tracker will re-add job |
| # to workerpool |
| result = True |
| break |
| |
| elif depresult == _JobDependencyManager.CANCEL: |
| # Job was cancelled, cancel this job as well |
| job.Cancel() |
| assert op.status == constants.OP_STATUS_CANCELING |
| break |
| |
| elif depresult in (_JobDependencyManager.WRONGSTATUS, |
| _JobDependencyManager.ERROR): |
| # Job failed or there was an error, this job must fail |
| op.status = constants.OP_STATUS_ERROR |
| op.result = _EncodeOpError(errors.OpExecError(depmsg)) |
| break |
| |
| else: |
| raise errors.ProgrammerError("Unknown dependency result '%s'" % |
| depresult) |
| |
| return result |
| |
| def _ExecOpCodeUnlocked(self, opctx): |
| """Processes one opcode and returns the result. |
| |
| """ |
| op = opctx.op |
| |
| assert op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING) |
| |
| # The very last check if the job was cancelled before trying to execute |
| if op.status == constants.OP_STATUS_CANCELING: |
| return (constants.OP_STATUS_CANCELING, None) |
| |
| timeout = opctx.GetNextLockTimeout() |
| |
| try: |
| # Make sure not to hold queue lock while calling ExecOpCode |
| result = self.opexec_fn(op.input, |
| _OpExecCallbacks(self.queue, self.job, op), |
| timeout=timeout) |
| except mcpu.LockAcquireTimeout: |
| assert timeout is not None, "Received timeout for blocking acquire" |
| logging.debug("Couldn't acquire locks in %0.6fs", timeout) |
| |
| assert op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING) |
| |
| # Was job cancelled while we were waiting for the lock? |
| if op.status == constants.OP_STATUS_CANCELING: |
| return (constants.OP_STATUS_CANCELING, None) |
| |
| # Stay in waitlock while trying to re-acquire lock |
| return (constants.OP_STATUS_WAITING, None) |
| except CancelJob: |
| logging.exception("%s: Canceling job", opctx.log_prefix) |
| assert op.status == constants.OP_STATUS_CANCELING |
| return (constants.OP_STATUS_CANCELING, None) |
| |
| except Exception, err: # pylint: disable=W0703 |
| logging.exception("%s: Caught exception in %s", |
| opctx.log_prefix, opctx.summary) |
| return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) |
| else: |
| logging.debug("%s: %s successful", |
| opctx.log_prefix, opctx.summary) |
| return (constants.OP_STATUS_SUCCESS, result) |
| |
| def __call__(self, _nextop_fn=None): |
| """Continues execution of a job. |
| |
| @param _nextop_fn: Callback function for tests |
| @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should |
| be deferred and C{WAITDEP} if the dependency manager |
| (L{_JobDependencyManager}) will re-schedule the job when appropriate |
| |
| """ |
| queue = self.queue |
| job = self.job |
| |
| logging.debug("Processing job %s", job.id) |
| |
| try: |
| opcount = len(job.ops) |
| |
| assert job.writable, "Expected writable job" |
| |
| # Don't do anything for finalized jobs |
| if job.CalcStatus() in constants.JOBS_FINALIZED: |
| return self.FINISHED |
| |
| # Is a previous opcode still pending? |
| if job.cur_opctx: |
| opctx = job.cur_opctx |
| job.cur_opctx = None |
| else: |
| if __debug__ and _nextop_fn: |
| _nextop_fn() |
| opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) |
| |
| op = opctx.op |
| |
| # Consistency check |
| assert compat.all(i.status in (constants.OP_STATUS_QUEUED, |
| constants.OP_STATUS_CANCELING) |
| for i in job.ops[opctx.index + 1:]) |
| |
| assert op.status in (constants.OP_STATUS_QUEUED, |
| constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING) |
| |
| assert (op.priority <= constants.OP_PRIO_LOWEST and |
| op.priority >= constants.OP_PRIO_HIGHEST) |
| |
| waitjob = None |
| |
| if op.status != constants.OP_STATUS_CANCELING: |
| assert op.status in (constants.OP_STATUS_QUEUED, |
| constants.OP_STATUS_WAITING) |
| |
| # Prepare to start opcode |
| if self._MarkWaitlock(job, op): |
| # Write to disk |
| queue.UpdateJobUnlocked(job) |
| |
| assert op.status == constants.OP_STATUS_WAITING |
| assert job.CalcStatus() == constants.JOB_STATUS_WAITING |
| assert job.start_timestamp and op.start_timestamp |
| assert waitjob is None |
| |
| # Check if waiting for a job is necessary |
| waitjob = self._CheckDependencies(queue, job, opctx) |
| |
| assert op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_CANCELING, |
| constants.OP_STATUS_ERROR) |
| |
| if not (waitjob or op.status in (constants.OP_STATUS_CANCELING, |
| constants.OP_STATUS_ERROR)): |
| logging.info("%s: opcode %s waiting for locks", |
| opctx.log_prefix, opctx.summary) |
| |
| assert not opctx.jobdeps, "Not all dependencies were removed" |
| |
| (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) |
| |
| op.status = op_status |
| op.result = op_result |
| |
| assert not waitjob |
| |
| if op.status in (constants.OP_STATUS_WAITING, |
| constants.OP_STATUS_QUEUED): |
| # waiting: Couldn't get locks in time |
| # queued: Queue is shutting down |
| assert not op.end_timestamp |
| else: |
| # Finalize opcode |
| op.end_timestamp = TimeStampNow() |
| |
| if op.status == constants.OP_STATUS_CANCELING: |
| assert not compat.any(i.status != constants.OP_STATUS_CANCELING |
| for i in job.ops[opctx.index:]) |
| else: |
| assert op.status in constants.OPS_FINALIZED |
| |
| if op.status == constants.OP_STATUS_QUEUED: |
| # Queue is shutting down |
| assert not waitjob |
| |
| finalize = False |
| |
| # Reset context |
| job.cur_opctx = None |
| |
| # In no case must the status be finalized here |
| assert job.CalcStatus() == constants.JOB_STATUS_QUEUED |
| |
| elif op.status == constants.OP_STATUS_WAITING or waitjob: |
| finalize = False |
| |
| if not waitjob and opctx.CheckPriorityIncrease(): |
| # Priority was changed, need to update on-disk file |
| queue.UpdateJobUnlocked(job) |
| |
| # Keep around for another round |
| job.cur_opctx = opctx |
| |
| assert (op.priority <= constants.OP_PRIO_LOWEST and |
| op.priority >= constants.OP_PRIO_HIGHEST) |
| |
| # In no case must the status be finalized here |
| assert job.CalcStatus() == constants.JOB_STATUS_WAITING |
| |
| else: |
| # Ensure all opcodes so far have been successful |
| assert (opctx.index == 0 or |
| compat.all(i.status == constants.OP_STATUS_SUCCESS |
| for i in job.ops[:opctx.index])) |
| |
| # Reset context |
| job.cur_opctx = None |
| |
| if op.status == constants.OP_STATUS_SUCCESS: |
| finalize = False |
| |
| elif op.status == constants.OP_STATUS_ERROR: |
| # If we get here, we cannot afford to check for any consistency |
| # any more, we just want to clean up. |
| # TODO: Actually, it wouldn't be a bad idea to start a timer |
| # here to kill the whole process. |
| to_encode = errors.OpExecError("Preceding opcode failed") |
| job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, |
| _EncodeOpError(to_encode)) |
| finalize = True |
| elif op.status == constants.OP_STATUS_CANCELING: |
| job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, |
| "Job canceled by request") |
| finalize = True |
| |
| else: |
| raise errors.ProgrammerError("Unknown status '%s'" % op.status) |
| |
| if opctx.index == (opcount - 1): |
| # Finalize on last opcode |
| finalize = True |
| |
| if finalize: |
| # All opcodes have been run, finalize job |
| job.Finalize() |
| |
| # Write to disk. If the job status is final, this is the final write |
| # allowed. Once the file has been written, it can be archived anytime. |
| queue.UpdateJobUnlocked(job) |
| |
| assert not waitjob |
| |
| if finalize: |
| logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) |
| return self.FINISHED |
| |
| assert not waitjob or queue.depmgr.JobWaiting(job) |
| |
| if waitjob: |
| return self.WAITDEP |
| else: |
| return self.DEFER |
| finally: |
| assert job.writable, "Job became read-only while being processed" |
| |
| |
| class _JobDependencyManager: |
| """Keeps track of job dependencies. |
| |
| """ |
| (WAIT, |
| ERROR, |
| CANCEL, |
| CONTINUE, |
| WRONGSTATUS) = range(1, 6) |
| |
| def __init__(self, getstatus_fn): |
| """Initializes this class. |
| |
| """ |
| self._getstatus_fn = getstatus_fn |
| |
| self._waiters = {} |
| |
| def JobWaiting(self, job): |
| """Checks if a job is waiting. |
| |
| """ |
| return compat.any(job in jobs |
| for jobs in self._waiters.values()) |
| |
| def CheckAndRegister(self, job, dep_job_id, dep_status): |
| """Checks if a dependency job has the requested status. |
| |
| If the other job is not yet in a finalized status, the calling job will be |
| notified (re-added to the workerpool) at a later point. |
| |
| @type job: L{_QueuedJob} |
| @param job: Job object |
| @type dep_job_id: int |
| @param dep_job_id: ID of dependency job |
| @type dep_status: list |
| @param dep_status: Required status |
| |
| """ |
| assert ht.TJobId(job.id) |
| assert ht.TJobId(dep_job_id) |
| assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status) |
| |
| if job.id == dep_job_id: |
| return (self.ERROR, "Job can't depend on itself") |
| |
| # Get status of dependency job |
| try: |
| status = self._getstatus_fn(dep_job_id) |
| except errors.JobLost, err: |
| return (self.ERROR, "Dependency error: %s" % err) |
| |
| assert status in constants.JOB_STATUS_ALL |
| |
| job_id_waiters = self._waiters.setdefault(dep_job_id, set()) |
| |
| if status not in constants.JOBS_FINALIZED: |
| # Register for notification and wait for job to finish |
| job_id_waiters.add(job) |
| return (self.WAIT, |
| "Need to wait for job %s, wanted status '%s'" % |
| (dep_job_id, dep_status)) |
| |
| # Remove from waiters list |
| if job in job_id_waiters: |
| job_id_waiters.remove(job) |
| |
| if (status == constants.JOB_STATUS_CANCELED and |
| constants.JOB_STATUS_CANCELED not in dep_status): |
| return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id) |
| |
| elif not dep_status or status in dep_status: |
| return (self.CONTINUE, |
| "Dependency job %s finished with status '%s'" % |
| (dep_job_id, status)) |
| |
| else: |
| return (self.WRONGSTATUS, |
| "Dependency job %s finished with status '%s'," |
| " not one of '%s' as required" % |
| (dep_job_id, status, utils.CommaJoin(dep_status))) |
| |
| def _RemoveEmptyWaitersUnlocked(self): |
| """Remove all jobs without actual waiters. |
| |
| """ |
| for job_id in [job_id for (job_id, waiters) in self._waiters.items() |
| if not waiters]: |
| del self._waiters[job_id] |
| |
| |
| class JobQueue(object): |
| """Queue used to manage the jobs. |
| |
| """ |
| def __init__(self, context, cfg): |
| """Constructor for JobQueue. |
| |
| The constructor will initialize the job queue object and then |
| start loading the current jobs from disk, either for starting them |
| (if they were queue) or for aborting them (if they were already |
| running). |
| |
| @type context: GanetiContext |
| @param context: the context object for access to the configuration |
| data and other ganeti objects |
| |
| """ |
| self.context = context |
| self._memcache = weakref.WeakValueDictionary() |
| self._my_hostname = netutils.Hostname.GetSysName() |
| |
| # Get initial list of nodes |
| self._nodes = dict((n.name, n.primary_ip) |
| for n in cfg.GetAllNodesInfo().values() |
| if n.master_candidate) |
| |
| # Remove master node |
| self._nodes.pop(self._my_hostname, None) |
| |
| # Job dependencies |
| self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies) |
| |
| def _GetRpc(self, address_list): |
| """Gets RPC runner with context. |
| |
| """ |
| return rpc.JobQueueRunner(self.context, address_list) |
| |
| @staticmethod |
| def _CheckRpcResult(result, nodes, failmsg): |
| """Verifies the status of an RPC call. |
| |
| Since we aim to keep consistency should this node (the current |
| master) fail, we will log errors if our rpc fail, and especially |
| log the case when more than half of the nodes fails. |
| |
| @param result: the data as returned from the rpc call |
| @type nodes: list |
| @param nodes: the list of nodes we made the call to |
| @type failmsg: str |
| @param failmsg: the identifier to be used for logging |
| |
| """ |
| failed = [] |
| success = [] |
| |
| for node in nodes: |
| msg = result[node].fail_msg |
| if msg: |
| failed.append(node) |
| logging.error("RPC call %s (%s) failed on node %s: %s", |
| result[node].call, failmsg, node, msg) |
| else: |
| success.append(node) |
| |
| # +1 for the master node |
| if (len(success) + 1) < len(failed): |
| # TODO: Handle failing nodes |
| logging.error("More than half of the nodes failed") |
| |
| def _GetNodeIp(self): |
| """Helper for returning the node name/ip list. |
| |
| @rtype: (list, list) |
| @return: a tuple of two lists, the first one with the node |
| names and the second one with the node addresses |
| |
| """ |
| # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"? |
| name_list = self._nodes.keys() |
| addr_list = [self._nodes[name] for name in name_list] |
| return name_list, addr_list |
| |
| def _UpdateJobQueueFile(self, file_name, data, replicate): |
| """Writes a file locally and then replicates it to all nodes. |
| |
| This function will replace the contents of a file on the local |
| node and then replicate it to all the other nodes we have. |
| |
| @type file_name: str |
| @param file_name: the path of the file to be replicated |
| @type data: str |
| @param data: the new contents of the file |
| @type replicate: boolean |
| @param replicate: whether to spread the changes to the remote nodes |
| |
| """ |
| getents = runtime.GetEnts() |
| utils.WriteFile(file_name, data=data, uid=getents.masterd_uid, |
| gid=getents.daemons_gid, |
| mode=constants.JOB_QUEUE_FILES_PERMS) |
| |
| if replicate: |
| names, addrs = self._GetNodeIp() |
| result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data) |
| self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) |
| |
| def _RenameFilesUnlocked(self, rename): |
| """Renames a file locally and then replicate the change. |
| |
| This function will rename a file in the local queue directory |
| and then replicate this rename to all the other nodes we have. |
| |
| @type rename: list of (old, new) |
| @param rename: List containing tuples mapping old to new names |
| |
| """ |
| # Rename them locally |
| for old, new in rename: |
| utils.RenameFile(old, new, mkdir=True) |
| |
| # ... and on all nodes |
| names, addrs = self._GetNodeIp() |
| result = self._GetRpc(addrs).call_jobqueue_rename(names, rename) |
| self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) |
| |
| @staticmethod |
| def _GetJobPath(job_id): |
| """Returns the job file for a given job id. |
| |
| @type job_id: str |
| @param job_id: the job identifier |
| @rtype: str |
| @return: the path to the job file |
| |
| """ |
| return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id) |
| |
| @staticmethod |
| def _GetArchivedJobPath(job_id): |
| """Returns the archived job file for a give job id. |
| |
| @type job_id: str |
| @param job_id: the job identifier |
| @rtype: str |
| @return: the path to the archived job file |
| |
| """ |
| return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR, |
| jstore.GetArchiveDirectory(job_id), |
| "job-%s" % job_id) |
| |
| @staticmethod |
| def _DetermineJobDirectories(archived): |
| """Build list of directories containing job files. |
| |
| @type archived: bool |
| @param archived: Whether to include directories for archived jobs |
| @rtype: list |
| |
| """ |
| result = [pathutils.QUEUE_DIR] |
| |
| if archived: |
| archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR |
| result.extend(utils.PathJoin(archive_path, job_file) for job_file in |
| utils.ListVisibleFiles(archive_path)) |
| |
| return result |
| |
| @classmethod |
| def _GetJobIDsUnlocked(cls, sort=True, archived=False): |
| """Return all known job IDs. |
| |
| The method only looks at disk because it's a requirement that all |
| jobs are present on disk (so in the _memcache we don't have any |
| extra IDs). |
| |
| @type sort: boolean |
| @param sort: perform sorting on the returned job ids |
| @rtype: list |
| @return: the list of job IDs |
| |
| """ |
| jlist = [] |
| |
| for path in cls._DetermineJobDirectories(archived): |
| for filename in utils.ListVisibleFiles(path): |
| m = constants.JOB_FILE_RE.match(filename) |
| if m: |
| jlist.append(int(m.group(1))) |
| |
| if sort: |
| jlist.sort() |
| return jlist |
| |
| def _LoadJobUnlocked(self, job_id): |
| """Loads a job from the disk or memory. |
| |
| Given a job id, this will return the cached job object if |
| existing, or try to load the job from the disk. If loading from |
| disk, it will also add the job to the cache. |
| |
| @type job_id: int |
| @param job_id: the job id |
| @rtype: L{_QueuedJob} or None |
| @return: either None or the job object |
| |
| """ |
| assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!" |
| |
| job = self._memcache.get(job_id, None) |
| if job: |
| logging.debug("Found job %s in memcache", job_id) |
| assert job.writable, "Found read-only job in memcache" |
| return job |
| |
| try: |
| job = self._LoadJobFromDisk(job_id, False) |
| if job is None: |
| return job |
| except errors.JobFileCorrupted: |
| old_path = self._GetJobPath(job_id) |
| new_path = self._GetArchivedJobPath(job_id) |
| if old_path == new_path: |
| # job already archived (future case) |
| logging.exception("Can't parse job %s", job_id) |
| else: |
| # non-archived case |
| logging.exception("Can't parse job %s, will archive.", job_id) |
| self._RenameFilesUnlocked([(old_path, new_path)]) |
| return None |
| |
| assert job.writable, "Job just loaded is not writable" |
| |
| self._memcache[job_id] = job |
| logging.debug("Added job %s to the cache", job_id) |
| return job |
| |
| def _LoadJobFromDisk(self, job_id, try_archived, writable=None): |
| """Load the given job file from disk. |
| |
| Given a job file, read, load and restore it in a _QueuedJob format. |
| |
| @type job_id: int |
| @param job_id: job identifier |
| @type try_archived: bool |
| @param try_archived: Whether to try loading an archived job |
| @rtype: L{_QueuedJob} or None |
| @return: either None or the job object |
| |
| """ |
| path_functions = [(self._GetJobPath, False)] |
| |
| if try_archived: |
| path_functions.append((self._GetArchivedJobPath, True)) |
| |
| raw_data = None |
| archived = None |
| |
| for (fn, archived) in path_functions: |
| filepath = fn(job_id) |
| logging.debug("Loading job from %s", filepath) |
| try: |
| raw_data = utils.ReadFile(filepath) |
| except EnvironmentError, err: |
| if err.errno != errno.ENOENT: |
| raise |
| else: |
| break |
| |
| if not raw_data: |
| logging.debug("No data available for job %s", job_id) |
| return None |
| |
| if writable is None: |
| writable = not archived |
| |
| try: |
| data = serializer.LoadJson(raw_data) |
| job = _QueuedJob.Restore(self, data, writable, archived) |
| except Exception, err: # pylint: disable=W0703 |
| raise errors.JobFileCorrupted(err) |
| |
| return job |
| |
| def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None): |
| """Load the given job file from disk. |
| |
| Given a job file, read, load and restore it in a _QueuedJob format. |
| In case of error reading the job, it gets returned as None, and the |
| exception is logged. |
| |
| @type job_id: int |
| @param job_id: job identifier |
| @type try_archived: bool |
| @param try_archived: Whether to try loading an archived job |
| @rtype: L{_QueuedJob} or None |
| @return: either None or the job object |
| |
| """ |
| try: |
| return self._LoadJobFromDisk(job_id, try_archived, writable=writable) |
| except (errors.JobFileCorrupted, EnvironmentError): |
| logging.exception("Can't load/parse job %s", job_id) |
| return None |
| |
| @classmethod |
| def SubmitManyJobs(cls, jobs): |
| """Create and store multiple jobs. |
| |
| """ |
| return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs) |
| |
| @staticmethod |
| def _ResolveJobDependencies(resolve_fn, deps): |
| """Resolves relative job IDs in dependencies. |
| |
| @type resolve_fn: callable |
| @param resolve_fn: Function to resolve a relative job ID |
| @type deps: list |
| @param deps: Dependencies |
| @rtype: tuple; (boolean, string or list) |
| @return: If successful (first tuple item), the returned list contains |
| resolved job IDs along with the requested status; if not successful, |
| the second element is an error message |
| |
| """ |
| result = [] |
| |
| for (dep_job_id, dep_status) in deps: |
| if ht.TRelativeJobId(dep_job_id): |
| assert ht.TInt(dep_job_id) and dep_job_id < 0 |
| try: |
| job_id = resolve_fn(dep_job_id) |
| except IndexError: |
| # Abort |
| return (False, "Unable to resolve relative job ID %s" % dep_job_id) |
| else: |
| job_id = dep_job_id |
| |
| result.append((job_id, dep_status)) |
| |
| return (True, result) |
| |
| def _GetJobStatusForDependencies(self, job_id): |
| """Gets the status of a job for dependencies. |
| |
| @type job_id: int |
| @param job_id: Job ID |
| @raise errors.JobLost: If job can't be found |
| |
| """ |
| # Not using in-memory cache as doing so would require an exclusive lock |
| |
| # Try to load from disk |
| job = self.SafeLoadJobFromDisk(job_id, True, writable=False) |
| |
| if job: |
| assert not job.writable, "Got writable job" # pylint: disable=E1101 |
| |
| if job: |
| return job.CalcStatus() |
| |
| raise errors.JobLost("Job %s not found" % job_id) |
| |
| def UpdateJobUnlocked(self, job, replicate=True): |
| """Update a job's on disk storage. |
| |
| After a job has been modified, this function needs to be called in |
| order to write the changes to disk and replicate them to the other |
| nodes. |
| |
| @type job: L{_QueuedJob} |
| @param job: the changed job |
| @type replicate: boolean |
| @param replicate: whether to replicate the change to remote nodes |
| |
| """ |
| if __debug__: |
| finalized = job.CalcStatus() in constants.JOBS_FINALIZED |
| assert (finalized ^ (job.end_timestamp is None)) |
| assert job.writable, "Can't update read-only job" |
| assert not job.archived, "Can't update archived job" |
| |
| filename = self._GetJobPath(job.id) |
| data = serializer.DumpJson(job.Serialize()) |
| logging.debug("Writing job %s to %s", job.id, filename) |
| self._UpdateJobQueueFile(filename, data, replicate) |
| |
| def HasJobBeenFinalized(self, job_id): |
| """Checks if a job has been finalized. |
| |
| @type job_id: int |
| @param job_id: Job identifier |
| @rtype: boolean |
| @return: True if the job has been finalized, |
| False if the timeout has been reached, |
| None if the job doesn't exist |
| |
| """ |
| job = self.SafeLoadJobFromDisk(job_id, True, writable=False) |
| if job is not None: |
| return job.CalcStatus() in constants.JOBS_FINALIZED |
| elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: |
| # FIXME: The above variable is a temporary workaround until the Python job |
| # queue is completely removed. When removing the job queue, also remove |
| # the variable from LUClusterDestroy. |
| return True |
| else: |
| return None |
| |
| def CancelJob(self, job_id): |
| """Cancels a job. |
| |
| This will only succeed if the job has not started yet. |
| |
| @type job_id: int |
| @param job_id: job ID of job to be cancelled. |
| |
| """ |
| logging.info("Cancelling job %s", job_id) |
| |
| return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel()) |
| |
| def ChangeJobPriority(self, job_id, priority): |
| """Changes a job's priority. |
| |
| @type job_id: int |
| @param job_id: ID of the job whose priority should be changed |
| @type priority: int |
| @param priority: New priority |
| |
| """ |
| logging.info("Changing priority of job %s to %s", job_id, priority) |
| |
| if priority not in constants.OP_PRIO_SUBMIT_VALID: |
| allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID) |
| raise errors.GenericError("Invalid priority %s, allowed are %s" % |
| (priority, allowed)) |
| |
| def fn(job): |
| (success, msg) = job.ChangePriority(priority) |
| return (success, msg) |
| |
| return self._ModifyJobUnlocked(job_id, fn) |
| |
| def _ModifyJobUnlocked(self, job_id, mod_fn): |
| """Modifies a job. |
| |
| @type job_id: int |
| @param job_id: Job ID |
| @type mod_fn: callable |
| @param mod_fn: Modifying function, receiving job object as parameter, |
| returning tuple of (status boolean, message string) |
| |
| """ |
| job = self._LoadJobUnlocked(job_id) |
| if not job: |
| logging.debug("Job %s not found", job_id) |
| return (False, "Job %s not found" % job_id) |
| |
| assert job.writable, "Can't modify read-only job" |
| assert not job.archived, "Can't modify archived job" |
| |
| (success, msg) = mod_fn(job) |
| |
| if success: |
| # If the job was finalized (e.g. cancelled), this is the final write |
| # allowed. The job can be archived anytime. |
| self.UpdateJobUnlocked(job) |
| |
| return (success, msg) |