summary refs log tree commit diff
path: root/nixos/lib
diff options
context:
space:
mode:
authorrnhmjoj <rnhmjoj@inventati.org>2020-06-13 12:04:05 +0200
committerrnhmjoj <rnhmjoj@inventati.org>2020-06-24 10:22:53 +0200
commitb520055df6c9913ef4ddbc931a1635516433a7a9 (patch)
tree86c886b7f99e53cdb66f098ad1c5a5e0623920e7 /nixos/lib
parent37ec7c488a3579a868014f53b87f669bf65ed83f (diff)
downloadnixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar.gz
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar.bz2
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar.lz
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar.xz
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.tar.zst
nixpkgs-b520055df6c9913ef4ddbc931a1635516433a7a9.zip
nixos/lib/test-driver: add wait_for_console_text
This method is similar to wait_for_text but is based on matching
serial console lines instead of the VGA output.
Diffstat (limited to 'nixos/lib')
-rw-r--r--nixos/lib/test-driver/test-driver.py23
1 files changed, 23 insertions, 0 deletions
diff --git a/nixos/lib/test-driver/test-driver.py b/nixos/lib/test-driver/test-driver.py
index e7b05968b07..0c1946387ae 100644
--- a/nixos/lib/test-driver/test-driver.py
+++ b/nixos/lib/test-driver/test-driver.py
@@ -3,6 +3,8 @@ from contextlib import contextmanager, _GeneratorContextManager
 from queue import Queue, Empty
 from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List
 from xml.sax.saxutils import XMLGenerator
+import queue
+import io
 import _thread
 import atexit
 import base64
@@ -671,6 +673,22 @@ class Machine:
         with self.nested("waiting for {} to appear on screen".format(regex)):
             retry(screen_matches)
 
+    def wait_for_console_text(self, regex: str) -> None:
+        self.log("waiting for {} to appear on console".format(regex))
+        # Buffer the console output, this is needed
+        # to match multiline regexes.
+        console = io.StringIO()
+        while True:
+            try:
+                console.write(self.last_lines.get())
+            except queue.Empty:
+                self.sleep(1)
+                continue
+            console.seek(0)
+            matches = re.search(regex, console.read())
+            if matches is not None:
+                return
+
     def send_key(self, key: str) -> None:
         key = CHAR_TO_KEY.get(key, key)
         self.send_monitor_command("sendkey {}".format(key))
@@ -734,11 +752,16 @@ class Machine:
         self.monitor, _ = self.monitor_socket.accept()
         self.shell, _ = self.shell_socket.accept()
 
+        # Store last serial console lines for use
+        # of wait_for_console_text
+        self.last_lines: Queue = Queue()
+
         def process_serial_output() -> None:
             assert self.process.stdout is not None
             for _line in self.process.stdout:
                 # Ignore undecodable bytes that may occur in boot menus
                 line = _line.decode(errors="ignore").replace("\r", "").rstrip()
+                self.last_lines.put(line)
                 eprint("{} # {}".format(self.name, line))
                 self.logger.enqueue({"msg": line, "machine": self.name})