blob: ebe2f0fe27f2a6e15dbb6c3150d7890889d96958 [file] [log] [blame]
{-# LANGUAGE TemplateHaskell #-}
{-| Implementation of the Ganeti LUXI interface.
-}
{-
Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
-}
module Ganeti.Luxi
( LuxiOp(..)
, LuxiReq(..)
, Client
, JobId
, fromJobId
, makeJobId
, RecvResult(..)
, strOfOp
, getClient
, getServer
, acceptClient
, closeClient
, closeServer
, callMethod
, submitManyJobs
, queryJobsStatus
, buildCall
, buildResponse
, validateCall
, decodeCall
, recvMsg
, recvMsgExt
, sendMsg
, allLuxiCalls
) where
import Control.Exception (catch)
import Data.IORef
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.UTF8 as UTF8
import qualified Data.ByteString.Lazy.UTF8 as UTF8L
import Data.Word (Word8)
import Control.Monad
import Text.JSON (encodeStrict, decodeStrict)
import qualified Text.JSON as J
import Text.JSON.Pretty (pp_value)
import Text.JSON.Types
import System.Directory (removeFile)
import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
import System.IO.Error (isEOFError)
import System.Posix.Files
import System.Timeout
import qualified Network.Socket as S
import Ganeti.BasicTypes
import Ganeti.Constants
import Ganeti.Errors
import Ganeti.JSON
import Ganeti.OpParams (pTagsObject)
import Ganeti.OpCodes
import Ganeti.Runtime
import qualified Ganeti.Query.Language as Qlang
import Ganeti.THH
import Ganeti.Types
import Ganeti.Utils
-- * Utility functions
-- | Wrapper over System.Timeout.timeout that fails in the IO monad.
withTimeout :: Int -> String -> IO a -> IO a
withTimeout secs descr action = do
result <- timeout (secs * 1000000) action
case result of
Nothing -> fail $ "Timeout in " ++ descr
Just v -> return v
-- * Generic protocol functionality
-- | Result of receiving a message from the socket.
data RecvResult = RecvConnClosed -- ^ Connection closed
| RecvError String -- ^ Any other error
| RecvOk String -- ^ Successfull receive
deriving (Show, Eq)
-- | Currently supported Luxi operations and JSON serialization.
$(genLuxiOp "LuxiOp"
[ (luxiReqQuery,
[ simpleField "what" [t| Qlang.ItemType |]
, simpleField "fields" [t| [String] |]
, simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
])
, (luxiReqQueryFields,
[ simpleField "what" [t| Qlang.ItemType |]
, simpleField "fields" [t| [String] |]
])
, (luxiReqQueryNodes,
[ simpleField "names" [t| [String] |]
, simpleField "fields" [t| [String] |]
, simpleField "lock" [t| Bool |]
])
, (luxiReqQueryGroups,
[ simpleField "names" [t| [String] |]
, simpleField "fields" [t| [String] |]
, simpleField "lock" [t| Bool |]
])
, (luxiReqQueryNetworks,
[ simpleField "names" [t| [String] |]
, simpleField "fields" [t| [String] |]
, simpleField "lock" [t| Bool |]
])
, (luxiReqQueryInstances,
[ simpleField "names" [t| [String] |]
, simpleField "fields" [t| [String] |]
, simpleField "lock" [t| Bool |]
])
, (luxiReqQueryJobs,
[ simpleField "ids" [t| [JobId] |]
, simpleField "fields" [t| [String] |]
])
, (luxiReqQueryExports,
[ simpleField "nodes" [t| [String] |]
, simpleField "lock" [t| Bool |]
])
, (luxiReqQueryConfigValues,
[ simpleField "fields" [t| [String] |] ]
)
, (luxiReqQueryClusterInfo, [])
, (luxiReqQueryTags,
[ pTagsObject ])
, (luxiReqSubmitJob,
[ simpleField "job" [t| [MetaOpCode] |] ]
)
, (luxiReqSubmitManyJobs,
[ simpleField "ops" [t| [[MetaOpCode]] |] ]
)
, (luxiReqWaitForJobChange,
[ simpleField "job" [t| JobId |]
, simpleField "fields" [t| [String]|]
, simpleField "prev_job" [t| JSValue |]
, simpleField "prev_log" [t| JSValue |]
, simpleField "tmout" [t| Int |]
])
, (luxiReqArchiveJob,
[ simpleField "job" [t| JobId |] ]
)
, (luxiReqAutoArchiveJobs,
[ simpleField "age" [t| Int |]
, simpleField "tmout" [t| Int |]
])
, (luxiReqCancelJob,
[ simpleField "job" [t| JobId |] ]
)
, (luxiReqChangeJobPriority,
[ simpleField "job" [t| JobId |]
, simpleField "priority" [t| Int |] ]
)
, (luxiReqSetDrainFlag,
[ simpleField "flag" [t| Bool |] ]
)
, (luxiReqSetWatcherPause,
[ simpleField "duration" [t| Double |] ]
)
])
$(makeJSONInstance ''LuxiReq)
-- | List of all defined Luxi calls.
$(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
-- | The serialisation of LuxiOps into strings in messages.
$(genStrOfOp ''LuxiOp "strOfOp")
-- | Type holding the initial (unparsed) Luxi call.
data LuxiCall = LuxiCall LuxiReq JSValue
-- | The end-of-message separator.
eOM :: Word8
eOM = 3
-- | The end-of-message encoded as a ByteString.
bEOM :: B.ByteString
bEOM = B.singleton eOM
-- | Valid keys in the requests and responses.
data MsgKeys = Method
| Args
| Success
| Result
-- | The serialisation of MsgKeys into strings in messages.
$(genStrOfKey ''MsgKeys "strOfKey")
-- | Luxi client encapsulation.
data Client = Client { socket :: Handle -- ^ The socket of the client
, rbuf :: IORef B.ByteString -- ^ Already received buffer
}
-- | Connects to the master daemon and returns a luxi Client.
getClient :: String -> IO Client
getClient path = do
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
withTimeout luxiDefCtmo "creating luxi connection" $
S.connect s (S.SockAddrUnix path)
rf <- newIORef B.empty
h <- S.socketToHandle s ReadWriteMode
return Client { socket=h, rbuf=rf }
-- | Creates and returns a server endpoint.
getServer :: Bool -> FilePath -> IO S.Socket
getServer setOwner path = do
s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
S.bindSocket s (S.SockAddrUnix path)
when setOwner $ do
setOwnerAndGroupFromNames path GanetiLuxid $ ExtraGroup DaemonsGroup
setFileMode path $ fromIntegral luxiSocketPerms
S.listen s 5 -- 5 is the max backlog
return s
-- | Closes a server endpoint.
-- FIXME: this should be encapsulated into a nicer type.
closeServer :: FilePath -> S.Socket -> IO ()
closeServer path sock = do
S.sClose sock
removeFile path
-- | Accepts a client
acceptClient :: S.Socket -> IO Client
acceptClient s = do
-- second return is the address of the client, which we ignore here
(client_socket, _) <- S.accept s
new_buffer <- newIORef B.empty
handle <- S.socketToHandle client_socket ReadWriteMode
return Client { socket=handle, rbuf=new_buffer }
-- | Closes the client socket.
closeClient :: Client -> IO ()
closeClient = hClose . socket
-- | Sends a message over a luxi transport.
sendMsg :: Client -> String -> IO ()
sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
let encoded = UTF8L.fromString buf
handle = socket s
BL.hPut handle encoded
B.hPut handle bEOM
hFlush handle
-- | Given a current buffer and the handle, it will read from the
-- network until we get a full message, and it will return that
-- message and the leftover buffer contents.
recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
recvUpdate handle obuf = do
nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
_ <- hWaitForInput handle (-1)
B.hGetNonBlocking handle 4096
let (msg, remaining) = B.break (eOM ==) nbuf
newbuf = B.append obuf msg
if B.null remaining
then recvUpdate handle newbuf
else return (newbuf, B.tail remaining)
-- | Waits for a message over a luxi transport.
recvMsg :: Client -> IO String
recvMsg s = do
cbuf <- readIORef $ rbuf s
let (imsg, ibuf) = B.break (eOM ==) cbuf
(msg, nbuf) <-
if B.null ibuf -- if old buffer didn't contain a full message
then recvUpdate (socket s) cbuf -- then we read from network
else return (imsg, B.tail ibuf) -- else we return data from our buffer
writeIORef (rbuf s) nbuf
return $ UTF8.toString msg
-- | Extended wrapper over recvMsg.
recvMsgExt :: Client -> IO RecvResult
recvMsgExt s =
Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
return $ if isEOFError e
then RecvConnClosed
else RecvError (show e)
-- | Serialize a request to String.
buildCall :: LuxiOp -- ^ The method
-> String -- ^ The serialized form
buildCall lo =
let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
, (strOfKey Args, opToArgs lo)
]
jo = toJSObject ja
in encodeStrict jo
-- | Serialize the response to String.
buildResponse :: Bool -- ^ Success
-> JSValue -- ^ The arguments
-> String -- ^ The serialized form
buildResponse success args =
let ja = [ (strOfKey Success, JSBool success)
, (strOfKey Result, args)]
jo = toJSObject ja
in encodeStrict jo
-- | Check that luxi request contains the required keys and parse it.
validateCall :: String -> Result LuxiCall
validateCall s = do
arr <- fromJResult "parsing top-level luxi message" $
decodeStrict s::Result (JSObject JSValue)
let aobj = fromJSObject arr
call <- fromObj aobj (strOfKey Method)::Result LuxiReq
args <- fromObj aobj (strOfKey Args)
return (LuxiCall call args)
-- | Converts Luxi call arguments into a 'LuxiOp' data structure.
--
-- This is currently hand-coded until we make it more uniform so that
-- it can be generated using TH.
decodeCall :: LuxiCall -> Result LuxiOp
decodeCall (LuxiCall call args) =
case call of
ReqQueryJobs -> do
(jids, jargs) <- fromJVal args
jids' <- case jids of
JSNull -> return []
_ -> fromJVal jids
return $ QueryJobs jids' jargs
ReqQueryInstances -> do
(names, fields, locking) <- fromJVal args
return $ QueryInstances names fields locking
ReqQueryNodes -> do
(names, fields, locking) <- fromJVal args
return $ QueryNodes names fields locking
ReqQueryGroups -> do
(names, fields, locking) <- fromJVal args
return $ QueryGroups names fields locking
ReqQueryClusterInfo ->
return QueryClusterInfo
ReqQueryNetworks -> do
(names, fields, locking) <- fromJVal args
return $ QueryNetworks names fields locking
ReqQuery -> do
(what, fields, qfilter) <- fromJVal args
return $ Query what fields qfilter
ReqQueryFields -> do
(what, fields) <- fromJVal args
fields' <- case fields of
JSNull -> return []
_ -> fromJVal fields
return $ QueryFields what fields'
ReqSubmitJob -> do
[ops1] <- fromJVal args
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
return $ SubmitJob ops2
ReqSubmitManyJobs -> do
[ops1] <- fromJVal args
ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
return $ SubmitManyJobs ops2
ReqWaitForJobChange -> do
(jid, fields, pinfo, pidx, wtmout) <-
-- No instance for 5-tuple, code copied from the
-- json sources and adapted
fromJResult "Parsing WaitForJobChange message" $
case args of
JSArray [a, b, c, d, e] ->
(,,,,) `fmap`
J.readJSON a `ap`
J.readJSON b `ap`
J.readJSON c `ap`
J.readJSON d `ap`
J.readJSON e
_ -> J.Error "Not enough values"
return $ WaitForJobChange jid fields pinfo pidx wtmout
ReqArchiveJob -> do
[jid] <- fromJVal args
return $ ArchiveJob jid
ReqAutoArchiveJobs -> do
(age, tmout) <- fromJVal args
return $ AutoArchiveJobs age tmout
ReqQueryExports -> do
(nodes, lock) <- fromJVal args
return $ QueryExports nodes lock
ReqQueryConfigValues -> do
[fields] <- fromJVal args
return $ QueryConfigValues fields
ReqQueryTags -> do
(kind, name) <- fromJVal args
item <- tagObjectFrom kind name
return $ QueryTags item
ReqCancelJob -> do
[jid] <- fromJVal args
return $ CancelJob jid
ReqChangeJobPriority -> do
(jid, priority) <- fromJVal args
return $ ChangeJobPriority jid priority
ReqSetDrainFlag -> do
[flag] <- fromJVal args
return $ SetDrainFlag flag
ReqSetWatcherPause -> do
[duration] <- fromJVal args
return $ SetWatcherPause duration
-- | Check that luxi responses contain the required keys and that the
-- call was successful.
validateResult :: String -> ErrorResult JSValue
validateResult s = do
when (UTF8.replacement_char `elem` s) $
fail "Failed to decode UTF-8, detected replacement char after decoding"
oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
let arr = J.fromJSObject oarr
status <- fromObj arr (strOfKey Success)
result <- fromObj arr (strOfKey Result)
if status
then return result
else decodeError result
-- | Try to decode an error from the server response. This function
-- will always fail, since it's called only on the error path (when
-- status is False).
decodeError :: JSValue -> ErrorResult JSValue
decodeError val =
case fromJVal val of
Ok e -> Bad e
Bad msg -> Bad $ GenericError msg
-- | Generic luxi method call.
callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
callMethod method s = do
sendMsg s $ buildCall method
result <- recvMsg s
let rval = validateResult result
return rval
-- | Parse job submission result.
parseSubmitJobResult :: JSValue -> ErrorResult JobId
parseSubmitJobResult (JSArray [JSBool True, v]) =
case J.readJSON v of
J.Error msg -> Bad $ LuxiError msg
J.Ok v' -> Ok v'
parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
Bad . LuxiError $ fromJSString x
parseSubmitJobResult v =
Bad . LuxiError $ "Unknown result from the master daemon: " ++
show (pp_value v)
-- | Specialized submitManyJobs call.
submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
submitManyJobs s jobs = do
rval <- callMethod (SubmitManyJobs jobs) s
-- map each result (status, payload) pair into a nice Result ADT
return $ case rval of
Bad x -> Bad x
Ok (JSArray r) -> mapM parseSubmitJobResult r
x -> Bad . LuxiError $
"Cannot parse response from Ganeti: " ++ show x
-- | Custom queryJobs call.
queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
queryJobsStatus s jids = do
rval <- callMethod (QueryJobs jids ["status"]) s
return $ case rval of
Bad x -> Bad x
Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
J.Ok vals -> if any null vals
then Bad $
LuxiError "Missing job status field"
else Ok (map head vals)
J.Error x -> Bad $ LuxiError x