|
| 1 | +"""Test plugin examples can run as dev plugins.""" |
| 2 | + |
| 3 | +import signal |
| 4 | +import subprocess |
| 5 | +import sys |
| 6 | +import time |
| 7 | + |
| 8 | + |
| 9 | +from pathlib import Path |
| 10 | +from queue import Empty, Queue |
| 11 | +from threading import Thread |
| 12 | +from typing import Iterable, TextIO |
| 13 | + |
| 14 | +import pytest |
| 15 | + |
| 16 | +_THIS_DIR = Path(__file__).parent.resolve() |
| 17 | +_PLUGIN_EXAMPLES_DIR = (_THIS_DIR / "../examples/plugins").resolve() |
| 18 | + |
| 19 | + |
| 20 | +def _get_plugin_paths() -> list[Path]: |
| 21 | + return [p for p in _PLUGIN_EXAMPLES_DIR.iterdir() if p.is_dir()] |
| 22 | + |
| 23 | + |
| 24 | +def _monitor_stream(stream: TextIO, queue: Queue[str], *, debug: bool = False) -> None: |
| 25 | + for line in stream: |
| 26 | + if debug: |
| 27 | + print(line) |
| 28 | + queue.put(line) |
| 29 | + |
| 30 | + |
| 31 | +def _drain_queue(queue: Queue[str]) -> Iterable[str]: |
| 32 | + while True: |
| 33 | + try: |
| 34 | + yield queue.get(block=False) |
| 35 | + except Empty: |
| 36 | + break |
| 37 | + |
| 38 | + |
| 39 | +def _exec_plugin(plugin_path: Path) -> subprocess.Popen[str]: |
| 40 | + # Run plugin in dev mode with IO pipes line buffered |
| 41 | + # (as the test process is monitoring for specific output) |
| 42 | + cmd = [ |
| 43 | + sys.executable, |
| 44 | + "-u", |
| 45 | + "-m", |
| 46 | + "lmstudio.plugin", |
| 47 | + "--dev", |
| 48 | + str(plugin_path), |
| 49 | + ] |
| 50 | + return subprocess.Popen( |
| 51 | + cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1 |
| 52 | + ) |
| 53 | + |
| 54 | + |
| 55 | +_PLUGIN_START_TIMEOUT = 5 |
| 56 | +_PLUGIN_STOP_TIMEOUT = 5 |
| 57 | + |
| 58 | + |
| 59 | +def _get_interrupt_signal() -> signal.Signals: |
| 60 | + if sys.platform == "win32": |
| 61 | + return signal.CTRL_C_EVENT |
| 62 | + return signal.SIGINT |
| 63 | + |
| 64 | + |
| 65 | +_INTERRUPT_SIGNAL = _get_interrupt_signal() |
| 66 | + |
| 67 | +def _exec_and_interrupt(plugin_path: Path) -> tuple[list[str], list[str], list[str]]: |
| 68 | + process = _exec_plugin(plugin_path) |
| 69 | + # Ensure pipes don't fill up and block subprocess execution |
| 70 | + stdout_q: Queue[str] = Queue() |
| 71 | + stdout_thread = Thread( |
| 72 | + target=_monitor_stream, args=[process.stdout, stdout_q], kwargs={"debug": True} |
| 73 | + ) |
| 74 | + stdout_thread.start() |
| 75 | + stderr_q: Queue[str] = Queue() |
| 76 | + stderr_thread = Thread(target=_monitor_stream, args=[process.stderr, stderr_q]) |
| 77 | + stderr_thread.start() |
| 78 | + startup_lines: list[str] = [] |
| 79 | + # Wait for plugin to start |
| 80 | + start_deadline = time.monotonic() + _PLUGIN_START_TIMEOUT |
| 81 | + try: |
| 82 | + print(f"Monitoring {stdout_q!r} for plugin started message") |
| 83 | + while True: |
| 84 | + remaining_time = start_deadline - time.monotonic() |
| 85 | + print(f"Waiting {remaining_time} seconds for plugin to start") |
| 86 | + try: |
| 87 | + line = stdout_q.get(timeout=remaining_time) |
| 88 | + except Empty: |
| 89 | + assert False, "Plugin subprocess failed to start" |
| 90 | + print(line) |
| 91 | + startup_lines.append(line) |
| 92 | + if "Ctrl-C to terminate" in line: |
| 93 | + break |
| 94 | + finally: |
| 95 | + # Instruct the process to terminate |
| 96 | + print("Sending termination request to plugin subprocess") |
| 97 | + process.send_signal(_INTERRUPT_SIGNAL) |
| 98 | + # Give threads a chance to halt their file reads |
| 99 | + # (process terminating will close the pipes) |
| 100 | + stop_deadline = time.monotonic() + _PLUGIN_STOP_TIMEOUT |
| 101 | + stdout_thread.join(timeout=(stop_deadline - time.monotonic())) |
| 102 | + stderr_thread.join(timeout=(stop_deadline - time.monotonic())) |
| 103 | + process.wait(timeout=(stop_deadline - time.monotonic())) |
| 104 | + with process: |
| 105 | + # Close pipes |
| 106 | + pass |
| 107 | + # Collect remainder of subprocess output |
| 108 | + shutdown_lines = [*_drain_queue(stdout_q)] |
| 109 | + stderr_lines = [*_drain_queue(stderr_q)] |
| 110 | + return startup_lines, shutdown_lines, stderr_lines |
| 111 | + |
| 112 | + |
| 113 | +def _plugin_case_id(plugin_path: Path) -> str: |
| 114 | + return plugin_path.name |
| 115 | + |
| 116 | + |
| 117 | +@pytest.mark.lmstudio |
| 118 | +@pytest.mark.parametrize("plugin_path", _get_plugin_paths(), ids=_plugin_case_id) |
| 119 | +def test_plugin_execution(plugin_path: Path) -> None: |
| 120 | + startup_lines, shutdown_lines, stderr_lines = _exec_and_interrupt(plugin_path) |
| 121 | + for log_line in stderr_lines: |
| 122 | + assert log_line.startswith("INFO:") |
| 123 | + assert startup_lines[-1].endswith("Ctrl-C to terminate...\n") |
| 124 | + # Outside an actual terminal, pipe may be closed before the termination is reported |
| 125 | + # TODO: Consider migrating to using pexpect, so this check can be more robust |
| 126 | + assert ( |
| 127 | + not shutdown_lines |
| 128 | + or shutdown_lines[-1] == "Plugin execution terminated by user\n" |
| 129 | + ) |
0 commit comments