|
| 1 | +import sys |
| 2 | +import pathlib |
| 3 | +import importlib.util |
| 4 | +import types |
| 5 | +import threading |
| 6 | +import time |
| 7 | +import queue as q |
| 8 | + |
| 9 | + |
| 10 | +ROOT = pathlib.Path(__file__).resolve().parents[1] |
| 11 | +SRC = ROOT / "UnityMcpBridge" / "UnityMcpServer~" / "src" |
| 12 | +sys.path.insert(0, str(SRC)) |
| 13 | + |
| 14 | +# Stub mcp.server.fastmcp to satisfy imports without the full dependency |
| 15 | +mcp_pkg = types.ModuleType("mcp") |
| 16 | +server_pkg = types.ModuleType("mcp.server") |
| 17 | +fastmcp_pkg = types.ModuleType("mcp.server.fastmcp") |
| 18 | + |
| 19 | +class _Dummy: |
| 20 | + pass |
| 21 | + |
| 22 | +fastmcp_pkg.FastMCP = _Dummy |
| 23 | +fastmcp_pkg.Context = _Dummy |
| 24 | +server_pkg.fastmcp = fastmcp_pkg |
| 25 | +mcp_pkg.server = server_pkg |
| 26 | +sys.modules.setdefault("mcp", mcp_pkg) |
| 27 | +sys.modules.setdefault("mcp.server", server_pkg) |
| 28 | +sys.modules.setdefault("mcp.server.fastmcp", fastmcp_pkg) |
| 29 | + |
| 30 | + |
| 31 | +def _load_module(path: pathlib.Path, name: str): |
| 32 | + spec = importlib.util.spec_from_file_location(name, path) |
| 33 | + mod = importlib.util.module_from_spec(spec) |
| 34 | + spec.loader.exec_module(mod) |
| 35 | + return mod |
| 36 | + |
| 37 | + |
| 38 | +telemetry = _load_module(SRC / "telemetry.py", "telemetry_mod") |
| 39 | + |
| 40 | + |
| 41 | +def test_telemetry_queue_backpressure_and_single_worker(monkeypatch, caplog): |
| 42 | + caplog.set_level("DEBUG") |
| 43 | + |
| 44 | + collector = telemetry.TelemetryCollector() |
| 45 | + # Force-enable telemetry regardless of env settings from conftest |
| 46 | + collector.config.enabled = True |
| 47 | + |
| 48 | + # Replace queue with tiny one to trigger backpressure quickly |
| 49 | + small_q = q.Queue(maxsize=2) |
| 50 | + collector._queue = small_q |
| 51 | + |
| 52 | + # Make sends slow to build backlog and exercise worker |
| 53 | + def slow_send(self, rec): |
| 54 | + time.sleep(0.05) |
| 55 | + |
| 56 | + collector._send_telemetry = types.MethodType(slow_send, collector) |
| 57 | + |
| 58 | + # Fire many events quickly; record() should not block even when queue fills |
| 59 | + start = time.perf_counter() |
| 60 | + for i in range(50): |
| 61 | + collector.record(telemetry.RecordType.TOOL_EXECUTION, {"i": i}) |
| 62 | + elapsed_ms = (time.perf_counter() - start) * 1000.0 |
| 63 | + |
| 64 | + # Should be fast despite backpressure (non-blocking enqueue or drop) |
| 65 | + assert elapsed_ms < 80.0 |
| 66 | + |
| 67 | + # Allow worker to process some |
| 68 | + time.sleep(0.3) |
| 69 | + |
| 70 | + # Verify drops were logged (queue full backpressure) |
| 71 | + dropped_logs = [m for m in caplog.messages if "Telemetry queue full; dropping" in m] |
| 72 | + assert len(dropped_logs) >= 1 |
| 73 | + |
| 74 | + # Ensure only one worker thread exists and is alive |
| 75 | + assert collector._worker.is_alive() |
| 76 | + worker_threads = [t for t in threading.enumerate() if t is collector._worker] |
| 77 | + assert len(worker_threads) == 1 |
| 78 | + |
| 79 | + |
0 commit comments