|
5 | 5 |
|
6 | 6 | import uuid |
7 | 7 | import threading |
8 | | -import contextvars |
| 8 | +""" |
| 9 | +Fire-and-forget telemetry sender with a single background worker. |
| 10 | +- No context/thread-local propagation to avoid re-entrancy into tool resolution. |
| 11 | +- Small network timeouts to prevent stalls. |
| 12 | +""" |
9 | 13 | import json |
10 | 14 | import time |
11 | 15 | import os |
@@ -98,8 +102,11 @@ def __init__(self): |
98 | 102 | self.uuid_file = self.data_dir / "customer_uuid.txt" |
99 | 103 | self.milestones_file = self.data_dir / "milestones.json" |
100 | 104 |
|
101 | | - # Request timeout |
102 | | - self.timeout = 10.0 |
| 105 | + # Request timeout (small, fail fast). Override with UNITY_MCP_TELEMETRY_TIMEOUT |
| 106 | + try: |
| 107 | + self.timeout = float(os.environ.get("UNITY_MCP_TELEMETRY_TIMEOUT", "1.5")) |
| 108 | + except Exception: |
| 109 | + self.timeout = 1.5 |
103 | 110 |
|
104 | 111 | # Session tracking |
105 | 112 | self.session_id = str(uuid.uuid4()) |
@@ -160,8 +167,8 @@ def __init__(self): |
160 | 167 | self._customer_uuid: Optional[str] = None |
161 | 168 | self._milestones: Dict[str, Dict[str, Any]] = {} |
162 | 169 | self._lock: threading.Lock = threading.Lock() |
163 | | - # Bounded queue with single background worker to avoid spawning a thread per event |
164 | | - self._queue: "queue.Queue[tuple[contextvars.Context, TelemetryRecord]]" = queue.Queue(maxsize=1000) |
| 170 | + # Bounded queue with single background worker (records only; no context propagation) |
| 171 | + self._queue: "queue.Queue[TelemetryRecord]" = queue.Queue(maxsize=1000) |
165 | 172 | self._worker: threading.Thread = threading.Thread(target=self._worker_loop, daemon=True) |
166 | 173 | self._worker.start() |
167 | 174 | self._load_persistent_data() |
@@ -196,13 +203,12 @@ def _load_persistent_data(self): |
196 | 203 | self._milestones = {} |
197 | 204 |
|
198 | 205 | def _save_milestones(self): |
199 | | - """Save milestones to disk""" |
| 206 | + """Save milestones to disk. Caller must hold self._lock.""" |
200 | 207 | try: |
201 | | - with self._lock: |
202 | | - self.config.milestones_file.write_text( |
203 | | - json.dumps(self._milestones, indent=2), |
204 | | - encoding="utf-8", |
205 | | - ) |
| 208 | + self.config.milestones_file.write_text( |
| 209 | + json.dumps(self._milestones, indent=2), |
| 210 | + encoding="utf-8", |
| 211 | + ) |
206 | 212 | except OSError as e: |
207 | 213 | logger.warning(f"Failed to save milestones: {e}", exc_info=True) |
208 | 214 |
|
@@ -249,18 +255,18 @@ def record(self, |
249 | 255 | milestone=milestone |
250 | 256 | ) |
251 | 257 | # Enqueue for background worker (non-blocking). Drop on backpressure. |
252 | | - current_context = contextvars.copy_context() |
253 | 258 | try: |
254 | | - self._queue.put_nowait((current_context, record)) |
| 259 | + self._queue.put_nowait(record) |
255 | 260 | except queue.Full: |
256 | 261 | logger.debug("Telemetry queue full; dropping %s", record.record_type) |
257 | 262 |
|
258 | 263 | def _worker_loop(self): |
259 | 264 | """Background worker that serializes telemetry sends.""" |
260 | 265 | while True: |
261 | | - ctx, rec = self._queue.get() |
| 266 | + rec = self._queue.get() |
262 | 267 | try: |
263 | | - ctx.run(self._send_telemetry, rec) |
| 268 | + # Run sender directly; do not reuse caller context/thread-locals |
| 269 | + self._send_telemetry(rec) |
264 | 270 | except Exception: |
265 | 271 | logger.debug("Telemetry worker send failed", exc_info=True) |
266 | 272 | finally: |
|
0 commit comments