blob: 8e61805e132193afb6d2a4e24d3938d485aadd57 [file] [log] [blame]
#
#
# Copyright (C) 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Module implementing executing of a job as a separate process
The complete protocol of initializing a job is described in the haskell
module Ganeti.Query.Exec
"""
import contextlib
import logging
import os
import signal
import sys
import time
from ganeti import mcpu
from ganeti.server import masterd
from ganeti.rpc import transport
from ganeti import serializer
from ganeti import utils
from ganeti import pathutils
from ganeti.utils import livelock
from ganeti.jqueue import _JobProcessor
def _GetMasterInfo():
"""Retrieve job id, lock file name and secret params from the master process
This also closes standard input/output
@rtype: (int, string, json encoding of a list of dicts)
"""
logging.debug("Opening transport over stdin/out")
with contextlib.closing(transport.FdTransport((0, 1))) as trans:
logging.debug("Reading job id from the master process")
job_id = int(trans.Call(""))
logging.debug("Got job id %d", job_id)
logging.debug("Reading the livelock name from the master process")
livelock_name = livelock.LiveLockName(trans.Call(""))
logging.debug("Got livelock %s", livelock_name)
logging.debug("Reading secret parameters from the master process")
secret_params = trans.Call("")
logging.debug("Got secret parameters.")
return (job_id, livelock_name, secret_params)
def RestorePrivateValueWrapping(json):
"""Wrap private values in JSON decoded structure.
@param json: the json-decoded value to protect.
"""
result = []
for secrets_dict in json:
if secrets_dict is None:
data = serializer.PrivateDict()
else:
data = serializer.PrivateDict(secrets_dict)
result.append(data)
return result
def main():
debug = int(os.environ["GNT_DEBUG"])
logname = pathutils.GetLogFilename("jobs")
utils.SetupLogging(logname, "job-startup", debug=debug)
(job_id, livelock_name, secret_params_serialized) = _GetMasterInfo()
secret_params = ""
if secret_params_serialized:
secret_params_json = serializer.LoadJson(secret_params_serialized)
secret_params = RestorePrivateValueWrapping(secret_params_json)
utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
try:
logging.debug("Preparing the context and the configuration")
context = masterd.GanetiContext(livelock_name)
logging.debug("Registering signal handlers")
cancel = [False]
prio_change = [False]
def _TermHandler(signum, _frame):
logging.info("Killed by signal %d", signum)
cancel[0] = True
signal.signal(signal.SIGTERM, _TermHandler)
def _HupHandler(signum, _frame):
logging.debug("Received signal %d, old flag was %s, will set to True",
signum, mcpu.sighupReceived)
mcpu.sighupReceived[0] = True
signal.signal(signal.SIGHUP, _HupHandler)
def _User1Handler(signum, _frame):
logging.info("Received signal %d, indicating priority change", signum)
prio_change[0] = True
signal.signal(signal.SIGUSR1, _User1Handler)
job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
job.SetPid(os.getpid())
if secret_params:
for i in range(0, len(secret_params)):
if hasattr(job.ops[i].input, "osparams_secret"):
job.ops[i].input.osparams_secret = secret_params[i]
execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode
proc = _JobProcessor(context.jobqueue, execfun, job)
result = _JobProcessor.DEFER
while result != _JobProcessor.FINISHED:
result = proc()
if result == _JobProcessor.WAITDEP and not cancel[0]:
# Normally, the scheduler should avoid starting a job where the
# dependencies are not yet finalised. So warn, but wait an continue.
logging.warning("Got started despite a dependency not yet finished")
time.sleep(5)
if cancel[0]:
logging.debug("Got cancel request, cancelling job %d", job_id)
r = context.jobqueue.CancelJob(job_id)
job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
proc = _JobProcessor(context.jobqueue, execfun, job)
logging.debug("CancelJob result for job %d: %s", job_id, r)
cancel[0] = False
if prio_change[0]:
logging.debug("Received priority-change request")
try:
fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id)
new_prio = int(utils.ReadFile(fname))
utils.RemoveFile(fname)
logging.debug("Changing priority of job %d to %d", job_id, new_prio)
r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
proc = _JobProcessor(context.jobqueue, execfun, job)
logging.debug("Result of changing priority of %d to %d: %s", job_id,
new_prio, r)
except Exception: # pylint: disable=W0703
logging.warning("Informed of priority change, but could not"
" read new priority")
prio_change[0] = False
except Exception: # pylint: disable=W0703
logging.exception("Exception when trying to run job %d", job_id)
finally:
logging.debug("Job %d finalized", job_id)
logging.debug("Removing livelock file %s", livelock_name.GetPath())
os.remove(livelock_name.GetPath())
sys.exit(0)
if __name__ == '__main__':
main()