From f38e45c2e0ea15c1882308299fbe24f6c46b8243 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 1 Apr 2020 15:53:56 +0200 Subject: [PATCH] nixos/google-oslogin: improve mock server some slightly better error handling for nonexistent users, less parsing of URLs and query strings by hand. --- nixos/tests/google-oslogin/server.py | 77 ++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 20 deletions(-) diff --git a/nixos/tests/google-oslogin/server.py b/nixos/tests/google-oslogin/server.py index bfc527cb97d3..eb0c77982d01 100644 --- a/nixos/tests/google-oslogin/server.py +++ b/nixos/tests/google-oslogin/server.py @@ -7,24 +7,27 @@ import hashlib import base64 from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse, parse_qs from typing import Dict SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY'] -def w(msg): +def w(msg: bytes): sys.stderr.write(f"{msg}\n") sys.stderr.flush() -def gen_fingerprint(pubkey): +def gen_fingerprint(pubkey: str): decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1]) return hashlib.sha256(decoded_key).hexdigest() -def gen_email(username): + +def gen_email(username: str): """username seems to be a 21 characters long number string, so mimic that in a reproducible way""" return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21] + def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict: snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey) # seems to be a 21 characters long numberstring, so mimic that in a reproducible way @@ -56,7 +59,8 @@ def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoi class ReqHandler(BaseHTTPRequestHandler): - def _send_json_ok(self, data): + + def _send_json_ok(self, data: dict): self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() @@ -64,29 +68,62 @@ class ReqHandler(BaseHTTPRequestHandler): w(out) self.wfile.write(out) + def _send_json_success(self, success=True): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + out = json.dumps({"success": success}).encode() + w(out) + self.wfile.write(out) + + def _send_404(self): + self.send_response(404) + self.end_headers() + def do_GET(self): p = str(self.path) - # mockuser and mockadmin are allowed to login, both use the same snakeoil public key - if p == '/computeMetadata/v1/oslogin/users?username=mockuser' \ - or p == '/computeMetadata/v1/oslogin/users?uid=1009719690': - self._send_json_ok(gen_mockuser(username='mockuser', uid='1009719690', gid='1009719690', - home_directory='/home/mockuser', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) - elif p == '/computeMetadata/v1/oslogin/users?username=mockadmin' \ - or p == '/computeMetadata/v1/oslogin/users?uid=1009719691': - self._send_json_ok(gen_mockuser(username='mockadmin', uid='1009719691', gid='1009719691', - home_directory='/home/mockadmin', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) + pu = urlparse(p) + params = parse_qs(pu.query) - # mockuser is allowed to login - elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockuser')}&policy=login": - self._send_json_ok({'success': True}) + # users endpoint + if pu.path == "/computeMetadata/v1/oslogin/users": + # mockuser and mockadmin are allowed to login, both use the same snakeoil public key + if params.get('username') == ['mockuser'] or params.get('uid') == ["1009719690"]: + username = "mockuser" + uid = "1009719690" + elif params.get('username') == ['mockadmin'] or params.get('uid') == ["1009719691"]: + username = "mockadmin" + uid = "1009719691" + else: + self._send_404() + return - # mockadmin may also become root - elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=login" or p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=adminLogin": - self._send_json_ok({'success': True}) + self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) + return + + # authorize endpoint + elif pu.path == "/computeMetadata/v1/oslogin/authorize": + # is user allowed to login? + if params.get("policy") == ["login"]: + # mockuser and mockadmin are allowed to login + if params.get('email') == [gen_email("mockuser")] or params.get('email') == [gen_email("mockadmin")]: + self._send_json_success() + return + self._send_json_success(False) + return + # is user allowed to become root? + elif params.get("policy") == ["adminLogin"]: + # only mockadmin is allowed to become admin + self._send_json_success((params['email'] == [gen_email("mockadmin")])) + return + # send 404 for other policies + else: + self._send_404() + return else: sys.stderr.write(f"Unhandled path: {p}\n") sys.stderr.flush() - self.send_response(501) + self.send_response(404) self.end_headers() self.wfile.write(b'')