Skip to content

Commit 2a21e1d

Browse files
remyduthuclaude
andcommitted
feat(flaky-detection): Add pytest-xdist support to flaky detection system
Add full flaky detection support under pytest-xdist using controller- orchestrated pre-computed deadlines and xdist's built-in IPC. Architecture: - Controller fetches flaky detection context from API once, distributes it to workers via workerinput - Each worker constructs a FlakyDetector from the serialized context, computes the same global budget, runs tests with reruns - Workers send metrics back via workeroutput - Controller aggregates metrics and generates the terminal report Changes: - Add FlakyDetector.from_context() classmethod for worker-side construction - Add to_serializable_metrics() for xdist IPC (plain builtins only) - Add make_report_from_aggregated() for controller-side reporting - Add static deadline mode (_is_xdist flag) preserving dynamic for non-xdist - Add MergifyCIInsights.load_flaky_detector_from_context() - Add controller hooks (pytest_configure_node, pytest_testnodedown) - Add worker hooks (workerinput loading, workeroutput export) - Disable flaky detection under 'each' scheduling mode - Add pytest-xdist dev dependency and integration tests Fixes: MRGFY-6296 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Change-Id: I2811d2999ee3b51fb4649e2f60c33c200635435b
1 parent ecdb273 commit 2a21e1d

File tree

9 files changed

+2084
-9
lines changed

9 files changed

+2084
-9
lines changed

docs/superpowers/plans/2026-03-19-xdist-flaky-detection.md

Lines changed: 1213 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# Add pytest-xdist Support to Flaky Detection
2+
3+
**Linear:** MRGFY-6296
4+
**Status:** Approved
5+
**Date:** 2026-03-19
6+
7+
## Problem
8+
9+
The flaky detection system does not support `pytest-xdist`:
10+
11+
1. `flaky_detector._test_metrics` lives in-process memory, but xdist spawns separate worker processes.
12+
2. `pytest_collection_finish` does not run on the controller under xdist.
13+
14+
## Decision Summary
15+
16+
- **Approach:** Controller-orchestrated with pre-computed per-test deadlines.
17+
- **IPC:** xdist built-in `workerinput`/`workeroutput`.
18+
- **Budget model:** Global budget, static per-test allocation under xdist. Dynamic deadlines preserved for non-xdist.
19+
- **Scheduling:** Target `load` (default) mode. Other modes should not crash. Under `each` mode (every test runs on every worker), flaky detection is disabled to avoid duplicated budgets.
20+
21+
## Architecture
22+
23+
```
24+
Controller Workers (gw0, gw1, ...)
25+
──────────────────────────────── ────────────────────────────────
26+
fetch flaky context from API
27+
28+
├─── workerinput ──────────► receive context as plain dict
29+
│ build FlakyDetector (no API call)
30+
│ collect tests (same list)
31+
│ compute budget (same result)
32+
│ run tests + reruns
33+
│ ◄── workeroutput ───────────┤
34+
aggregate metrics
35+
print terminal summary
36+
```
37+
38+
All workers collect the same full test list (xdist verifies this). Budget computation is deterministic, so each worker independently arrives at the same global budget and per-test allocation. No mid-run coordination.
39+
40+
## Controller Responsibilities
41+
42+
### 1. Fetch context and distribute (`pytest_configure_node`)
43+
44+
- Fetch `_FlakyDetectionContext` from API **once** (cache it).
45+
- Serialize as plain dict into `node.workerinput["flaky_detection_context"]`.
46+
- Also set `node.workerinput["flaky_detection_mode"]`.
47+
48+
### 2. Collect worker metrics (`pytest_testnodedown`)
49+
50+
- Read `node.workeroutput["flaky_detection_metrics"]`.
51+
- Merge into controller-side aggregated metrics dict.
52+
- Workers run distinct tests under `load` scheduling, so no overlap.
53+
54+
### 3. Terminal summary (`pytest_terminal_summary`)
55+
56+
- Build report from aggregated metrics using same format as today.
57+
58+
## Worker Responsibilities
59+
60+
### 1. Initialization
61+
62+
- Read `config.workerinput["flaky_detection_context"]` if present.
63+
- Construct `FlakyDetector` via new `from_context()` classmethod (skips API call).
64+
65+
### 2. Session preparation (`pytest_collection_finish`)
66+
67+
- Call `prepare_for_session(session)` as today.
68+
69+
### 3. Test execution (`pytest_runtest_protocol`)
70+
71+
- Identical to current logic: initial run, set deadline, rerun loop.
72+
- `set_test_deadline` uses static allocation: `total_budget / global_num_tests_to_process` where the denominator is the **global** count of tests to process (computed from the full collection, not from the worker's assigned subset). Workers don't know upfront which tests they'll run (xdist dispatches dynamically), but the per-test budget is the same regardless.
73+
74+
### 4. Metrics export (`pytest_sessionfinish`)
75+
76+
- Serialize `_test_metrics`, `_over_length_tests`, `_debug_logs` into `config.workeroutput["flaky_detection_metrics"]`.
77+
78+
## Data Flow
79+
80+
### workerinput (controller -> worker)
81+
82+
```python
83+
node.workerinput["flaky_detection_context"] = {
84+
"budget_ratio_for_new_tests": float,
85+
"budget_ratio_for_unhealthy_tests": float,
86+
"existing_test_names": list[str],
87+
"existing_tests_mean_duration_ms": int,
88+
"unhealthy_test_names": list[str],
89+
"max_test_execution_count": int,
90+
"max_test_name_length": int,
91+
"min_budget_duration_ms": int,
92+
"min_test_execution_count": int,
93+
}
94+
node.workerinput["flaky_detection_mode"] = "new" | "unhealthy"
95+
```
96+
97+
### workeroutput (worker -> controller)
98+
99+
```python
100+
config.workeroutput["flaky_detection_metrics"] = {
101+
"test_metrics": {
102+
"tests/test_foo.py::test_bar": {
103+
"rerun_count": int,
104+
"total_duration_ms": float,
105+
"initial_setup_duration_ms": float,
106+
"initial_call_duration_ms": float,
107+
"initial_teardown_duration_ms": float,
108+
"prevented_timeout": bool,
109+
},
110+
},
111+
"over_length_tests": list[str],
112+
"debug_logs": list[dict],
113+
}
114+
```
115+
116+
The three initial duration sub-fields are needed because `make_report` uses `initial_duration` (their sum) and `is_test_too_slow` compares it against remaining time. Serializing them separately preserves full fidelity.
117+
118+
## FlakyDetector Changes
119+
120+
### New classmethod
121+
122+
`FlakyDetector.from_context(context_dict, mode)` is a `@classmethod` that constructs a `FlakyDetector` from a serialized context dict, skipping `_fetch_context()`. It sets `token`, `url`, and `full_repository_name` to empty strings (the dataclass fields remain required, but these values are unused on workers). The `_context` field is populated directly from the dict.
123+
124+
On the controller side, `FlakyDetector` is **not** instantiated. The controller only holds the raw context dict (for `workerinput`) and aggregated metrics (from `workeroutput`). The report is generated via `make_report_from_aggregated`, which is a standalone function that operates on plain dicts.
125+
126+
### Deadline computation
127+
128+
- **Non-xdist (unchanged):** Dynamic `remaining_budget / remaining_tests`.
129+
- **xdist:** Static `total_budget / num_tests_to_process`.
130+
131+
Branch via a single `if` in `set_test_deadline`.
132+
133+
### Report from aggregated data
134+
135+
`make_report_from_aggregated(context, mode, metrics, over_length_tests, debug_logs)` runs on the controller from deserialized worker data.
136+
137+
## Error Handling
138+
139+
- **Worker crash:** `workeroutput` may be missing. Controller skips that worker's data and shows partial report.
140+
- **Context fetch fails:** No context sent to workers, workers skip flaky detection. Same as today.
141+
- **No context in workerinput:** Worker skips flaky detection gracefully.
142+
143+
## Testing Strategy
144+
145+
### Unit tests
146+
147+
- `from_context()` construction from plain dict.
148+
- Static deadline computation.
149+
- `make_report_from_aggregated()` output from deserialized metrics.
150+
151+
### Integration tests
152+
153+
- `pytester` with `-n 2`: end-to-end flaky detection under xdist.
154+
- Metrics aggregation across workers (check terminal summary).
155+
- Budget respected across workers.
156+
157+
### Edge cases
158+
159+
- Single worker (`-n 1`).
160+
- Worker crash: partial report, no controller crash.
161+
- No tests to process.
162+
- xdist not installed: no import errors.
163+
164+
### Regression
165+
166+
All existing non-xdist tests must keep passing unchanged.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dev = [
2727
# https://github.com/pytest-dev/pytest-asyncio/issues/706#issuecomment-2082788963
2828
"pytest-asyncio>=0.24.0",
2929
"freezegun>=1.5.5",
30+
"pytest-xdist>=3.0",
3031
]
3132

3233
[build-system]

pytest_mergify/__init__.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses
12
import datetime
23
import os
34
import platform
@@ -33,6 +34,70 @@ def pytest_configure(self, config: _pytest.config.Config) -> None:
3334
kwargs["api_url"] = api_url
3435
self.mergify_ci = MergifyCIInsights(**kwargs)
3536

37+
# xdist controller state.
38+
self._xdist_flaky_context: typing.Optional[typing.Dict[str, typing.Any]] = None
39+
self._xdist_flaky_mode: typing.Optional[str] = None
40+
self._xdist_aggregated_metrics: typing.Dict[str, typing.Any] = {
41+
"test_metrics": {},
42+
"over_length_tests": [],
43+
"debug_logs": [],
44+
}
45+
self._xdist_available_budget_duration_ms: float = 0.0
46+
47+
# On xdist controller, reuse the already-loaded detector's context
48+
# for distribution to workers. No extra API call needed since
49+
# MergifyCIInsights.__post_init__ already calls _load_flaky_detector().
50+
if _is_xdist_controller(config) and self.mergify_ci.flaky_detector:
51+
self._xdist_flaky_context = dataclasses.asdict(
52+
self.mergify_ci.flaky_detector._context
53+
)
54+
self._xdist_flaky_mode = self.mergify_ci.flaky_detector.mode
55+
56+
# xdist worker: load flaky detector from controller-provided context.
57+
if _is_xdist_worker(config):
58+
workerinput = getattr(config, "workerinput", {})
59+
context = workerinput.get("flaky_detection_context")
60+
mode = workerinput.get("flaky_detection_mode")
61+
if context is not None and mode is not None:
62+
self.mergify_ci.load_flaky_detector_from_context(context, mode)
63+
64+
def pytest_configure_node(self, node: typing.Any) -> None:
65+
"""xdist hook: distribute flaky detection context to workers."""
66+
# Disable under 'each' mode to avoid duplicated budgets.
67+
dist_mode = getattr(node.config.option, "dist", None)
68+
if dist_mode == "each":
69+
return
70+
71+
if self._xdist_flaky_context is not None:
72+
node.workerinput["flaky_detection_context"] = self._xdist_flaky_context
73+
node.workerinput["flaky_detection_mode"] = self._xdist_flaky_mode
74+
75+
def pytest_testnodedown(self, node: typing.Any, error: typing.Any) -> None:
76+
"""xdist hook: collect metrics from completed workers."""
77+
workeroutput = getattr(node, "workeroutput", None)
78+
if workeroutput is None:
79+
return
80+
81+
worker_metrics = workeroutput.get("flaky_detection_metrics")
82+
if worker_metrics is None:
83+
return
84+
85+
# Merge test metrics (workers run distinct tests, no overlap).
86+
self._xdist_aggregated_metrics["test_metrics"].update(
87+
worker_metrics.get("test_metrics", {})
88+
)
89+
self._xdist_aggregated_metrics["over_length_tests"].extend(
90+
worker_metrics.get("over_length_tests", [])
91+
)
92+
self._xdist_aggregated_metrics["debug_logs"].extend(
93+
worker_metrics.get("debug_logs", [])
94+
)
95+
96+
if "available_budget_duration_ms" in worker_metrics:
97+
self._xdist_available_budget_duration_ms = worker_metrics[
98+
"available_budget_duration_ms"
99+
]
100+
36101
def pytest_terminal_summary(
37102
self, terminalreporter: _pytest.terminal.TerminalReporter
38103
) -> None:
@@ -56,7 +121,37 @@ def pytest_terminal_summary(
56121
)
57122
return
58123

59-
if self.mergify_ci.flaky_detector:
124+
if _is_xdist_controller(terminalreporter.config):
125+
if self._xdist_flaky_context:
126+
# Always show report (even if no test_metrics — shows "No new tests detected").
127+
from pytest_mergify import flaky_detection
128+
129+
mode: typing.Literal["new", "unhealthy"] = (
130+
self._xdist_flaky_mode # type: ignore[assignment]
131+
if self._xdist_flaky_mode in ("new", "unhealthy")
132+
else "new"
133+
)
134+
terminalreporter.write_line(
135+
flaky_detection.make_report_from_aggregated(
136+
context_dict=self._xdist_flaky_context,
137+
mode=mode,
138+
available_budget_duration_ms=self._xdist_available_budget_duration_ms,
139+
aggregated_metrics=self._xdist_aggregated_metrics,
140+
)
141+
)
142+
elif self.mergify_ci.flaky_detector_error_message:
143+
terminalreporter.write_line(
144+
f"""⚠️ Flaky detection couldn't be enabled because of an error.
145+
146+
Common issues:
147+
• Your 'MERGIFY_TOKEN' might not be set or could be invalid
148+
• There might be a network connectivity issue with the Mergify API
149+
150+
📚 Documentation: https://docs.mergify.com/ci-insights/test-frameworks/pytest/
151+
🔍 Details: {self.mergify_ci.flaky_detector_error_message}""",
152+
yellow=True,
153+
)
154+
elif self.mergify_ci.flaky_detector:
60155
terminalreporter.write_line(self.mergify_ci.flaky_detector.make_report())
61156
elif self.mergify_ci.flaky_detector_error_message:
62157
terminalreporter.write_line(
@@ -147,6 +242,17 @@ def pytest_sessionfinish(
147242
self,
148243
session: _pytest.main.Session,
149244
) -> typing.Generator[None, None, None]:
245+
# xdist worker: export metrics via workeroutput (independent of tracer).
246+
if _is_xdist_worker(session.config) and self.mergify_ci.flaky_detector:
247+
workeroutput = getattr(session.config, "workeroutput", None)
248+
if workeroutput is not None:
249+
metrics = self.mergify_ci.flaky_detector.to_serializable_metrics()
250+
metrics["available_budget_duration_ms"] = (
251+
self.mergify_ci.flaky_detector._available_budget_duration.total_seconds()
252+
* 1000
253+
)
254+
workeroutput["flaky_detection_metrics"] = metrics
255+
150256
if not self.tracer:
151257
yield
152258
return
@@ -462,3 +568,15 @@ def _should_skip_item(item: _pytest.nodes.Item) -> bool:
462568

463569
# nosemgrep: python.lang.security.audit.eval-detected.eval-detected
464570
return bool(eval(condition_code, globals_))
571+
572+
573+
def _is_xdist_controller(config: _pytest.config.Config) -> bool:
574+
"""Check if running as xdist controller (not a worker)."""
575+
return config.pluginmanager.has_plugin("dsession") and not hasattr(
576+
config, "workerinput"
577+
)
578+
579+
580+
def _is_xdist_worker(config: _pytest.config.Config) -> bool:
581+
"""Check if running as xdist worker."""
582+
return hasattr(config, "workerinput")

pytest_mergify/ci_insights.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,22 @@ def _load_flaky_detector(self) -> None:
195195
f"Could not load flaky detector: {str(exception)}"
196196
)
197197

198+
def load_flaky_detector_from_context(
199+
self,
200+
context_dict: typing.Dict[str, typing.Any],
201+
mode: typing.Literal["new", "unhealthy"],
202+
) -> None:
203+
"""Construct FlakyDetector from pre-fetched context (xdist worker path)."""
204+
try:
205+
self.flaky_detector = flaky_detection.FlakyDetector.from_context(
206+
context_dict=context_dict,
207+
mode=mode,
208+
)
209+
except Exception as exception:
210+
self.flaky_detector_error_message = (
211+
f"Could not load flaky detector: {str(exception)}"
212+
)
213+
198214
def mark_test_as_quarantined_if_needed(self, item: _pytest.nodes.Item) -> bool:
199215
"""
200216
Returns `True` if the test was marked as quarantined, otherwise returns `False`.

0 commit comments

Comments
 (0)