blob: 4391a4f71b66c6dd71da47601d55795954036be8 [file] [log] [blame]
#
#
# Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Module for the LUXI protocol
This module implements the local unix socket protocol. You only need
this module and the opcodes module in the client program in order to
communicate with the master.
The module is also used by the master daemon.
"""
from ganeti import constants
from ganeti import pathutils
from ganeti import objects
import ganeti.rpc.client as cl
from ganeti.rpc.errors import RequestError
from ganeti.rpc.transport import Transport
__all__ = [
# classes:
"Client"
]
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
REQ_QUERY = constants.LUXI_REQ_QUERY
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
REQ_QUERY_FILTERS = constants.LUXI_REQ_QUERY_FILTERS
REQ_REPLACE_FILTER = constants.LUXI_REQ_REPLACE_FILTER
REQ_DELETE_FILTER = constants.LUXI_REQ_DELETE_FILTER
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
REQ_ALL = constants.LUXI_REQ_ALL
DEF_RWTO = constants.LUXI_DEF_RWTO
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
class Client(cl.AbstractClient):
"""High-level client implementation.
This uses a backing Transport-like class on top of which it
implements data serialization/deserialization.
"""
def __init__(self, address=None, timeouts=None, transport=Transport):
"""Constructor for the Client class.
Arguments are the same as for L{AbstractClient}.
"""
super(Client, self).__init__(timeouts, transport)
# Override the version of the protocol:
self.version = constants.LUXI_VERSION
# Store the socket address
if address is None:
address = pathutils.QUERY_SOCKET
self.address = address
self._InitTransport()
def _GetAddress(self):
return self.address
def SetQueueDrainFlag(self, drain_flag):
return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
def SetWatcherPause(self, until):
return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
def PickupJob(self, job):
return self.CallMethod(REQ_PICKUP_JOB, (job,))
def SubmitJob(self, ops):
ops_state = [op.__getstate__()
if not isinstance(op, objects.ConfigObject)
else op.ToDict(_with_private=True) for op in ops]
return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
def SubmitJobToDrainedQueue(self, ops):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
def SubmitManyJobs(self, jobs):
jobs_state = []
for ops in jobs:
jobs_state.append([op.__getstate__() for op in ops])
return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
@staticmethod
def _PrepareJobId(request_name, job_id):
try:
return int(job_id)
except ValueError:
raise RequestError("Invalid parameter passed to %s as job id: "
" expected integer, got value %s" %
(request_name, job_id))
def CancelJob(self, job_id, kill=False):
job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
return self.CallMethod(REQ_CANCEL_JOB, (job_id, kill))
def ArchiveJob(self, job_id):
job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
def ChangeJobPriority(self, job_id, priority):
job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
def AutoArchiveJobs(self, age):
timeout = (DEF_RWTO - 1) / 2
return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
def WaitForJobChangeOnce(self, job_id, fields,
prev_job_info, prev_log_serial,
timeout=WFJC_TIMEOUT):
"""Waits for changes on a job.
@param job_id: Job ID
@type fields: list
@param fields: List of field names to be observed
@type prev_job_info: None or list
@param prev_job_info: Previously received job information
@type prev_log_serial: None or int/long
@param prev_log_serial: Highest log serial number previously received
@type timeout: int/float
@param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
be capped to that value)
"""
assert timeout >= 0, "Timeout can not be negative"
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info,
prev_log_serial,
min(WFJC_TIMEOUT, timeout)))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
while True:
result = self.WaitForJobChangeOnce(job_id, fields,
prev_job_info, prev_log_serial)
if result != constants.JOB_NOTCHANGED:
break
return result
def Query(self, what, fields, qfilter):
"""Query for resources/items.
@param what: One of L{constants.QR_VIA_LUXI}
@type fields: List of strings
@param fields: List of requested fields
@type qfilter: None or list
@param qfilter: Query filter
@rtype: L{objects.QueryResponse}
"""
result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
return objects.QueryResponse.FromDict(result)
def QueryFields(self, what, fields):
"""Query for available fields.
@param what: One of L{constants.QR_VIA_LUXI}
@type fields: None or list of strings
@param fields: List of requested fields
@rtype: L{objects.QueryFieldsResponse}
"""
result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
return objects.QueryFieldsResponse.FromDict(result)
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
def QueryFilters(self, uuids, fields):
return self.CallMethod(REQ_QUERY_FILTERS, (uuids, fields))
def ReplaceFilter(self, uuid, priority, predicates, action, reason):
return self.CallMethod(REQ_REPLACE_FILTER,
(uuid, priority, predicates, action, reason))
def DeleteFilter(self, uuid):
return self.CallMethod(REQ_DELETE_FILTER, (uuid, ))
def QueryInstances(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
def QueryNodes(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
def QueryGroups(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
def QueryNetworks(self, names, fields, use_locking):
return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
def QueryExports(self, nodes, use_locking):
return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
def QueryClusterInfo(self):
return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
def QueryTags(self, kind, name):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))