diff options
Diffstat (limited to 'nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py')
-rw-r--r--[-rwxr-xr-x] | nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py | 221 |
1 files changed, 112 insertions, 109 deletions
diff --git a/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py b/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py index a040518a5a5..e2e7ffe59dc 100755..100644 --- a/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py +++ b/nixos/modules/system/boot/loader/systemd-boot/systemd-boot-builder.py @@ -1,27 +1,41 @@ #! @python3@/bin/python3 -B import argparse -import shutil -import os -import sys -import errno -import subprocess -import glob -import tempfile -import errno -import warnings import ctypes -libc = ctypes.CDLL("libc.so.6") -import re import datetime +import errno import glob +import os import os.path -from typing import NamedTuple, List, Optional -from packaging import version +import re +import shutil +import subprocess +import sys +import warnings +import json +from typing import NamedTuple, Dict, List +from dataclasses import dataclass + + +@dataclass +class BootSpec: + init: str + initrd: str + initrdSecrets: str + kernel: str + kernelParams: List[str] + label: str + system: str + toplevel: str + specialisations: Dict[str, "BootSpec"] + + + +libc = ctypes.CDLL("libc.so.6") class SystemIdentifier(NamedTuple): - profile: Optional[str] + profile: str | None generation: int - specialisation: Optional[str] + specialisation: str | None def copy_if_not_exists(source: str, dest: str) -> None: @@ -29,13 +43,13 @@ def copy_if_not_exists(source: str, dest: str) -> None: shutil.copyfile(source, dest) -def generation_dir(profile: Optional[str], generation: int) -> str: +def generation_dir(profile: str | None, generation: int) -> str: if profile: return "/nix/var/nix/profiles/system-profiles/%s-%d-link" % (profile, generation) else: return "/nix/var/nix/profiles/system-%d-link" % (generation) -def system_dir(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str: +def system_dir(profile: str | None, generation: int, specialisation: str | None) -> str: d = generation_dir(profile, generation) if specialisation: return os.path.join(d, "specialisation", specialisation) @@ -49,7 +63,7 @@ initrd {initrd} options {kernel_params} """ -def generation_conf_filename(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str: +def generation_conf_filename(profile: str | None, generation: int, specialisation: str | None) -> str: pieces = [ "nixos", profile or None, @@ -60,23 +74,44 @@ def generation_conf_filename(profile: Optional[str], generation: int, specialisa return "-".join(p for p in pieces if p) + ".conf" -def write_loader_conf(profile: Optional[str], generation: int, specialisation: Optional[str]) -> None: +def write_loader_conf(profile: str | None, generation: int, specialisation: str | None) -> None: with open("@efiSysMountPoint@/loader/loader.conf.tmp", 'w') as f: if "@timeout@" != "": f.write("timeout @timeout@\n") f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation)) if not @editor@: - f.write("editor 0\n"); - f.write("console-mode @consoleMode@\n"); + f.write("editor 0\n") + f.write("console-mode @consoleMode@\n") + f.flush() + os.fsync(f.fileno()) os.rename("@efiSysMountPoint@/loader/loader.conf.tmp", "@efiSysMountPoint@/loader/loader.conf") -def profile_path(profile: Optional[str], generation: int, specialisation: Optional[str], name: str) -> str: - return os.path.realpath("%s/%s" % (system_dir(profile, generation, specialisation), name)) +def get_bootspec(profile: str | None, generation: int) -> BootSpec: + system_directory = system_dir(profile, generation, None) + boot_json_path = os.path.realpath("%s/%s" % (system_directory, "boot.json")) + if os.path.isfile(boot_json_path): + boot_json_f = open(boot_json_path, 'r') + bootspec_json = json.load(boot_json_f) + else: + boot_json_str = subprocess.check_output([ + "@bootspecTools@/bin/synthesize", + "--version", + "1", + system_directory, + "/dev/stdout"], + universal_newlines=True) + bootspec_json = json.loads(boot_json_str) + return bootspec_from_json(bootspec_json) +def bootspec_from_json(bootspec_json: Dict) -> BootSpec: + specialisations = bootspec_json['org.nixos.specialisation.v1'] + specialisations = {k: bootspec_from_json(v) for k, v in specialisations.items()} + return BootSpec(**bootspec_json['org.nixos.bootspec.v1'], specialisations=specialisations) -def copy_from_profile(profile: Optional[str], generation: int, specialisation: Optional[str], name: str, dry_run: bool = False) -> str: - store_file_path = profile_path(profile, generation, specialisation, name) + +def copy_from_file(file: str, dry_run: bool = False) -> str: + store_file_path = os.path.realpath(file) suffix = os.path.basename(store_file_path) store_dir = os.path.basename(os.path.dirname(store_file_path)) efi_file_path = "/efi/nixos/%s-%s.efi" % (store_dir, suffix) @@ -84,40 +119,19 @@ def copy_from_profile(profile: Optional[str], generation: int, specialisation: O copy_if_not_exists(store_file_path, "@efiSysMountPoint@%s" % (efi_file_path)) return efi_file_path - -def describe_generation(profile: Optional[str], generation: int, specialisation: Optional[str]) -> str: - try: - with open(profile_path(profile, generation, specialisation, "nixos-version")) as f: - nixos_version = f.read() - except IOError: - nixos_version = "Unknown" - - kernel_dir = os.path.dirname(profile_path(profile, generation, specialisation, "kernel")) - module_dir = glob.glob("%s/lib/modules/*" % kernel_dir)[0] - kernel_version = os.path.basename(module_dir) - - build_time = int(os.path.getctime(system_dir(profile, generation, specialisation))) - build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F') - - description = "@distroName@ {}, Linux Kernel {}, Built on {}".format( - nixos_version, kernel_version, build_date - ) - - return description - - -def write_entry(profile: Optional[str], generation: int, specialisation: Optional[str], - machine_id: str, current: bool) -> None: - kernel = copy_from_profile(profile, generation, specialisation, "kernel") - initrd = copy_from_profile(profile, generation, specialisation, "initrd") +def write_entry(profile: str | None, generation: int, specialisation: str | None, + machine_id: str, bootspec: BootSpec, current: bool) -> None: + if specialisation: + bootspec = bootspec.specialisations[specialisation] + kernel = copy_from_file(bootspec.kernel) + initrd = copy_from_file(bootspec.initrd) title = "@distroName@{profile}{specialisation}".format( profile=" [" + profile + "]" if profile else "", specialisation=" (%s)" % specialisation if specialisation else "") try: - append_initrd_secrets = profile_path(profile, generation, specialisation, "append-initrd-secrets") - subprocess.check_call([append_initrd_secrets, "@efiSysMountPoint@%s" % (initrd)]) + subprocess.check_call([bootspec.initrdSecrets, "@efiSysMountPoint@%s" % (initrd)]) except FileNotFoundError: pass except subprocess.CalledProcessError: @@ -132,31 +146,27 @@ def write_entry(profile: Optional[str], generation: int, specialisation: Optiona entry_file = "@efiSysMountPoint@/loader/entries/%s" % ( generation_conf_filename(profile, generation, specialisation)) tmp_path = "%s.tmp" % (entry_file) - kernel_params = "init=%s " % profile_path(profile, generation, specialisation, "init") + kernel_params = "init=%s " % bootspec.init + + kernel_params = kernel_params + " ".join(bootspec.kernelParams) + build_time = int(os.path.getctime(system_dir(profile, generation, specialisation))) + build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F') - with open(profile_path(profile, generation, specialisation, "kernel-params")) as params_file: - kernel_params = kernel_params + params_file.read() with open(tmp_path, 'w') as f: f.write(BOOT_ENTRY.format(title=title, generation=generation, kernel=kernel, initrd=initrd, kernel_params=kernel_params, - description=describe_generation(profile, generation, specialisation))) + description=f"{bootspec.label}, built on {build_date}")) if machine_id is not None: f.write("machine-id %s\n" % machine_id) + f.flush() + os.fsync(f.fileno()) os.rename(tmp_path, entry_file) -def mkdir_p(path: str) -> None: - try: - os.makedirs(path) - except OSError as e: - if e.errno != errno.EEXIST or not os.path.isdir(path): - raise - - -def get_generations(profile: Optional[str] = None) -> List[SystemIdentifier]: +def get_generations(profile: str | None = None) -> list[SystemIdentifier]: gen_list = subprocess.check_output([ "@nix@/bin/nix-env", "--list-generations", @@ -179,21 +189,14 @@ def get_generations(profile: Optional[str] = None) -> List[SystemIdentifier]: return configurations[-configurationLimit:] -def get_specialisations(profile: Optional[str], generation: int, _: Optional[str]) -> List[SystemIdentifier]: - specialisations_dir = os.path.join( - system_dir(profile, generation, None), "specialisation") - if not os.path.exists(specialisations_dir): - return [] - return [SystemIdentifier(profile, generation, spec) for spec in os.listdir(specialisations_dir)] - - -def remove_old_entries(gens: List[SystemIdentifier]) -> None: - rex_profile = re.compile("^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$") - rex_generation = re.compile("^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$") +def remove_old_entries(gens: list[SystemIdentifier]) -> None: + rex_profile = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos-(.*)-generation-.*\.conf$") + rex_generation = re.compile(r"^@efiSysMountPoint@/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$") known_paths = [] for gen in gens: - known_paths.append(copy_from_profile(*gen, "kernel", True)) - known_paths.append(copy_from_profile(*gen, "initrd", True)) + bootspec = get_bootspec(gen.profile, gen.generation) + known_paths.append(copy_from_file(bootspec.kernel, True)) + known_paths.append(copy_from_file(bootspec.initrd, True)) for path in glob.iglob("@efiSysMountPoint@/loader/entries/nixos*-generation-[1-9]*.conf"): if rex_profile.match(path): prof = rex_profile.sub(r"\1", path) @@ -210,7 +213,7 @@ def remove_old_entries(gens: List[SystemIdentifier]) -> None: os.unlink(path) -def get_profiles() -> List[str]: +def get_profiles() -> list[str]: if os.path.isdir("/nix/var/nix/profiles/system-profiles/"): return [x for x in os.listdir("/nix/var/nix/profiles/system-profiles/") @@ -218,11 +221,7 @@ def get_profiles() -> List[str]: else: return [] -def main() -> None: - parser = argparse.ArgumentParser(description='Update @distroName@-related systemd-boot files') - parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default @distroName@ config to boot') - args = parser.parse_args() - +def install_bootloader(args: argparse.Namespace) -> None: try: with open("/etc/machine-id") as machine_file: machine_id = machine_file.readlines()[0] @@ -273,21 +272,15 @@ def main() -> None: if available_match is None: raise Exception("could not determine systemd-boot version") - installed_version = version.parse(installed_match.group(1)) - available_version = version.parse(available_match.group(1)) + installed_version = installed_match.group(1) + available_version = available_match.group(1) - # systemd 252 has a regression that leaves some machines unbootable, so we skip that update. - # The fix is in 252.2 - # See https://github.com/systemd/systemd/issues/25363 and https://github.com/NixOS/nixpkgs/pull/201558#issuecomment-1348603263 if installed_version < available_version: - if version.parse('252') <= available_version < version.parse('252.2'): - print("skipping systemd-boot update to %s because of known regression" % available_version) - else: - print("updating systemd-boot from %s to %s" % (installed_version, available_version)) - subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["update"]) + print("updating systemd-boot from %s to %s" % (installed_version, available_version)) + subprocess.check_call(["@systemd@/bin/bootctl", "--esp-path=@efiSysMountPoint@"] + bootctl_flags + ["update"]) - mkdir_p("@efiSysMountPoint@/efi/nixos") - mkdir_p("@efiSysMountPoint@/loader/entries") + os.makedirs("@efiSysMountPoint@/efi/nixos", exist_ok=True) + os.makedirs("@efiSysMountPoint@/loader/entries", exist_ok=True) gens = get_generations() for profile in get_profiles(): @@ -295,10 +288,11 @@ def main() -> None: remove_old_entries(gens) for gen in gens: try: - is_default = os.path.dirname(profile_path(*gen, "init")) == args.default_config - write_entry(*gen, machine_id, current=is_default) - for specialisation in get_specialisations(*gen): - write_entry(*specialisation, machine_id, current=is_default) + bootspec = get_bootspec(gen.profile, gen.generation) + is_default = os.path.dirname(bootspec.init) == args.default_config + write_entry(*gen, machine_id, bootspec, current=is_default) + for specialisation in bootspec.specialisations.keys(): + write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, current=is_default) if is_default: write_loader_conf(*gen) except OSError as e: @@ -324,17 +318,26 @@ def main() -> None: os.rmdir(actual_root) os.rmdir(root) - mkdir_p("@efiSysMountPoint@/efi/nixos/.extra-files") + os.makedirs("@efiSysMountPoint@/efi/nixos/.extra-files", exist_ok=True) subprocess.check_call("@copyExtraFiles@") - # Since fat32 provides little recovery facilities after a crash, - # it can leave the system in an unbootable state, when a crash/outage - # happens shortly after an update. To decrease the likelihood of this - # event sync the efi filesystem after each update. - rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY)) - if rc != 0: - print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr) + +def main() -> None: + parser = argparse.ArgumentParser(description='Update @distroName@-related systemd-boot files') + parser.add_argument('default_config', metavar='DEFAULT-CONFIG', help='The default @distroName@ config to boot') + args = parser.parse_args() + + try: + install_bootloader(args) + finally: + # Since fat32 provides little recovery facilities after a crash, + # it can leave the system in an unbootable state, when a crash/outage + # happens shortly after an update. To decrease the likelihood of this + # event sync the efi filesystem after each update. + rc = libc.syncfs(os.open("@efiSysMountPoint@", os.O_RDONLY)) + if rc != 0: + print("could not sync @efiSysMountPoint@: {}".format(os.strerror(rc)), file=sys.stderr) if __name__ == '__main__': |