summary refs log tree commit diff
path: root/nixos/lib/test-driver/test_driver/polling_condition.py
blob: f38dea71376efcbaa97b06581a840d71602d33bf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import Callable, Optional, Any, List, Dict
from functools import wraps

import time

from .logger import rootlog


class PollingConditionFailed(Exception):
    pass


def coopmulti(fun: Callable, *, machine: Any = None) -> Callable:
    assert not (fun is None and machine is None)

    def inner(fun_: Callable) -> Any:
        @wraps(fun_)
        def wrapper(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
            this_machine = args[0] if machine is None else machine

            if this_machine.fail_early():  # type: ignore
                raise PollingConditionFailed("Action interrupted early...")

            return fun_(*args, **kwargs)

        return wrapper

    if fun is None:
        return inner
    else:
        return inner(fun)


class PollingCondition:
    condition: Callable[[], bool]
    seconds_interval: float
    description: Optional[str]

    last_called: float
    entered: bool

    def __init__(
        self,
        condition: Callable[[], Optional[bool]],
        seconds_interval: float = 2.0,
        description: Optional[str] = None,
    ):
        self.condition = condition  # type: ignore
        self.seconds_interval = seconds_interval

        if description is None:
            self.description = condition.__doc__
        else:
            self.description = str(description)

        self.last_called = float("-inf")
        self.entered = False

    def check(self) -> bool:
        if self.entered or not self.overdue:
            return True

        with self, rootlog.nested(self.nested_message):
            rootlog.info(f"Time since last: {time.monotonic() - self.last_called:.2f}s")
            try:
                res = self.condition()  # type: ignore
            except Exception:
                res = False
            res = res is None or res
            rootlog.info(f"Polling condition {'succeeded' if res else 'failed'}")
            return res

    @property
    def nested_message(self) -> str:
        nested_message = ["Checking polling condition"]
        if self.description is not None:
            nested_message.append(repr(self.description))

        return " ".join(nested_message)

    @property
    def overdue(self) -> bool:
        return self.last_called + self.seconds_interval < time.monotonic()

    def __enter__(self) -> None:
        self.entered = True

    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
        self.entered = False
        self.last_called = time.monotonic()