blob: c942803cdd74241bcb473f19adb30e62ad4a0c35 [file] [log] [blame]
{-# LANGUAGE FlexibleContexts #-}
{-| Implementation of the Ganeti Query2 server.
-}
{-
Copyright (C) 2012, 2013, 2014 Google Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-}
module Ganeti.Query.Server
( main
, checkMain
, prepMain
) where
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Lens ((.~))
import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Error (MonadError)
import Control.Monad.IO.Class
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import qualified Data.ByteString.UTF8 as UTF8
import qualified Data.Set as Set (toList)
import Data.IORef
import Data.List (intersperse)
import Data.Maybe (fromMaybe)
import qualified Text.JSON as J
import Text.JSON (encode, showJSON, JSValue(..))
import System.Info (arch)
import System.Directory
import System.Posix.Process (getProcessID)
import System.Posix.Signals as P
import qualified Ganeti.Constants as C
import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
import Ganeti.Errors
import qualified Ganeti.Path as Path
import Ganeti.Daemon
import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
import Ganeti.Objects
import Ganeti.Objects.Lens (configFiltersL)
import qualified Ganeti.Config as Config
import qualified Ganeti.Compat as Compat
import Ganeti.ConfigReader
import Ganeti.BasicTypes
import Ganeti.JQueue
import Ganeti.JQScheduler
import Ganeti.JSON (TimeAsDoubleJSON(..), alterContainerL, lookupContainer)
import Ganeti.Locking.Locks (ClientId(..), ClientType(ClientOther))
import Ganeti.Logging
import Ganeti.Luxi
import qualified Ganeti.Query.Language as Qlang
import qualified Ganeti.Query.Cluster as QCluster
import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile )
import Ganeti.Rpc
import qualified Ganeti.Query.Exec as Exec
import Ganeti.Query.Query
import Ganeti.Query.Filter (makeSimpleFilter)
import Ganeti.THH.HsRPC (runRpcClient, RpcClientMonad)
import Ganeti.Types
import qualified Ganeti.UDSServer as U (Handler(..), listener)
import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
, safeRenameFile, newUUID, isUUID )
import Ganeti.Utils.Monad (orM)
import Ganeti.Utils.MVarLock
import qualified Ganeti.Version as Version
import Ganeti.WConfd.Client ( getWConfdClient, withLockedConfig, writeConfig
, cleanupLocks)
-- | Creates a `ClientId` that identifies the current luxi
-- (process, thread).
--
-- This means that this `ClientId` will be different for each request
-- handled by luxid.
makeLuxidClientId :: JQStatus -> IO ClientId
makeLuxidClientId status = do
pid <- getProcessID
tid <- myThreadId
return ClientId
{ ciIdentifier = ClientOther $ "luxid-" ++ show tid
, ciLockFile = jqLivelock status
, ciPid = pid
}
-- | Creates a connection to WConfd and locks the config, allowing
-- to run some WConfd RPC commands given the locked config.
--
-- This is needed when luxid wants to change the config.
--
-- Example:
--
-- > cid <- makeLuxidClientId ...
-- > withLockedWconfdConfig cid $ \lockedCfg -> do
-- > -- some (IO) action that needs to be run inside having the lock
-- > writeConfig cid (updateConfig lockedCfg)
withLockedWconfdConfig
:: (MonadBase IO m, MonadError GanetiException m)
=> ClientId
-> (ConfigData -> RpcClientMonad a)
-> m a
withLockedWconfdConfig cid f = do
wconfdClient <- liftBase $ getWConfdClient =<< Path.defaultWConfdSocket
runRpcClient (withLockedConfig cid False f) wconfdClient
-- | Helper for classic queries.
handleQuery :: [Qlang.ItemType -> Qlang.FilterField] -- ^ Fields to put into
-- the query
-> ConfigData -- ^ Cluster config
-> Qlang.ItemType -- ^ Query type
-> [Either String Integer] -- ^ Requested names
-- (empty means all)
-> [String] -- ^ Requested fields
-> Bool -- ^ Whether to do sync queries or not
-> IO (GenericResult GanetiException JSValue)
handleQuery _ _ _ _ _ True =
return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
handleQuery filterFields cfg qkind names fields _ = do
let simpleNameFilter field = makeSimpleFilter (field qkind) names
flt = Qlang.OrFilter $ map simpleNameFilter filterFields
qr <- query cfg True (Qlang.Query qkind fields flt)
return $ showJSON <$> (qr >>= queryCompat)
-- | Helper for classic queries.
-- Queries `name` and `uuid` fields.
handleClassicQuery :: ConfigData -- ^ Cluster config
-> Qlang.ItemType -- ^ Query type
-> [Either String Integer] -- ^ Requested names
-- (empty means all)
-> [String] -- ^ Requested fields
-> Bool -- ^ Whether to do sync queries or not
-> IO (GenericResult GanetiException JSValue)
handleClassicQuery = handleQuery [nameField, uuidField]
-- | Like `handleClassicQuery`, but filters only by UUID.
handleUuidQuery :: ConfigData -- ^ Cluster config
-> Qlang.ItemType -- ^ Query type
-> [Either String Integer] -- ^ Requested names
-- (empty means all)
-> [String] -- ^ Requested fields
-> Bool -- ^ Whether to do sync queries or not
-> IO (GenericResult GanetiException JSValue)
handleUuidQuery = handleQuery [uuidField]
-- | Minimal wrapper to handle the missing config case.
handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
-> LuxiOp -> IO (ErrorResult JSValue)
handleCallWrapper _ _ (Bad msg) _ =
return . Bad . ConfigurationError $
"I do not have access to a valid configuration, cannot\
\ process queries: " ++ msg
handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
-- | Actual luxi operation handler.
handleCall :: Lock -> JQStatus
-> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
handleCall _ _ cdata QueryClusterInfo =
let cluster = configCluster cdata
master = QCluster.clusterMasterNodeName cdata
hypervisors = clusterEnabledHypervisors cluster
diskTemplates = clusterEnabledDiskTemplates cluster
def_hv = case hypervisors of
x:_ -> showJSON x
[] -> JSNull
bits = show (Compat.finiteBitSize (0::Int)) ++ "bits"
arch_tuple = [bits, arch]
obj = [ ("software_version", showJSON C.releaseVersion)
, ("protocol_version", showJSON C.protocolVersion)
, ("config_version", showJSON C.configVersion)
, ("os_api_version", showJSON . maximum .
Set.toList . ConstantUtils.unFrozenSet $
C.osApiVersions)
, ("export_version", showJSON C.exportVersion)
, ("vcs_version", showJSON Version.version)
, ("architecture", showJSON arch_tuple)
, ("name", showJSON $ clusterClusterName cluster)
, ("master", showJSON (case master of
Ok name -> name
_ -> undefined))
, ("default_hypervisor", def_hv)
, ("enabled_hypervisors", showJSON hypervisors)
, ("hvparams", showJSON $ clusterHvparams cluster)
, ("os_hvp", showJSON $ clusterOsHvp cluster)
, ("beparams", showJSON $ clusterBeparams cluster)
, ("osparams", showJSON $ clusterOsparams cluster)
, ("ipolicy", showJSON $ clusterIpolicy cluster)
, ("nicparams", showJSON $ clusterNicparams cluster)
, ("ndparams", showJSON $ clusterNdparams cluster)
, ("diskparams", showJSON $ clusterDiskparams cluster)
, ("candidate_pool_size",
showJSON $ clusterCandidatePoolSize cluster)
, ("max_running_jobs",
showJSON $ clusterMaxRunningJobs cluster)
, ("max_tracked_jobs",
showJSON $ clusterMaxTrackedJobs cluster)
, ("mac_prefix", showJSON $ clusterMacPrefix cluster)
, ("master_netdev", showJSON $ clusterMasterNetdev cluster)
, ("master_netmask", showJSON $ clusterMasterNetmask cluster)
, ("use_external_mip_script",
showJSON $ clusterUseExternalMipScript cluster)
, ("volume_group_name",
maybe JSNull showJSON (clusterVolumeGroupName cluster))
, ("drbd_usermode_helper",
maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
, ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
, ("shared_file_storage_dir",
showJSON $ clusterSharedFileStorageDir cluster)
, ("gluster_storage_dir",
showJSON $ clusterGlusterStorageDir cluster)
, ("maintain_node_health",
showJSON $ clusterMaintainNodeHealth cluster)
, ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
, ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
, ("uuid", showJSON $ clusterUuid cluster)
, ("tags", showJSON $ clusterTags cluster)
, ("uid_pool", showJSON $ clusterUidPool cluster)
, ("default_iallocator",
showJSON $ clusterDefaultIallocator cluster)
, ("default_iallocator_params",
showJSON $ clusterDefaultIallocatorParams cluster)
, ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
, ("primary_ip_version",
showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
, ("prealloc_wipe_disks",
showJSON $ clusterPreallocWipeDisks cluster)
, ("hidden_os", showJSON $ clusterHiddenOs cluster)
, ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
, ("enabled_disk_templates", showJSON diskTemplates)
, ("install_image", showJSON $ clusterInstallImage cluster)
, ("instance_communication_network",
showJSON (clusterInstanceCommunicationNetwork cluster))
, ("zeroing_image", showJSON $ clusterZeroingImage cluster)
, ("compression_tools",
showJSON $ clusterCompressionTools cluster)
, ("enabled_user_shutdown",
showJSON $ clusterEnabledUserShutdown cluster)
, ("enabled_data_collectors",
showJSON . fmap dataCollectorActive
$ clusterDataCollectors cluster)
, ("data_collector_interval",
showJSON . fmap dataCollectorInterval
$ clusterDataCollectors cluster)
, ("modify_ssh_setup",
showJSON $ clusterModifySshSetup cluster)
, ("ssh_key_type", showJSON $ clusterSshKeyType cluster)
, ("ssh_key_bits", showJSON $ clusterSshKeyBits cluster)
]
in case master of
Ok _ -> return . Ok . J.makeObj $ obj
Bad ex -> return $ Bad ex
handleCall _ _ cfg (QueryTags kind name) = do
let tags = case kind of
TagKindCluster -> Ok . clusterTags $ configCluster cfg
TagKindGroup -> groupTags <$> Config.getGroup cfg name
TagKindNode -> nodeTags <$> Config.getNode cfg name
TagKindInstance -> instTags <$> Config.getInstance cfg name
TagKindNetwork -> networkTags <$> Config.getNetwork cfg name
return (J.showJSON <$> tags)
handleCall _ _ cfg (Query qkind qfields qfilter) = do
result <- query cfg True (Qlang.Query qkind qfields qfilter)
return $ J.showJSON <$> result
handleCall _ _ _ (QueryFields qkind qfields) = do
let result = queryFields (Qlang.QueryFields qkind qfields)
return $ J.showJSON <$> result
handleCall _ _ cfg (QueryNodes names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
(map Left names) fields lock
handleCall _ _ cfg (QueryInstances names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
(map Left names) fields lock
handleCall _ _ cfg (QueryGroups names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
(map Left names) fields lock
handleCall _ _ cfg (QueryJobs names fields) =
handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
(map (Right . fromIntegral . fromJobId) names) fields False
handleCall _ _ cfg (QueryFilters uuids fields) =
handleUuidQuery cfg (Qlang.ItemTypeLuxi Qlang.QRFilter)
(map Left uuids) fields False
handleCall _ status _ (ReplaceFilter mUuid priority predicates action
reason) =
-- Handles both adding new filter and changing existing ones.
runResultT $ do
-- Check that uuid `String` is actually a UUID.
uuid <- case mUuid of
Nothing -> liftIO newUUID -- Request to add a new filter
Just u -- Request to edit an existing filter
| isUUID u -> return u
| otherwise -> fail "Unable to parse UUID"
timestamp <- liftIO $ reasonTrailTimestamp <$> currentTimestamp
let luxidReason = ("luxid", "", timestamp)
-- Ask WConfd to change the config for us.
cid <- liftIO $ makeLuxidClientId status
withLockedWconfdConfig cid $ \lockedCfg -> do
-- Reading the latest JobID inside the Wconfd lock to really get the
-- most recent one (locking may block us for some time).
serial <- liftIO readSerialFromDisk
case serial of
Bad err -> fail $ "AddFilter: reading current JobId failed: " ++ err
Ok watermark -> do
let rule = FilterRule { frWatermark = watermark
, frPriority = priority
, frPredicates = predicates
, frAction = action
, frReasonTrail = reason ++ [luxidReason]
, frUuid = UTF8.fromString uuid
}
writeConfig cid
. (configFiltersL . alterContainerL (UTF8.fromString uuid)
.~ Just rule)
$ lockedCfg
-- Return UUID of added/replaced filter.
return $ showJSON uuid
handleCall _ status cfg (DeleteFilter uuid) = runResultT $ do
-- Check if filter exists.
_ <- lookupContainer
(failError $ "Filter rule with UUID " ++ uuid ++ " does not exist")
(UTF8.fromString uuid)
(configFilters cfg)
-- Ask WConfd to change the config for us.
cid <- liftIO $ makeLuxidClientId status
withLockedWconfdConfig cid $ \lockedCfg ->
writeConfig cid
. (configFiltersL . alterContainerL (UTF8.fromString uuid) .~ Nothing)
$ lockedCfg
return JSNull
handleCall _ _ cfg (QueryNetworks names fields lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
(map Left names) fields lock
handleCall _ _ cfg (QueryConfigValues fields) = do
let clusterProperty fn = showJSON . fn . configCluster $ cfg
let params = [ ("cluster_name", return $ clusterProperty clusterClusterName)
, ("watcher_pause", liftM (maybe JSNull showJSON)
QCluster.isWatcherPaused)
, ("master_node", return . genericResult (const JSNull) showJSON
$ QCluster.clusterMasterNodeName cfg)
, ("drain_flag", liftM (showJSON . not) isQueueOpen)
, ("modify_ssh_setup",
return $ clusterProperty clusterModifySshSetup)
, ("ssh_key_type", return $ clusterProperty clusterSshKeyType)
, ("ssh_key_bits", return $ clusterProperty clusterSshKeyBits)
] :: [(String, IO JSValue)]
let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
answerEval <- sequence answer
return . Ok . showJSON $ answerEval
handleCall _ _ cfg (QueryExports nodes lock) =
handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
(map Left nodes) ["node", "export"] lock
handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
ts <- liftIO currentTimestamp
job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
$ queuedJobFromOpCodes jid ops
qDir <- liftIO queueDir
_ <- writeAndReplicateJob cfg qDir job
_ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
return . showJSON . fromJobId $ jid
handleCall qlock qstat cfg (SubmitJob ops) =
do
open <- isQueueOpen
if not open
then return . Bad . GenericError $ "Queue drained"
else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
handleCall qlock qstat cfg (SubmitManyJobs lops) =
do
open <- isQueueOpen
if not open
then return . Bad . GenericError $ "Queue drained"
else do
let mcs = Config.getMasterCandidates cfg
result_jobids <- allocateJobIds mcs qlock (length lops)
case result_jobids of
Bad s -> return . Bad . GenericError $ s
Ok jids -> do
ts <- currentTimestamp
jobs <- liftM (map $ extendJobReasonTrail . setReceivedTimestamp ts)
$ zipWithM queuedJobFromOpCodes jids lops
qDir <- queueDir
write_results <- mapM (writeJobToDisk qDir) jobs
let annotated_results = zip write_results jobs
succeeded = map snd $ filter (isOk . fst) annotated_results
when (any isBad write_results) . logWarning
$ "Writing some jobs failed " ++ show annotated_results
replicateManyJobs qDir mcs succeeded
_ <- forkIO $ enqueueNewJobs qstat succeeded
return . Ok . JSArray
. map (\(res, job) ->
if isOk res
then showJSON (True, fromJobId $ qjId job)
else showJSON (False, genericResult id (const "") res))
$ annotated_results
handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) =
waitForJobChange jid prev_job tmout $ computeJobUpdate cfg jid fields prev_log
handleCall _ _ cfg (SetWatcherPause time) = do
let mcs = Config.getMasterOrCandidates cfg
_ <- executeRpcCall mcs $ RpcCallSetWatcherPause time
return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
handleCall _ _ cfg (SetDrainFlag value) = do
let mcs = Config.getMasterCandidates cfg
fpath <- jobQueueDrainFile
if value
then writeFile fpath ""
else removeFile fpath
_ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
return . Ok . showJSON $ True
handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
let jName = (++) "job " . show $ fromJobId jid
maybeJob <- setJobPriority qstat jid prio
case maybeJob of
Bad s -> return . Ok $ showJSON (False, s)
Ok (Just job) -> runResultT $ do
let mcs = Config.getMasterCandidates cfg
qDir <- liftIO queueDir
liftIO $ replicateManyJobs qDir mcs [job]
return $ showJSON (True, "Priorities of pending opcodes for "
++ jName ++ " have been changed"
++ " to " ++ show prio)
Ok Nothing -> do
logDebug $ jName ++ " started, will signal"
fmap showJSON <$> tellJobPriority (jqLivelock qstat) jid prio
handleCall _ qstat cfg (CancelJob jid kill) = do
let jName = (++) "job " . show $ fromJobId jid
dequeueResult <- dequeueJob qstat jid
case dequeueResult of
Ok True ->
let jobFileFailed = (,) False
. (++) ("Dequeued " ++ jName
++ ", but failed to mark as cancelled: ")
jobFileSucceeded _ = (True, "Dequeued " ++ jName)
in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
. runResultT $ do
logDebug $ jName ++ " dequeued, marking as canceled"
qDir <- liftIO queueDir
(job, _) <- ResultT $ loadJobFromDisk qDir True jid
now <- liftIO currentTimestamp
let job' = cancelQueuedJob now job
writeAndReplicateJob cfg qDir job'
Ok False -> do
logDebug $ jName ++ " not queued; trying to cancel directly"
result <- fmap showJSON <$> cancelJob kill (jqLivelock qstat) jid
when kill . void . forkIO $ do
_ <- orM
. intersperse (threadDelay C.luxidJobDeathDelay >> return False)
. replicate C.luxidJobDeathDetectionRetries
$ cleanupIfDead qstat jid
wconfdsocket <- Path.defaultWConfdSocket
wconfdclient <- getWConfdClient wconfdsocket
void . runResultT $ runRpcClient cleanupLocks wconfdclient
return result
Bad s -> return . Ok . showJSON $ (False, s)
handleCall qlock _ cfg (ArchiveJob jid) =
-- By adding a layer of MaybeT, we can prematurely end a computation
-- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
qDir <- liftIO queueDir
let mcs = Config.getMasterCandidates cfg
live = liveJobFile qDir jid
archive = archivedJobFile qDir jid
withLock qlock $ do
(job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
`orElse` mzero
guard $ jobFinalized job
lift . withErrorT JobQueueError
. annotateError "Archiving failed in an unexpected way"
. mkResultT $ safeRenameFile queueDirPermissions live archive
_ <- liftIO . executeRpcCall mcs
$ RpcCallJobqueueRename [(live, archive)]
return True
handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
qDir <- queueDir
resultJids <- getJobIDs [qDir]
case resultJids of
Bad s -> return . Bad . JobQueueError $ show s
Ok jids -> do
result <- withLock qlock
. archiveJobs cfg age timeout
$ sortJobIDs jids
return . Ok $ showJSON result
handleCall _ _ _ (PickupJob _) =
return . Bad
$ GenericError "Luxi call 'PickupJob' is for internal use only"
{-# ANN handleCall "HLint: ignore Too strict if" #-}
-- | Special-case handler for WaitForJobChange RPC call for fields == ["status"]
-- that doesn't require the use of ConfigData
handleWaitForJobChangeStatus :: JobId -> JSValue -> JSValue -> Int
-> IO (ErrorResult JSValue)
handleWaitForJobChangeStatus jid prev_job prev_log tmout =
waitForJobChange jid prev_job tmout $ computeJobUpdateStatus jid prev_log
-- | Common WaitForJobChange functionality shared between handleCall and
-- handleWaitForJobChangeStatus
waitForJobChange :: JobId -> JSValue -> Int -> IO (JSValue, JSValue)
-> IO (ErrorResult JSValue)
waitForJobChange jid prev_job tmout compute_fn = do
qDir <- queueDir
-- verify if the job is finalized, and return immediately in this case
jobresult <- loadJobFromDisk qDir False jid
case jobresult of
Bad s -> return . Bad $ JobLost s
Ok (job, _) | not (jobFinalized job) -> do
let jobfile = liveJobFile qDir jid
answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
(prev_job, JSArray []) compute_fn
return . Ok $ showJSON answer
_ -> liftM (Ok . showJSON) compute_fn
-- | Query the status of a job and return the requested fields
-- and the logs newer than the given log number.
computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
-> IO (JSValue, JSValue)
computeJobUpdate cfg jid fields prev_log = do
let sjid = show $ fromJobId jid
logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
let fromJSArray (JSArray xs) = xs
fromJSArray _ = []
let logFilter JSNull (JSArray _) = True
logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
logFilter _ _ = False
let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
[Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
let (rfields, rlogs) = case jobQuery of
Ok (JSArray [JSArray (JSArray logs : answer)]) ->
(answer, filterLogs prev_log logs)
_ -> (map (const JSNull) fields, JSArray [])
logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
return (JSArray rfields, rlogs)
-- | A version of computeJobUpdate hardcoded to only return logs and the status
-- field. By hardcoding this we avoid using the luxi Query infrastructure and
-- the ConfigData value it requires.
computeJobUpdateStatus :: JobId -> JSValue -> IO (JSValue, JSValue)
computeJobUpdateStatus jid prev_log = do
qdir <- queueDir
loadResult <- loadJobFromDisk qdir True jid
let sjid = show $ fromJobId jid
logDebug $ "Inspecting status of job " ++ sjid
let (rfields, rlogs) = case loadResult of
Ok (job, _) -> (J.JSArray [status], newlogs)
where status = showJSON $ calcJobStatus job -- like "status" jobField
oplogs = map qoLog (qjOps job) -- like "oplog" jobField
newer = case J.readJSON prev_log of
J.Ok n -> (\(idx, _time, _type, _msg) -> n < idx)
_ -> const True
newlogs = showJSON $ concatMap (filter newer) oplogs
_ -> (JSArray[JSNull], JSArray [])
logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
return (rfields, rlogs)
type LuxiConfig = (Lock, JQStatus, ConfigReader)
luxiExec
:: LuxiConfig
-> LuxiOp
-> IO (Bool, GenericResult GanetiException JSValue)
luxiExec (qlock, qstat, creader) args =
case args of
-- Special case WaitForJobChange handling to avoid passing a ConfigData to
-- a potentially long-lived thread. ConfigData uses lots of heap, and
-- multiple handler threads retaining different versions of ConfigData
-- increases luxi's memory use for concurrent jobs that modify config.
WaitForJobChange jid fields prev_job prev_log tmout
| fields == ["status"] -> do
result <- handleWaitForJobChangeStatus jid prev_job prev_log tmout
return (True, result)
_ -> do
cfg <- creader
result <- handleCallWrapper qlock qstat cfg args
return (True, result)
luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
, U.hInputLogShort = strOfOp
, U.hInputLogLong = show
, U.hExec = luxiExec cfg
}
-- | Type alias for prepMain results
type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
-- | Activate the master IP address.
activateMasterIP :: IO (Result ())
activateMasterIP = runResultT $ do
liftIO $ logDebug "Activating master IP address"
conf_file <- liftIO Path.clusterConfFile
config <- mkResultT $ Config.loadConfig conf_file
let mnp = Config.getMasterNetworkParameters config
masters = Config.getMasterNodes config
ems = clusterUseExternalMipScript $ configCluster config
liftIO . logDebug $ "Master IP params: " ++ show mnp
res <- liftIO . executeRpcCall masters $ RpcCallNodeActivateMasterIp mnp ems
_ <- liftIO $ logRpcErrors res
liftIO $ logDebug "finished activating master IP address"
return ()
-- | Check function for luxid.
checkMain :: CheckFn ()
checkMain = handleMasterVerificationOptions
-- | Prepare function for luxid.
prepMain :: PrepFn () PrepResult
prepMain _ _ = do
Exec.isForkSupported
>>= flip exitUnless "The daemon must be compiled without -threaded"
socket_path <- Path.defaultQuerySocket
cleanupSocket socket_path
s <- describeError "binding to the Luxi socket"
Nothing (Just socket_path) $ getLuxiServer True socket_path
cref <- newIORef (Bad "Configuration not yet loaded")
jq <- emptyJQStatus cref
return (s, cref, jq)
-- | Main function.
main :: MainFn () PrepResult
main _ _ (server, cref, jq) = do
-- Subscribe to config udpates. If the config changes, write new config and
-- check if the changes should trigger the scheduler.
initConfigReader $ \newConfig -> do
runScheduler <- atomicModifyIORef cref $ \oldConfig ->
case (oldConfig, newConfig) of
(Ok old, Ok new) -> (newConfig, configChangeNeedsRescheduling old new)
_ -> (newConfig, True) -- no old or new config, schedule
when runScheduler (updateStatusAndScheduleSomeJobs jq)
let creader = readIORef cref
qlockFile <- jobQueueLockFile
_ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
qlock <- newLock
_ <- P.installHandler P.sigCHLD P.Ignore Nothing
_ <- forkIO . void $ activateMasterIP
initJQScheduler jq
finally
(forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
(closeServer server >> removeFile qlockFile)