diff options
Diffstat (limited to 'nixos/lib/test-driver/test-driver.py')
-rwxr-xr-x | nixos/lib/test-driver/test-driver.py | 95 |
1 files changed, 54 insertions, 41 deletions
diff --git a/nixos/lib/test-driver/test-driver.py b/nixos/lib/test-driver/test-driver.py index 643446f313e..90c9e9be45c 100755 --- a/nixos/lib/test-driver/test-driver.py +++ b/nixos/lib/test-driver/test-driver.py @@ -171,7 +171,7 @@ class Logger: yield self.drain_log_queue() toc = time.time() - self.log("({:.2f} seconds)".format(toc - tic)) + self.log("(finished: {}, in {:.2f} seconds)".format(message, toc - tic)) self.xml.endElement("nest") @@ -490,23 +490,24 @@ class Machine: return rootlog.nested(msg, my_attrs) def wait_for_monitor_prompt(self) -> str: - assert self.monitor is not None - answer = "" - while True: - undecoded_answer = self.monitor.recv(1024) - if not undecoded_answer: - break - answer += undecoded_answer.decode() - if answer.endswith("(qemu) "): - break - return answer + with self.nested("waiting for monitor prompt"): + assert self.monitor is not None + answer = "" + while True: + undecoded_answer = self.monitor.recv(1024) + if not undecoded_answer: + break + answer += undecoded_answer.decode() + if answer.endswith("(qemu) "): + break + return answer def send_monitor_command(self, command: str) -> str: - message = ("{}\n".format(command)).encode() - self.log("sending monitor command: {}".format(command)) - assert self.monitor is not None - self.monitor.send(message) - return self.wait_for_monitor_prompt() + with self.nested("sending monitor command: {}".format(command)): + message = ("{}\n".format(command)).encode() + assert self.monitor is not None + self.monitor.send(message) + return self.wait_for_monitor_prompt() def wait_for_unit(self, unit: str, user: Optional[str] = None) -> None: """Wait for a systemd unit to get into "active" state. @@ -533,7 +534,12 @@ class Machine: return state == "active" - retry(check_active) + with self.nested( + "waiting for unit {}{}".format( + unit, f" with user {user}" if user is not None else "" + ) + ): + retry(check_active) def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user) @@ -597,9 +603,14 @@ class Machine: break return "".join(output_buffer) - def execute(self, command: str, check_return: bool = True) -> Tuple[int, str]: + def execute( + self, command: str, check_return: bool = True, timeout: Optional[int] = 900 + ) -> Tuple[int, str]: self.connect() + if timeout is not None: + command = "timeout {} sh -c {}".format(timeout, shlex.quote(command)) + out_command = f"( set -euo pipefail; {command} ) | (base64 --wrap 0; echo)\n" assert self.shell self.shell.send(out_command.encode()) @@ -629,12 +640,12 @@ class Machine: pass_fds=[self.shell.fileno()], ) - def succeed(self, *commands: str) -> str: + def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: """Execute each command and check that it succeeds.""" output = "" for command in commands: with self.nested("must succeed: {}".format(command)): - (status, out) = self.execute(command) + (status, out) = self.execute(command, timeout=timeout) if status != 0: self.log("output: {}".format(out)) raise Exception( @@ -643,12 +654,12 @@ class Machine: output += out return output - def fail(self, *commands: str) -> str: + def fail(self, *commands: str, timeout: Optional[int] = None) -> str: """Execute each command and check that it fails.""" output = "" for command in commands: with self.nested("must fail: {}".format(command)): - (status, out) = self.execute(command) + (status, out) = self.execute(command, timeout=timeout) if status == 0: raise Exception( "command `{}` unexpectedly succeeded".format(command) @@ -664,14 +675,14 @@ class Machine: def check_success(_: Any) -> bool: nonlocal output - status, output = self.execute(command) + status, output = self.execute(command, timeout=timeout) return status == 0 with self.nested("waiting for success: {}".format(command)): retry(check_success, timeout) return output - def wait_until_fails(self, command: str) -> str: + def wait_until_fails(self, command: str, timeout: int = 900) -> str: """Wait until a command returns failure. Throws an exception on timeout. """ @@ -679,7 +690,7 @@ class Machine: def check_failure(_: Any) -> bool: nonlocal output - status, output = self.execute(command) + status, output = self.execute(command, timeout=timeout) return status != 0 with self.nested("waiting for failure: {}".format(command)): @@ -752,7 +763,8 @@ class Machine: status, _ = self.execute("nc -z localhost {}".format(port)) return status != 0 - retry(port_is_closed) + with self.nested("waiting for TCP port {} to be closed"): + retry(port_is_closed) def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: return self.systemctl("start {}".format(jobname), user) @@ -886,24 +898,25 @@ class Machine: retry(screen_matches) def wait_for_console_text(self, regex: str) -> None: - self.log("waiting for {} to appear on console".format(regex)) - # Buffer the console output, this is needed - # to match multiline regexes. - console = io.StringIO() - while True: - try: - console.write(self.last_lines.get()) - except queue.Empty: - self.sleep(1) - continue - console.seek(0) - matches = re.search(regex, console.read()) - if matches is not None: - return + with self.nested("waiting for {} to appear on console".format(regex)): + # Buffer the console output, this is needed + # to match multiline regexes. + console = io.StringIO() + while True: + try: + console.write(self.last_lines.get()) + except queue.Empty: + self.sleep(1) + continue + console.seek(0) + matches = re.search(regex, console.read()) + if matches is not None: + return def send_key(self, key: str) -> None: key = CHAR_TO_KEY.get(key, key) self.send_monitor_command("sendkey {}".format(key)) + time.sleep(0.01) def start(self) -> None: if self.booted: @@ -1014,7 +1027,7 @@ class Machine: ) return any(pattern.search(name) for name in names) - with self.nested("Waiting for a window to appear"): + with self.nested("waiting for a window to appear"): retry(window_is_visible) def sleep(self, secs: int) -> None: |