summary refs log tree commit diff
path: root/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py
diff options
context:
space:
mode:
Diffstat (limited to 'nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py')
-rw-r--r--[-rwxr-xr-x]nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py221
1 files changed, 112 insertions, 109 deletions
diff --git a/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py b/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py
index a040518a5a5..e2e7ffe59dc 100755..100644
--- a/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py
+++ b/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py
@@ -1,27 +1,41 @@
 #! @python3@/bin/python3 -B
 import argparse
-import shutil
-import os
-import sys
-import errno
-import subprocess
-import glob
-import tempfile
-import errno
-import warnings
 import ctypes
-libc = ctypes.CDLL("libc.so.6")
-import re
 import datetime
+import errno
 import glob
+import os
 import os.path
-from typing import NamedTuple, List, Optional
-from packaging import version
+import re
+import shutil
+import subprocess
+import sys
+import warnings
+import json
+from typing import NamedTuple, Dict, List
+from dataclasses import dataclass
+
+
+@dataclass
+class BootSpec:
+    init: str
+    initrd: str
+    initrdSecrets: str
+    kernel: str
+    kernelParams: List[str]
+    label: str
+    system: str
+    toplevel: str
+    specialisations: Dict[str, "BootSpec"]
+
+
+
+libc = ctypes.CDLL("libc.so.6")
 
 class SystemIdentifier(NamedTuple):
-    profile: Optional[str]
+    profile: str | None
     generation: int
-    specialisation: Optional[str]
+    specialisation: str | None
 
 
 def copy_if_not_exists(source: str, dest: str) -> None:
@@ -29,13 +43,13 @@ def copy_if_not_exists(source: str, dest: str) -> None:
         shutil.copyfile(source, dest)
 
 
-def generation_dir(profile: Optional[str], generation: int) -> str:
+def generation_dir(profile: str | None, generation: int) -> str:
     if profile:
         return "/nix/var/nix/profiles/system-profiles/%s-%d-link" % (profile, generation)
     else:
         return "/nix/var/nix/profiles/system-%d-link" % (generation)
 
-def system_dir(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str:
+def system_dir(profile: str | None, generation: int, specialisation: str | None) -> str:
     d = generation_dir(profile, generation)
     if specialisation:
         return os.path.join(d, "specialisation", specialisation)
@@ -49,7 +63,7 @@ initrd {initrd}
 options {kernel_params}
 """
 
-def generation_conf_filename(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str:
+def generation_conf_filename(profile: str | None, generation: int, specialisation: str | None) -> str:
     pieces = [
         "nixos",
         profile or None,
@@ -60,23 +74,44 @@ def generation_conf_filename(profile: Optional[str], generation: int, specialisa
     return "-".join(p for p in pieces if p) + ".conf"
 
 
-def write_loader_conf(profile: Optional[str], generation: int, specialisation: Optional[str]) -> None:
+def write_loader_conf(profile: str | None, generation: int, specialisation: str | None) -> None:
     with open("@efiSysMountPoint@/loader/loader.conf.tmp", 'w') as f:
         if "@timeout@" != "":
             f.write("timeout @timeout@\n")
         f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation))
         if not @editor@:
-            f.write("editor 0\n");
-        f.write("console-mode @consoleMode@\n");
+            f.write("editor 0\n")
+        f.write("console-mode @consoleMode@\n")
+        f.flush()
+        os.fsync(f.fileno())
     os.rename("@efiSysMountPoint@/loader/loader.conf.tmp", "@efiSysMountPoint@/loader/loader.conf")
 
 
-def profile_path(profile: Optional[str], generation: int, specialisation: Optional[str], name: str) -> str:
-    return os.path.realpath("%s/%s" % (system_dir(profile, generation, specialisation), name))
+def get_bootspec(profile: str | None, generation: int) -> BootSpec:
+    system_directory = system_dir(profile, generation, None)
+    boot_json_path = os.path.realpath("%s/%s" % (system_directory, "boot.json"))
+    if os.path.isfile(boot_json_path):
+        boot_json_f = open(boot_json_path, 'r')
+        bootspec_json = json.load(boot_json_f)
+    else:
+        boot_json_str = subprocess.check_output([
+        "@bootspecTools@/bin/synthesize",
+        "--version",
+        "1",
+        system_directory,
+        "/dev/stdout"],
+        universal_newlines=True)
+        bootspec_json = json.loads(boot_json_str)
+    return bootspec_from_json(bootspec_json)
 
+def bootspec_from_json(bootspec_json: Dict) -> BootSpec:
+    specialisations = bootspec_json['org.nixos.specialisation.v1']
+    specialisations = {k: bootspec_from_json(v) for k, v in specialisations.items()}
+    return BootSpec(**bootspec_json['org.nixos.bootspec.v1'], specialisations=specialisations)
 
-def copy_from_profile(profile: Optional[str], generation: int, specialisation: Optional[str], name: str, dry_run: bool = False) -> str:
-    store_file_path = profile_path(profile, generation, specialisation, name)
+
+def copy_from_file(file: str, dry_run: bool = False) -> str:
+    store_file_path = os.path.realpath(file)
     suffix = os.path.basename(store_file_path)
     store_dir = os.path.basename(os.path.dirname(store_file_path))
     efi_file_path = "/efi/nixos/%s-%s.efi" % (store_dir, suffix)
@@ -84,40 +119,19 @@ def copy_from_profile(profile: Optional[str], generation: int, specialisation: O
         copy_if_not_exists(store_file_path, "@efiSysMountPoint@%s" % (efi_file_path))
     return efi_file_path
 
-
-def describe_generation(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str:
-    try:
-        with open(profile_path(profile, generation, specialisation, "nixos-version")) as f:
-            nixos_version = f.read()
-    except IOError:
-        nixos_version = "Unknown"
-
-    kernel_dir = os.path.dirname(profile_path(profile, generation, specialisation, "kernel"))
-    module_dir = glob.glob("%s/lib/modules/*" % kernel_dir)[0]
-    kernel_version = os.path.basename(module_dir)
-
-    build_time = int(os.path.getctime(system_dir(profile, generation, specialisation)))
-    build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')
-
-    description = "@distroName@ {}, Linux Kernel {}, Built on {}".format(
-        nixos_version, kernel_version, build_date
-    )
-
-    return description
-
-
-def write_entry(profile: Optional[str], generation: int, specialisation: Optional[str],
-                machine_id: str, current: bool) -> None:
-    kernel = copy_from_profile(profile, generation, specialisation, "kernel")
-    initrd = copy_from_profile(profile, generation, specialisation, "initrd")
+def write_entry(profile: str | None, generation: int, specialisation: str | None,
+                machine_id: str, bootspec: BootSpec, current: bool) -> None:
+    if specialisation:
+        bootspec = bootspec.specialisations[specialisation]
+    kernel = copy_from_file(bootspec.kernel)
+    initrd = copy_from_file(bootspec.initrd)
 
     title = "@distroName@{profile}{specialisation}".format(
         profile=" [" + profile + "]" if profile else "",
         specialisation=" (%s)" % specialisation if specialisation else "")
 
     try:
-        append_initrd_secrets = profile_path(profile, generation, specialisation, "append-initrd-secrets")
-        subprocess.check_call([append_initrd_secrets, "@efiSysMountPoint@%s" % (initrd)])
+        subprocess.check_call([bootspec.initrdSecrets, "@efiSysMountPoint@%s" % (initrd)])
     except FileNotFoundError:
         pass
     except subprocess.CalledProcessError:
@@ -132,31 +146,27 @@ def write_entry(profile: Optional[str], generation: int, specialisation: Optiona
     entry_file = "@efiSysMountPoint@/loader/entries/%s" % (
         generation_conf_filename(profile, generation, specialisation))
     tmp_path = "%s.tmp" % (entry_file)
-    kernel_params = "init=%s " % profile_path(profile, generation, specialisation, "init")
+    kernel_params = "init=%s " % bootspec.init
+
+    kernel_params = kernel_params + " ".join(bootspec.kernelParams)
+    build_time = int(os.path.getctime(system_dir(profile, generation, specialisation)))
+    build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')
 
-    with open(profile_path(profile, generation, specialisation, "kernel-params")) as params_file:
-        kernel_params = kernel_params + params_file.read()
     with open(tmp_path, 'w') as f:
         f.write(BOOT_ENTRY.format(title=title,
                     generation=generation,
                     kernel=kernel,
                     initrd=initrd,
                     kernel_params=kernel_params,
-                    description=describe_generation(profile, generation, specialisation)))
+                    description=f"{bootspec.label}, built on {build_date}"))
         if machine_id is not None:
             f.write("machine-id %s\n" % machine_id)
+        f.flush()
+        os.fsync(f.fileno())
     os.rename(tmp_path, entry_file)
 
 
-def mkdir_p(path: str) -> None:
-    try:
-        os.makedirs(path)
-    except OSError as e:
-        if e.errno != errno.EEXIST or not os.path.isdir(path):
-            raise
-
-
-def get_generations(profile: Optional[str] = None) -> List[SystemIdentifier]:
+def get_generations(profile: str | None = None) -> list[SystemIdentifier]:
     gen_list = subprocess.check_output([
         "@nix@/bin/nix-env",
         "--list-generations",
@@ -179,21 +189,14 @@ def get_generations(profile: Optional[str] = None) -> List[SystemIdentifier]:
     return configurations[-configurationLimit:]
 
 
-def get_specialisations(profile: Optional[str], generation: int, _: Optional[str]) -> List[SystemIdentifier]:
-    specialisations_dir = os.path.join(
-            system_dir(profile, generation, None), "specialisation")
-    if not os.path.exists(specialisations_dir):
-        return []
-    return [SystemIdentifier(profile, generation, spec) for spec in os.listdir(specialisations_dir)]
-
-
-def remove_old_entries(gens: List[SystemIdentifier]) -> None:
-    rex_profile = re.compile("^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$")
-    rex_generation = re.compile("^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
+def remove_old_entries(gens: list[SystemIdentifier]) -> None:
+    rex_profile = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$")
+    rex_generation = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
     known_paths = []
     for gen in gens:
-        known_paths.append(copy_from_profile(*gen, "kernel", True))
-        known_paths.append(copy_from_profile(*gen, "initrd", True))
+        bootspec = get_bootspec(gen.profile, gen.generation)
+        known_paths.append(copy_from_file(bootspec.kernel, True))
+        known_paths.append(copy_from_file(bootspec.initrd, True))
     for path in glob.iglob("@efiSysMountPoint@/loader/entries/nixos*-generation-[1-9]*.conf"):
         if rex_profile.match(path):
             prof = rex_profile.sub(r"\1", path)
@@ -210,7 +213,7 @@ def remove_old_entries(gens: List[SystemIdentifier]) -> None:
             os.unlink(path)
 
 
-def get_profiles() -> List[str]:
+def get_profiles() -> list[str]:
     if os.path.isdir("/nix/var/nix/profiles/system-profiles/"):
         return [x
             for x in os.listdir("/nix/var/nix/profiles/system-profiles/")
@@ -218,11 +221,7 @@ def get_profiles() -> List[str]:
     else:
         return []
 
-def main() -> None:
-    parser = argparse.ArgumentParser(description='Update @distroName@-related systemd-boot files')
-    parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default @distroName@ config to boot')
-    args = parser.parse_args()
-
+def install_bootloader(args: argparse.Namespace) -> None:
     try:
         with open("/etc/machine-id") as machine_file:
             machine_id = machine_file.readlines()[0]
@@ -273,21 +272,15 @@ def main() -> None:
         if available_match is None:
             raise Exception("could not determine systemd-boot version")
 
-        installed_version = version.parse(installed_match.group(1))
-        available_version = version.parse(available_match.group(1))
+        installed_version = installed_match.group(1)
+        available_version = available_match.group(1)
 
-        # systemd 252 has a regression that leaves some machines unbootable, so we skip that update.
-        # The fix is in 252.2
-        # See https://github.com/systemd/systemd/issues/25363 and https://github.com/NixOS/nixpkgs/pull/201558#issuecomment-1348603263
         if installed_version < available_version:
-            if version.parse('252') <= available_version < version.parse('252.2'):
-                print("skipping systemd-boot update to %s because of known regression" % available_version)
-            else:
-                print("updating systemd-boot from %s to %s" % (installed_version, available_version))
-                subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["update"])
+            print("updating systemd-boot from %s to %s" % (installed_version, available_version))
+            subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["update"])
 
-    mkdir_p("@efiSysMountPoint@/efi/nixos")
-    mkdir_p("@efiSysMountPoint@/loader/entries")
+    os.makedirs("@efiSysMountPoint@/efi/nixos", exist_ok=True)
+    os.makedirs("@efiSysMountPoint@/loader/entries", exist_ok=True)
 
     gens = get_generations()
     for profile in get_profiles():
@@ -295,10 +288,11 @@ def main() -> None:
     remove_old_entries(gens)
     for gen in gens:
         try:
-            is_default = os.path.dirname(profile_path(*gen, "init")) == args.default_config
-            write_entry(*gen, machine_id, current=is_default)
-            for specialisation in get_specialisations(*gen):
-                write_entry(*specialisation, machine_id, current=is_default)
+            bootspec = get_bootspec(gen.profile, gen.generation)
+            is_default = os.path.dirname(bootspec.init) == args.default_config
+            write_entry(*gen, machine_id, bootspec, current=is_default)
+            for specialisation in bootspec.specialisations.keys():
+                write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, current=is_default)
             if is_default:
                 write_loader_conf(*gen)
         except OSError as e:
@@ -324,17 +318,26 @@ def main() -> None:
             os.rmdir(actual_root)
         os.rmdir(root)
 
-    mkdir_p("@efiSysMountPoint@/efi/nixos/.extra-files")
+    os.makedirs("@efiSysMountPoint@/efi/nixos/.extra-files", exist_ok=True)
 
     subprocess.check_call("@copyExtraFiles@")
 
-    # Since fat32 provides little recovery facilities after a crash,
-    # it can leave the system in an unbootable state, when a crash/outage
-    # happens shortly after an update. To decrease the likelihood of this
-    # event sync the efi filesystem after each update.
-    rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY))
-    if rc != 0:
-        print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr)
+
+def main() -> None:
+    parser = argparse.ArgumentParser(description='Update @distroName@-related systemd-boot files')
+    parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default @distroName@ config to boot')
+    args = parser.parse_args()
+
+    try:
+        install_bootloader(args)
+    finally:
+        # Since fat32 provides little recovery facilities after a crash,
+        # it can leave the system in an unbootable state, when a crash/outage
+        # happens shortly after an update. To decrease the likelihood of this
+        # event sync the efi filesystem after each update.
+        rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY))
+        if rc != 0:
+            print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr)
 
 
 if __name__ == '__main__':