diff options
Diffstat (limited to 'nixos/tests/google-oslogin')
-rw-r--r-- | nixos/tests/google-oslogin/default.nix | 74 | ||||
-rw-r--r-- | nixos/tests/google-oslogin/server.nix | 27 | ||||
-rwxr-xr-x | nixos/tests/google-oslogin/server.py | 135 |
3 files changed, 236 insertions, 0 deletions
diff --git a/nixos/tests/google-oslogin/default.nix b/nixos/tests/google-oslogin/default.nix new file mode 100644 index 00000000000..72c87d7153b --- /dev/null +++ b/nixos/tests/google-oslogin/default.nix @@ -0,0 +1,74 @@ +import ../make-test-python.nix ({ pkgs, ... } : +let + inherit (import ./../ssh-keys.nix pkgs) + snakeOilPrivateKey snakeOilPublicKey; + + # don't check host keys or known hosts, use the snakeoil ssh key + ssh-config = builtins.toFile "ssh.conf" '' + UserKnownHostsFile=/dev/null + StrictHostKeyChecking=no + IdentityFile=~/.ssh/id_snakeoil + ''; +in { + name = "google-oslogin"; + meta = with pkgs.lib.maintainers; { + maintainers = [ adisbladis flokli ]; + }; + + nodes = { + # the server provides both the the mocked google metadata server and the ssh server + server = (import ./server.nix pkgs); + + client = { ... }: {}; + }; + testScript = '' + MOCKUSER = "mockuser_nixos_org" + MOCKADMIN = "mockadmin_nixos_org" + start_all() + + server.wait_for_unit("mock-google-metadata.service") + server.wait_for_open_port(80) + + # mockserver should return a non-expired ssh key for both mockuser and mockadmin + server.succeed( + f'${pkgs.google-guest-oslogin}/bin/google_authorized_keys {MOCKUSER} | grep -q "${snakeOilPublicKey}"' + ) + server.succeed( + f'${pkgs.google-guest-oslogin}/bin/google_authorized_keys {MOCKADMIN} | grep -q "${snakeOilPublicKey}"' + ) + + # install snakeoil ssh key on the client, and provision .ssh/config file + client.succeed("mkdir -p ~/.ssh") + client.succeed( + "cat ${snakeOilPrivateKey} > ~/.ssh/id_snakeoil" + ) + client.succeed("chmod 600 ~/.ssh/id_snakeoil") + client.succeed("cp ${ssh-config} ~/.ssh/config") + + client.wait_for_unit("network.target") + server.wait_for_unit("sshd.service") + + # we should not be able to connect as non-existing user + client.fail("ssh ghost@server 'true'") + + # we should be able to connect as mockuser + client.succeed(f"ssh {MOCKUSER}@server 'true'") + # but we shouldn't be able to sudo + client.fail( + f"ssh {MOCKUSER}@server '/run/wrappers/bin/sudo /run/current-system/sw/bin/id' | grep -q 'root'" + ) + + # we should also be able to log in as mockadmin + client.succeed(f"ssh {MOCKADMIN}@server 'true'") + # pam_oslogin_admin.so should now have generated a sudoers file + server.succeed( + f"find /run/google-sudoers.d | grep -q '/run/google-sudoers.d/{MOCKADMIN}'" + ) + + # and we should be able to sudo + client.succeed( + f"ssh {MOCKADMIN}@server '/run/wrappers/bin/sudo /run/current-system/sw/bin/id' | grep -q 'root'" + ) + ''; + }) + diff --git a/nixos/tests/google-oslogin/server.nix b/nixos/tests/google-oslogin/server.nix new file mode 100644 index 00000000000..faf5e847d7e --- /dev/null +++ b/nixos/tests/google-oslogin/server.nix @@ -0,0 +1,27 @@ +{ pkgs, ... }: +let + inherit (import ./../ssh-keys.nix pkgs) + snakeOilPrivateKey snakeOilPublicKey; +in { + networking.firewall.allowedTCPPorts = [ 80 ]; + + systemd.services.mock-google-metadata = { + description = "Mock Google metadata service"; + serviceConfig.Type = "simple"; + serviceConfig.ExecStart = "${pkgs.python3}/bin/python ${./server.py}"; + environment = { + SNAKEOIL_PUBLIC_KEY = snakeOilPublicKey; + }; + wantedBy = [ "multi-user.target" ]; + after = [ "network.target" ]; + }; + + services.openssh.enable = true; + services.openssh.kbdInteractiveAuthentication = false; + services.openssh.passwordAuthentication = false; + + security.googleOsLogin.enable = true; + + # Mock google service + networking.interfaces.lo.ipv4.addresses = [ { address = "169.254.169.254"; prefixLength = 32; } ]; +} diff --git a/nixos/tests/google-oslogin/server.py b/nixos/tests/google-oslogin/server.py new file mode 100755 index 00000000000..5ea9bbd2c96 --- /dev/null +++ b/nixos/tests/google-oslogin/server.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +import json +import sys +import time +import os +import hashlib +import base64 + +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse, parse_qs +from typing import Dict + +SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY'] +MOCKUSER="mockuser_nixos_org" +MOCKADMIN="mockadmin_nixos_org" + + +def w(msg: bytes): + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def gen_fingerprint(pubkey: str): + decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1]) + return hashlib.sha256(decoded_key).hexdigest() + + +def gen_email(username: str): + """username seems to be a 21 characters long number string, so mimic that in a reproducible way""" + return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21] + + +def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict: + snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey) + # seems to be a 21 characters long numberstring, so mimic that in a reproducible way + email = gen_email(username) + return { + "loginProfiles": [ + { + "name": email, + "posixAccounts": [ + { + "primary": True, + "username": username, + "uid": uid, + "gid": gid, + "homeDirectory": home_directory, + "operatingSystemType": "LINUX" + } + ], + "sshPublicKeys": { + snakeoil_pubkey_fingerprint: { + "key": snakeoil_pubkey, + "expirationTimeUsec": str((time.time() + 600) * 1000000), # 10 minutes in the future + "fingerprint": snakeoil_pubkey_fingerprint + } + } + } + ] + } + + +class ReqHandler(BaseHTTPRequestHandler): + + def _send_json_ok(self, data: dict): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + out = json.dumps(data).encode() + w(out) + self.wfile.write(out) + + def _send_json_success(self, success=True): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + out = json.dumps({"success": success}).encode() + w(out) + self.wfile.write(out) + + def _send_404(self): + self.send_response(404) + self.end_headers() + + def do_GET(self): + p = str(self.path) + pu = urlparse(p) + params = parse_qs(pu.query) + + # users endpoint + if pu.path == "/computeMetadata/v1/oslogin/users": + # mockuser and mockadmin are allowed to login, both use the same snakeoil public key + if params.get('username') == [MOCKUSER] or params.get('uid') == ["1009719690"]: + username = MOCKUSER + uid = "1009719690" + elif params.get('username') == [MOCKADMIN] or params.get('uid') == ["1009719691"]: + username = MOCKADMIN + uid = "1009719691" + else: + self._send_404() + return + + self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) + return + + # authorize endpoint + elif pu.path == "/computeMetadata/v1/oslogin/authorize": + # is user allowed to login? + if params.get("policy") == ["login"]: + # mockuser and mockadmin are allowed to login + if params.get('email') == [gen_email(MOCKUSER)] or params.get('email') == [gen_email(MOCKADMIN)]: + self._send_json_success() + return + self._send_json_success(False) + return + # is user allowed to become root? + elif params.get("policy") == ["adminLogin"]: + # only mockadmin is allowed to become admin + self._send_json_success((params['email'] == [gen_email(MOCKADMIN)])) + return + # send 404 for other policies + else: + self._send_404() + return + else: + sys.stderr.write(f"Unhandled path: {p}\n") + sys.stderr.flush() + self.send_response(404) + self.end_headers() + self.wfile.write(b'') + + +if __name__ == '__main__': + s = HTTPServer(('0.0.0.0', 80), ReqHandler) + s.serve_forever() |