diff options
Diffstat (limited to 'nixos/lib/test-driver/test_driver/machine.py')
-rw-r--r-- | nixos/lib/test-driver/test_driver/machine.py | 988 |
1 files changed, 988 insertions, 0 deletions
diff --git a/nixos/lib/test-driver/test_driver/machine.py b/nixos/lib/test-driver/test_driver/machine.py new file mode 100644 index 00000000000..569a0f3c61e --- /dev/null +++ b/nixos/lib/test-driver/test_driver/machine.py @@ -0,0 +1,988 @@ +from contextlib import _GeneratorContextManager +from pathlib import Path +from queue import Queue +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple +import base64 +import io +import os +import queue +import re +import shlex +import shutil +import socket +import subprocess +import sys +import tempfile +import threading +import time + +from test_driver.logger import rootlog + +CHAR_TO_KEY = { + "A": "shift-a", + "N": "shift-n", + "-": "0x0C", + "_": "shift-0x0C", + "B": "shift-b", + "O": "shift-o", + "=": "0x0D", + "+": "shift-0x0D", + "C": "shift-c", + "P": "shift-p", + "[": "0x1A", + "{": "shift-0x1A", + "D": "shift-d", + "Q": "shift-q", + "]": "0x1B", + "}": "shift-0x1B", + "E": "shift-e", + "R": "shift-r", + ";": "0x27", + ":": "shift-0x27", + "F": "shift-f", + "S": "shift-s", + "'": "0x28", + '"': "shift-0x28", + "G": "shift-g", + "T": "shift-t", + "`": "0x29", + "~": "shift-0x29", + "H": "shift-h", + "U": "shift-u", + "\\": "0x2B", + "|": "shift-0x2B", + "I": "shift-i", + "V": "shift-v", + ",": "0x33", + "<": "shift-0x33", + "J": "shift-j", + "W": "shift-w", + ".": "0x34", + ">": "shift-0x34", + "K": "shift-k", + "X": "shift-x", + "/": "0x35", + "?": "shift-0x35", + "L": "shift-l", + "Y": "shift-y", + " ": "spc", + "M": "shift-m", + "Z": "shift-z", + "\n": "ret", + "!": "shift-0x02", + "@": "shift-0x03", + "#": "shift-0x04", + "$": "shift-0x05", + "%": "shift-0x06", + "^": "shift-0x07", + "&": "shift-0x08", + "*": "shift-0x09", + "(": "shift-0x0A", + ")": "shift-0x0B", +} + + +def make_command(args: list) -> str: + return " ".join(map(shlex.quote, (map(str, args)))) + + +def _perform_ocr_on_screenshot( + screenshot_path: str, model_ids: Iterable[int] +) -> List[str]: + if shutil.which("tesseract") is None: + raise Exception("OCR requested but enableOCR is false") + + magick_args = ( + "-filter Catrom -density 72 -resample 300 " + + "-contrast -normalize -despeckle -type grayscale " + + "-sharpen 1 -posterize 3 -negate -gamma 100 " + + "-blur 1x65535" + ) + + tess_args = f"-c debug_file=/dev/null --psm 11" + + cmd = f"convert {magick_args} {screenshot_path} tiff:{screenshot_path}.tiff" + ret = subprocess.run(cmd, shell=True, capture_output=True) + if ret.returncode != 0: + raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") + + model_results = [] + for model_id in model_ids: + cmd = f"tesseract {screenshot_path}.tiff - {tess_args} --oem {model_id}" + ret = subprocess.run(cmd, shell=True, capture_output=True) + if ret.returncode != 0: + raise Exception(f"OCR failed with exit code {ret.returncode}") + model_results.append(ret.stdout.decode("utf-8")) + + return model_results + + +def retry(fn: Callable, timeout: int = 900) -> None: + """Call the given function repeatedly, with 1 second intervals, + until it returns True or a timeout is reached. + """ + + for _ in range(timeout): + if fn(False): + return + time.sleep(1) + + if not fn(True): + raise Exception(f"action timed out after {timeout} seconds") + + +class StartCommand: + """The Base Start Command knows how to append the necesary + runtime qemu options as determined by a particular test driver + run. Any such start command is expected to happily receive and + append additional qemu args. + """ + + _cmd: str + + def cmd( + self, + monitor_socket_path: Path, + shell_socket_path: Path, + allow_reboot: bool = False, # TODO: unused, legacy? + ) -> str: + display_opts = "" + display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) + if not display_available: + display_opts += " -nographic" + + # qemu options + qemu_opts = "" + qemu_opts += ( + "" + if allow_reboot + else " -no-reboot" + " -device virtio-serial" + " -device virtconsole,chardev=shell" + " -device virtio-rng-pci" + " -serial stdio" + ) + # TODO: qemu script already catpures this env variable, legacy? + qemu_opts += " " + os.environ.get("QEMU_OPTS", "") + + return ( + f"{self._cmd}" + f" -monitor unix:{monitor_socket_path}" + f" -chardev socket,id=shell,path={shell_socket_path}" + f"{qemu_opts}" + f"{display_opts}" + ) + + @staticmethod + def build_environment( + state_dir: Path, + shared_dir: Path, + ) -> dict: + # We make a copy to not update the current environment + env = dict(os.environ) + env.update( + { + "TMPDIR": str(state_dir), + "SHARED_DIR": str(shared_dir), + "USE_TMPDIR": "1", + } + ) + return env + + def run( + self, + state_dir: Path, + shared_dir: Path, + monitor_socket_path: Path, + shell_socket_path: Path, + ) -> subprocess.Popen: + return subprocess.Popen( + self.cmd(monitor_socket_path, shell_socket_path), + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + cwd=state_dir, + env=self.build_environment(state_dir, shared_dir), + ) + + +class NixStartScript(StartCommand): + """A start script from nixos/modules/virtualiation/qemu-vm.nix + that also satisfies the requirement of the BaseStartCommand. + These Nix commands have the particular charactersitic that the + machine name can be extracted out of them via a regex match. + (Admittedly a _very_ implicit contract, evtl. TODO fix) + """ + + def __init__(self, script: str): + self._cmd = script + + @property + def machine_name(self) -> str: + match = re.search("run-(.+)-vm$", self._cmd) + name = "machine" + if match: + name = match.group(1) + return name + + +class LegacyStartCommand(StartCommand): + """Used in some places to create an ad-hoc machine instead of + using nix test instrumentation + module system for that purpose. + Legacy. + """ + + def __init__( + self, + netBackendArgs: Optional[str] = None, + netFrontendArgs: Optional[str] = None, + hda: Optional[Tuple[Path, str]] = None, + cdrom: Optional[str] = None, + usb: Optional[str] = None, + bios: Optional[str] = None, + qemuBinary: Optional[str] = None, + qemuFlags: Optional[str] = None, + ): + if qemuBinary is not None: + self._cmd = qemuBinary + else: + self._cmd = "qemu-kvm" + + self._cmd += " -m 384" + + # networking + net_backend = "-netdev user,id=net0" + net_frontend = "-device virtio-net-pci,netdev=net0" + if netBackendArgs is not None: + net_backend += "," + netBackendArgs + if netFrontendArgs is not None: + net_frontend += "," + netFrontendArgs + self._cmd += f" {net_backend} {net_frontend}" + + # hda + hda_cmd = "" + if hda is not None: + hda_path = hda[0].resolve() + hda_interface = hda[1] + if hda_interface == "scsi": + hda_cmd += ( + f" -drive id=hda,file={hda_path},werror=report,if=none" + " -device scsi-hd,drive=hda" + ) + else: + hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" + self._cmd += hda_cmd + + # cdrom + if cdrom is not None: + self._cmd += f" -cdrom {cdrom}" + + # usb + usb_cmd = "" + if usb is not None: + # https://github.com/qemu/qemu/blob/master/docs/usb2.txt + usb_cmd += ( + " -device usb-ehci" + f" -drive id=usbdisk,file={usb},if=none,readonly" + " -device usb-storage,drive=usbdisk " + ) + self._cmd += usb_cmd + + # bios + if bios is not None: + self._cmd += f" -bios {bios}" + + # qemu flags + if qemuFlags is not None: + self._cmd += f" {qemuFlags}" + + +class Machine: + """A handle to the machine with this name, that also knows how to manage + the machine lifecycle with the help of a start script / command.""" + + name: str + out_dir: Path + tmp_dir: Path + shared_dir: Path + state_dir: Path + monitor_path: Path + shell_path: Path + + start_command: StartCommand + keep_vm_state: bool + allow_reboot: bool + + process: Optional[subprocess.Popen] + pid: Optional[int] + monitor: Optional[socket.socket] + shell: Optional[socket.socket] + serial_thread: Optional[threading.Thread] + + booted: bool + connected: bool + # Store last serial console lines for use + # of wait_for_console_text + last_lines: Queue = Queue() + callbacks: List[Callable] + + def __repr__(self) -> str: + return f"<Machine '{self.name}'>" + + def __init__( + self, + out_dir: Path, + tmp_dir: Path, + start_command: StartCommand, + name: str = "machine", + keep_vm_state: bool = False, + allow_reboot: bool = False, + callbacks: Optional[List[Callable]] = None, + ) -> None: + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.keep_vm_state = keep_vm_state + self.allow_reboot = allow_reboot + self.name = name + self.start_command = start_command + self.callbacks = callbacks if callbacks is not None else [] + + # set up directories + self.shared_dir = self.tmp_dir / "shared-xchg" + self.shared_dir.mkdir(mode=0o700, exist_ok=True) + + self.state_dir = self.tmp_dir / f"vm-state-{self.name}" + self.monitor_path = self.state_dir / "monitor" + self.shell_path = self.state_dir / "shell" + if (not self.keep_vm_state) and self.state_dir.exists(): + self.cleanup_statedir() + self.state_dir.mkdir(mode=0o700, exist_ok=True) + + self.process = None + self.pid = None + self.monitor = None + self.shell = None + self.serial_thread = None + + self.booted = False + self.connected = False + + @staticmethod + def create_startcommand(args: Dict[str, str]) -> StartCommand: + rootlog.warning( + "Using legacy create_startcommand()," + "please use proper nix test vm instrumentation, instead" + "to generate the appropriate nixos test vm qemu startup script" + ) + hda = None + if args.get("hda"): + hda_arg: str = args.get("hda", "") + hda_arg_path: Path = Path(hda_arg) + hda = (hda_arg_path, args.get("hdaInterface", "")) + return LegacyStartCommand( + netBackendArgs=args.get("netBackendArgs"), + netFrontendArgs=args.get("netFrontendArgs"), + hda=hda, + cdrom=args.get("cdrom"), + usb=args.get("usb"), + bios=args.get("bios"), + qemuBinary=args.get("qemuBinary"), + qemuFlags=args.get("qemuFlags"), + ) + + def is_up(self) -> bool: + return self.booted and self.connected + + def log(self, msg: str) -> None: + rootlog.log(msg, {"machine": self.name}) + + def log_serial(self, msg: str) -> None: + rootlog.log_serial(msg, self.name) + + def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: + my_attrs = {"machine": self.name} + my_attrs.update(attrs) + return rootlog.nested(msg, my_attrs) + + def wait_for_monitor_prompt(self) -> str: + with self.nested("waiting for monitor prompt"): + assert self.monitor is not None + answer = "" + while True: + undecoded_answer = self.monitor.recv(1024) + if not undecoded_answer: + break + answer += undecoded_answer.decode() + if answer.endswith("(qemu) "): + break + return answer + + def send_monitor_command(self, command: str) -> str: + self.run_callbacks() + with self.nested("sending monitor command: {}".format(command)): + message = ("{}\n".format(command)).encode() + assert self.monitor is not None + self.monitor.send(message) + return self.wait_for_monitor_prompt() + + def wait_for_unit(self, unit: str, user: Optional[str] = None) -> None: + """Wait for a systemd unit to get into "active" state. + Throws exceptions on "failed" and "inactive" states as well as + after timing out. + """ + + def check_active(_: Any) -> bool: + info = self.get_unit_info(unit, user) + state = info["ActiveState"] + if state == "failed": + raise Exception('unit "{}" reached state "{}"'.format(unit, state)) + + if state == "inactive": + status, jobs = self.systemctl("list-jobs --full 2>&1", user) + if "No jobs" in jobs: + info = self.get_unit_info(unit, user) + if info["ActiveState"] == state: + raise Exception( + ( + 'unit "{}" is inactive and there ' "are no pending jobs" + ).format(unit) + ) + + return state == "active" + + with self.nested( + "waiting for unit {}{}".format( + unit, f" with user {user}" if user is not None else "" + ) + ): + retry(check_active) + + def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: + status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user) + if status != 0: + raise Exception( + 'retrieving systemctl info for unit "{}" {} failed with exit code {}'.format( + unit, "" if user is None else 'under user "{}"'.format(user), status + ) + ) + + line_pattern = re.compile(r"^([^=]+)=(.*)$") + + def tuple_from_line(line: str) -> Tuple[str, str]: + match = line_pattern.match(line) + assert match is not None + return match[1], match[2] + + return dict( + tuple_from_line(line) + for line in lines.split("\n") + if line_pattern.match(line) + ) + + def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: + if user is not None: + q = q.replace("'", "\\'") + return self.execute( + ( + "su -l {} --shell /bin/sh -c " + "$'XDG_RUNTIME_DIR=/run/user/`id -u` " + "systemctl --user {}'" + ).format(user, q) + ) + return self.execute("systemctl {}".format(q)) + + def require_unit_state(self, unit: str, require_state: str = "active") -> None: + with self.nested( + "checking if unit ‘{}’ has reached state '{}'".format(unit, require_state) + ): + info = self.get_unit_info(unit) + state = info["ActiveState"] + if state != require_state: + raise Exception( + "Expected unit ‘{}’ to to be in state ".format(unit) + + "'{}' but it is in state ‘{}’".format(require_state, state) + ) + + def _next_newline_closed_block_from_shell(self) -> str: + assert self.shell + output_buffer = [] + while True: + # This receives up to 4096 bytes from the socket + chunk = self.shell.recv(4096) + if not chunk: + # Probably a broken pipe, return the output we have + break + + decoded = chunk.decode() + output_buffer += [decoded] + if decoded[-1] == "\n": + break + return "".join(output_buffer) + + def execute( + self, command: str, check_return: bool = True, timeout: Optional[int] = 900 + ) -> Tuple[int, str]: + self.run_callbacks() + self.connect() + + if timeout is not None: + command = "timeout {} sh -c {}".format(timeout, shlex.quote(command)) + + out_command = f"( set -euo pipefail; {command} ) | (base64 --wrap 0; echo)\n" + assert self.shell + self.shell.send(out_command.encode()) + + # Get the output + output = base64.b64decode(self._next_newline_closed_block_from_shell()) + + if not check_return: + return (-1, output.decode()) + + # Get the return code + self.shell.send("echo ${PIPESTATUS[0]}\n".encode()) + rc = int(self._next_newline_closed_block_from_shell().strip()) + + return (rc, output.decode()) + + def shell_interact(self) -> None: + """Allows you to interact with the guest shell + + Should only be used during test development, not in the production test.""" + self.connect() + self.log("Terminal is ready (there is no initial prompt):") + + assert self.shell + subprocess.run( + ["socat", "READLINE,prompt=$ ", f"FD:{self.shell.fileno()}"], + pass_fds=[self.shell.fileno()], + ) + + def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: + """Execute each command and check that it succeeds.""" + output = "" + for command in commands: + with self.nested("must succeed: {}".format(command)): + (status, out) = self.execute(command, timeout=timeout) + if status != 0: + self.log("output: {}".format(out)) + raise Exception( + "command `{}` failed (exit code {})".format(command, status) + ) + output += out + return output + + def fail(self, *commands: str, timeout: Optional[int] = None) -> str: + """Execute each command and check that it fails.""" + output = "" + for command in commands: + with self.nested("must fail: {}".format(command)): + (status, out) = self.execute(command, timeout=timeout) + if status == 0: + raise Exception( + "command `{}` unexpectedly succeeded".format(command) + ) + output += out + return output + + def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: + """Wait until a command returns success and return its output. + Throws an exception on timeout. + """ + output = "" + + def check_success(_: Any) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status == 0 + + with self.nested("waiting for success: {}".format(command)): + retry(check_success, timeout) + return output + + def wait_until_fails(self, command: str, timeout: int = 900) -> str: + """Wait until a command returns failure. + Throws an exception on timeout. + """ + output = "" + + def check_failure(_: Any) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status != 0 + + with self.nested("waiting for failure: {}".format(command)): + retry(check_failure) + return output + + def wait_for_shutdown(self) -> None: + if not self.booted: + return + + with self.nested("waiting for the VM to power off"): + sys.stdout.flush() + assert self.process + self.process.wait() + + self.pid = None + self.booted = False + self.connected = False + + def get_tty_text(self, tty: str) -> str: + status, output = self.execute( + "fold -w$(stty -F /dev/tty{0} size | " + "awk '{{print $2}}') /dev/vcs{0}".format(tty) + ) + return output + + def wait_until_tty_matches(self, tty: str, regexp: str) -> None: + """Wait until the visible output on the chosen TTY matches regular + expression. Throws an exception on timeout. + """ + matcher = re.compile(regexp) + + def tty_matches(last: bool) -> bool: + text = self.get_tty_text(tty) + if last: + self.log( + f"Last chance to match /{regexp}/ on TTY{tty}, " + f"which currently contains: {text}" + ) + return len(matcher.findall(text)) > 0 + + with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)): + retry(tty_matches) + + def send_chars(self, chars: List[str]) -> None: + with self.nested("sending keys ‘{}‘".format(chars)): + for char in chars: + self.send_key(char) + + def wait_for_file(self, filename: str) -> None: + """Waits until the file exists in machine's file system.""" + + def check_file(_: Any) -> bool: + status, _ = self.execute("test -e {}".format(filename)) + return status == 0 + + with self.nested("waiting for file ‘{}‘".format(filename)): + retry(check_file) + + def wait_for_open_port(self, port: int) -> None: + def port_is_open(_: Any) -> bool: + status, _ = self.execute("nc -z localhost {}".format(port)) + return status == 0 + + with self.nested("waiting for TCP port {}".format(port)): + retry(port_is_open) + + def wait_for_closed_port(self, port: int) -> None: + def port_is_closed(_: Any) -> bool: + status, _ = self.execute("nc -z localhost {}".format(port)) + return status != 0 + + with self.nested("waiting for TCP port {} to be closed"): + retry(port_is_closed) + + def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: + return self.systemctl("start {}".format(jobname), user) + + def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: + return self.systemctl("stop {}".format(jobname), user) + + def wait_for_job(self, jobname: str) -> None: + self.wait_for_unit(jobname) + + def connect(self) -> None: + if self.connected: + return + + with self.nested("waiting for the VM to finish booting"): + self.start() + + assert self.shell + + tic = time.time() + self.shell.recv(1024) + # TODO: Timeout + toc = time.time() + + self.log("connected to guest root shell") + self.log("(connecting took {:.2f} seconds)".format(toc - tic)) + self.connected = True + + def screenshot(self, filename: str) -> None: + word_pattern = re.compile(r"^\w+$") + if word_pattern.match(filename): + filename = os.path.join(self.out_dir, "{}.png".format(filename)) + tmp = "{}.ppm".format(filename) + + with self.nested( + "making screenshot {}".format(filename), + {"image": os.path.basename(filename)}, + ): + self.send_monitor_command("screendump {}".format(tmp)) + ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True) + os.unlink(tmp) + if ret.returncode != 0: + raise Exception("Cannot convert screenshot") + + def copy_from_host_via_shell(self, source: str, target: str) -> None: + """Copy a file from the host into the guest by piping it over the + shell into the destination file. Works without host-guest shared folder. + Prefer copy_from_host for whenever possible. + """ + with open(source, "rb") as fh: + content_b64 = base64.b64encode(fh.read()).decode() + self.succeed( + f"mkdir -p $(dirname {target})", + f"echo -n {content_b64} | base64 -d > {target}", + ) + + def copy_from_host(self, source: str, target: str) -> None: + """Copy a file from the host into the guest via the `shared_dir` shared + among all the VMs (using a temporary directory). + """ + host_src = Path(source) + vm_target = Path(target) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + host_intermediate = shared_temp / host_src.name + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / host_src.name + + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + if host_src.is_dir(): + shutil.copytree(host_src, host_intermediate) + else: + shutil.copy(host_src, host_intermediate) + self.succeed(make_command(["mkdir", "-p", vm_target.parent])) + self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) + + def copy_from_vm(self, source: str, target_dir: str = "") -> None: + """Copy a file from the VM (specified by an in-VM source path) to a path + relative to `$out`. The file is copied via the `shared_dir` shared among + all the VMs (using a temporary directory). + """ + # Compute the source, target, and intermediate shared file names + vm_src = Path(source) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / vm_src.name + intermediate = shared_temp / vm_src.name + # Copy the file to the shared directory inside VM + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) + abs_target = self.out_dir / target_dir / vm_src.name + abs_target.parent.mkdir(exist_ok=True, parents=True) + # Copy the file from the shared directory outside VM + if intermediate.is_dir(): + shutil.copytree(intermediate, abs_target) + else: + shutil.copy(intermediate, abs_target) + + def dump_tty_contents(self, tty: str) -> None: + """Debugging: Dump the contents of the TTY<n>""" + self.execute("fold -w 80 /dev/vcs{} | systemd-cat".format(tty)) + + def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: + with tempfile.TemporaryDirectory() as tmpdir: + screenshot_path = os.path.join(tmpdir, "ppm") + self.send_monitor_command(f"screendump {screenshot_path}") + return _perform_ocr_on_screenshot(screenshot_path, model_ids) + + def get_screen_text_variants(self) -> List[str]: + return self._get_screen_text_variants([0, 1, 2]) + + def get_screen_text(self) -> str: + return self._get_screen_text_variants([2])[0] + + def wait_for_text(self, regex: str) -> None: + def screen_matches(last: bool) -> bool: + variants = self.get_screen_text_variants() + for text in variants: + if re.search(regex, text) is not None: + return True + + if last: + self.log("Last OCR attempt failed. Text was: {}".format(variants)) + + return False + + with self.nested("waiting for {} to appear on screen".format(regex)): + retry(screen_matches) + + def wait_for_console_text(self, regex: str) -> None: + with self.nested("waiting for {} to appear on console".format(regex)): + # Buffer the console output, this is needed + # to match multiline regexes. + console = io.StringIO() + while True: + try: + console.write(self.last_lines.get()) + except queue.Empty: + self.sleep(1) + continue + console.seek(0) + matches = re.search(regex, console.read()) + if matches is not None: + return + + def send_key(self, key: str) -> None: + key = CHAR_TO_KEY.get(key, key) + self.send_monitor_command("sendkey {}".format(key)) + time.sleep(0.01) + + def start(self) -> None: + if self.booted: + return + + self.log("starting vm") + + def clear(path: Path) -> Path: + if path.exists(): + path.unlink() + return path + + def create_socket(path: Path) -> socket.socket: + s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + s.bind(str(path)) + s.listen(1) + return s + + monitor_socket = create_socket(clear(self.monitor_path)) + shell_socket = create_socket(clear(self.shell_path)) + self.process = self.start_command.run( + self.state_dir, + self.shared_dir, + self.monitor_path, + self.shell_path, + ) + self.monitor, _ = monitor_socket.accept() + self.shell, _ = shell_socket.accept() + + # Store last serial console lines for use + # of wait_for_console_text + self.last_lines: Queue = Queue() + + def process_serial_output() -> None: + assert self.process + assert self.process.stdout + for _line in self.process.stdout: + # Ignore undecodable bytes that may occur in boot menus + line = _line.decode(errors="ignore").replace("\r", "").rstrip() + self.last_lines.put(line) + self.log_serial(line) + + self.serial_thread = threading.Thread(target=process_serial_output) + self.serial_thread.start() + + self.wait_for_monitor_prompt() + + self.pid = self.process.pid + self.booted = True + + self.log("QEMU running (pid {})".format(self.pid)) + + def cleanup_statedir(self) -> None: + shutil.rmtree(self.state_dir) + rootlog.log(f"deleting VM state directory {self.state_dir}") + rootlog.log("if you want to keep the VM state, pass --keep-vm-state") + + def shutdown(self) -> None: + if not self.booted: + return + + assert self.shell + self.shell.send("poweroff\n".encode()) + self.wait_for_shutdown() + + def crash(self) -> None: + if not self.booted: + return + + self.log("forced crash") + self.send_monitor_command("quit") + self.wait_for_shutdown() + + def wait_for_x(self) -> None: + """Wait until it is possible to connect to the X server. Note that + testing the existence of /tmp/.X11-unix/X0 is insufficient. + """ + + def check_x(_: Any) -> bool: + cmd = ( + "journalctl -b SYSLOG_IDENTIFIER=systemd | " + + 'grep "Reached target Current graphical"' + ) + status, _ = self.execute(cmd) + if status != 0: + return False + status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") + return status == 0 + + with self.nested("waiting for the X11 server"): + retry(check_x) + + def get_window_names(self) -> List[str]: + return self.succeed( + r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" + ).splitlines() + + def wait_for_window(self, regexp: str) -> None: + pattern = re.compile(regexp) + + def window_is_visible(last_try: bool) -> bool: + names = self.get_window_names() + if last_try: + self.log( + "Last chance to match {} on the window list,".format(regexp) + + " which currently contains: " + + ", ".join(names) + ) + return any(pattern.search(name) for name in names) + + with self.nested("waiting for a window to appear"): + retry(window_is_visible) + + def sleep(self, secs: int) -> None: + # We want to sleep in *guest* time, not *host* time. + self.succeed(f"sleep {secs}") + + def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: + """Forward a TCP port on the host to a TCP port on the guest. + Useful during interactive testing. + """ + self.send_monitor_command( + "hostfwd_add tcp::{}-:{}".format(host_port, guest_port) + ) + + def block(self) -> None: + """Make the machine unreachable by shutting down eth1 (the multicast + interface used to talk to the other VMs). We keep eth0 up so that + the test driver can continue to talk to the machine. + """ + self.send_monitor_command("set_link virtio-net-pci.1 off") + + def unblock(self) -> None: + """Make the machine reachable.""" + self.send_monitor_command("set_link virtio-net-pci.1 on") + + def release(self) -> None: + if self.pid is None: + return + rootlog.info(f"kill machine (pid {self.pid})") + assert self.process + assert self.shell + assert self.monitor + assert self.serial_thread + + self.process.terminate() + self.shell.close() + self.monitor.close() + self.serial_thread.join() + + def run_callbacks(self) -> None: + for callback in self.callbacks: + callback() |