summary refs log tree commit diff
path: root/nixos/tests/google-oslogin/server.py
blob: bfc527cb97d3775e596de5b6a4ff99b4d0f17db6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python3
import json
import sys
import time
import os
import hashlib
import base64

from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict

SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY']


def w(msg):
    sys.stderr.write(f"{msg}\n")
    sys.stderr.flush()


def gen_fingerprint(pubkey):
    decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1])
    return hashlib.sha256(decoded_key).hexdigest()

def gen_email(username):
    """username seems to be a 21 characters long number string, so mimic that in a reproducible way"""
    return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21]

def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict:
    snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey)
    # seems to be a 21 characters long numberstring, so mimic that in a reproducible way
    email = gen_email(username)
    return {
        "loginProfiles": [
            {
                "name": email,
                "posixAccounts": [
                    {
                        "primary": True,
                        "username": username,
                        "uid": uid,
                        "gid": gid,
                        "homeDirectory": home_directory,
                        "operatingSystemType": "LINUX"
                    }
                ],
                "sshPublicKeys": {
                    snakeoil_pubkey_fingerprint: {
                        "key": snakeoil_pubkey,
                        "expirationTimeUsec": str((time.time() + 600) * 1000000),  # 10 minutes in the future
                        "fingerprint": snakeoil_pubkey_fingerprint
                    }
                }
            }
        ]
    }


class ReqHandler(BaseHTTPRequestHandler):
    def _send_json_ok(self, data):
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        out = json.dumps(data).encode()
        w(out)
        self.wfile.write(out)

    def do_GET(self):
        p = str(self.path)
        # mockuser and mockadmin are allowed to login, both use the same snakeoil public key
        if p == '/computeMetadata/v1/oslogin/users?username=mockuser' \
            or p == '/computeMetadata/v1/oslogin/users?uid=1009719690':
            self._send_json_ok(gen_mockuser(username='mockuser', uid='1009719690', gid='1009719690',
                                            home_directory='/home/mockuser', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))
        elif p == '/computeMetadata/v1/oslogin/users?username=mockadmin' \
            or p == '/computeMetadata/v1/oslogin/users?uid=1009719691':
            self._send_json_ok(gen_mockuser(username='mockadmin', uid='1009719691', gid='1009719691',
                                            home_directory='/home/mockadmin', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY))

        # mockuser is allowed to login
        elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockuser')}&policy=login":
            self._send_json_ok({'success': True})

        # mockadmin may also become root
        elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=login" or p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=adminLogin":
            self._send_json_ok({'success': True})
        else:
            sys.stderr.write(f"Unhandled path: {p}\n")
            sys.stderr.flush()
            self.send_response(501)
            self.end_headers()
            self.wfile.write(b'')


if __name__ == '__main__':
    s = HTTPServer(('0.0.0.0', 80), ReqHandler)
    s.serve_forever()