diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 00000000..537b9599 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,36 @@ +# Workflow to run pytest +name: pytest + +on: [push, pull_request] + +permissions: + contents: read + +env: + UV_SYSTEM_PYTHON: 1 + PIP_DISABLE_PIP_VERSION_CHECK: 1 + +jobs: + pytest: + name: pytest + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + persist-credentials: false + - uses: astral-sh/setup-uv@22695119d769bdb6f7032ad67b9bca0ef8c4a174 # v5.4.0 + with: + enable-cache: true + cache-dependency-glob: '' + cache-suffix: '3.13' + - name: Setup Python + uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0 + with: + python-version: '3.13' + - name: Install OS dependencies + run: | + sudo apt-get update -yy + sudo apt-get install -yy --no-install-recommends libcairo2-dev libgirepository-2.0-dev gir1.2-gtk-4.0 + - run: uv pip install -r pyproject.toml + - run: uv pip install --group tests + - run: pytest diff --git a/MANIFEST.in b/MANIFEST.in index b0a6c493..b0ff7140 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -3,4 +3,6 @@ include README.md graft safeeyes +prune safeeyes/tests + global-exclude *.py[cod] diff --git a/README.md b/README.md index b66fd36d..fdf49e6f 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,12 @@ To ensure that the coding and formatting guidelines are followed, install [ruff] To ensure that any types are correct, install [mypy](https://github.com/python/mypy) and run `mypy safeeyes`. -The last three checks are also run in CI, so a PR must pass all the tests for it to be mmerged. +To ensure that the tests still pass, install [pytest](https://docs.pytest.org/en/stable/) and run `pytest`. + +The last four checks are also run in CI, so a PR must pass all the tests for it to be mmerged. + +It is also possible to use dependency groups to install the needed dependencies. When using a new enough version of pip, run `pip install --group types` to install all dependencies to run the type check. +The available dependency groups can be found in the `pyproject.toml` file. ## How to Release? diff --git a/pyproject.toml b/pyproject.toml index 7765900b..f829466d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,12 +46,14 @@ build-backend = "setuptools.build_meta" [tool.setuptools.packages.find] include=["safeeyes*"] +exclude=["safeeyes.tests*"] [dependency-groups] dev = [ {include-group = "lint"}, {include-group = "scripts"}, - {include-group = "types"} + {include-group = "tests"}, + {include-group = "types"}, ] lint = [ "ruff==0.11.2" @@ -64,7 +66,11 @@ types = [ "PyGObject-stubs==2.13.0", "types-croniter==5.0.1.20250322", "types-psutil==7.0.0.20250218", - "types-python-xlib==0.33.0.20240407" + "types-python-xlib==0.33.0.20240407", + {include-group = "tests"}, +] +tests = [ + "pytest==8.3.5", ] [tool.mypy] @@ -78,3 +84,8 @@ enable_error_code = [ "ignore-without-code", "possibly-undefined" ] + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] diff --git a/safeeyes/plugins/smartpause/dependency_checker.py b/safeeyes/plugins/smartpause/dependency_checker.py index c3df55d4..91e13574 100644 --- a/safeeyes/plugins/smartpause/dependency_checker.py +++ b/safeeyes/plugins/smartpause/dependency_checker.py @@ -22,9 +22,7 @@ def validate(plugin_config, plugin_settings): command = None - if utility.DESKTOP_ENVIRONMENT == "gnome" and utility.IS_WAYLAND: - command = "dbus-send" - elif utility.DESKTOP_ENVIRONMENT == "sway": + if utility.DESKTOP_ENVIRONMENT == "sway": command = "swayidle" elif utility.IS_WAYLAND: if not utility.module_exist("pywayland"): diff --git a/safeeyes/plugins/smartpause/ext_idle_notify.py b/safeeyes/plugins/smartpause/ext_idle_notify.py index 1c4f02ff..a975d5a0 100644 --- a/safeeyes/plugins/smartpause/ext_idle_notify.py +++ b/safeeyes/plugins/smartpause/ext_idle_notify.py @@ -18,29 +18,100 @@ # This file is heavily inspired by https://github.com/juienpro/easyland/blob/efc26a0b22d7bdbb0f8436183428f7036da4662a/src/easyland/idle.py +from dataclasses import dataclass +import logging import threading -import datetime import os import select +import typing from pywayland.client import Display from pywayland.protocol.wayland.wl_seat import WlSeat -from pywayland.protocol.ext_idle_notify_v1 import ExtIdleNotifierV1 +from pywayland.protocol.ext_idle_notify_v1 import ( + ExtIdleNotifierV1, + ExtIdleNotificationV1, +) +from .interface import IdleMonitorInterface +from safeeyes import utility -class ExtIdleNotify: - _idle_notifier = None - _seat = None - _notification = None - _notifier_set = False - _running = True - _thread = None - _r_channel = None - _w_channel = None - _idle_since = None +@dataclass +class IdleConfig: + on_idle: typing.Callable[[], None] + on_resumed: typing.Callable[[], None] + idle_time: float - def __init__(self): + +class IdleMonitorExtIdleNotify(IdleMonitorInterface): + _ext_idle_notify_internal: typing.Optional["ExtIdleNotifyInternal"] = None + _thread: typing.Optional[threading.Thread] = None + + _r_channel_started: int + _w_channel_started: int + + _r_channel_stop: int + _w_channel_stop: int + + _r_channel_listen: int + _w_channel_listen: int + + _idle_config: typing.Optional[IdleConfig] = None + + def init(self) -> None: + # we spawn one wayland client once + # when the monitor is not running, it should be quite idle + self._r_channel_started, self._w_channel_started = os.pipe() + self._r_channel_stop, self._w_channel_stop = os.pipe() + self._r_channel_listen, self._w_channel_listen = os.pipe() + os.set_blocking(self._r_channel_listen, False) + + self._thread = threading.Thread( + target=self._run, name="ExtIdleNotify", daemon=False + ) + self._thread.start() + + result = os.read(self._r_channel_started, 1) + + if result == b"0": + self._thread.join() + self._thread = None + raise Exception("ext-idle-notify-v1 not supported") + + def start_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + self._idle_config = IdleConfig( + on_idle=on_idle, + on_resumed=on_resumed, + idle_time=idle_time, + ) + + # 1 means start listening, or that the configuration changed + os.write(self._w_channel_listen, b"1") + + def configuration_changed( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + self._idle_config = IdleConfig( + on_idle=on_idle, + on_resumed=on_resumed, + idle_time=idle_time, + ) + + # 1 means start listening, or that the configuration changed + os.write(self._w_channel_listen, b"1") + + def is_monitor_running(self) -> bool: + return self._idle_config is not None + + def _run(self) -> None: # Note that this creates a new connection to the wayland compositor. # This is not an issue per se, but does mean that the compositor sees this as # a new, separate client, that just happens to run in the same process as @@ -54,80 +125,196 @@ def __init__(self): # https://lists.freedesktop.org/archives/wayland-devel/2019-March/040344.html # The best thing would be, of course, for gtk to gain native support for # ext-idle-notify-v1. - self._display = Display() - self._display.connect() - self._r_channel, self._w_channel = os.pipe() + with Display() as display: + self._ext_idle_notify_internal = ExtIdleNotifyInternal( + display, + self._r_channel_stop, + self._w_channel_started, + self._r_channel_listen, + self._on_idle, + self._on_resumed, + self._get_idle_time, + ) + self._ext_idle_notify_internal.run() + self._ext_idle_notify_internal = None + + def _on_idle(self) -> None: + if self._idle_config is not None: + self._idle_config.on_idle() + + def _on_resumed(self) -> None: + if self._idle_config is not None: + self._idle_config.on_resumed() - def stop(self): - self._running = False + def _get_idle_time(self) -> typing.Optional[float]: + if self._idle_config is not None: + return self._idle_config.idle_time + else: + return None + + def stop_monitor(self) -> None: + # 0 means to stop listening + # It's not an issue to write to the channel if we're not listening anymore + # already + os.write(self._w_channel_listen, b"0") + + self._idle_config = None + + def stop(self) -> None: # write anything, just to wake up the channel - os.write(self._w_channel, b"!") - self._notification.destroy() - self._notification = None - self._seat = None - self._thread.join() - os.close(self._r_channel) - os.close(self._w_channel) + if self._thread is not None: + os.write(self._w_channel_stop, b"!") + self._thread.join() + self._thread = None + os.close(self._r_channel_stop) + os.close(self._w_channel_stop) - def run(self): - self._thread = threading.Thread( - target=self._run, name="ExtIdleNotify", daemon=False - ) - self._thread.start() + os.close(self._r_channel_started) + os.close(self._w_channel_started) - def _run(self): + os.close(self._r_channel_listen) + os.close(self._w_channel_listen) + + +class ExtIdleNotifyInternal: + """This runs in the thread, and is only alive while the display exists. + + Split out into a separate object to simplify lifetime handling. + """ + + _idle_notifier: typing.Optional[ExtIdleNotifierV1] = None + _notification: typing.Optional[ExtIdleNotificationV1] = None + _display: Display + _r_channel_stop: int + _w_channel_started: int + _r_channel_listen: int + _seat: typing.Optional[WlSeat] = None + + _on_idle: typing.Callable[[], None] + _on_resumed: typing.Callable[[], None] + _get_idle_time: typing.Callable[[], typing.Optional[float]] + + def __init__( + self, + display: Display, + r_channel_stop: int, + w_channel_started: int, + r_channel_listen: int, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + get_idle_time: typing.Callable[[], typing.Optional[float]], + ) -> None: + self._display = display + self._r_channel_stop = r_channel_stop + self._w_channel_started = w_channel_started + self._r_channel_listen = r_channel_listen + self._on_idle = on_idle + self._on_resumed = on_resumed + self._get_idle_time = get_idle_time + + def run(self) -> None: + """Run the wayland client. + + This will block until it's stopped by the channel. + When this stops, self should no longer be used. + """ reg = self._display.get_registry() reg.dispatcher["global"] = self._global_handler + self._display.roundtrip() + + while self._seat is None: + self._display.dispatch(block=True) + + if self._idle_notifier is None: + self._seat = None + + self._display.roundtrip() + + # communicate to the outer thread that the compositor does not + # implement the ext-idle-notify-v1 protocol + os.write(self._w_channel_started, b"0") + + return + + os.write(self._w_channel_started, b"1") + display_fd = self._display.get_fd() - while self._running: + while True: self._display.flush() # this blocks until either there are new events in self._display # (retrieved using dispatch()) - # or until there are events in self._r_channel - which means that stop() - # was called + # or until there are events in self._r_channel_stop - which means that + # stop() was called # unfortunately, this seems like the best way to make sure that dispatch # doesn't block potentially forever (up to multiple seconds in my usage) - read, _w, _x = select.select((display_fd, self._r_channel), (), ()) + read, _w, _x = select.select( + (display_fd, self._r_channel_stop, self._r_channel_listen), (), () + ) - if self._r_channel in read: + if self._r_channel_listen in read: + # r_channel_listen is nonblocking + # if there is nothing to read here, result should just be b"" + result = os.read(self._r_channel_listen, 1) + if result == b"1": + self._listen() + elif result == b"0": + if self._notification is not None: + self._notification.destroy() # type: ignore[attr-defined] + self._notification = None + + if self._r_channel_stop in read: # the channel was written to, which means stop() was called - # at this point, self._running should be false as well break if display_fd in read: self._display.dispatch(block=True) - self._display.disconnect() + self._display.roundtrip() + + if self._notification is not None: + self._notification.destroy() # type: ignore[attr-defined] + self._notification = None + + self._display.roundtrip() + + self._seat = None + self._idle_notifier = None + + def _listen(self): + """Create a new idle notification listener. - def _global_handler(self, reg, id_num, iface_name, version): + If one already exists, throw it away and recreate it with the new + idle time. + """ + # note that the typing doesn't work correctly here - it always says that + # get_idle_notification is not defined + # so just don't check this method + if self._notification is not None: + self._notification.destroy() + self._notification = None + + timeout_sec = self._get_idle_time() + if timeout_sec is None: + logging.debug( + "this should not happen. _listen() was called but idle time was not set" + ) + self._notification = self._idle_notifier.get_idle_notification( + int(timeout_sec * 1000), self._seat + ) + self._notification.dispatcher["idled"] = self._idle_notifier_handler + self._notification.dispatcher["resumed"] = self._idle_notifier_resume_handler + + def _global_handler(self, reg, id_num, iface_name, version) -> None: if iface_name == "wl_seat": self._seat = reg.bind(id_num, WlSeat, version) if iface_name == "ext_idle_notifier_v1": self._idle_notifier = reg.bind(id_num, ExtIdleNotifierV1, version) - if self._idle_notifier and self._seat and not self._notifier_set: - self._notifier_set = True - timeout_sec = 1 - self._notification = self._idle_notifier.get_idle_notification( - timeout_sec * 1000, self._seat - ) - self._notification.dispatcher["idled"] = self._idle_notifier_handler - self._notification.dispatcher["resumed"] = ( - self._idle_notifier_resume_handler - ) - - def _idle_notifier_handler(self, notification): - self._idle_since = datetime.datetime.now() - - def _idle_notifier_resume_handler(self, notification): - self._idle_since = None - - def get_idle_time_seconds(self): - if self._idle_since is None: - return 0 + def _idle_notifier_handler(self, notification) -> None: + utility.execute_main_thread(self._on_idle) - result = datetime.datetime.now() - self._idle_since - return result.total_seconds() + def _idle_notifier_resume_handler(self, notification) -> None: + utility.execute_main_thread(self._on_resumed) diff --git a/safeeyes/plugins/smartpause/gnome_dbus.py b/safeeyes/plugins/smartpause/gnome_dbus.py new file mode 100644 index 00000000..229a0da4 --- /dev/null +++ b/safeeyes/plugins/smartpause/gnome_dbus.py @@ -0,0 +1,126 @@ +# Safe Eyes is a utility to remind you to take break frequently +# to protect your eyes from eye strain. + +# Copyright (C) 2025 Mel Dafert + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import typing + +import gi + +gi.require_version("Gio", "2.0") +from gi.repository import Gio, GLib + + +from .interface import IdleMonitorInterface + + +class IdleMonitorGnomeDBus(IdleMonitorInterface): + """IdleMonitorInterface implementation for GNOME.""" + + dbus_proxy: typing.Optional[Gio.DBusProxy] = None + idle_watch_id: typing.Optional[int] = None + active_watch_id: typing.Optional[int] = None + + was_idle: bool = False + + _on_idle: typing.Optional[typing.Callable[[], None]] = None + _on_resumed: typing.Optional[typing.Callable[[], None]] = None + + def init(self) -> None: + if self.dbus_proxy is None: + self.dbus_proxy = Gio.DBusProxy.new_for_bus_sync( + bus_type=Gio.BusType.SESSION, + flags=Gio.DBusProxyFlags.NONE, + info=None, + name="org.gnome.Mutter.IdleMonitor", + object_path="/org/gnome/Mutter/IdleMonitor/Core", + interface_name="org.gnome.Mutter.IdleMonitor", + cancellable=None, + ) + + self.dbus_proxy.connect("g-signal", self._handle_proxy_signal) + + def start_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + """Start watching for idling. + + This is run on the main thread, and should not block. + """ + if self.is_monitor_running(): + self.stop() + + self._on_idle = on_idle + self._on_resumed = on_resumed + # NOTE: this is currently somewhat buggy, actually + # This does not start counting the idle time when the watch is added + # if the user was idle for more than `idle_time` s, this will fire immediately + # This is not a big issue, but does mean that it might pause safeeyes right + # after a break finishes + self.idle_watch_id = self.dbus_proxy.AddIdleWatch( # type: ignore[union-attr] + "(t)", idle_time * 1000 + ) + + def _handle_proxy_signal( + self, + dbus_proxy: Gio.DBusProxy, + sender_name: typing.Optional[str], + signal_name: str, + parameters: GLib.Variant, + ) -> None: + if signal_name == "WatchFired": + watch_id: int + (watch_id,) = parameters # type: ignore[misc] + + if watch_id == self.idle_watch_id: + if self.active_watch_id is not None: + dbus_proxy.RemoveWatch("(u)", self.active_watch_id) # type: ignore[attr-defined] + self.active_watch_id = dbus_proxy.AddUserActiveWatch("()") # type: ignore[attr-defined] + if not self.was_idle: + self.was_idle = True + if self._on_idle: + self._on_idle() + + if self.active_watch_id is not None and watch_id == self.active_watch_id: + self.active_watch_id = None + if self.was_idle: + self.was_idle = False + if self._on_resumed: + self._on_resumed() + + def is_monitor_running(self) -> bool: + return self.idle_watch_id is not None + + def stop_monitor(self) -> None: + """Stop watching for idling. + + This is run on the main thread. It may block a short time for cleanup. + """ + if self.is_monitor_running() and self.dbus_proxy is not None: + self.dbus_proxy.RemoveWatch("(u)", self.idle_watch_id) # type: ignore[attr-defined] + self.idle_watch_id = None + + if self.active_watch_id is not None: + self.dbus_proxy.RemoveWatch("(u)", self.active_watch_id) # type: ignore[attr-defined] + self.active_watch_id = None + + self.was_idle = False + + def stop(self) -> None: + self.dbus_proxy = None diff --git a/safeeyes/plugins/smartpause/interface.py b/safeeyes/plugins/smartpause/interface.py new file mode 100644 index 00000000..ccfe5a4e --- /dev/null +++ b/safeeyes/plugins/smartpause/interface.py @@ -0,0 +1,98 @@ +# Safe Eyes is a utility to remind you to take break frequently +# to protect your eyes from eye strain. + +# Copyright (C) 2025 Mel Dafert + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from abc import ABC, abstractmethod +from typing import Callable + + +class IdleMonitorInterface(ABC): + """Platform-specific interface to notify when the user is idle. + + The on_idle and on_resumed hooks must be used to notify safeeyes when the user + has gone idle. The on_idle hook must be fired first, afterwards they must be called + in alternation. + They must be fired from the main thread. + """ + + @abstractmethod + def init(self) -> None: + """Initialize the monitor. + + This is called once initially. + This is run on the main thread, and should only block for a short time for + startup. + """ + pass + + @abstractmethod + def start_monitor( + self, + on_idle: Callable[[], None], + on_resumed: Callable[[], None], + idle_time: float, + ) -> None: + """Start watching for idling. + + This will be called multiple times, whenever the monitor should start watching + or after configuration changes. + This is run on the main thread, and should only block for a short time for + startup. + """ + pass + + @abstractmethod + def is_monitor_running(self) -> bool: + """Check if the monitor is running. + + This is run on the main thread, and should not block. + """ + pass + + @abstractmethod + def stop_monitor(self) -> None: + """Stop watching for idling. + + This will be called multiple times, whenever the monitor should stop watching + or when configuration changes. + This is run on the main thread. It may block a short time for cleanup. + """ + pass + + def configuration_changed( + self, + on_idle: Callable[[], None], + on_resumed: Callable[[], None], + idle_time: float, + ) -> None: + """Restart the idle watcher. + + This method will be called when configuration changes. It may be overridden + by implementations for optimization. + This is run on the main thread. It may block a short time for cleanup/startup. + """ + self.stop_monitor() + self.start_monitor(on_idle, on_resumed, idle_time) + + @abstractmethod + def stop(self) -> None: + """Deinitialize the monitor. + + This is called once before the monitor is destroyed. + This is run on the main thread. It may block a short time for cleanup. + """ + pass diff --git a/safeeyes/plugins/smartpause/plugin.py b/safeeyes/plugins/smartpause/plugin.py index fe79664c..55c308fd 100644 --- a/safeeyes/plugins/smartpause/plugin.py +++ b/safeeyes/plugins/smartpause/plugin.py @@ -18,176 +18,102 @@ import datetime import logging -import subprocess -import threading -import re +import typing -from safeeyes import utility from safeeyes.model import State +from .interface import IdleMonitorInterface +from .gnome_dbus import IdleMonitorGnomeDBus +from .swayidle import IdleMonitorSwayidle +from .x11 import IdleMonitorX11 + """ Safe Eyes smart pause plugin """ context = None -idle_condition = threading.Condition() -lock = threading.Lock() -active = False -idle_time = 0 +idle_time: float = 0 enable_safeeyes = None disable_safeeyes = None +postpone: typing.Optional[typing.Callable[[int], None]] = None smart_pause_activated = False -idle_start_time = None -next_break_time = None -next_break_duration = 0 -short_break_interval = 0 -waiting_time = 2 -is_wayland_and_gnome = False +idle_start_time: typing.Optional[datetime.datetime] = None +next_break_time: typing.Optional[datetime.datetime] = None +short_break_interval: int = 0 +postpone_if_active: bool = False +is_wayland_and_gnome = False use_swayidle = False use_ext_idle_notify = False -swayidle_process = None -swayidle_lock = threading.Lock() -swayidle_idle = 0 -swayidle_active = 0 - -ext_idle_notify_lock = threading.Lock() -ext_idle_notification_obj = None - - -# swayidle -def __swayidle_running(): - return swayidle_process is not None and swayidle_process.poll() is None - - -def __start_swayidle_monitor(): - global swayidle_process - global swayidle_start - global swayidle_idle - global swayidle_active - logging.debug("Starting swayidle subprocess") - swayidle_process = subprocess.Popen( - ["swayidle", "timeout", "1", "date +S%s", "resume", "date +R%s"], - stdout=subprocess.PIPE, - bufsize=1, - universal_newlines=True, - encoding="utf-8", - ) - for line in swayidle_process.stdout: - with swayidle_lock: - typ = line[0] - timestamp = int(line[1:]) - if typ == "S": - swayidle_idle = timestamp - elif typ == "R": - swayidle_active = timestamp - - -def __stop_swayidle_monitor(): - if __swayidle_running(): - logging.debug("Stopping swayidle subprocess") - swayidle_process.terminate() +idle_monitor: typing.Optional[IdleMonitorInterface] = None +idle_monitor_unsupported: bool = False -def __swayidle_idle_time(): - with swayidle_lock: - if not __swayidle_running(): - utility.start_thread(__start_swayidle_monitor) - # Idle more recently than active, meaning idle time isn't stale. - if swayidle_idle > swayidle_active: - idle_time = int(datetime.datetime.now().timestamp()) - swayidle_idle - return idle_time - return 0 +idle_monitor_is_pre_break: bool = False +pre_break_idle_start_time: typing.Optional[datetime.datetime] = None +# this is hardcoded currently +pre_break_postpone_idle_time: int = 2 -# ext idle -def __start_ext_idle_monitor(): - global ext_idle_notification_obj - - from .ext_idle_notify import ExtIdleNotify - - ext_idle_notification_obj = ExtIdleNotify() - ext_idle_notification_obj.run() +def _on_idle() -> None: + global smart_pause_activated + global idle_start_time -def __stop_ext_idle_monitor(): - global ext_idle_notification_obj + if context["state"] == State.WAITING: # type: ignore[index] + smart_pause_activated = True + idle_start_time = datetime.datetime.now() - datetime.timedelta( + seconds=idle_time + ) + logging.info("Pause Safe Eyes due to system idle") + disable_safeeyes(None, True) # type: ignore[misc] - with ext_idle_notify_lock: - if ext_idle_notification_obj is not None: - ext_idle_notification_obj.stop() - ext_idle_notification_obj = None +def _on_resumed() -> None: + global smart_pause_activated + global idle_start_time -def __ext_idle_idle_time(): - global ext_idle_notification_obj - with ext_idle_notify_lock: - if ext_idle_notification_obj is None: - __start_ext_idle_monitor() + if ( + context["state"] == State.RESTING # type: ignore[index] + and idle_start_time is not None + ): + logging.info("Resume Safe Eyes due to user activity") + smart_pause_activated = False + idle_period = datetime.datetime.now() - idle_start_time + idle_seconds = idle_period.total_seconds() + context["idle_period"] = idle_seconds # type: ignore[index] + if idle_seconds < short_break_interval: + # Credit back the idle time + if next_break_time is not None: + # This method runs in a thread since the start. + # It may run before next_break is initialized in the + # update_next_break method + next_break = next_break_time + idle_period + enable_safeeyes(next_break.timestamp()) # type: ignore[misc] + else: + enable_safeeyes() # type: ignore[misc] else: - return ext_idle_notification_obj.get_idle_time_seconds() - return 0 - - -# gnome -def __gnome_wayland_idle_time(): - """Determine system idle time in seconds, specifically for gnome with - wayland. - - If there's a failure, return 0. - https://unix.stackexchange.com/a/492328/222290 - """ - try: - output = subprocess.check_output( - [ - "dbus-send", - "--print-reply", - "--dest=org.gnome.Mutter.IdleMonitor", - "/org/gnome/Mutter/IdleMonitor/Core", - "org.gnome.Mutter.IdleMonitor.GetIdletime", - ] - ) - return int(re.search(rb"\d+$", output).group(0)) / 1000 - except BaseException as e: - logging.warning("Failed to get system idle time for gnome/wayland.") - logging.warning(str(e)) - return 0 + # User is idle for more than the time between two breaks + enable_safeeyes() # type: ignore[misc] -def __system_idle_time(): - """Get system idle time in minutes. - - Return the idle time if xprintidle is available, otherwise return 0. - """ - try: - if is_wayland_and_gnome: - return __gnome_wayland_idle_time() - elif use_swayidle: - return __swayidle_idle_time() - elif use_ext_idle_notify: - return __ext_idle_idle_time() - # Convert to seconds - return int(subprocess.check_output(["xprintidle"]).decode("utf-8")) / 1000 - except BaseException: - return 0 +def _on_idle_pre_break() -> None: + global pre_break_idle_start_time + logging.debug("idled before break") + pre_break_idle_start_time = datetime.datetime.now() - datetime.timedelta( + seconds=pre_break_postpone_idle_time + ) -def __is_active(): - """Thread safe function to see if this plugin is active or not.""" - is_active = False - with lock: - is_active = active - return is_active +def _on_resumed_pre_break() -> None: + global pre_break_idle_start_time -def __set_active(is_active): - """Thread safe function to change the state of the plugin.""" - global active - with lock: - active = is_active + logging.debug("resumed before break") + pre_break_idle_start_time = None -def init(ctx, safeeyes_config, plugin_config): +def init(ctx, safeeyes_config, plugin_config) -> None: """Initialize the plugin.""" global context global enable_safeeyes @@ -195,8 +121,6 @@ def init(ctx, safeeyes_config, plugin_config): global postpone global idle_time global short_break_interval - global long_break_duration - global waiting_time global postpone_if_active global is_wayland_and_gnome global use_swayidle @@ -211,111 +135,163 @@ def init(ctx, safeeyes_config, plugin_config): short_break_interval = ( safeeyes_config.get("short_break_interval") * 60 ) # Convert to seconds - long_break_duration = safeeyes_config.get("long_break_duration") - waiting_time = min(2, idle_time) # If idle time is 1 sec, wait only 1 sec is_wayland_and_gnome = context["desktop"] == "gnome" and context["is_wayland"] use_swayidle = context["desktop"] == "sway" use_ext_idle_notify = ( context["is_wayland"] and not use_swayidle and not is_wayland_and_gnome ) + if idle_monitor is not None and idle_monitor.is_monitor_running(): + idle_monitor.configuration_changed(_on_idle, _on_resumed, idle_time) -def __start_idle_monitor(): - """Continuously check the system idle time and pause/resume Safe Eyes based - on it. - """ - global smart_pause_activated - global idle_start_time - while __is_active(): - # Wait for waiting_time seconds - idle_condition.acquire() - idle_condition.wait(waiting_time) - idle_condition.release() - - if __is_active(): - # Get the system idle time - system_idle_time = __system_idle_time() - if system_idle_time >= idle_time and context["state"] == State.WAITING: - smart_pause_activated = True - idle_start_time = datetime.datetime.now() - datetime.timedelta( - seconds=system_idle_time - ) - logging.info("Pause Safe Eyes due to system idle") - disable_safeeyes(None, True) - elif ( - system_idle_time < idle_time - and context["state"] == State.RESTING - and idle_start_time is not None - ): - logging.info("Resume Safe Eyes due to user activity") - smart_pause_activated = False - idle_period = datetime.datetime.now() - idle_start_time - idle_seconds = idle_period.total_seconds() - context["idle_period"] = idle_seconds - if idle_seconds < short_break_interval: - # Credit back the idle time - if next_break_time is not None: - # This method runs in a thread since the start. - # It may run before next_break is initialized in the - # update_next_break method - next_break = next_break_time + idle_period - enable_safeeyes(next_break.timestamp()) - else: - enable_safeeyes() - else: - # User is idle for more than the time between two breaks - enable_safeeyes() - - -def on_start(): - """Start a thread to continuously call xprintidle.""" - global active - if not __is_active(): - # If SmartPause is already started, do not start it again - logging.debug("Start Smart Pause plugin") - __set_active(True) - utility.start_thread(__start_idle_monitor) - - -def on_stop(): - """Stop the thread from continuously calling xprintidle.""" - global active +def on_start() -> None: + """Start the platform idle monitor.""" + global idle_time + global idle_monitor + global idle_monitor_unsupported + + if idle_monitor_unsupported: + # Don't try and start again if we failed in the past + return + + if idle_monitor is None: + if is_wayland_and_gnome: + idle_monitor = IdleMonitorGnomeDBus() + elif use_swayidle: + idle_monitor = IdleMonitorSwayidle() + elif use_ext_idle_notify: + from .ext_idle_notify import IdleMonitorExtIdleNotify + + idle_monitor = IdleMonitorExtIdleNotify() + else: + idle_monitor = IdleMonitorX11() + + try: + idle_monitor.init() + except BaseException as e: + logging.warning("Unable to get idle time, idle monitor not supported.") + logging.warning(str(e)) + idle_monitor.stop() + idle_monitor = None + idle_monitor_unsupported = True + + if idle_monitor is not None: + if not idle_monitor.is_monitor_running(): + logging.debug("Start Smart Pause plugin") + try: + idle_monitor.start_monitor(_on_idle, _on_resumed, idle_time) + except BaseException as e: + logging.warning("Unable to get idle time, idle monitor not supported.") + logging.warning(str(e)) + idle_monitor.stop_monitor() + idle_monitor.stop() + idle_monitor = None + idle_monitor_unsupported = True + + +def on_stop() -> None: + """Stop the platform idle monitor.""" + global idle_monitor global smart_pause_activated + if smart_pause_activated: # Safe Eyes is stopped due to system idle smart_pause_activated = False return logging.debug("Stop Smart Pause plugin") - if use_swayidle: - __stop_swayidle_monitor() - __set_active(False) - idle_condition.acquire() - idle_condition.notify_all() - idle_condition.release() - if use_ext_idle_notify: - __stop_ext_idle_monitor() + if idle_monitor is not None: + if idle_monitor.is_monitor_running(): + idle_monitor.stop_monitor() -def update_next_break(break_obj, dateTime): +def update_next_break(break_obj, dateTime) -> None: """Update the next break time.""" global next_break_time - global next_break_duration next_break_time = dateTime - next_break_duration = break_obj.duration -def on_start_break(break_obj): +def on_pre_break(break_obj) -> None: + """Executes at the start of the prepare time for a break.""" + global postpone_if_active + global idle_monitor_is_pre_break + global idle_monitor + + if idle_monitor is not None: + if postpone_if_active: + logging.debug("Enabling pre-break idle monitor") + idle_monitor.configuration_changed( + _on_idle_pre_break, + _on_resumed_pre_break, + pre_break_postpone_idle_time, + ) + idle_monitor_is_pre_break = True + else: + # Stop during the pre break + logging.debug("Stop idle monitor during break") + idle_monitor.stop_monitor() + + +def on_start_break(break_obj) -> None: """Lifecycle method executes just before the break.""" + global postpone_if_active + global idle_monitor_is_pre_break + global pre_break_idle_start_time + if postpone_if_active: - # Postpone this break if the user is active - system_idle_time = __system_idle_time() - if system_idle_time < 2: - postpone(2) # Postpone for 2 seconds + if idle_monitor_is_pre_break: + # Postpone this break if the user is active + system_idle_time = 0.0 + if pre_break_idle_start_time is not None: + idle_period = datetime.datetime.now() - pre_break_idle_start_time + system_idle_time = idle_period.total_seconds() + + if system_idle_time < pre_break_postpone_idle_time: + logging.debug("User is not idle, postponing") + postpone(pre_break_postpone_idle_time) # type: ignore[misc] + return + + logging.debug(f"User was idle for {system_idle_time}, time for the break") + + if idle_monitor is not None: + # Stop during the break + # The normal monitor should no longer be running here - try stopping anyways + if idle_monitor.is_monitor_running(): + logging.debug("Start break, disable the pre-break idle monitor") + idle_monitor.stop_monitor() + # We stopped, the pre_break monitor is no longer running + idle_monitor_is_pre_break = False + pre_break_idle_start_time = None + + +def on_stop_break() -> None: + """Lifecycle method executes after the break.""" + global idle_monitor_is_pre_break + global postpone_if_active + + if idle_monitor is not None: + if not idle_monitor.is_monitor_running(): + logging.debug("Break is done, reenable idle monitor") + idle_monitor.start_monitor(_on_idle, _on_resumed, idle_time) -def disable(): +def disable() -> None: """SmartPause plugin was active earlier but now user has disabled it.""" + global idle_monitor + # Remove the idle_period - context.pop("idle_period", None) + context.pop("idle_period", None) # type: ignore[union-attr] + + if idle_monitor is not None: + idle_monitor.stop() + idle_monitor = None + + +def on_exit() -> None: + """SafeEyes is exiting.""" + global idle_monitor + + if idle_monitor is not None: + idle_monitor.stop() + idle_monitor = None diff --git a/safeeyes/plugins/smartpause/swayidle.py b/safeeyes/plugins/smartpause/swayidle.py new file mode 100644 index 00000000..49434c97 --- /dev/null +++ b/safeeyes/plugins/smartpause/swayidle.py @@ -0,0 +1,117 @@ +# Safe Eyes is a utility to remind you to take break frequently +# to protect your eyes from eye strain. + +# Copyright (C) 2025 Mel Dafert + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +import math +import subprocess +import threading +import typing + +from safeeyes import utility + +from .interface import IdleMonitorInterface + + +class IdleMonitorSwayidle(IdleMonitorInterface): + """IdleMonitorInterface implementation for swayidle.""" + + swayidle_process: typing.Optional[subprocess.Popen] = None + swayidle_lock = threading.Lock() + swayidle_idle = 0 + swayidle_active = 0 + + def init(self) -> None: + pass + + def start_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + """Start watching for idling. + + This is run on the main thread, and should not block. + """ + if not self.is_monitor_running(): + utility.start_thread( + self._start_swayidle_monitor, + on_idle=on_idle, + on_resumed=on_resumed, + idle_time=idle_time, + ) + + def is_monitor_running(self) -> bool: + return ( + self.swayidle_process is not None and self.swayidle_process.poll() is None + ) + + def _start_swayidle_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + was_idle = False + + logging.debug("Starting swayidle subprocess") + + timeout = str(math.ceil(idle_time)) + + self.swayidle_process = subprocess.Popen( + [ + "swayidle", + "timeout", + timeout, + "date +S%s", + "resume", + "date +R%s", + ], + stdout=subprocess.PIPE, + bufsize=1, + universal_newlines=True, + encoding="utf-8", + ) + for line in self.swayidle_process.stdout: # type: ignore[union-attr] + with self.swayidle_lock: + typ = line[0] + timestamp = int(line[1:]) + if typ == "S": + self.swayidle_idle = timestamp + if not was_idle: + was_idle = True + utility.execute_main_thread(on_idle) + elif typ == "R": + self.swayidle_active = timestamp + if was_idle: + was_idle = False + utility.execute_main_thread(on_resumed) + + def stop_monitor(self) -> None: + """Stop watching for idling. + + This is run on the main thread. It may block a short time for cleanup. + """ + if self.is_monitor_running() and self.swayidle_process is not None: + logging.debug("Stopping swayidle subprocess") + self.swayidle_process.terminate() + self.swayidle_process.wait() + self.swayidle_process = None + + def stop(self) -> None: + pass diff --git a/safeeyes/plugins/smartpause/x11.py b/safeeyes/plugins/smartpause/x11.py new file mode 100644 index 00000000..2504dd4b --- /dev/null +++ b/safeeyes/plugins/smartpause/x11.py @@ -0,0 +1,114 @@ +# Safe Eyes is a utility to remind you to take break frequently +# to protect your eyes from eye strain. + +# Copyright (C) 2017 Gobinath + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import subprocess +import threading +import typing + +from safeeyes import utility + +from .interface import IdleMonitorInterface + + +class IdleMonitorX11(IdleMonitorInterface): + """IdleMonitorInterface implementation for X11. + + Note that this is quite inefficient. It polls every 2 seconds whether the user is + idle or not, keeping the CPU active a lot. + """ + + active: bool = False + lock = threading.Lock() + idle_condition = threading.Condition() + + def _is_active(self) -> bool: + """Thread safe function to see if this plugin is active or not.""" + is_active = False + with self.lock: + is_active = self.active + return is_active + + def _set_active(self, is_active: bool) -> None: + """Thread safe function to change the state of the plugin.""" + with self.lock: + self.active = is_active + + def init(self) -> None: + pass + + def start_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + """Start a thread to continuously call xprintidle.""" + if not self._is_active(): + # If SmartPause is already started, do not start it again + self._set_active(True) + utility.start_thread(self._start_idle_monitor) + utility.start_thread( + self._start_idle_monitor, + on_idle=on_idle, + on_resumed=on_resumed, + idle_time=idle_time, + ) + + def is_monitor_running(self) -> bool: + return self._is_active() + + def _start_idle_monitor( + self, + on_idle: typing.Callable[[], None], + on_resumed: typing.Callable[[], None], + idle_time: float, + ) -> None: + """Continuously check the system idle time and pause/resume Safe Eyes based + on it. + """ + waiting_time = min(idle_time, 2) + was_idle = False + + while self._is_active(): + # Wait for waiting_time seconds + self.idle_condition.acquire() + self.idle_condition.wait(waiting_time) + self.idle_condition.release() + + if self._is_active(): + # Get the system idle time + system_idle_time = ( + # Convert to seconds + int(subprocess.check_output(["xprintidle"]).decode("utf-8")) / 1000 + ) + if system_idle_time >= idle_time and not was_idle: + was_idle = True + utility.execute_main_thread(on_idle) + elif system_idle_time < idle_time and was_idle: + was_idle = False + utility.execute_main_thread(on_resumed) + + def stop_monitor(self) -> None: + """Stop the thread from continuously calling xprintidle.""" + self._set_active(False) + self.idle_condition.acquire() + self.idle_condition.notify_all() + self.idle_condition.release() + + def stop(self) -> None: + pass diff --git a/safeeyes/tests/__init__.py b/safeeyes/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/safeeyes/tests/test_model.py b/safeeyes/tests/test_model.py new file mode 100644 index 00000000..7ff82818 --- /dev/null +++ b/safeeyes/tests/test_model.py @@ -0,0 +1,350 @@ +# Safe Eyes is a utility to remind you to take break frequently +# to protect your eyes from eye strain. + +# Copyright (C) 2025 Mel Dafert + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest +import random +import typing +from safeeyes import model + + +class TestBreak: + def test_break_short(self) -> None: + b = model.Break( + break_type=model.BreakType.SHORT_BREAK, + name="test break", + time=15, + duration=15, + image=None, + plugins=None, + ) + + assert b.is_short_break() + assert not b.is_long_break() + + def test_break_long(self) -> None: + b = model.Break( + break_type=model.BreakType.LONG_BREAK, + name="long break", + time=75, + duration=60, + image=None, + plugins=None, + ) + + assert not b.is_short_break() + assert b.is_long_break() + + +class TestBreakQueue: + def test_create_empty(self) -> None: + config = { + "short_breaks": [], + "long_breaks": [], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": False, + } + + context: dict[str, typing.Any] = {} + + bq = model.BreakQueue(config, context) + + assert bq.is_empty() + assert bq.is_empty(model.BreakType.LONG_BREAK) + assert bq.is_empty(model.BreakType.SHORT_BREAK) + assert bq.next() is None + assert bq.get_break() is None + + def get_bq_only_short( + self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None + ) -> model.BreakQueue: + if random_seed is not None: + random.seed(random_seed) + + monkeypatch.setattr( + model, "_", lambda message: "translated!: " + message, raising=False + ) + + config = { + "short_breaks": [ + {"name": "break 1"}, + {"name": "break 2"}, + {"name": "break 3"}, + ], + "long_breaks": [], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + } + + context: dict[str, typing.Any] = { + "session": {}, + } + + return model.BreakQueue(config, context) + + def get_bq_only_long( + self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None + ) -> model.BreakQueue: + if random_seed is not None: + random.seed(random_seed) + + monkeypatch.setattr( + model, "_", lambda message: "translated!: " + message, raising=False + ) + + config = { + "short_breaks": [], + "long_breaks": [ + {"name": "long break 1"}, + {"name": "long break 2"}, + {"name": "long break 3"}, + ], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + } + + context: dict[str, typing.Any] = { + "session": {}, + } + + return model.BreakQueue(config, context) + + def get_bq_full( + self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None + ) -> model.BreakQueue: + if random_seed is not None: + random.seed(random_seed) + + monkeypatch.setattr( + model, "_", lambda message: "translated!: " + message, raising=False + ) + + config = { + "short_breaks": [ + {"name": "break 1"}, + {"name": "break 2"}, + {"name": "break 3"}, + {"name": "break 4"}, + ], + "long_breaks": [ + {"name": "long break 1"}, + {"name": "long break 2"}, + {"name": "long break 3"}, + ], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + } + + context: dict[str, typing.Any] = { + "session": {}, + } + + return model.BreakQueue(config, context) + + def test_create_only_short(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_only_short(monkeypatch) + + assert not bq.is_empty() + assert not bq.is_empty(model.BreakType.SHORT_BREAK) + assert bq.is_empty(model.BreakType.LONG_BREAK) + + def test_only_short_repeat_get_break_no_change( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + bq = self.get_bq_only_short(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: break 1" + + next = bq.get_break() + assert next.name == "translated!: break 1" + + assert not bq.is_long_break() + + def test_only_short_next_break(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_only_short(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: break 1" + + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + + def test_only_short_next_break_random( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + random_seed = 5 + bq = self.get_bq_only_short(monkeypatch, random_seed) + + next = bq.get_break() + assert next.name == "translated!: break 3" + + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 1" + + def test_create_only_long(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_only_long(monkeypatch) + + assert not bq.is_empty() + assert not bq.is_empty(model.BreakType.LONG_BREAK) + assert bq.is_empty(model.BreakType.SHORT_BREAK) + + def test_only_long_repeat_get_break_no_change( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + bq = self.get_bq_only_long(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: long break 1" + + next = bq.get_break() + assert next.name == "translated!: long break 1" + + assert bq.is_long_break() + + def test_only_long_next_break(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_only_long(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: long break 1" + + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: long break 1" + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: long break 1" + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: long break 3" + + def test_only_long_next_break_random(self, monkeypatch: pytest.MonkeyPatch) -> None: + random_seed = 5 + bq = self.get_bq_only_long(monkeypatch, random_seed) + + next = bq.get_break() + assert next.name == "translated!: long break 3" + + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: long break 1" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: long break 1" + + def test_create_full(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_full(monkeypatch) + + assert not bq.is_empty() + assert not bq.is_empty(model.BreakType.LONG_BREAK) + assert not bq.is_empty(model.BreakType.SHORT_BREAK) + + def test_full_repeat_get_break_no_change( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + bq = self.get_bq_full(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: break 1" + + next = bq.get_break() + assert next.name == "translated!: break 1" + + assert not bq.is_long_break() + + def test_full_next_break(self, monkeypatch: pytest.MonkeyPatch) -> None: + bq = self.get_bq_full(monkeypatch) + + next = bq.get_break() + assert next.name == "translated!: break 1" + assert not bq.is_long_break() + + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 1" + assert bq.is_long_break() + assert bq.next().name == "translated!: break 1" + assert not bq.is_long_break() + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 1" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: long break 1" + + def test_full_next_break_random(self, monkeypatch: pytest.MonkeyPatch) -> None: + random_seed = 5 + bq = self.get_bq_full(monkeypatch, random_seed) + + next = bq.get_break() + assert next.name == "translated!: break 1" + + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: long break 3" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: long break 2" + assert bq.next().name == "translated!: break 2" + assert bq.next().name == "translated!: break 4" + assert bq.next().name == "translated!: break 1" + assert bq.next().name == "translated!: break 3" + assert bq.next().name == "translated!: long break 1" diff --git a/safeeyes/utility.py b/safeeyes/utility.py index 1ec4b601..5444ae9e 100644 --- a/safeeyes/utility.py +++ b/safeeyes/utility.py @@ -98,7 +98,10 @@ def get_resource_path(resource_name): def start_thread(target_function, **args): """Execute the function in a separate thread.""" thread = threading.Thread( - target=target_function, name="WorkThread", daemon=False, kwargs=args + target=target_function, + name=f"WorkThread {target_function.__qualname__}", + daemon=False, + kwargs=args, ) thread.start()