summary refs log tree commit diff
path: root/nixos/lib/test-driver/test_driver/logger.py
diff options
context:
space:
mode:
Diffstat (limited to 'nixos/lib/test-driver/test_driver/logger.py')
-rw-r--r--nixos/lib/test-driver/test_driver/logger.py101
1 files changed, 101 insertions, 0 deletions
diff --git a/nixos/lib/test-driver/test_driver/logger.py b/nixos/lib/test-driver/test_driver/logger.py
new file mode 100644
index 00000000000..5b3091a5129
--- /dev/null
+++ b/nixos/lib/test-driver/test_driver/logger.py
@@ -0,0 +1,101 @@
+from colorama import Style
+from contextlib import contextmanager
+from typing import Any, Dict, Iterator
+from queue import Queue, Empty
+from xml.sax.saxutils import XMLGenerator
+import codecs
+import os
+import sys
+import time
+import unicodedata
+
+
+class Logger:
+    def __init__(self) -> None:
+        self.logfile = os.environ.get("LOGFILE", "/dev/null")
+        self.logfile_handle = codecs.open(self.logfile, "wb")
+        self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
+        self.queue: "Queue[Dict[str, str]]" = Queue()
+
+        self.xml.startDocument()
+        self.xml.startElement("logfile", attrs={})
+
+        self._print_serial_logs = True
+
+    @staticmethod
+    def _eprint(*args: object, **kwargs: Any) -> None:
+        print(*args, file=sys.stderr, **kwargs)
+
+    def close(self) -> None:
+        self.xml.endElement("logfile")
+        self.xml.endDocument()
+        self.logfile_handle.close()
+
+    def sanitise(self, message: str) -> str:
+        return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
+
+    def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
+        if "machine" in attributes:
+            return "{}: {}".format(attributes["machine"], message)
+        return message
+
+    def log_line(self, message: str, attributes: Dict[str, str]) -> None:
+        self.xml.startElement("line", attributes)
+        self.xml.characters(message)
+        self.xml.endElement("line")
+
+    def info(self, *args, **kwargs) -> None:  # type: ignore
+        self.log(*args, **kwargs)
+
+    def warning(self, *args, **kwargs) -> None:  # type: ignore
+        self.log(*args, **kwargs)
+
+    def error(self, *args, **kwargs) -> None:  # type: ignore
+        self.log(*args, **kwargs)
+        sys.exit(1)
+
+    def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
+        self._eprint(self.maybe_prefix(message, attributes))
+        self.drain_log_queue()
+        self.log_line(message, attributes)
+
+    def log_serial(self, message: str, machine: str) -> None:
+        self.enqueue({"msg": message, "machine": machine, "type": "serial"})
+        if self._print_serial_logs:
+            self._eprint(
+                Style.DIM + "{} # {}".format(machine, message) + Style.RESET_ALL
+            )
+
+    def enqueue(self, item: Dict[str, str]) -> None:
+        self.queue.put(item)
+
+    def drain_log_queue(self) -> None:
+        try:
+            while True:
+                item = self.queue.get_nowait()
+                msg = self.sanitise(item["msg"])
+                del item["msg"]
+                self.log_line(msg, item)
+        except Empty:
+            pass
+
+    @contextmanager
+    def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
+        self._eprint(self.maybe_prefix(message, attributes))
+
+        self.xml.startElement("nest", attrs={})
+        self.xml.startElement("head", attributes)
+        self.xml.characters(message)
+        self.xml.endElement("head")
+
+        tic = time.time()
+        self.drain_log_queue()
+        yield
+        self.drain_log_queue()
+        toc = time.time()
+        self.log("(finished: {}, in {:.2f} seconds)".format(message, toc - tic))
+
+        self.xml.endElement("nest")
+
+
+rootlog = Logger()