blob: 4f0b9646529d0f6324efbd00f23471caa8b5b850 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (C) 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.jqueue"""
import os
import sys
import unittest
import tempfile
import shutil
import errno
import itertools
import random
import operator
try:
# pylint: disable=E0611
from pyinotify import pyinotify
except ImportError:
import pyinotify
from ganeti import constants
from ganeti import utils
from ganeti import errors
from ganeti import jqueue
from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
from ganeti import query
from ganeti import workerpool
import testutils
class _FakeJob:
def __init__(self, job_id, status):
self.id = job_id
self.writable = False
self._status = status
self._log = []
def SetStatus(self, status):
self._status = status
def AddLogEntry(self, msg):
self._log.append((len(self._log), msg))
def CalcStatus(self):
return self._status
def GetInfo(self, fields):
result = []
for name in fields:
if name == "status":
result.append(self._status)
else:
raise Exception("Unknown field")
return result
def GetLogEntries(self, newer_than):
assert newer_than is None or newer_than >= 0
if newer_than is None:
return self._log
return self._log[newer_than:]
class TestJobChangesChecker(unittest.TestCase):
def testStatus(self):
job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
checker = jqueue._JobChangesChecker(["status"], None, None)
self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
job.SetStatus(constants.JOB_STATUS_RUNNING)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
job.SetStatus(constants.JOB_STATUS_SUCCESS)
self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
# job.id is used by checker
self.assertEqual(job.id, 9094)
def testStatusWithPrev(self):
job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
checker = jqueue._JobChangesChecker(["status"],
[constants.JOB_STATUS_QUEUED], None)
self.assert_(checker(job) is None)
job.SetStatus(constants.JOB_STATUS_RUNNING)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
def testFinalStatus(self):
for status in constants.JOBS_FINALIZED:
job = _FakeJob(2178711, status)
checker = jqueue._JobChangesChecker(["status"], [status], None)
# There won't be any changes in this status, hence it should signal
# a change immediately
self.assertEqual(checker(job), ([status], []))
def testLog(self):
job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
checker = jqueue._JobChangesChecker(["status"], None, None)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
job.AddLogEntry("Hello World")
(job_info, log_entries) = checker(job)
self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
self.assertEqual(log_entries, [[0, "Hello World"]])
checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
self.assert_(checker2(job) is None)
job.AddLogEntry("Foo Bar")
job.SetStatus(constants.JOB_STATUS_ERROR)
(job_info, log_entries) = checker2(job)
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
self.assertEqual(log_entries, [[1, "Foo Bar"]])
checker3 = jqueue._JobChangesChecker(["status"], None, None)
(job_info, log_entries) = checker3(job)
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
class TestJobChangesWaiter(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filename = utils.PathJoin(self.tmpdir, "job-1")
utils.WriteFile(self.filename, data="")
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _EnsureNotifierClosed(self, notifier):
try:
os.fstat(notifier._fd)
except EnvironmentError, err:
self.assertEqual(err.errno, errno.EBADF)
else:
self.fail("File descriptor wasn't closed")
def testClose(self):
for wait in [False, True]:
waiter = jqueue._JobFileChangesWaiter(self.filename)
try:
if wait:
waiter.Wait(0.001)
finally:
waiter.Close()
# Ensure file descriptor was closed
self._EnsureNotifierClosed(waiter._notifier)
def testChangingFile(self):
waiter = jqueue._JobFileChangesWaiter(self.filename)
try:
self.assertFalse(waiter.Wait(0.1))
utils.WriteFile(self.filename, data="changed")
self.assert_(waiter.Wait(60))
finally:
waiter.Close()
self._EnsureNotifierClosed(waiter._notifier)
def testChangingFile2(self):
waiter = jqueue._JobChangesWaiter(self.filename)
try:
self.assertFalse(waiter._filewaiter)
self.assert_(waiter.Wait(0.1))
self.assert_(waiter._filewaiter)
# File waiter is now used, but there have been no changes
self.assertFalse(waiter.Wait(0.1))
utils.WriteFile(self.filename, data="changed")
self.assert_(waiter.Wait(60))
finally:
waiter.Close()
self._EnsureNotifierClosed(waiter._filewaiter._notifier)
class _FailingWatchManager(pyinotify.WatchManager):
"""Subclass of L{pyinotify.WatchManager} which always fails to register.
"""
def add_watch(self, filename, mask):
assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
return {
filename: -1,
}
class TestWaitForJobChangesHelper(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
utils.WriteFile(self.filename, data="")
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _LoadWaitingJob(self):
return _FakeJob(2614226563, constants.JOB_STATUS_WAITING)
def _LoadLostJob(self):
return None
def testNoChanges(self):
wfjc = jqueue._WaitForJobChangesHelper()
# No change
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
[constants.JOB_STATUS_WAITING], None, 0.1),
constants.JOB_NOTCHANGED)
# No previous information
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
["status"], None, None, 1.0),
([constants.JOB_STATUS_WAITING], []))
def testLostJob(self):
wfjc = jqueue._WaitForJobChangesHelper()
self.assert_(wfjc(self.filename, self._LoadLostJob,
["status"], None, None, 1.0) is None)
def testNonExistentFile(self):
wfjc = jqueue._WaitForJobChangesHelper()
filename = utils.PathJoin(self.tmpdir, "does-not-exist")
self.assertFalse(os.path.exists(filename))
result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
_waiter_cls=compat.partial(jqueue._JobChangesWaiter,
_waiter_cls=NotImplemented))
self.assertTrue(result is None)
def testInotifyError(self):
jobfile_waiter_cls = \
compat.partial(jqueue._JobFileChangesWaiter,
_inotify_wm_cls=_FailingWatchManager)
jobchange_waiter_cls = \
compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
wfjc = jqueue._WaitForJobChangesHelper()
# Test if failing to watch a job file (e.g. due to
# fs.inotify.max_user_watches being too low) raises errors.InotifyError
self.assertRaises(errors.InotifyError, wfjc,
self.filename, self._LoadWaitingJob,
["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
_waiter_cls=jobchange_waiter_cls)
class TestEncodeOpError(unittest.TestCase):
def test(self):
encerr = jqueue._EncodeOpError(errors.LockError("Test 1"))
self.assert_(isinstance(encerr, tuple))
self.assertRaises(errors.LockError, errors.MaybeRaise, encerr)
encerr = jqueue._EncodeOpError(errors.GenericError("Test 2"))
self.assert_(isinstance(encerr, tuple))
self.assertRaises(errors.GenericError, errors.MaybeRaise, encerr)
encerr = jqueue._EncodeOpError(NotImplementedError("Foo"))
self.assert_(isinstance(encerr, tuple))
self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
encerr = jqueue._EncodeOpError("Hello World")
self.assert_(isinstance(encerr, tuple))
self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
class TestQueuedOpCode(unittest.TestCase):
def testDefaults(self):
def _Check(op):
self.assertFalse(hasattr(op.input, "dry_run"))
self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
self.assertFalse(op.log)
self.assert_(op.start_timestamp is None)
self.assert_(op.exec_timestamp is None)
self.assert_(op.end_timestamp is None)
self.assert_(op.result is None)
self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
_Check(op1)
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
_Check(op2)
self.assertEqual(op1.Serialize(), op2.Serialize())
def testPriority(self):
def _Check(op):
assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
"Default priority equals high priority; test can't work"
self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
inpop = opcodes.OpTagsGet(priority=constants.OP_PRIO_HIGH)
op1 = jqueue._QueuedOpCode(inpop)
_Check(op1)
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
_Check(op2)
self.assertEqual(op1.Serialize(), op2.Serialize())
class TestQueuedJob(unittest.TestCase):
def testNoOpCodes(self):
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
None, 1, [], False)
def testDefaults(self):
job_id = 4260
ops = [
opcodes.OpTagsGet(),
opcodes.OpTestDelay(),
]
def _Check(job):
self.assertTrue(job.writable)
self.assertEqual(job.id, job_id)
self.assertEqual(job.log_serial, 0)
self.assert_(job.received_timestamp)
self.assert_(job.start_timestamp is None)
self.assert_(job.end_timestamp is None)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assert_(repr(job).startswith("<"))
self.assertEqual(len(job.ops), len(ops))
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
for (inp, op) in zip(ops, job.ops)))
self.assertRaises(errors.OpPrereqError, job.GetInfo,
["unknown-field"])
self.assertEqual(job.GetInfo(["summary"]),
[[op.input.Summary() for op in job.ops]])
self.assertFalse(job.archived)
job1 = jqueue._QueuedJob(None, job_id, ops, True)
_Check(job1)
job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False)
_Check(job2)
self.assertEqual(job1.Serialize(), job2.Serialize())
def testWritable(self):
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
self.assertFalse(job.writable)
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
self.assertTrue(job.writable)
def testArchived(self):
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
self.assertFalse(job.archived)
newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True)
self.assertTrue(newjob.archived)
newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False)
self.assertFalse(newjob2.archived)
def testPriority(self):
job_id = 4283
ops = [
opcodes.OpTagsGet(priority=constants.OP_PRIO_DEFAULT),
opcodes.OpTestDelay(),
]
def _Check(job):
self.assertEqual(job.id, job_id)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(repr(job).startswith("<"))
job = jqueue._QueuedJob(None, job_id, ops, True)
_Check(job)
self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
# Increase first
job.ops[0].priority -= 1
_Check(job)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
# Mark opcode as finished
job.ops[0].status = constants.OP_STATUS_SUCCESS
_Check(job)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
# Increase second
job.ops[1].priority -= 10
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
# Test increasing first
job.ops[0].status = constants.OP_STATUS_RUNNING
job.ops[0].priority -= 19
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
def _JobForPriority(self, job_id):
ops = [
opcodes.OpTagsGet(),
opcodes.OpTestDelay(),
opcodes.OpTagsGet(),
opcodes.OpTestDelay(),
]
job = jqueue._QueuedJob(None, job_id, ops, True)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
return job
def testChangePriorityAllQueued(self):
job = self._JobForPriority(24984)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
result = job.ChangePriority(-10)
self.assertEqual(job.CalcPriority(), -10)
self.assertTrue(compat.all(op.priority == -10 for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job 24984 have"
" been changed to -10")))
def testChangePriorityAllFinished(self):
job = self._JobForPriority(16405)
for (idx, op) in enumerate(job.ops):
if idx > 2:
op.status = constants.OP_STATUS_ERROR
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(-10)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_ERROR,
])
self.assertEqual(result, (False, "Job 16405 is finished"))
def testChangePriorityCancelling(self):
job = self._JobForPriority(31572)
for (idx, op) in enumerate(job.ops):
if idx > 1:
op.status = constants.OP_STATUS_CANCELING
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(5)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_CANCELING,
constants.OP_STATUS_CANCELING,
])
self.assertEqual(result, (False, "Job 31572 is cancelling"))
def testChangePriorityFirstRunning(self):
job = self._JobForPriority(1716215889)
for (idx, op) in enumerate(job.ops):
if idx == 0:
op.status = constants.OP_STATUS_RUNNING
else:
op.status = constants.OP_STATUS_QUEUED
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(7)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, 7, 7, 7])
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_RUNNING,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
])
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job"
" 1716215889 have been changed to 7")))
def testChangePriorityLastRunning(self):
job = self._JobForPriority(1308)
for (idx, op) in enumerate(job.ops):
if idx == (len(job.ops) - 1):
op.status = constants.OP_STATUS_RUNNING
else:
op.status = constants.OP_STATUS_SUCCESS
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
result = job.ChangePriority(-3)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertTrue(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_RUNNING,
])
self.assertEqual(result, (False, "Job 1308 had no pending opcodes"))
def testChangePrioritySecondOpcodeRunning(self):
job = self._JobForPriority(27701)
self.assertEqual(len(job.ops), 4)
job.ops[0].status = constants.OP_STATUS_SUCCESS
job.ops[1].status = constants.OP_STATUS_RUNNING
job.ops[2].status = constants.OP_STATUS_QUEUED
job.ops[3].status = constants.OP_STATUS_QUEUED
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
result = job.ChangePriority(-19)
self.assertEqual(job.CalcPriority(), -19)
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, constants.OP_PRIO_DEFAULT,
-19, -19])
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
self.assertEqual(map(operator.attrgetter("status"), job.ops), [
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_RUNNING,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED,
])
self.assertEqual(result,
(True, ("Priorities of pending opcodes for job"
" 27701 have been changed to -19")))
def testChangePriorityWithInconsistentJob(self):
job = self._JobForPriority(30097)
self.assertEqual(len(job.ops), 4)
# This job is invalid (as it has two opcodes marked as running) and make
# the call fail because an unprocessed opcode precedes a running one (which
# should never happen in reality)
job.ops[0].status = constants.OP_STATUS_SUCCESS
job.ops[1].status = constants.OP_STATUS_RUNNING
job.ops[2].status = constants.OP_STATUS_QUEUED
job.ops[3].status = constants.OP_STATUS_RUNNING
self.assertRaises(AssertionError, job.ChangePriority, 19)
def testCalcStatus(self):
def _Queued(ops):
# The default status is "queued"
self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in ops))
def _Waitlock1(ops):
ops[0].status = constants.OP_STATUS_WAITING
def _Waitlock2(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
ops[1].status = constants.OP_STATUS_SUCCESS
ops[2].status = constants.OP_STATUS_WAITING
def _Running(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
ops[1].status = constants.OP_STATUS_RUNNING
for op in ops[2:]:
op.status = constants.OP_STATUS_QUEUED
def _Canceling1(ops):
ops[0].status = constants.OP_STATUS_SUCCESS
ops[1].status = constants.OP_STATUS_SUCCESS
for op in ops[2:]:
op.status = constants.OP_STATUS_CANCELING
def _Canceling2(ops):
for op in ops:
op.status = constants.OP_STATUS_CANCELING
def _Canceled(ops):
for op in ops:
op.status = constants.OP_STATUS_CANCELED
def _Error1(ops):
for idx, op in enumerate(ops):
if idx > 3:
op.status = constants.OP_STATUS_ERROR
else:
op.status = constants.OP_STATUS_SUCCESS
def _Error2(ops):
for op in ops:
op.status = constants.OP_STATUS_ERROR
def _Success(ops):
for op in ops:
op.status = constants.OP_STATUS_SUCCESS
tests = {
constants.JOB_STATUS_QUEUED: [_Queued],
constants.JOB_STATUS_WAITING: [_Waitlock1, _Waitlock2],
constants.JOB_STATUS_RUNNING: [_Running],
constants.JOB_STATUS_CANCELING: [_Canceling1, _Canceling2],
constants.JOB_STATUS_CANCELED: [_Canceled],
constants.JOB_STATUS_ERROR: [_Error1, _Error2],
constants.JOB_STATUS_SUCCESS: [_Success],
}
def _NewJob():
job = jqueue._QueuedJob(None, 1,
[opcodes.OpTestDelay() for _ in range(10)],
True)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
return job
for status in constants.JOB_STATUS_ALL:
sttests = tests[status]
assert sttests
for fn in sttests:
job = _NewJob()
fn(job.ops)
self.assertEqual(job.CalcStatus(), status)
class _FakeDependencyManager:
def __init__(self):
self._checks = []
self._notifications = []
self._waiting = set()
def AddCheckResult(self, job, dep_job_id, dep_status, result):
self._checks.append((job, dep_job_id, dep_status, result))
def CountPendingResults(self):
return len(self._checks)
def CountWaitingJobs(self):
return len(self._waiting)
def GetNextNotification(self):
return self._notifications.pop(0)
def JobWaiting(self, job):
return job in self._waiting
def CheckAndRegister(self, job, dep_job_id, dep_status):
(exp_job, exp_dep_job_id, exp_dep_status, result) = self._checks.pop(0)
assert exp_job == job
assert exp_dep_job_id == dep_job_id
assert exp_dep_status == dep_status
(result_status, _) = result
if result_status == jqueue._JobDependencyManager.WAIT:
self._waiting.add(job)
elif result_status == jqueue._JobDependencyManager.CONTINUE:
self._waiting.remove(job)
return result
def NotifyWaiters(self, job_id):
self._notifications.append(job_id)
class _DisabledFakeDependencyManager:
def JobWaiting(self, _):
return False
def CheckAndRegister(self, *args):
assert False, "Should not be called"
def NotifyWaiters(self, _):
pass
class _FakeQueueForProc:
def __init__(self, depmgr=None):
self._acquired = False
self._updates = []
self._submitted = []
self._accepting_jobs = True
self._submit_count = itertools.count(1000)
if depmgr:
self.depmgr = depmgr
else:
self.depmgr = _DisabledFakeDependencyManager()
def IsAcquired(self):
return self._acquired
def GetNextUpdate(self):
return self._updates.pop(0)
def GetNextSubmittedJob(self):
return self._submitted.pop(0)
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
def release(self):
assert self._acquired
self._acquired = False
def UpdateJobUnlocked(self, job, replicate=True):
assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
def SubmitManyJobs(self, jobs):
assert not self._acquired, "Lock acquired while submitting jobs"
job_ids = [self._submit_count.next() for _ in jobs]
self._submitted.extend(zip(job_ids, jobs))
return job_ids
def StopAcceptingJobs(self):
self._accepting_jobs = False
def AcceptingJobsUnlocked(self):
return self._accepting_jobs
class _FakeExecOpCodeForProc:
def __init__(self, queue, before_start, after_start):
self._queue = queue
self._before_start = before_start
self._after_start = after_start
def __call__(self, op, cbs, timeout=None):
assert isinstance(op, opcodes.OpTestDummy)
assert not self._queue.IsAcquired(), \
"Queue lock not released when executing opcode"
if self._before_start:
self._before_start(timeout, cbs.CurrentPriority())
cbs.NotifyStart()
if self._after_start:
self._after_start(op, cbs)
# Check again after the callbacks
assert not self._queue.IsAcquired()
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
return cbs.SubmitManyJobs(op.submit_jobs)
return op.result
class _JobProcessorTestUtils:
def _CreateJob(self, queue, job_id, ops):
job = jqueue._QueuedJob(queue, job_id, ops, True)
self.assertFalse(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(len(ops), len(job.ops))
self.assert_(compat.all(op.input == inp
for (op, inp) in zip(job.ops, ops)))
self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
return job
class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
def _GenericCheckJob(self, job):
assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
for op in job.ops)
self.assertEqual(job.GetInfo(["opstart", "opexec", "opend"]),
[[op.start_timestamp for op in job.ops],
[op.exec_timestamp for op in job.ops],
[op.end_timestamp for op in job.ops]])
self.assertEqual(job.GetInfo(["received_ts", "start_ts", "end_ts"]),
[job.received_timestamp,
job.start_timestamp,
job.end_timestamp])
self.assert_(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
def testSuccess(self):
queue = _FakeQueueForProc()
for (job_id, opcount) in [(25351, 1), (6637, 3),
(24644, 10), (32207, 100)]:
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(opcount)]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testOpcodeError(self):
queue = _FakeQueueForProc()
testdata = [
(17077, 1, 0, 0),
(1782, 5, 2, 2),
(18179, 10, 9, 9),
(4744, 10, 3, 8),
(23816, 100, 39, 45),
]
for (job_id, opcount, failfrom, failto) in testdata:
# Prepare opcodes
ops = [opcodes.OpTestDummy(result="Res%s" % i,
fail=(failfrom <= i and
i <= failto))
for i in range(opcount)]
# Create job
job = self._CreateJob(queue, str(job_id), ops)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
# queued to waitlock
self.assertEqual(queue.GetNextUpdate(), (job, True))
# waitlock to running
self.assertEqual(queue.GetNextUpdate(), (job, True))
# Opcode result
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx in (failfrom, len(ops) - 1):
# Last opcode
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
# Check opcode status
data = zip(job.ops,
job.GetInfo(["opstatus"])[0],
job.GetInfo(["opresult"])[0])
for idx, (op, opstatus, opresult) in enumerate(data):
if idx < failfrom:
assert not op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_SUCCESS)
self.assertEqual(opresult, op.input.result)
elif idx <= failto:
assert op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
else:
assert not op.input.fail
self.assertEqual(opstatus, constants.OP_STATUS_ERROR)
self.assertRaises(errors.OpExecError, errors.MaybeRaise, opresult)
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops[:failfrom]))
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileInQueue(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 17045
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(job.start_timestamp)
self.assertTrue(job.end_timestamp)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
for op in job.ops))
# Serialize to check for differences
before_proc = job.Serialize()
# Simulate processor called in workerpool
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertFalse(job.start_timestamp)
self.assertTrue(job.end_timestamp)
self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
# Must not have changed or written
self.assertEqual(before_proc, job.Serialize())
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileWaitlockInQueue(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 8645
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
job.ops[0].status = constants.OP_STATUS_WAITING
assert len(job.ops) == 5
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelling
(success, _) = job.Cancel()
self.assert_(success)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertFalse(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
def testCancelWhileWaitlock(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 11009
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
self.assertRaises(IndexError, queue.GetNextUpdate)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assert_(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
def _TestCancelWhileSomething(self, cb):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 24314
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
cb(queue)
def _AfterStart(op, cbs):
self.fail("Should not reach this")
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assert_(job.start_timestamp)
self.assert_(job.end_timestamp)
self.assertFalse(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_CANCELED for _ in job.ops],
["Job canceled by request" for _ in job.ops]])
return queue
def testCancelWhileWaitlockWithTimeout(self):
def fn(_):
# Fake an acquire attempt timing out
raise mcpu.LockAcquireTimeout()
self._TestCancelWhileSomething(fn)
def testCancelDuringQueueShutdown(self):
queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
self.assertFalse(queue.AcceptingJobsUnlocked())
def testCancelWhileRunning(self):
# Tests canceling a job with finished opcodes and more, unprocessed ones
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(3)]
# Create job
job_id = 28492
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED],
["Res0", None, None]])
# Mark as cancelled
(success, _) = job.Cancel()
self.assert_(success)
# Try processing another opcode (this will actually cancel the job)
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_CANCELED,
constants.OP_STATUS_CANCELED],
["Res0", "Job canceled by request",
"Job canceled by request"]])
def _TestQueueShutdown(self, queue, opexec, job, runcount):
self.assertTrue(queue.AcceptingJobsUnlocked())
# Simulate shutdown
queue.StopAcceptingJobs()
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
self.assertFalse(job.cur_opctx)
self.assertTrue(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops[:runcount]))
self.assertFalse(job.ops[runcount].end_timestamp)
self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
for op in job.ops[(runcount + 1):]))
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[(([constants.OP_STATUS_SUCCESS] * runcount) +
([constants.OP_STATUS_QUEUED] *
(len(job.ops) - runcount))),
(["Res%s" % i for i in range(runcount)] +
([None] * (len(job.ops) - runcount)))])
# Must have been written and replicated
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
def testQueueShutdownWhileRunning(self):
# Tests shutting down the queue while a job is running
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(3)]
# Create job
job_id = 2718211587
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assertRaises(IndexError, queue.GetNextUpdate)
# Run one opcode
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED],
["Res0", None, None]])
self.assertFalse(job.cur_opctx)
# Writes for waiting, running and result
for _ in range(3):
self.assertEqual(queue.GetNextUpdate(), (job, True))
# Run second opcode
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED],
["Res0", "Res1", None]])
self.assertFalse(job.cur_opctx)
# Writes for waiting, running and result
for _ in range(3):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self._TestQueueShutdown(queue, opexec, job, 2)
def testQueueShutdownWithLockTimeout(self):
# Tests shutting down while a lock acquire times out
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(3)]
# Create job
job_id = 1304231178
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
acquire_timeout = False
def _BeforeStart(timeout, priority):
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
if acquire_timeout:
raise mcpu.LockAcquireTimeout()
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
self.assertRaises(IndexError, queue.GetNextUpdate)
# Run one opcode
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED],
["Res0", None, None]])
self.assertFalse(job.cur_opctx)
# Writes for waiting, running and result
for _ in range(3):
self.assertEqual(queue.GetNextUpdate(), (job, True))
# The next opcode should have expiring lock acquires
acquire_timeout = True
self._TestQueueShutdown(queue, opexec, job, 1)
def testQueueShutdownWhileInQueue(self):
# This should never happen in reality (no new jobs are started by the
# workerpool once a shutdown has been initiated), but it's better to test
# the job processor for this scenario
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 2031
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
opexec = _FakeExecOpCodeForProc(queue, None, None)
self._TestQueueShutdown(queue, opexec, job, 0)
def testQueueShutdownWhileWaitlockInQueue(self):
queue = _FakeQueueForProc()
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(5)]
# Create job
job_id = 53125685
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
job.ops[0].status = constants.OP_STATUS_WAITING
assert len(job.ops) == 5
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, queue.GetNextUpdate)
opexec = _FakeExecOpCodeForProc(queue, None, None)
self._TestQueueShutdown(queue, opexec, job, 0)
def testPartiallyRun(self):
# Tests calling the processor on a job that's been partially run before the
# program was restarted
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(queue, None, None)
for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(10)]
# Create job
job = self._CreateJob(queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
for _ in range(successcount):
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus"]),
[[constants.OP_STATUS_SUCCESS
for _ in range(successcount)] +
[constants.OP_STATUS_QUEUED
for _ in range(len(ops) - successcount)]])
self.assert_(job.ops_iter)
# Serialize and restore (simulates program restart)
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
self.assertFalse(newjob.ops_iter)
self._TestPartial(newjob, successcount)
def _TestPartial(self, job, successcount):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(queue, None, None)
for remaining in reversed(range(len(job.ops) - successcount)):
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if remaining == 0:
# Last opcode
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
self.assertEqual(job.GetInfo(["opstatus"]),
[[constants.OP_STATUS_SUCCESS for _ in job.ops]])
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testProcessorOnRunningJob(self):
ops = [opcodes.OpTestDummy(result="result", fail=False)]
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Create job
job = self._CreateJob(queue, 9571, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
job.ops[0].status = constants.OP_STATUS_RUNNING
assert len(job.ops) == 1
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
# Calling on running job must fail
self.assertRaises(errors.ProgrammerError,
jqueue._JobProcessor(queue, opexec, job))
def testLogMessages(self):
# Tests the "Feedback" callback function
queue = _FakeQueueForProc()
messages = {
1: [
(None, "Hello"),
(None, "World"),
(constants.ELOG_MESSAGE, "there"),
],
4: [
(constants.ELOG_JQUEUE_TEST, (1, 2, 3)),
(constants.ELOG_JQUEUE_TEST, ("other", "type")),
],
}
ops = [opcodes.OpTestDummy(result="Logtest%s" % i, fail=False,
messages=messages.get(i, []))
for i in range(5)]
# Create job
job = self._CreateJob(queue, 29386, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertRaises(AssertionError, cbs.Feedback,
"too", "many", "arguments")
for (log_type, msg) in op.messages:
self.assertRaises(IndexError, queue.GetNextUpdate)
if log_type:
cbs.Feedback(log_type, msg)
else:
cbs.Feedback(msg)
# Check for job update without replication
self.assertEqual(queue.GetNextUpdate(), (job, False))
self.assertRaises(IndexError, queue.GetNextUpdate)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for remaining in reversed(range(len(job.ops))):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if remaining == 0:
# Last opcode
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
logmsgcount = sum(len(m) for m in messages.values())
self._CheckLogMessages(job, logmsgcount)
# Serialize and restore (simulates program restart)
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False)
self._CheckLogMessages(newjob, logmsgcount)
# Check each message
prevserial = -1
for idx, oplog in enumerate(job.GetInfo(["oplog"])[0]):
for (serial, timestamp, log_type, msg) in oplog:
(exptype, expmsg) = messages.get(idx).pop(0)
if exptype:
self.assertEqual(log_type, exptype)
else:
self.assertEqual(log_type, constants.ELOG_MESSAGE)
self.assertEqual(expmsg, msg)
self.assert_(serial > prevserial)
prevserial = serial
def _CheckLogMessages(self, job, count):
# Check serial
self.assertEqual(job.log_serial, count)
# No filter
self.assertEqual(job.GetLogEntries(None),
[entry for entries in job.GetInfo(["oplog"])[0] if entries
for entry in entries])
# Filter with serial
assert count > 3
self.assert_(job.GetLogEntries(3))
self.assertEqual(job.GetLogEntries(3),
[entry for entries in job.GetInfo(["oplog"])[0] if entries
for entry in entries][3:])
# No log message after highest serial
self.assertFalse(job.GetLogEntries(count))
self.assertFalse(job.GetLogEntries(count + 3))
def testSubmitManyJobs(self):
queue = _FakeQueueForProc()
job_id = 15656
ops = [
opcodes.OpTestDummy(result="Res0", fail=False,
submit_jobs=[]),
opcodes.OpTestDummy(result="Res1", fail=False,
submit_jobs=[
[opcodes.OpTestDummy(result="r1j0", fail=False)],
]),
opcodes.OpTestDummy(result="Res2", fail=False,
submit_jobs=[
[opcodes.OpTestDummy(result="r2j0o0", fail=False),
opcodes.OpTestDummy(result="r2j0o1", fail=False),
opcodes.OpTestDummy(result="r2j0o2", fail=False),
opcodes.OpTestDummy(result="r2j0o3", fail=False)],
[opcodes.OpTestDummy(result="r2j1", fail=False)],
[opcodes.OpTestDummy(result="r2j3o0", fail=False),
opcodes.OpTestDummy(result="r2j3o1", fail=False)],
]),
]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertRaises(IndexError, queue.GetNextUpdate)
for idx, submitted_ops in enumerate(job_ops
for op in ops
for job_ops in op.submit_jobs):
self.assertEqual(queue.GetNextSubmittedJob(),
(1000 + idx, submitted_ops))
self.assertRaises(IndexError, queue.GetNextSubmittedJob)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[[], [1000], [1001, 1002, 1003]]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependency(self):
depmgr = _FakeDependencyManager()
queue = _FakeQueueForProc(depmgr=depmgr)
self.assertEqual(queue.depmgr, depmgr)
prev_job_id = 22113
prev_job_id2 = 28102
job_id = 29929
ops = [
opcodes.OpTestDummy(result="Res0", fail=False,
depends=[
[prev_job_id2, None],
[prev_job_id, None],
]),
opcodes.OpTestDummy(result="Res1", fail=False),
]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
if attempt == 0 or attempt > 5:
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
counter = itertools.count()
while True:
attempt = counter.next()
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
if attempt < 2:
depmgr.AddCheckResult(job, prev_job_id2, None,
(jqueue._JobDependencyManager.WAIT, "wait2"))
elif attempt == 2:
depmgr.AddCheckResult(job, prev_job_id2, None,
(jqueue._JobDependencyManager.CONTINUE, "cont"))
# The processor will ask for the next dependency immediately
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.WAIT, "wait"))
elif attempt < 5:
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.WAIT, "wait"))
elif attempt == 5:
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.CONTINUE, "cont"))
if attempt == 2:
self.assertEqual(depmgr.CountPendingResults(), 2)
elif attempt > 5:
self.assertEqual(depmgr.CountPendingResults(), 0)
else:
self.assertEqual(depmgr.CountPendingResults(), 1)
result = jqueue._JobProcessor(queue, opexec, job)()
if attempt == 0 or attempt >= 5:
# Job should only be updated if there was an actual change
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(depmgr.CountPendingResults())
if attempt < 5:
# Simulate waiting for other job
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
self._GenericCheckJob(job)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(depmgr.CountPendingResults())
self.assertFalse(depmgr.CountWaitingJobs())
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyCancel(self):
depmgr = _FakeDependencyManager()
queue = _FakeQueueForProc(depmgr=depmgr)
self.assertEqual(queue.depmgr, depmgr)
prev_job_id = 13623
job_id = 30876
ops = [
opcodes.OpTestDummy(result="Res0", fail=False),
opcodes.OpTestDummy(result="Res1", fail=False,
depends=[
[prev_job_id, None],
]),
opcodes.OpTestDummy(result="Res2", fail=False),
]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
if attempt == 0 or attempt > 5:
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
counter = itertools.count()
while True:
attempt = counter.next()
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
if attempt == 0:
# This will handle the first opcode
pass
elif attempt < 4:
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.WAIT, "wait"))
elif attempt == 4:
# Other job was cancelled
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.CANCEL, "cancel"))
if attempt == 0:
self.assertEqual(depmgr.CountPendingResults(), 0)
else:
self.assertEqual(depmgr.CountPendingResults(), 1)
result = jqueue._JobProcessor(queue, opexec, job)()
if attempt <= 1 or attempt >= 4:
# Job should only be updated if there was an actual change
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(depmgr.CountPendingResults())
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_CANCELED,
constants.OP_STATUS_CANCELED],
["Res0", "Job canceled by request",
"Job canceled by request"]])
self._GenericCheckJob(job)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyWrongstatus(self):
depmgr = _FakeDependencyManager()
queue = _FakeQueueForProc(depmgr=depmgr)
self.assertEqual(queue.depmgr, depmgr)
prev_job_id = 9741
job_id = 11763
ops = [
opcodes.OpTestDummy(result="Res0", fail=False),
opcodes.OpTestDummy(result="Res1", fail=False,
depends=[
[prev_job_id, None],
]),
opcodes.OpTestDummy(result="Res2", fail=False),
]
# Create job
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
if attempt == 0 or attempt > 5:
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
counter = itertools.count()
while True:
attempt = counter.next()
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
if attempt == 0:
# This will handle the first opcode
pass
elif attempt < 4:
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.WAIT, "wait"))
elif attempt == 4:
# Other job failed
depmgr.AddCheckResult(job, prev_job_id, None,
(jqueue._JobDependencyManager.WRONGSTATUS, "w"))
if attempt == 0:
self.assertEqual(depmgr.CountPendingResults(), 0)
else:
self.assertEqual(depmgr.CountPendingResults(), 1)
result = jqueue._JobProcessor(queue, opexec, job)()
if attempt <= 1 or attempt >= 4:
# Job should only be updated if there was an actual change
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(depmgr.CountPendingResults())
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
continue
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
self.assertEqual(job.GetInfo(["opstatus"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_ERROR,
constants.OP_STATUS_ERROR]]),
(opresult, ) = job.GetInfo(["opresult"])
self.assertEqual(len(opresult), len(ops))
self.assertEqual(opresult[0], "Res0")
self.assertTrue(errors.GetEncodedError(opresult[1]))
self.assertTrue(errors.GetEncodedError(opresult[2]))
self._GenericCheckJob(job)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
class TestEvaluateJobProcessorResult(unittest.TestCase):
def testFinished(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(30953)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.FINISHED)
self.assertEqual(depmgr.GetNextNotification(), job.id)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testDefer(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(11326, priority=5463)
try:
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.DEFER)
except workerpool.DeferTask, err:
self.assertEqual(err.priority, 5463)
else:
self.fail("Didn't raise exception")
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testWaitdep(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(21317)
jqueue._EvaluateJobProcessorResult(depmgr, job,
jqueue._JobProcessor.WAITDEP)
self.assertRaises(IndexError, depmgr.GetNextNotification)
def testOther(self):
depmgr = _FakeDependencyManager()
job = _IdOnlyFakeJob(5813)
self.assertRaises(errors.ProgrammerError,
jqueue._EvaluateJobProcessorResult,
depmgr, job, "Other result")
self.assertRaises(IndexError, depmgr.GetNextNotification)
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts
self.attempts = 0
self.last_timeout = None
def NextAttempt(self):
self.attempts += 1
if self.timeouts:
timeout = self.timeouts.pop(0)
else:
timeout = None
self.last_timeout = timeout
return timeout
class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def setUp(self):
self.queue = _FakeQueueForProc()
self.job = None
self.curop = None
self.opcounter = None
self.timeout_strategy = None
self.retries = 0
self.prev_tsop = None
self.prev_prio = None
self.prev_status = None
self.lock_acq_prio = None
self.gave_lock = None
self.done_lock_before_blocking = False
def _BeforeStart(self, timeout, priority):
job = self.job
# If status has changed, job must've been written
if self.prev_status != self.job.ops[self.curop].status:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
ts = self.timeout_strategy
self.assert_(timeout is None or isinstance(timeout, (int, float)))
self.assertEqual(timeout, ts.last_timeout)
self.assertEqual(priority, job.ops[self.curop].priority)
self.gave_lock = True
self.lock_acq_prio = priority
if (self.curop == 3 and
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
# Give locks before running into blocking acquire
assert self.retries == 7
self.retries = 0
self.done_lock_before_blocking = True
return
if self.retries > 0:
self.assert_(timeout is not None)
self.retries -= 1
self.gave_lock = False
raise mcpu.LockAcquireTimeout()
if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
assert self.retries == 0, "Didn't exhaust all retries at highest priority"
assert not ts.timeouts
self.assert_(timeout is None)
def _AfterStart(self, op, cbs):
job = self.job
# Setting to "running" requires an update
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
def _NextOpcode(self):
self.curop = self.opcounter.next()
self.prev_prio = self.job.ops[self.curop].priority
self.prev_status = self.job.ops[self.curop].status
def _NewTimeoutStrategy(self):
job = self.job
self.assertEqual(self.retries, 0)
if self.prev_tsop == self.curop:
# Still on the same opcode, priority must've been increased
self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
if self.curop == 1:
# Normal retry
timeouts = range(10, 31, 10)
self.retries = len(timeouts) - 1
elif self.curop == 2:
# Let this run into a blocking acquire
timeouts = range(11, 61, 12)
self.retries = len(timeouts)
elif self.curop == 3:
# Wait for priority to increase, but give lock before blocking acquire
timeouts = range(12, 100, 14)
self.retries = len(timeouts)
self.assertFalse(self.done_lock_before_blocking)
elif self.curop == 4:
self.assert_(self.done_lock_before_blocking)
# Timeouts, but no need to retry
timeouts = range(10, 31, 10)
self.retries = 0
elif self.curop == 5:
# Normal retry
timeouts = range(19, 100, 11)
self.retries = len(timeouts)
else:
timeouts = []
self.retries = 0
assert len(job.ops) == 10
assert self.retries <= len(timeouts)
ts = _FakeTimeoutStrategy(timeouts)
self.timeout_strategy = ts
self.prev_tsop = self.curop
self.prev_prio = job.ops[self.curop].priority
return ts
def testTimeout(self):
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(10)]
# Create job
job_id = 15801
job = self._CreateJob(self.queue, job_id, ops)
self.job = job
self.opcounter = itertools.count(0)
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
self._AfterStart)
tsf = self._NewTimeoutStrategy
self.assertFalse(self.done_lock_before_blocking)
while True:
proc = jqueue._JobProcessor(self.queue, opexec, job,
_timeout_strategy_factory=tsf)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
if self.curop is not None:
self.prev_status = self.job.ops[self.curop].status
self.lock_acq_prio = None
result = proc(_nextop_fn=self._NextOpcode)
assert self.curop is not None
# Input priority should never be set or modified
self.assertFalse(compat.any(hasattr(op.input, "priority")
for op in job.ops))
if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
if result == jqueue._JobProcessor.FINISHED:
self.assertFalse(job.cur_opctx)
break
self.assertEqual(result, jqueue._JobProcessor.DEFER)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
job.start_timestamp)
if self.gave_lock:
# Opcode finished, but job not yet done
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
else:
# Did not get locks
self.assert_(job.cur_opctx)
self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt)
self.assertFalse(job.ops[self.curop].exec_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# If priority has changed since acquiring locks, the job must've been
# updated
if self.lock_acq_prio != job.ops[self.curop].priority:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertEqual(self.curop, len(job.ops) - 1)
self.assertEqual(self.job, job)
self.assertEqual(self.opcounter.next(), len(job.ops))
self.assert_(self.done_lock_before_blocking)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
self.assertEqual(job.GetInfo(["opstatus"]),
[len(job.ops) * [constants.OP_STATUS_SUCCESS]])
self.assert_(compat.all(op.start_timestamp and op.end_timestamp
for op in job.ops))
# Calling the processor on a finished job should be a no-op
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
class TestJobProcessorChangePriority(unittest.TestCase, _JobProcessorTestUtils):
def setUp(self):
self.queue = _FakeQueueForProc()
self.opexecprio = []
def _BeforeStart(self, timeout, priority):
self.assertFalse(self.queue.IsAcquired())
self.opexecprio.append(priority)
def testChangePriorityWhileRunning(self):
# Tests changing the priority on a job while it has finished opcodes
# (successful) and more, unprocessed ones
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
for i in range(3)]
# Create job
job_id = 3499
job = self._CreateJob(self.queue, job_id, ops)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart, None)
# Run first opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED,
constants.OP_STATUS_QUEUED],
["Res0", None, None]])
self.assertEqual(self.opexecprio.pop(0), constants.OP_PRIO_DEFAULT)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Change priority
self.assertEqual(job.ChangePriority(-10),
(True,
("Priorities of pending opcodes for job 3499 have"
" been changed to -10")))
self.assertEqual(job.CalcPriority(), -10)
# Process second opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
self.assertEqual(self.opexecprio.pop(0), -10)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Check status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.CalcPriority(), -10)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_QUEUED],
["Res0", "Res1", None]])
# Change priority once more
self.assertEqual(job.ChangePriority(5),
(True,
("Priorities of pending opcodes for job 3499 have"
" been changed to 5")))
self.assertEqual(job.CalcPriority(), 5)
# Process third opcode
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertEqual(self.opexecprio.pop(0), 5)
self.assertRaises(IndexError, self.opexecprio.pop, 0)
# Check status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
self.assertEqual(job.GetInfo(["id"]), [job_id])
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
[[constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS,
constants.OP_STATUS_SUCCESS],
["Res0", "Res1", "Res2"]])
self.assertEqual(map(operator.attrgetter("priority"), job.ops),
[constants.OP_PRIO_DEFAULT, -10, 5])
class _IdOnlyFakeJob:
def __init__(self, job_id, priority=NotImplemented):
self.id = str(job_id)
self._priority = priority
def CalcPriority(self):
return self._priority
class TestJobDependencyManager(unittest.TestCase):
def setUp(self):
self._status = []
self._queue = []
self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
def _GetStatus(self, job_id):
(exp_job_id, result) = self._status.pop(0)
self.assertEqual(exp_job_id, job_id)
return result
def _Enqueue(self, jobs):
self.assertFalse(self.jdm._lock.is_owned(),
msg=("Must not own manager lock while re-adding jobs"
" (potential deadlock)"))
self._queue.append(jobs)
def testNotFinalizedThenCancel(self):
job = _IdOnlyFakeJob(17697)
job_id = str(28625)
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/28625", None, None, [("job", [job.id])])
])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.CANCEL)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testNotFinalizedThenQueued(self):
# This can happen on a queue shutdown
job = _IdOnlyFakeJob(1320)
job_id = str(22971)
for i in range(5):
if i > 2:
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
else:
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/22971", None, None, [("job", [job.id])])
])
def testRequireCancel(self):
job = _IdOnlyFakeJob(5278)
job_id = str(9610)
dep_status = [constants.JOB_STATUS_CANCELED]
self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/9610", None, None, [("job", [job.id])])
])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireError(self):
job = _IdOnlyFakeJob(21459)
job_id = str(25519)
dep_status = [constants.JOB_STATUS_ERROR]
self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self._status.append((job_id, constants.JOB_STATUS_ERROR))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireMultiple(self):
dep_status = list(constants.JOBS_FINALIZED)
for end_status in dep_status:
job = _IdOnlyFakeJob(21343)
job_id = str(14609)
self._status.append((job_id, constants.JOB_STATUS_WAITING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/14609", None, None, [("job", [job.id])])
])
self._status.append((job_id, end_status))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testNotify(self):
job = _IdOnlyFakeJob(8227)
job_id = str(4113)
self._status.append((job_id, constants.JOB_STATUS_RUNNING))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.jdm.NotifyWaiters(job_id)
self.assertFalse(self._status)
self.assertFalse(self.jdm._waiters)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertEqual(self._queue, [set([job])])
def testWrongStatus(self):
job = _IdOnlyFakeJob(10102)
job_id = str(1271)
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self._status.append((job_id, constants.JOB_STATUS_ERROR))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.WRONGSTATUS)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
def testCorrectStatus(self):
job = _IdOnlyFakeJob(24273)
job_id = str(23885)
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
def testFinalizedRightAway(self):
job = _IdOnlyFakeJob(224)
job_id = str(3081)
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, {
job_id: set(),
})
# Force cleanup
self.jdm.NotifyWaiters("0")
self.assertFalse(self.jdm._waiters)
self.assertFalse(self._status)
self.assertFalse(self._queue)
def testMultipleWaiting(self):
# Use a deterministic random generator
rnd = random.Random(21402)
job_ids = map(str, rnd.sample(range(1, 10000), 150))
waiters = dict((job_ids.pop(),
set(map(_IdOnlyFakeJob,
[job_ids.pop()
for _ in range(rnd.randint(1, 20))])))
for _ in range(10))
# Ensure there are no duplicate job IDs
assert not utils.FindDuplicates(waiters.keys() +
[job.id
for jobs in waiters.values()
for job in jobs])
# Register all jobs as waiters
for job_id, job in [(job_id, job)
for (job_id, jobs) in waiters.items()
for job in jobs]:
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, waiters)
def _MakeSet((name, mode, owner_names, pending)):
return (name, mode, owner_names,
[(pendmode, set(pend)) for (pendmode, pend) in pending])
def _CheckLockInfo():
info = self.jdm.GetLockInfo([query.LQ_PENDING])
self.assertEqual(sorted(map(_MakeSet, info)), sorted([
("job/%s" % job_id, None, None,
[("job", set([job.id for job in jobs]))])
for job_id, jobs in waiters.items()
if jobs
]))
_CheckLockInfo()
# Notify in random order
for job_id in rnd.sample(waiters, len(waiters)):
# Remove from pending waiter list
jobs = waiters.pop(job_id)
for job in jobs:
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
_CheckLockInfo()
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
assert not waiters
def testSelfDependency(self):
job = _IdOnlyFakeJob(18937)
self._status.append((job.id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job.id, [])
self.assertEqual(result, self.jdm.ERROR)
def testJobDisappears(self):
job = _IdOnlyFakeJob(30540)
job_id = str(23769)
def _FakeStatus(_):
raise errors.JobLost("#msg#")
jdm = jqueue._JobDependencyManager(_FakeStatus, None)
(result, _) = jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.ERROR)
self.assertFalse(jdm.JobWaiting(job))
self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
if __name__ == "__main__":
testutils.GanetiTestProgram()