| #!/usr/bin/python |
| # |
| |
| # Copyright (C) 2012 Google Inc. |
| # |
| # This program is free software; you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation; either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, but |
| # WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| # General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
| # 02110-1301, USA. |
| |
| |
| """Script for testing ganeti.server.rapi""" |
| |
| import re |
| import unittest |
| import random |
| import mimetools |
| import base64 |
| from cStringIO import StringIO |
| |
| from ganeti import constants |
| from ganeti import utils |
| from ganeti import compat |
| from ganeti import errors |
| from ganeti import serializer |
| from ganeti import rapi |
| from ganeti import http |
| from ganeti import objects |
| |
| import ganeti.rapi.baserlib |
| import ganeti.rapi.testutils |
| import ganeti.rapi.rlib2 |
| import ganeti.http.auth |
| |
| import testutils |
| |
| |
| class TestRemoteApiHandler(unittest.TestCase): |
| @staticmethod |
| def _LookupWrongUser(_): |
| return None |
| |
| def _Test(self, method, path, headers, reqbody, |
| user_fn=NotImplemented, luxi_client=NotImplemented, |
| reqauth=False): |
| rm = rapi.testutils._RapiMock(user_fn, luxi_client, reqauth=reqauth) |
| |
| (resp_code, resp_headers, resp_body) = \ |
| rm.FetchResponse(path, method, http.ParseHeaders(StringIO(headers)), |
| reqbody) |
| |
| self.assertTrue(resp_headers[http.HTTP_DATE]) |
| self.assertEqual(resp_headers[http.HTTP_CONNECTION], "close") |
| self.assertEqual(resp_headers[http.HTTP_CONTENT_TYPE], http.HTTP_APP_JSON) |
| self.assertEqual(resp_headers[http.HTTP_SERVER], http.HTTP_GANETI_VERSION) |
| |
| return (resp_code, resp_headers, serializer.LoadJson(resp_body)) |
| |
| def testRoot(self): |
| (code, _, data) = self._Test(http.HTTP_GET, "/", "", None) |
| self.assertEqual(code, http.HTTP_OK) |
| self.assertTrue(data is None) |
| |
| def testRootReqAuth(self): |
| (code, _, _) = self._Test(http.HTTP_GET, "/", "", None, reqauth=True) |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| def testVersion(self): |
| (code, _, data) = self._Test(http.HTTP_GET, "/version", "", None) |
| self.assertEqual(code, http.HTTP_OK) |
| self.assertEqual(data, constants.RAPI_VERSION) |
| |
| def testSlashTwo(self): |
| (code, _, data) = self._Test(http.HTTP_GET, "/2", "", None) |
| self.assertEqual(code, http.HTTP_OK) |
| self.assertTrue(data is None) |
| |
| def testFeatures(self): |
| (code, _, data) = self._Test(http.HTTP_GET, "/2/features", "", None) |
| self.assertEqual(code, http.HTTP_OK) |
| self.assertEqual(set(data), set(rapi.rlib2.ALL_FEATURES)) |
| |
| def testPutInstances(self): |
| (code, _, data) = self._Test(http.HTTP_PUT, "/2/instances", "", None) |
| self.assertEqual(code, http.HttpNotImplemented.code) |
| self.assertTrue(data["message"].startswith("Method PUT is unsupported")) |
| |
| def testPostInstancesNoAuth(self): |
| (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", "", None) |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| def testRequestWithUnsupportedMediaType(self): |
| for fn in [lambda s: s, lambda s: s.upper(), lambda s: s.title()]: |
| headers = rapi.testutils._FormatHeaders([ |
| "%s: %s" % (http.HTTP_CONTENT_TYPE, fn("un/supported/media/type")), |
| ]) |
| (code, _, data) = self._Test(http.HTTP_GET, "/", headers, "body") |
| self.assertEqual(code, http.HttpUnsupportedMediaType.code) |
| self.assertEqual(data["message"], "Unsupported Media Type") |
| |
| def testRequestWithInvalidJsonData(self): |
| body = "_this/is/no'valid.json" |
| self.assertRaises(Exception, serializer.LoadJson, body) |
| |
| headers = rapi.testutils._FormatHeaders([ |
| "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON), |
| ]) |
| |
| (code, _, data) = self._Test(http.HTTP_GET, "/", headers, body) |
| self.assertEqual(code, http.HttpBadRequest.code) |
| self.assertEqual(data["message"], "Unable to parse JSON data") |
| |
| def testUnsupportedAuthScheme(self): |
| headers = rapi.testutils._FormatHeaders([ |
| "%s: %s" % (http.HTTP_AUTHORIZATION, "Unsupported scheme"), |
| ]) |
| |
| (code, _, _) = self._Test(http.HTTP_POST, "/2/instances", headers, "") |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| def testIncompleteBasicAuth(self): |
| headers = rapi.testutils._FormatHeaders([ |
| "%s: Basic" % http.HTTP_AUTHORIZATION, |
| ]) |
| |
| (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "") |
| self.assertEqual(code, http.HttpBadRequest.code) |
| self.assertEqual(data["message"], |
| "Basic authentication requires credentials") |
| |
| def testInvalidBasicAuth(self): |
| for auth in ["!invalid=base!64.", base64.b64encode(" "), |
| base64.b64encode("missingcolonchar")]: |
| headers = rapi.testutils._FormatHeaders([ |
| "%s: Basic %s" % (http.HTTP_AUTHORIZATION, auth), |
| ]) |
| |
| (code, _, data) = self._Test(http.HTTP_POST, "/2/instances", headers, "") |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| @staticmethod |
| def _MakeAuthHeaders(username, password, correct_password): |
| if correct_password: |
| pw = password |
| else: |
| pw = "wrongpass" |
| |
| return rapi.testutils._FormatHeaders([ |
| "%s: Basic %s" % (http.HTTP_AUTHORIZATION, |
| base64.b64encode("%s:%s" % (username, pw))), |
| "%s: %s" % (http.HTTP_CONTENT_TYPE, http.HTTP_APP_JSON), |
| ]) |
| |
| def testQueryAuth(self): |
| username = "admin" |
| password = "2046920054" |
| |
| header_fn = compat.partial(self._MakeAuthHeaders, username, password) |
| |
| def _LookupUserNoWrite(name): |
| if name == username: |
| return http.auth.PasswordFileUser(name, password, []) |
| else: |
| return None |
| |
| for access in [rapi.RAPI_ACCESS_WRITE, rapi.RAPI_ACCESS_READ]: |
| def _LookupUserWithWrite(name): |
| if name == username: |
| return http.auth.PasswordFileUser(name, password, [ |
| access, |
| ]) |
| else: |
| return None |
| |
| for qr in constants.QR_VIA_RAPI: |
| # The /2/query resource has somewhat special rules for authentication as |
| # it can be used to retrieve critical information |
| path = "/2/query/%s" % qr |
| |
| for method in rapi.baserlib._SUPPORTED_METHODS: |
| # No authorization |
| (code, _, _) = self._Test(method, path, "", "") |
| |
| if method in (http.HTTP_DELETE, http.HTTP_POST): |
| self.assertEqual(code, http.HttpNotImplemented.code) |
| continue |
| |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| # Incorrect user |
| (code, _, _) = self._Test(method, path, header_fn(True), "", |
| user_fn=self._LookupWrongUser) |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| # User has no write access, but the password is correct |
| (code, _, _) = self._Test(method, path, header_fn(True), "", |
| user_fn=_LookupUserNoWrite) |
| self.assertEqual(code, http.HttpForbidden.code) |
| |
| # Wrong password and no write access |
| (code, _, _) = self._Test(method, path, header_fn(False), "", |
| user_fn=_LookupUserNoWrite) |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| # Wrong password with write access |
| (code, _, _) = self._Test(method, path, header_fn(False), "", |
| user_fn=_LookupUserWithWrite) |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| |
| # Prepare request information |
| if method == http.HTTP_PUT: |
| reqpath = path |
| body = serializer.DumpJson({ |
| "fields": ["name"], |
| }) |
| elif method == http.HTTP_GET: |
| reqpath = "%s?fields=name" % path |
| body = "" |
| else: |
| self.fail("Unknown method '%s'" % method) |
| |
| # User has write access, password is correct |
| (code, _, data) = self._Test(method, reqpath, header_fn(True), body, |
| user_fn=_LookupUserWithWrite, |
| luxi_client=_FakeLuxiClientForQuery) |
| self.assertEqual(code, http.HTTP_OK) |
| self.assertTrue(objects.QueryResponse.FromDict(data)) |
| |
| def testConsole(self): |
| path = "/2/instances/inst1.example.com/console" |
| |
| for method in rapi.baserlib._SUPPORTED_METHODS: |
| for reqauth in [False, True]: |
| # No authorization |
| (code, _, _) = self._Test(method, path, "", "", reqauth=reqauth) |
| |
| if method == http.HTTP_GET or reqauth: |
| self.assertEqual(code, http.HttpUnauthorized.code) |
| else: |
| self.assertEqual(code, http.HttpNotImplemented.code) |
| |
| |
| class _FakeLuxiClientForQuery: |
| def __init__(self, *args, **kwargs): |
| pass |
| |
| def Query(self, *args): |
| return objects.QueryResponse(fields=[]) |
| |
| |
| if __name__ == "__main__": |
| testutils.GanetiTestProgram() |