summary refs log tree commit diff
diff options
context:
space:
mode:
authorJacek Galowicz <jacek@galowicz.de>2022-12-30 11:38:10 +0100
committerGitHub <noreply@github.com>2022-12-30 11:38:10 +0100
commit2d626a8fc5515917d46a73eb4e2b71525a88b524 (patch)
treec8b2b8dfa6d073ef8f041d6aae1349fd4435255a
parent9a56a9a8a1f446cb6f93d59ae584e591401aaea0 (diff)
parentce5644f658ade7837fc37193bf41c8e64db7d18f (diff)
downloadnixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar.gz
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar.bz2
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar.lz
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar.xz
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.tar.zst
nixpkgs-2d626a8fc5515917d46a73eb4e2b71525a88b524.zip
Merge pull request #208278 from bjornfor/nixos-test-driver-use-f-string
nixos/test-driver: use f-strings instead of .format()
-rwxr-xr-xnixos/lib/test-driver/test_driver/__init__.py6
-rw-r--r--nixos/lib/test-driver/test_driver/driver.py8
-rw-r--r--nixos/lib/test-driver/test_driver/logger.py8
-rw-r--r--nixos/lib/test-driver/test_driver/machine.py113
4 files changed, 57 insertions, 78 deletions
diff --git a/nixos/lib/test-driver/test_driver/__init__.py b/nixos/lib/test-driver/test_driver/__init__.py
index 61d91c9ed65..db7e0ed33a8 100755
--- a/nixos/lib/test-driver/test_driver/__init__.py
+++ b/nixos/lib/test-driver/test_driver/__init__.py
@@ -41,11 +41,9 @@ def writeable_dir(arg: str) -> Path:
     """
     path = Path(arg)
     if not path.is_dir():
-        raise argparse.ArgumentTypeError("{0} is not a directory".format(path))
+        raise argparse.ArgumentTypeError(f"{path} is not a directory")
     if not os.access(path, os.W_OK):
-        raise argparse.ArgumentTypeError(
-            "{0} is not a writeable directory".format(path)
-        )
+        raise argparse.ArgumentTypeError(f"{path} is not a writeable directory")
     return path
 
 
diff --git a/nixos/lib/test-driver/test_driver/driver.py b/nixos/lib/test-driver/test_driver/driver.py
index 6542a2e2f69..de6abbb4679 100644
--- a/nixos/lib/test-driver/test_driver/driver.py
+++ b/nixos/lib/test-driver/test_driver/driver.py
@@ -19,15 +19,11 @@ def get_tmp_dir() -> Path:
     tmp_dir.mkdir(mode=0o700, exist_ok=True)
     if not tmp_dir.is_dir():
         raise NotADirectoryError(
-            "The directory defined by TMPDIR, TEMP, TMP or CWD: {0} is not a directory".format(
-                tmp_dir
-            )
+            f"The directory defined by TMPDIR, TEMP, TMP or CWD: {tmp_dir} is not a directory"
         )
     if not os.access(tmp_dir, os.W_OK):
         raise PermissionError(
-            "The directory defined by TMPDIR, TEMP, TMP, or CWD: {0} is not writeable".format(
-                tmp_dir
-            )
+            f"The directory defined by TMPDIR, TEMP, TMP, or CWD: {tmp_dir} is not writeable"
         )
     return tmp_dir
 
diff --git a/nixos/lib/test-driver/test_driver/logger.py b/nixos/lib/test-driver/test_driver/logger.py
index 59ed2954723..e6182ff7c76 100644
--- a/nixos/lib/test-driver/test_driver/logger.py
+++ b/nixos/lib/test-driver/test_driver/logger.py
@@ -36,7 +36,7 @@ class Logger:
 
     def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
         if "machine" in attributes:
-            return "{}: {}".format(attributes["machine"], message)
+            return f"{attributes['machine']}: {message}"
         return message
 
     def log_line(self, message: str, attributes: Dict[str, str]) -> None:
@@ -62,9 +62,7 @@ class Logger:
     def log_serial(self, message: str, machine: str) -> None:
         self.enqueue({"msg": message, "machine": machine, "type": "serial"})
         if self._print_serial_logs:
-            self._eprint(
-                Style.DIM + "{} # {}".format(machine, message) + Style.RESET_ALL
-            )
+            self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL)
 
     def enqueue(self, item: Dict[str, str]) -> None:
         self.queue.put(item)
@@ -97,7 +95,7 @@ class Logger:
         yield
         self.drain_log_queue()
         toc = time.time()
-        self.log("(finished: {}, in {:.2f} seconds)".format(message, toc - tic))
+        self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
 
         self.xml.endElement("nest")
 
diff --git a/nixos/lib/test-driver/test_driver/machine.py b/nixos/lib/test-driver/test_driver/machine.py
index c59ef3b1726..cbf3aa733c4 100644
--- a/nixos/lib/test-driver/test_driver/machine.py
+++ b/nixos/lib/test-driver/test_driver/machine.py
@@ -420,8 +420,8 @@ class Machine:
 
     def send_monitor_command(self, command: str) -> str:
         self.run_callbacks()
-        with self.nested("sending monitor command: {}".format(command)):
-            message = ("{}\n".format(command)).encode()
+        with self.nested(f"sending monitor command: {command}"):
+            message = f"{command}\n".encode()
             assert self.monitor is not None
             self.monitor.send(message)
             return self.wait_for_monitor_prompt()
@@ -438,7 +438,7 @@ class Machine:
             info = self.get_unit_info(unit, user)
             state = info["ActiveState"]
             if state == "failed":
-                raise Exception('unit "{}" reached state "{}"'.format(unit, state))
+                raise Exception(f'unit "{unit}" reached state "{state}"')
 
             if state == "inactive":
                 status, jobs = self.systemctl("list-jobs --full 2>&1", user)
@@ -446,27 +446,24 @@ class Machine:
                     info = self.get_unit_info(unit, user)
                     if info["ActiveState"] == state:
                         raise Exception(
-                            (
-                                'unit "{}" is inactive and there ' "are no pending jobs"
-                            ).format(unit)
+                            f'unit "{unit}" is inactive and there are no pending jobs'
                         )
 
             return state == "active"
 
         with self.nested(
-            "waiting for unit {}{}".format(
-                unit, f" with user {user}" if user is not None else ""
-            )
+            f"waiting for unit {unit}"
+            + (f" with user {user}" if user is not None else "")
         ):
             retry(check_active, timeout)
 
     def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
-        status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
+        status, lines = self.systemctl(f'--no-pager show "{unit}"', user)
         if status != 0:
             raise Exception(
-                'retrieving systemctl info for unit "{}" {} failed with exit code {}'.format(
-                    unit, "" if user is None else 'under user "{}"'.format(user), status
-                )
+                f'retrieving systemctl info for unit "{unit}"'
+                + ("" if user is None else f' under user "{user}"')
+                + f" failed with exit code {status}"
             )
 
         line_pattern = re.compile(r"^([^=]+)=(.*)$")
@@ -486,24 +483,22 @@ class Machine:
         if user is not None:
             q = q.replace("'", "\\'")
             return self.execute(
-                (
-                    "su -l {} --shell /bin/sh -c "
-                    "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
-                    "systemctl --user {}'"
-                ).format(user, q)
+                f"su -l {user} --shell /bin/sh -c "
+                "$'XDG_RUNTIME_DIR=/run/user/`id -u` "
+                f"systemctl --user {q}'"
             )
-        return self.execute("systemctl {}".format(q))
+        return self.execute(f"systemctl {q}")
 
     def require_unit_state(self, unit: str, require_state: str = "active") -> None:
         with self.nested(
-            "checking if unit ‘{}’ has reached state '{}'".format(unit, require_state)
+            f"checking if unit ‘{unit}’ has reached state '{require_state}'"
         ):
             info = self.get_unit_info(unit)
             state = info["ActiveState"]
             if state != require_state:
                 raise Exception(
-                    "Expected unit ‘{}’ to to be in state ".format(unit)
-                    + "'{}' but it is in state ‘{}’".format(require_state, state)
+                    f"Expected unit ‘{unit}’ to to be in state "
+                    f"'{require_state}' but it is in state ‘{state}’"
                 )
 
     def _next_newline_closed_block_from_shell(self) -> str:
@@ -593,13 +588,11 @@ class Machine:
         """Execute each command and check that it succeeds."""
         output = ""
         for command in commands:
-            with self.nested("must succeed: {}".format(command)):
+            with self.nested(f"must succeed: {command}"):
                 (status, out) = self.execute(command, timeout=timeout)
                 if status != 0:
-                    self.log("output: {}".format(out))
-                    raise Exception(
-                        "command `{}` failed (exit code {})".format(command, status)
-                    )
+                    self.log(f"output: {out}")
+                    raise Exception(f"command `{command}` failed (exit code {status})")
                 output += out
         return output
 
@@ -607,12 +600,10 @@ class Machine:
         """Execute each command and check that it fails."""
         output = ""
         for command in commands:
-            with self.nested("must fail: {}".format(command)):
+            with self.nested(f"must fail: {command}"):
                 (status, out) = self.execute(command, timeout=timeout)
                 if status == 0:
-                    raise Exception(
-                        "command `{}` unexpectedly succeeded".format(command)
-                    )
+                    raise Exception(f"command `{command}` unexpectedly succeeded")
                 output += out
         return output
 
@@ -627,7 +618,7 @@ class Machine:
             status, output = self.execute(command, timeout=timeout)
             return status == 0
 
-        with self.nested("waiting for success: {}".format(command)):
+        with self.nested(f"waiting for success: {command}"):
             retry(check_success, timeout)
             return output
 
@@ -642,7 +633,7 @@ class Machine:
             status, output = self.execute(command, timeout=timeout)
             return status != 0
 
-        with self.nested("waiting for failure: {}".format(command)):
+        with self.nested(f"waiting for failure: {command}"):
             retry(check_failure)
             return output
 
@@ -661,8 +652,8 @@ class Machine:
 
     def get_tty_text(self, tty: str) -> str:
         status, output = self.execute(
-            "fold -w$(stty -F /dev/tty{0} size | "
-            "awk '{{print $2}}') /dev/vcs{0}".format(tty)
+            f"fold -w$(stty -F /dev/tty{tty} size | "
+            f"awk '{{print $2}}') /dev/vcs{tty}"
         )
         return output
 
@@ -681,11 +672,11 @@ class Machine:
                 )
             return len(matcher.findall(text)) > 0
 
-        with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
+        with self.nested(f"waiting for {regexp} to appear on tty {tty}"):
             retry(tty_matches)
 
     def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
-        with self.nested("sending keys ‘{}‘".format(chars)):
+        with self.nested(f"sending keys ‘{chars}‘"):
             for char in chars:
                 self.send_key(char, delay)
 
@@ -693,35 +684,33 @@ class Machine:
         """Waits until the file exists in machine's file system."""
 
         def check_file(_: Any) -> bool:
-            status, _ = self.execute("test -e {}".format(filename))
+            status, _ = self.execute(f"test -e {filename}")
             return status == 0
 
-        with self.nested("waiting for file ‘{}‘".format(filename)):
+        with self.nested(f"waiting for file ‘{filename}‘"):
             retry(check_file)
 
     def wait_for_open_port(self, port: int, addr: str = "localhost") -> None:
         def port_is_open(_: Any) -> bool:
-            status, _ = self.execute("nc -z {} {}".format(addr, port))
+            status, _ = self.execute(f"nc -z {addr} {port}")
             return status == 0
 
-        with self.nested("waiting for TCP port {} on {}".format(port, addr)):
+        with self.nested(f"waiting for TCP port {port} on {addr}"):
             retry(port_is_open)
 
     def wait_for_closed_port(self, port: int, addr: str = "localhost") -> None:
         def port_is_closed(_: Any) -> bool:
-            status, _ = self.execute("nc -z {} {}".format(addr, port))
+            status, _ = self.execute(f"nc -z {addr} {port}")
             return status != 0
 
-        with self.nested(
-            "waiting for TCP port {} on {} to be closed".format(port, addr)
-        ):
+        with self.nested(f"waiting for TCP port {port} on {addr} to be closed"):
             retry(port_is_closed)
 
     def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
-        return self.systemctl("start {}".format(jobname), user)
+        return self.systemctl(f"start {jobname}", user)
 
     def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
-        return self.systemctl("stop {}".format(jobname), user)
+        return self.systemctl(f"stop {jobname}", user)
 
     def wait_for_job(self, jobname: str) -> None:
         self.wait_for_unit(jobname)
@@ -741,21 +730,21 @@ class Machine:
             toc = time.time()
 
             self.log("connected to guest root shell")
-            self.log("(connecting took {:.2f} seconds)".format(toc - tic))
+            self.log(f"(connecting took {toc - tic:.2f} seconds)")
             self.connected = True
 
     def screenshot(self, filename: str) -> None:
         word_pattern = re.compile(r"^\w+$")
         if word_pattern.match(filename):
-            filename = os.path.join(self.out_dir, "{}.png".format(filename))
-        tmp = "{}.ppm".format(filename)
+            filename = os.path.join(self.out_dir, f"{filename}.png")
+        tmp = f"{filename}.ppm"
 
         with self.nested(
-            "making screenshot {}".format(filename),
+            f"making screenshot {filename}",
             {"image": os.path.basename(filename)},
         ):
-            self.send_monitor_command("screendump {}".format(tmp))
-            ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
+            self.send_monitor_command(f"screendump {tmp}")
+            ret = subprocess.run(f"pnmtopng {tmp} > {filename}", shell=True)
             os.unlink(tmp)
             if ret.returncode != 0:
                 raise Exception("Cannot convert screenshot")
@@ -817,7 +806,7 @@ class Machine:
 
     def dump_tty_contents(self, tty: str) -> None:
         """Debugging: Dump the contents of the TTY<n>"""
-        self.execute("fold -w 80 /dev/vcs{} | systemd-cat".format(tty))
+        self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat")
 
     def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]:
         with tempfile.TemporaryDirectory() as tmpdir:
@@ -839,15 +828,15 @@ class Machine:
                     return True
 
             if last:
-                self.log("Last OCR attempt failed. Text was: {}".format(variants))
+                self.log(f"Last OCR attempt failed. Text was: {variants}")
 
             return False
 
-        with self.nested("waiting for {} to appear on screen".format(regex)):
+        with self.nested(f"waiting for {regex} to appear on screen"):
             retry(screen_matches)
 
     def wait_for_console_text(self, regex: str) -> None:
-        with self.nested("waiting for {} to appear on console".format(regex)):
+        with self.nested(f"waiting for {regex} to appear on console"):
             # Buffer the console output, this is needed
             # to match multiline regexes.
             console = io.StringIO()
@@ -864,7 +853,7 @@ class Machine:
 
     def send_key(self, key: str, delay: Optional[float] = 0.01) -> None:
         key = CHAR_TO_KEY.get(key, key)
-        self.send_monitor_command("sendkey {}".format(key))
+        self.send_monitor_command(f"sendkey {key}")
         if delay is not None:
             time.sleep(delay)
 
@@ -923,7 +912,7 @@ class Machine:
         self.pid = self.process.pid
         self.booted = True
 
-        self.log("QEMU running (pid {})".format(self.pid))
+        self.log(f"QEMU running (pid {self.pid})")
 
     def cleanup_statedir(self) -> None:
         shutil.rmtree(self.state_dir)
@@ -977,7 +966,7 @@ class Machine:
             names = self.get_window_names()
             if last_try:
                 self.log(
-                    "Last chance to match {} on the window list,".format(regexp)
+                    f"Last chance to match {regexp} on the window list,"
                     + " which currently contains: "
                     + ", ".join(names)
                 )
@@ -994,9 +983,7 @@ class Machine:
         """Forward a TCP port on the host to a TCP port on the guest.
         Useful during interactive testing.
         """
-        self.send_monitor_command(
-            "hostfwd_add tcp::{}-:{}".format(host_port, guest_port)
-        )
+        self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}")
 
     def block(self) -> None:
         """Make the machine unreachable by shutting down eth1 (the multicast