Merge branch 'stable-2.16' into stable-2.17
* stable-2.16
Fix some trivial pep8/pylint errors
Make executeRpcCall only compute rpcCallData once
Remove storage unit selection from rpcCallData
Only import needed functions from Ganeti.JSON
Special case WaitForJobChange to reduce heap use
* stable-2.15
Cleanup more pylint/pep8/apidoc errors
KVM: handle gracefully too old/too new psutil versions
Fix small typo in opcode tests
Fix line-too long errors
Manually update import lists to fix compile errors, and fix trivial
pep8 whitespace warning.
Signed-off-by: Brian Foley <bpfoley@google.com>
Reviewed-by: Viktor Bachraty <vbachraty@google.com>
diff --git a/INSTALL b/INSTALL
index e33ab96..95f3019 100644
--- a/INSTALL
+++ b/INSTALL
@@ -42,7 +42,9 @@
- `Paramiko <http://www.lag.net/paramiko/>`_, if you want to use
``ganeti-listrunner``
- `psutil Python module <https://github.com/giampaolo/psutil>`_,
- optional python package for supporting CPU pinning under KVM
+ optional python package for supporting CPU pinning under KVM, versions
+ 2.x.x only; beware that versions from 2.0.0 to before 2.2.0 had a
+ number of file handle leaks, so running at least 2.2.0 is advised
- `fdsend Python module <https://gitorious.org/python-fdsend>`_,
optional Python package for supporting NIC hotplugging under KVM
- `qemu-img <http://qemu.org/>`_, if you want to use ``ovfconverter``
diff --git a/lib/backend.py b/lib/backend.py
index 9ea1c50..e597410 100644
--- a/lib/backend.py
+++ b/lib/backend.py
@@ -1104,6 +1104,7 @@
return result
+
def VerifyNodeNetTest(my_name, test_config):
"""Verify nodes are reachable.
@@ -1146,6 +1147,7 @@
" and ".join(fail))
return result
+
def VerifyMasterIP(my_name, test_config):
"""Verify master IP is reachable.
diff --git a/lib/cli.py b/lib/cli.py
index 0139a24..1a1815c 100644
--- a/lib/cli.py
+++ b/lib/cli.py
@@ -747,7 +747,7 @@
timer += constants.CLI_WFJC_FREQUENCY
else:
result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
- prev_logmsg_serial, timeout=update_freq)
+ prev_logmsg_serial, timeout=update_freq)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
@@ -906,6 +906,7 @@
"""
return self.cl.CancelJob(job_id)
+
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
def __init__(self, feedback_fn):
"""Initializes this class.
diff --git a/lib/cmdlib/cluster/verify.py b/lib/cmdlib/cluster/verify.py
index ef286c1..76809b6 100644
--- a/lib/cmdlib/cluster/verify.py
+++ b/lib/cmdlib/cluster/verify.py
@@ -109,8 +109,8 @@
@type error_descriptor: tuple (string, string, string)
@param error_descriptor: triplet describing the error (object_type,
code, description)
- @type obj_name: string
- @param obj_name: name of object (instance, node ..) the error relates to
+ @type object_name: string
+ @param object_name: name of object (instance, node ..) the error relates to
@type message_list: list of strings
@param message_list: body of error messages
@type log_type: string
@@ -133,13 +133,15 @@
log_type, error_code, object_type, object_name, msg))
else:
if not object_name:
- object_name = ""
+ object_name = ""
for msg in message_list:
prefixed_list.append(" - %s: %s %s: %s" % (
log_type, object_type, object_name, msg))
# Report messages via the feedback_fn
- self._feedback_fn(constants.ELOG_MESSAGE_LIST, prefixed_list) # pylint: disable=E1101,C0301
+ # pylint: disable=E1101
+ self._feedback_fn(constants.ELOG_MESSAGE_LIST, prefixed_list)
+ # pylint: enable=E1101
# do not mark the operation as failed for WARN cases only
if log_type == self.ETYPE_ERROR:
@@ -781,8 +783,8 @@
if constants.NV_MASTERIP not in nresult:
self._ErrorMsg(constants.CV_ENODENET, ninfo.name,
- "node hasn't returned node master IP reachability data")
- elif nresult[constants.NV_MASTERIP] == False: # be explicit, could be None
+ "node hasn't returned node master IP reachability data")
+ elif nresult[constants.NV_MASTERIP] == False: # be explicit, could be None
if ninfo.uuid == self.master_node:
msg = "the master node cannot reach the master IP (not configured?)"
else:
diff --git a/lib/http/server.py b/lib/http/server.py
index 8b3a4ee..9a4563e 100644
--- a/lib/http/server.py
+++ b/lib/http/server.py
@@ -423,7 +423,8 @@
try:
http.Handshake(sock, self.WRITE_TIMEOUT)
except http.HttpSessionHandshakeUnexpectedEOF:
- logging.debug("Unexpected EOF from %s:%s" % (client_addr[0], client_addr[1]))
+ logging.debug("Unexpected EOF from %s:%s",
+ client_addr[0], client_addr[1])
# Ignore rest
return
@@ -609,8 +610,9 @@
t_setup = time.time()
self.request_executor(self, self.handler, connection, client_addr)
t_end = time.time()
- logging.debug("Request from %s:%s executed in: %.4f [setup: %.4f] [workers: %d]" % (
- client_addr[0], client_addr[1], t_end - t_start, t_setup - t_start, len(self._children)))
+ logging.debug("Request from %s:%s executed in: %.4f [setup: %.4f] "
+ "[workers: %d]", client_addr[0], client_addr[1],
+ t_end - t_start, t_setup - t_start, len(self._children))
except Exception: # pylint: disable=W0703
logging.exception("Error while handling request from %s:%s",
diff --git a/lib/hypervisor/hv_kvm/__init__.py b/lib/hypervisor/hv_kvm/__init__.py
index 1e89c82..580606b 100644
--- a/lib/hypervisor/hv_kvm/__init__.py
+++ b/lib/hypervisor/hv_kvm/__init__.py
@@ -45,7 +45,17 @@
from bitarray import bitarray
try:
import psutil # pylint: disable=F0401
+ if psutil.version_info < (2, 0, 0):
+ # The psutil version seems too old, we ignore it
+ psutil_err = "too old (2.x.x needed, %s found)" % psutil.__version__
+ psutil = None
+ elif psutil.version_info >= (3,):
+ psutil_err = "too new (2.x.x needed, %s found)" % psutil.__version__
+ psutil = None
+ else:
+ psutil_err = "<no error>"
except ImportError:
+ psutil_err = "not found"
psutil = None
try:
import fdsend # pylint: disable=F0401
@@ -885,11 +895,14 @@
"""
if psutil is None:
- raise errors.HypervisorError("psutil Python package not"
- " found; cannot use CPU pinning under KVM")
+ raise errors.HypervisorError("psutil Python package %s"
+ "; cannot use CPU pinning"
+ " under KVM" % psutil_err)
target_process = psutil.Process(process_id)
if cpus == constants.CPU_PINNING_OFF:
+ # we checked this at import time
+ # pylint: disable=E1101
target_process.set_cpu_affinity(range(psutil.cpu_count()))
else:
target_process.set_cpu_affinity(cpus)
diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py
index 751e6ef..8514fcb 100644
--- a/lib/rapi/rlib2.py
+++ b/lib/rapi/rlib2.py
@@ -200,6 +200,7 @@
return inst
+
def _CheckIfConnectionDropped(sock):
"""Utility function to monitor the state of an open connection.
diff --git a/lib/storage/bdev.py b/lib/storage/bdev.py
index 1f95004..7c1897c 100644
--- a/lib/storage/bdev.py
+++ b/lib/storage/bdev.py
@@ -510,7 +510,6 @@
logging.warning("lvs command returned an empty output, the LV cache will"
"be empty!")
return {}
- info = {}
return dict([LogicalVolume._ParseLvInfoLine(line, sep) for line in out])
def Attach(self, lv_info=None):
diff --git a/src/Ganeti/Confd/Server.hs b/src/Ganeti/Confd/Server.hs
index 3ff0393..a2ec0a9 100644
--- a/src/Ganeti/Confd/Server.hs
+++ b/src/Ganeti/Confd/Server.hs
@@ -58,7 +58,7 @@
import Ganeti.BasicTypes
import Ganeti.Errors
import Ganeti.Daemon
-import Ganeti.JSON
+import Ganeti.JSON (containerFromList, fromContainer)
import Ganeti.Objects
import Ganeti.Confd.Types
import Ganeti.Confd.Utils
diff --git a/src/Ganeti/Confd/Utils.hs b/src/Ganeti/Confd/Utils.hs
index 4e43642..ba5585f 100644
--- a/src/Ganeti/Confd/Utils.hs
+++ b/src/Ganeti/Confd/Utils.hs
@@ -59,7 +59,7 @@
import Ganeti.Hash
import qualified Ganeti.Constants as C
import qualified Ganeti.Path as Path
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult)
import Ganeti.Utils
-- | Type-adjusted max clock skew constant.
diff --git a/src/Ganeti/Config.hs b/src/Ganeti/Config.hs
index f67aff1..5687b54 100644
--- a/src/Ganeti/Config.hs
+++ b/src/Ganeti/Config.hs
@@ -103,7 +103,7 @@
import qualified Ganeti.Constants as C
import qualified Ganeti.ConstantUtils as CU
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult, fromContainer, GenericContainer(..))
import Ganeti.Objects
import Ganeti.Types
import qualified Ganeti.Utils.MultiMap as MM
diff --git a/src/Ganeti/DataCollectors/Lv.hs b/src/Ganeti/DataCollectors/Lv.hs
index 4846988..27a80cc 100644
--- a/src/Ganeti/DataCollectors/Lv.hs
+++ b/src/Ganeti/DataCollectors/Lv.hs
@@ -61,7 +61,7 @@
import Ganeti.Confd.ClientFunctions
import Ganeti.DataCollectors.CLI
import Ganeti.DataCollectors.Types
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult)
import Ganeti.Objects
import Ganeti.Storage.Lvm.LVParser
import Ganeti.Storage.Lvm.Types
diff --git a/src/Ganeti/HTools/Backend/IAlloc.hs b/src/Ganeti/HTools/Backend/IAlloc.hs
index e431947..e40c3d7 100644
--- a/src/Ganeti/HTools/Backend/IAlloc.hs
+++ b/src/Ganeti/HTools/Backend/IAlloc.hs
@@ -65,7 +65,7 @@
import Ganeti.HTools.CLI
import Ganeti.HTools.Loader
import Ganeti.HTools.Types
-import Ganeti.JSON
+import Ganeti.JSON (maybeFromObj, JSRecord, tryFromObj, toArray, asObjectList, readEitherString, fromJResult, fromObj, fromObjWithDefault, asJSObject, emptyContainer)
import Ganeti.Types ( EvacMode(ChangePrimary, ChangeSecondary)
, adminStateFromRaw, AdminState(..))
import Ganeti.Utils
diff --git a/src/Ganeti/HTools/Backend/Luxi.hs b/src/Ganeti/HTools/Backend/Luxi.hs
index be81a3c..639d74d 100644
--- a/src/Ganeti/HTools/Backend/Luxi.hs
+++ b/src/Ganeti/HTools/Backend/Luxi.hs
@@ -51,8 +51,9 @@
import qualified Ganeti.HTools.Group as Group
import qualified Ganeti.HTools.Node as Node
import qualified Ganeti.HTools.Instance as Instance
-import Ganeti.JSON
-import Ganeti.Objects as O
+import Ganeti.JSON (fromJVal, tryFromObj, arrayMaybeFromJVal,
+ getKeysFromContainer, Container)
+import Ganeti.Objects (PartialNicParams)
{-# ANN module "HLint: ignore Eta reduce" #-}
diff --git a/src/Ganeti/HTools/Backend/MonD.hs b/src/Ganeti/HTools/Backend/MonD.hs
index d7d8b4c..be420a5 100644
--- a/src/Ganeti/HTools/Backend/MonD.hs
+++ b/src/Ganeti/HTools/Backend/MonD.hs
@@ -78,7 +78,7 @@
import Ganeti.HTools.Loader (ClusterData(..))
import Ganeti.HTools.Types
import Ganeti.HTools.CLI
-import Ganeti.JSON
+import Ganeti.JSON (fromJVal, tryFromObj, JSRecord, loadJSArray, maybeParseMap)
import Ganeti.Logging.Lifted (logWarning)
import Ganeti.Utils (exitIfBad)
diff --git a/src/Ganeti/HTools/Backend/Rapi.hs b/src/Ganeti/HTools/Backend/Rapi.hs
index 03d158c..218411c 100644
--- a/src/Ganeti/HTools/Backend/Rapi.hs
+++ b/src/Ganeti/HTools/Backend/Rapi.hs
@@ -53,7 +53,7 @@
import Ganeti.BasicTypes
import Ganeti.HTools.Loader
import Ganeti.HTools.Types
-import Ganeti.JSON
+import Ganeti.JSON (loadJSArray, JSRecord, tryFromObj, fromJVal, maybeFromObj, fromJResult, tryArrayMaybeFromObj, readEitherString, fromObjWithDefault, asJSObject, emptyContainer)
import qualified Ganeti.HTools.Group as Group
import qualified Ganeti.HTools.Node as Node
import qualified Ganeti.HTools.Instance as Instance
diff --git a/src/Ganeti/JQScheduler/Filtering.hs b/src/Ganeti/JQScheduler/Filtering.hs
index 34e55bb..c031415 100644
--- a/src/Ganeti/JQScheduler/Filtering.hs
+++ b/src/Ganeti/JQScheduler/Filtering.hs
@@ -56,7 +56,7 @@
import Ganeti.JQScheduler.Types
import Ganeti.JQueue (QueuedJob(..))
import Ganeti.JQueue.Lens
-import Ganeti.JSON
+import Ganeti.JSON (nestedAccessByKeyDotted)
import Ganeti.Objects (FilterRule(..), FilterAction(..), FilterPredicate(..),
filterRuleOrder)
import Ganeti.OpCodes (OpCode)
diff --git a/src/Ganeti/JQueue.hs b/src/Ganeti/JQueue.hs
index a3f9da2..736fce6 100644
--- a/src/Ganeti/JQueue.hs
+++ b/src/Ganeti/JQueue.hs
@@ -122,7 +122,7 @@
import Ganeti.Errors (ErrorResult, ResultG)
import Ganeti.JQueue.Lens (qoInputL, validOpCodeL)
import Ganeti.JQueue.Objects
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult, fromObjWithDefault)
import Ganeti.Logging
import Ganeti.Luxi
import Ganeti.Objects (ConfigData, Node)
diff --git a/src/Ganeti/JSON.hs b/src/Ganeti/JSON.hs
index a666ba2..86323ba 100644
--- a/src/Ganeti/JSON.hs
+++ b/src/Ganeti/JSON.hs
@@ -549,4 +549,4 @@
-- | Maybe obtain a map from a JSON object.
maybeParseMap :: J.JSON a => J.JSValue -> Maybe (Map.Map String a)
-maybeParseMap = liftM fromContainer . readContainer <=< asJSObject
+maybeParseMap =liftM fromContainer . readContainer <=< asJSObject
diff --git a/src/Ganeti/Luxi.hs b/src/Ganeti/Luxi.hs
index 4439cef..831e859 100644
--- a/src/Ganeti/Luxi.hs
+++ b/src/Ganeti/Luxi.hs
@@ -73,7 +73,7 @@
import Ganeti.BasicTypes
import Ganeti.Constants
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult, fromJVal, fromObj, Tuple5(..), MaybeForJSON(..), TimeAsDoubleJSON(..))
import Ganeti.UDSServer
import Ganeti.Objects
import Ganeti.OpParams (pTagsObject)
diff --git a/src/Ganeti/Objects.hs b/src/Ganeti/Objects.hs
index 4b92561..065aaa8 100644
--- a/src/Ganeti/Objects.hs
+++ b/src/Ganeti/Objects.hs
@@ -130,7 +130,7 @@
import qualified AutoConf
import qualified Ganeti.Constants as C
import qualified Ganeti.ConstantUtils as ConstantUtils
-import Ganeti.JSON
+import Ganeti.JSON (DictObject(..), Container, emptyContainer, GenericContainer)
import Ganeti.Objects.BitArray (BitArray)
import Ganeti.Objects.Disk
import Ganeti.Objects.Maintenance
diff --git a/src/Ganeti/Objects/BitArray.hs b/src/Ganeti/Objects/BitArray.hs
index f121ba4..62b45c4 100644
--- a/src/Ganeti/Objects/BitArray.hs
+++ b/src/Ganeti/Objects/BitArray.hs
@@ -63,7 +63,7 @@
import qualified Text.JSON as J
import Ganeti.BasicTypes
-import Ganeti.JSON
+import Ganeti.JSON (readEitherString)
-- | A fixed-size, space-efficient array of bits.
data BitArray = BitArray
diff --git a/src/Ganeti/Objects/Disk.hs b/src/Ganeti/Objects/Disk.hs
index 4e95749..f6b3cbb 100644
--- a/src/Ganeti/Objects/Disk.hs
+++ b/src/Ganeti/Objects/Disk.hs
@@ -83,7 +83,7 @@
showsPrec _ (LogicalVolume g v) =
showString g . showString "/" . showString v
--- | Check the constraints for a VG/LV names (except the \@\/dev\/\@ check).
+-- | Check the constraints for VG\/LV names (except the @\/dev\/@ check).
instance Validatable LogicalVolume where
validate (LogicalVolume g v) = do
let vgn = "Volume group name"
diff --git a/src/Ganeti/OpParams.hs b/src/Ganeti/OpParams.hs
index a965df3..83fd2c4 100644
--- a/src/Ganeti/OpParams.hs
+++ b/src/Ganeti/OpParams.hs
@@ -324,7 +324,7 @@
import Ganeti.THH
import Ganeti.THH.Field
import Ganeti.Utils
-import Ganeti.JSON
+import Ganeti.JSON (GenericContainer)
import Ganeti.Types
import qualified Ganeti.Query.Language as Qlang
diff --git a/src/Ganeti/Query/Common.hs b/src/Ganeti/Query/Common.hs
index 6f8e126..31c40d6 100644
--- a/src/Ganeti/Query/Common.hs
+++ b/src/Ganeti/Query/Common.hs
@@ -67,7 +67,7 @@
import qualified Ganeti.Constants as C
import Ganeti.Config
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (GenericContainer(..), fromContainer, TimeAsDoubleJSON(..))
import Ganeti.Objects
import Ganeti.Rpc
import Ganeti.Query.Language
diff --git a/src/Ganeti/Query/Filter.hs b/src/Ganeti/Query/Filter.hs
index cddc4d8..0d36ff2 100644
--- a/src/Ganeti/Query/Filter.hs
+++ b/src/Ganeti/Query/Filter.hs
@@ -84,7 +84,7 @@
import Ganeti.Query.Language
import Ganeti.Query.Types
import Ganeti.Utils.Monad (anyM, allM)
-import Ganeti.JSON
+import Ganeti.JSON (fromJVal)
-- | Compiles a filter based on field names to one based on getters.
compileFilter :: FieldMap a b
diff --git a/src/Ganeti/Query/Instance.hs b/src/Ganeti/Query/Instance.hs
index 4d2e660..8ea2c0f 100644
--- a/src/Ganeti/Query/Instance.hs
+++ b/src/Ganeti/Query/Instance.hs
@@ -59,7 +59,7 @@
import qualified Ganeti.Constants as C
import qualified Ganeti.ConstantUtils as C
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (MaybeForJSON(..), fromContainer)
import Ganeti.Objects
import Ganeti.Query.Common
import Ganeti.Query.Language
diff --git a/src/Ganeti/Query/Network.hs b/src/Ganeti/Query/Network.hs
index 1fda614..3ef0803 100644
--- a/src/Ganeti/Query/Network.hs
+++ b/src/Ganeti/Query/Network.hs
@@ -47,7 +47,7 @@
import Data.Maybe (fromMaybe, mapMaybe)
import Data.List (find, intercalate)
-import Ganeti.JSON
+import Ganeti.JSON (fromContainer)
import Ganeti.Network
import Ganeti.Objects
import qualified Ganeti.Objects.BitArray as BA
diff --git a/src/Ganeti/Query/Node.hs b/src/Ganeti/Query/Node.hs
index 698e209..f431ade 100644
--- a/src/Ganeti/Query/Node.hs
+++ b/src/Ganeti/Query/Node.hs
@@ -43,13 +43,12 @@
import Data.List (intercalate)
import Data.Maybe
-import qualified Data.Map as Map
import qualified Text.JSON as J
import Ganeti.Config
import Ganeti.Common
import Ganeti.Objects
-import Ganeti.JSON
+import Ganeti.JSON (jsonHead)
import Ganeti.Rpc
import Ganeti.Types
import Ganeti.Query.Language
@@ -306,10 +305,10 @@
let hvs = [getDefaultHypervisorSpec cfg |
queryDomainRequired hypervisorFields fields]
good_nodes = nodesWithValidConfig cfg nodes
- storage_units = if queryDomainRequired storageFields fields
- then getStorageUnitsOfNodes cfg good_nodes
- else Map.fromList
- (map (\n -> (uuidOf n, [])) good_nodes)
- rpcres <- executeRpcCall good_nodes (RpcCallNodeInfo storage_units hvs)
+ storage_units n = if queryDomainRequired storageFields fields
+ then getStorageUnitsOfNode cfg n
+ else []
+ rpcres <- executeRpcCalls
+ [(n, RpcCallNodeInfo (storage_units n) hvs) | n <- good_nodes]
return $ fillUpList (fillPairFromMaybe rpcResultNodeBroken pickPairUnique)
nodes rpcres
diff --git a/src/Ganeti/Query/Query.hs b/src/Ganeti/Query/Query.hs
index 7ccc4db..ace0f8f 100644
--- a/src/Ganeti/Query/Query.hs
+++ b/src/Ganeti/Query/Query.hs
@@ -82,7 +82,7 @@
import Ganeti.Config
import Ganeti.Errors
import Ganeti.JQueue
-import Ganeti.JSON
+import Ganeti.JSON (Container, GenericContainer(..))
import Ganeti.Locking.Allocation (OwnerState, LockRequest(..), OwnerState(..))
import Ganeti.Locking.Locks (GanetiLocks, ClientId, lockName)
import Ganeti.Logging
diff --git a/src/Ganeti/Query/Server.hs b/src/Ganeti/Query/Server.hs
index 89c8b60..aefe129 100644
--- a/src/Ganeti/Query/Server.hs
+++ b/src/Ganeti/Query/Server.hs
@@ -458,19 +458,8 @@
else showJSON (False, genericResult id (const "") res))
$ annotated_results
-handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
- let compute_fn = computeJobUpdate cfg jid fields prev_log
- qDir <- queueDir
- -- verify if the job is finalized, and return immediately in this case
- jobresult <- loadJobFromDisk qDir False jid
- case jobresult of
- Bad s -> return . Bad $ JobLost s
- Ok (job, _) | not (jobFinalized job) -> do
- let jobfile = liveJobFile qDir jid
- answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
- (prev_job, JSArray []) compute_fn
- return . Ok $ showJSON answer
- _ -> liftM (Ok . showJSON) compute_fn
+handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) =
+ waitForJobChange jid prev_job tmout $ computeJobUpdate cfg jid fields prev_log
handleCall _ _ cfg (SetWatcherPause time) = do
let mcs = Config.getMasterOrCandidates cfg
@@ -569,6 +558,30 @@
{-# ANN handleCall "HLint: ignore Too strict if" #-}
+-- | Special-case handler for WaitForJobChange RPC call for fields == ["status"]
+-- that doesn't require the use of ConfigData
+handleWaitForJobChangeStatus :: JobId -> JSValue -> JSValue -> Int
+ -> IO (ErrorResult JSValue)
+handleWaitForJobChangeStatus jid prev_job prev_log tmout =
+ waitForJobChange jid prev_job tmout $ computeJobUpdateStatus jid prev_log
+
+-- | Common WaitForJobChange functionality shared between handleCall and
+-- handleWaitForJobChangeStatus
+waitForJobChange :: JobId -> JSValue -> Int -> IO (JSValue, JSValue)
+ -> IO (ErrorResult JSValue)
+waitForJobChange jid prev_job tmout compute_fn = do
+ qDir <- queueDir
+ -- verify if the job is finalized, and return immediately in this case
+ jobresult <- loadJobFromDisk qDir False jid
+ case jobresult of
+ Bad s -> return . Bad $ JobLost s
+ Ok (job, _) | not (jobFinalized job) -> do
+ let jobfile = liveJobFile qDir jid
+ answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
+ (prev_job, JSArray []) compute_fn
+ return . Ok $ showJSON answer
+ _ -> liftM (Ok . showJSON) compute_fn
+
-- | Query the status of a job and return the requested fields
-- and the logs newer than the given log number.
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
@@ -591,6 +604,26 @@
logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
return (JSArray rfields, rlogs)
+-- | A version of computeJobUpdate hardcoded to only return logs and the status
+-- field. By hardcoding this we avoid using the luxi Query infrastructure and
+-- the ConfigData value it requires.
+computeJobUpdateStatus :: JobId -> JSValue -> IO (JSValue, JSValue)
+computeJobUpdateStatus jid prev_log = do
+ qdir <- queueDir
+ loadResult <- loadJobFromDisk qdir True jid
+ let sjid = show $ fromJobId jid
+ logDebug $ "Inspecting status of job " ++ sjid
+ let (rfields, rlogs) = case loadResult of
+ Ok (job, _) -> (J.JSArray [status], newlogs)
+ where status = showJSON $ calcJobStatus job -- like "status" jobField
+ oplogs = map qoLog (qjOps job) -- like "oplog" jobField
+ newer = case J.readJSON prev_log of
+ J.Ok n -> (\(idx, _time, _type, _msg) -> n < idx)
+ _ -> const True
+ newlogs = showJSON $ concatMap (filter newer) oplogs
+ _ -> (JSArray[JSNull], JSArray [])
+ logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
+ return (rfields, rlogs)
type LuxiConfig = (Lock, JQStatus, ConfigReader)
@@ -598,10 +631,20 @@
:: LuxiConfig
-> LuxiOp
-> IO (Bool, GenericResult GanetiException JSValue)
-luxiExec (qlock, qstat, creader) args = do
- cfg <- creader
- result <- handleCallWrapper qlock qstat cfg args
- return (True, result)
+luxiExec (qlock, qstat, creader) args =
+ case args of
+ -- Special case WaitForJobChange handling to avoid passing a ConfigData to
+ -- a potentially long-lived thread. ConfigData uses lots of heap, and
+ -- multiple handler threads retaining different versions of ConfigData
+ -- increases luxi's memory use for concurrent jobs that modify config.
+ WaitForJobChange jid fields prev_job prev_log tmout
+ | fields == ["status"] -> do
+ result <- handleWaitForJobChangeStatus jid prev_job prev_log tmout
+ return (True, result)
+ _ -> do
+ cfg <- creader
+ result <- handleCallWrapper qlock qstat cfg args
+ return (True, result)
luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
diff --git a/src/Ganeti/Rpc.hs b/src/Ganeti/Rpc.hs
index ddd947c..838575f 100644
--- a/src/Ganeti/Rpc.hs
+++ b/src/Ganeti/Rpc.hs
@@ -111,7 +111,8 @@
import Control.Monad
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.Map as Map
-import Data.Maybe (fromMaybe, mapMaybe)
+import Data.List (zipWith4)
+import Data.Maybe (mapMaybe)
import qualified Text.JSON as J
import Text.JSON.Pretty (pp_value)
import qualified Data.ByteString.Base64.Lazy as Base64
@@ -128,7 +129,7 @@
import Ganeti.Codec
import Ganeti.Curl.Multi
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (ArrayObject(..), GenericContainer(..))
import Ganeti.Logging
import Ganeti.Objects
import Ganeti.Runtime
@@ -179,8 +180,8 @@
-- | Calculate the timeout value for the call execution.
rpcCallTimeout :: a -> Int
-- | Prepare arguments of the call to be send as POST.
- rpcCallData :: Node -> a -> String
- rpcCallData _ = J.encode . J.JSArray . toJSArray
+ rpcCallData :: a -> String
+ rpcCallData = J.encode . J.JSArray . toJSArray
-- | Whether we accept offline nodes when making a call.
rpcCallAcceptOffline :: a -> Bool
@@ -213,12 +214,12 @@
-- | Create HTTP request for a given node provided it is online,
-- otherwise create empty response.
-prepareHttpRequest :: (RpcCall a) => Int -> [CurlOption] -> Node -> a
- -> ERpcError HttpClientRequest
-prepareHttpRequest port opts node call
+prepareHttpRequest :: (RpcCall a) => Int -> [CurlOption] -> Node
+ -> String -> a -> ERpcError HttpClientRequest
+prepareHttpRequest port opts node reqdata call
| rpcCallAcceptOffline call || not (nodeOffline node) =
Right HttpClientRequest { requestUrl = prepareUrl port node call
- , requestData = rpcCallData node call
+ , requestData = reqdata
, requestOpts = opts ++ curlOpts
}
| otherwise = Left OfflineNodeError
@@ -275,9 +276,13 @@
. liftM (fromIntegral . servicePort)
$ getServiceByName C.noded "tcp"
--- | Execute multiple RPC calls in parallel
+-- | Execute multiple distinct RPC calls in parallel
executeRpcCalls :: (Rpc a b) => [(Node, a)] -> IO [(Node, ERpcError b)]
-executeRpcCalls nodeCalls = do
+executeRpcCalls = executeRpcCalls' . map (\(n, c) -> (n, c, rpcCallData c))
+
+-- | Execute multiple RPC calls in parallel
+executeRpcCalls' :: (Rpc a b) => [(Node, a, String)] -> IO [(Node, ERpcError b)]
+executeRpcCalls' nodeCalls = do
port <- getNodedPort
cert_file <- P.nodedCertFile
client_cert_file_name <- P.nodedClientCertFile
@@ -287,16 +292,16 @@
let client_cert_file = if client_file_exists
then client_cert_file_name
else cert_file
- (nodes, calls) = unzip nodeCalls
+ (nodes, calls, datas) = unzip3 nodeCalls
opts = map (getOptionsForCall cert_file client_cert_file) calls
- opts_urls = zipWith3 (\n c o ->
- case prepareHttpRequest port o n c of
+ opts_urls = zipWith4 (\n c d o ->
+ case prepareHttpRequest port o n d c of
Left v -> Left v
Right request ->
Right (CurlPostFields [requestData request]:
requestOpts request,
requestUrl request)
- ) nodes calls opts
+ ) nodes calls datas opts
-- split the opts_urls list; we don't want to pass the
-- failed-already nodes to Curl
let (lefts, rights, trail) = splitEithers opts_urls
@@ -311,8 +316,10 @@
return pairedList
-- | Execute an RPC call for many nodes in parallel.
+-- NB this computes the RPC call payload string only once.
executeRpcCall :: (Rpc a b) => [Node] -> a -> IO [(Node, ERpcError b)]
-executeRpcCall nodes call = executeRpcCalls . zip nodes $ repeat call
+executeRpcCall nodes call = executeRpcCalls' [(n, call, rpc_data) | n <- nodes]
+ where rpc_data = rpcCallData call
-- | Helper function that is used to read dictionaries of values.
sanitizeDictResults :: [(String, J.Result a)] -> ERpcError [(String, a)]
@@ -431,7 +438,7 @@
rpcCallName _ = "all_instances_info"
rpcCallTimeout _ = rpcTimeoutToRaw Urgent
rpcCallAcceptOffline _ = False
- rpcCallData _ call = J.encode (
+ rpcCallData call = J.encode (
map fst $ rpcCallAllInstInfoHypervisors call,
GenericContainer . Map.fromList $ rpcCallAllInstInfoHypervisors call)
@@ -486,7 +493,7 @@
rpcCallName _ = "instance_console_info"
rpcCallTimeout _ = rpcTimeoutToRaw Urgent
rpcCallAcceptOffline _ = False
- rpcCallData _ call = J.encode .
+ rpcCallData call = J.encode .
GenericContainer $ Map.fromList (rpcCallInstConsInfoInstanceInfo call)
instance Rpc RpcCallInstanceConsoleInfo RpcResultInstanceConsoleInfo where
@@ -522,7 +529,7 @@
-- | Returns node information
$(buildObject "RpcCallNodeInfo" "rpcCallNodeInfo"
- [ simpleField "storage_units" [t| Map.Map String [StorageUnit] |]
+ [ simpleField "storage_units" [t| [StorageUnit] |]
, simpleField "hypervisors" [t| [ (Hypervisor, HvParams) ] |]
])
@@ -557,10 +564,8 @@
rpcCallName _ = "node_info"
rpcCallTimeout _ = rpcTimeoutToRaw Urgent
rpcCallAcceptOffline _ = False
- rpcCallData n call = J.encode
- ( fromMaybe (error $ "Programmer error: missing parameter for node named "
- ++ nodeName n)
- $ Map.lookup (uuidOf n) (rpcCallNodeInfoStorageUnits call)
+ rpcCallData call = J.encode
+ ( rpcCallNodeInfoStorageUnits call
, rpcCallNodeInfoHypervisors call
)
@@ -582,7 +587,7 @@
rpcCallName _ = "version"
rpcCallTimeout _ = rpcTimeoutToRaw Urgent
rpcCallAcceptOffline _ = True
- rpcCallData _ = J.encode
+ rpcCallData = J.encode
instance Rpc RpcCallVersion RpcResultVersion where
rpcResultFill _ res = fromJSValueToRes res RpcResultVersion
@@ -651,7 +656,7 @@
rpcCallName _ = "export_list"
rpcCallTimeout _ = rpcTimeoutToRaw Fast
rpcCallAcceptOffline _ = False
- rpcCallData _ = J.encode
+ rpcCallData = J.encode
instance Rpc RpcCallExportList RpcResultExportList where
rpcResultFill _ res = fromJSValueToRes res RpcResultExportList
@@ -671,7 +676,7 @@
rpcCallName _ = "jobqueue_update"
rpcCallTimeout _ = rpcTimeoutToRaw Fast
rpcCallAcceptOffline _ = False
- rpcCallData _ call = J.encode
+ rpcCallData call = J.encode
( rpcCallJobqueueUpdateFileName call
, toCompressed $ rpcCallJobqueueUpdateContent call
)
diff --git a/src/Ganeti/Storage/Drbd/Types.hs b/src/Ganeti/Storage/Drbd/Types.hs
index a60c510..1fb9073 100644
--- a/src/Ganeti/Storage/Drbd/Types.hs
+++ b/src/Ganeti/Storage/Drbd/Types.hs
@@ -54,7 +54,7 @@
import Text.JSON
import Text.Printf
-import Ganeti.JSON
+import Ganeti.JSON (optFieldsToObj, optionalJSField)
--TODO: consider turning deviceInfos into an IntMap
-- | Data type contaning all the data about the status of DRBD.
diff --git a/src/Ganeti/Storage/Utils.hs b/src/Ganeti/Storage/Utils.hs
index 9cfdd7e..f7191f3 100644
--- a/src/Ganeti/Storage/Utils.hs
+++ b/src/Ganeti/Storage/Utils.hs
@@ -33,7 +33,7 @@
-}
module Ganeti.Storage.Utils
- ( getStorageUnitsOfNodes
+ ( getStorageUnitsOfNode
, nodesWithValidConfig
) where
@@ -45,7 +45,6 @@
import Control.Monad
import Data.List (nub)
import Data.Maybe
-import qualified Data.Map as M
-- | Get the cluster's default storage unit for a given disk template
getDefaultStorageKey :: ConfigData -> DiskTemplate -> Maybe StorageKey
@@ -94,8 +93,3 @@
let clusterSUs = getClusterStorageUnitRaws cfg
es = fromJust (getExclusiveStorage cfg n)
in map (addParamsToStorageUnit es) clusterSUs
-
--- | Get the storage unit map for all nodes
-getStorageUnitsOfNodes :: ConfigData -> [Node] -> M.Map String [StorageUnit]
-getStorageUnitsOfNodes cfg ns =
- M.fromList (map (\n -> (uuidOf n, getStorageUnitsOfNode cfg n)) ns)
diff --git a/src/Ganeti/THH.hs b/src/Ganeti/THH.hs
index 4cb610e..4bc7e88 100644
--- a/src/Ganeti/THH.hs
+++ b/src/Ganeti/THH.hs
@@ -104,7 +104,7 @@
import qualified Text.JSON as JSON
import Text.JSON.Pretty (pp_value)
-import Ganeti.JSON
+import Ganeti.JSON (readJSONWithDesc, fromObj, DictObject(..), ArrayObject(..), maybeFromObj, mkUsedKeys, showJSONtoDict, readJSONfromDict, branchOnField, addField, allUsedKeys)
import Ganeti.PartialParams
import Ganeti.PyValue
import Ganeti.THH.PyType
@@ -310,15 +310,16 @@
-- | Checks that a given field is not optional (for object types or
-- fields which should not allow this case).
checkNonOptDef :: (Monad m) => Field -> m ()
-checkNonOptDef (Field { fieldIsOptional = OptionalOmitNull
- , fieldName = name }) =
- fail $ "Optional field " ++ (T.unpack name) ++ " used in parameter declaration"
-checkNonOptDef (Field { fieldIsOptional = OptionalSerializeNull
- , fieldName = name }) =
- fail $ "Optional field " ++ (T.unpack name) ++ " used in parameter declaration"
-checkNonOptDef (Field { fieldDefault = (Just _), fieldName = name }) =
- fail $ "Default field " ++ (T.unpack name) ++ " used in parameter declaration"
-checkNonOptDef _ = return ()
+checkNonOptDef field
+ | fieldIsOptional field == OptionalOmitNull = failWith kOpt
+ | fieldIsOptional field == OptionalSerializeNull = failWith kOpt
+ | isJust (fieldDefault field) = failWith kDef
+ | otherwise = return ()
+ where failWith kind = fail $ kind ++ " field " ++ name
+ ++ " used in parameter declaration"
+ name = T.unpack (fieldName field)
+ kOpt = "Optional"
+ kDef = "Default"
-- | Construct a function that parses a field value. If the field has
-- a custom 'fieldRead', it's applied to @o@ and used. Otherwise
@@ -328,8 +329,9 @@
-> Q Exp -- ^ The resulting function that parses a JSON message
parseFn field o =
let fnType = [t| JSON.JSValue -> JSON.Result $(fieldType field) |]
- expr = maybe [| readJSONWithDesc $(stringE . T.unpack $ fieldName field) |]
- (`appE` o) (fieldRead field)
+ expr = maybe
+ [| readJSONWithDesc $(stringE . T.unpack $ fieldName field) |]
+ (`appE` o) (fieldRead field)
in sigE expr fnType
-- | Produces the expression that will de-serialise a given
@@ -806,7 +808,7 @@
genOpConsFields :: OpCodeConstructor -> Clause
genOpConsFields (cname, _, _, fields, _) =
let op_id = deCamelCase cname
- fieldnames f = (T.unpack . fieldName $ f):(map T.unpack $ fieldExtraKeys f)
+ fieldnames f = map T.unpack $ fieldName f:fieldExtraKeys f
fvals = map (LitE . StringL) . sort . nub $ concatMap fieldnames fields
in Clause [LitP (StringL op_id)] (NormalB $ ListE fvals) []
diff --git a/src/Ganeti/THH/Field.hs b/src/Ganeti/THH/Field.hs
index 6047ca4..9b444e9 100644
--- a/src/Ganeti/THH/Field.hs
+++ b/src/Ganeti/THH/Field.hs
@@ -57,7 +57,7 @@
import System.Posix.Types (FileMode, ProcessID)
import System.Time (ClockTime(..))
-import Ganeti.JSON
+import Ganeti.JSON (TimeAsDoubleJSON(..))
import Ganeti.THH
-- * Internal functions
diff --git a/src/Ganeti/THH/RPC.hs b/src/Ganeti/THH/RPC.hs
index 1cb63ec..25388df 100644
--- a/src/Ganeti/THH/RPC.hs
+++ b/src/Ganeti/THH/RPC.hs
@@ -55,7 +55,7 @@
import Ganeti.BasicTypes
import Ganeti.Errors
-import Ganeti.JSON
+import Ganeti.JSON (fromJResultE, fromJVal)
import Ganeti.THH.Types
import qualified Ganeti.UDSServer as US
diff --git a/src/Ganeti/Types.hs b/src/Ganeti/Types.hs
index 437e62a..8da06d4 100644
--- a/src/Ganeti/Types.hs
+++ b/src/Ganeti/Types.hs
@@ -201,7 +201,7 @@
import System.Time (ClockTime)
import qualified Ganeti.ConstantUtils as ConstantUtils
-import Ganeti.JSON
+import Ganeti.JSON (Container, HasStringRepr(..))
import qualified Ganeti.THH as THH
import Ganeti.Utils
diff --git a/src/Ganeti/UDSServer.hs b/src/Ganeti/UDSServer.hs
index 8478e2d..7008d08 100644
--- a/src/Ganeti/UDSServer.hs
+++ b/src/Ganeti/UDSServer.hs
@@ -97,7 +97,7 @@
import Ganeti.BasicTypes
import Ganeti.Errors (GanetiException(..), ErrorResult)
-import Ganeti.JSON
+import Ganeti.JSON (fromJResult, fromJVal, fromJResultE, fromObj)
import Ganeti.Logging
import Ganeti.THH
import Ganeti.Utils
diff --git a/src/Ganeti/WConfd/Ssconf.hs b/src/Ganeti/WConfd/Ssconf.hs
index b8c83c0..3900578 100644
--- a/src/Ganeti/WConfd/Ssconf.hs
+++ b/src/Ganeti/WConfd/Ssconf.hs
@@ -53,7 +53,7 @@
import Ganeti.BasicTypes
import Ganeti.Config
import Ganeti.Constants
-import Ganeti.JSON
+import Ganeti.JSON (fromContainer, lookupContainer)
import Ganeti.Objects
import Ganeti.Ssconf
import Ganeti.Utils
diff --git a/test/hs/Test/Ganeti/OpCodes.hs b/test/hs/Test/Ganeti/OpCodes.hs
index 4d2167e..603c394 100644
--- a/test/hs/Test/Ganeti/OpCodes.hs
+++ b/test/hs/Test/Ganeti/OpCodes.hs
@@ -638,7 +638,7 @@
extra_hs = hs_ops \\ py_ops
HUnit.assertBool ("Missing OpCodes from the Haskell code:\n" ++
unlines extra_py) (null extra_py)
- HUnit.assertBool ("Extra OpCodes in the Haskell code code:\n" ++
+ HUnit.assertBool ("Extra OpCodes in the Haskell code:\n" ++
unlines extra_hs) (null extra_hs)
-- | Custom HUnit test case that forks a Python process and checks
diff --git a/test/hs/Test/Ganeti/Rpc.hs b/test/hs/Test/Ganeti/Rpc.hs
index 86b3ece..bdb83ac 100644
--- a/test/hs/Test/Ganeti/Rpc.hs
+++ b/test/hs/Test/Ganeti/Rpc.hs
@@ -73,13 +73,6 @@
num_storage_units <- choose (0, 5)
vectorOf num_storage_units genStorageUnit
-genStorageUnitMap :: Gen (Map.Map String [StorageUnit])
-genStorageUnitMap = do
- num_nodes <- choose (0,5)
- node_uuids <- vectorOf num_nodes genName
- storage_units_list <- vectorOf num_nodes genStorageUnits
- return $ Map.fromList (zip node_uuids storage_units_list)
-
-- FIXME: Generate more interesting hvparams
-- | Generate Hvparams
genHvParams :: Gen Objects.HvParams
@@ -101,7 +94,7 @@
arbitrary = Rpc.RpcCallInstanceList <$> arbitrary
instance Arbitrary Rpc.RpcCallNodeInfo where
- arbitrary = Rpc.RpcCallNodeInfo <$> genStorageUnitMap <*> genHvSpecs
+ arbitrary = Rpc.RpcCallNodeInfo <$> genStorageUnits <*> genHvSpecs
-- | Generates per-instance console info params for the 'InstanceConsoleInfo'
-- call.
diff --git a/test/py/mocks.py b/test/py/mocks.py
index 406caca..b48125b 100644
--- a/test/py/mocks.py
+++ b/test/py/mocks.py
@@ -113,6 +113,7 @@
class FakeContext(object):
"""Fake context object"""
+ # pylint: disable=W0613
def __init__(self):
self.cfg = FakeConfig()
@@ -124,9 +125,10 @@
def GetRpc(self, cfg):
return None
- def GetWConfdContext(self, _ec_id):
+ def GetWConfdContext(self, ec_id):
return (None, None, None)
+
class FakeGetentResolver(object):
"""Fake runtime.GetentResolver"""
@@ -154,6 +156,7 @@
def LookupGid(self, gid):
return "group%s" % gid
+
class FakeLU(object):
HPATH = "fake-lu"
HTYPE = None
@@ -161,7 +164,7 @@
def __init__(self, processor, op, cfg, rpc_runner, prereq_err):
self.proc = processor
self.cfg = cfg
- self.op = op
+ self.op = op
self.rpc = rpc_runner
self.prereq_err = prereq_err
@@ -170,7 +173,7 @@
self.dont_collate_locks = dict.fromkeys(locking.LEVELS, False)
self.add_locks = {}
- self.LogWarning = processor.LogWarning
+ self.LogWarning = processor.LogWarning # pylint: disable=C0103
def CheckArguments(self):
pass
@@ -184,7 +187,6 @@
def CheckPrereq(self):
if self.prereq_err:
raise self.prereq_err
- pass
def Exec(self, feedback_fn):
pass
@@ -196,7 +198,9 @@
return {}
def PreparePostHookNodes(self, post_hook_node_uuids):
+ # pylint: disable=W0613
return []
def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
+ # pylint: disable=W0613
return lu_result
diff --git a/test/py/testutils_ssh.py b/test/py/testutils_ssh.py
index fa3b35d..e700e47 100644
--- a/test/py/testutils_ssh.py
+++ b/test/py/testutils_ssh.py
@@ -564,7 +564,7 @@
uuid=old_node_data.uuid,
key=new_key,
is_potential_master_candidate=old_node_data
- .is_potential_master_candidate,
+ .is_potential_master_candidate,
is_master_candidate=old_node_data.is_master_candidate,
is_master=old_node_data.is_master)