Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dev = [
# https://github.com/pytest-dev/pytest-asyncio/issues/706#issuecomment-2082788963
"pytest-asyncio>=0.24.0",
"freezegun>=1.5.5",
"pytest-xdist>=3.0",
]

[build-system]
Expand Down
120 changes: 119 additions & 1 deletion pytest_mergify/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import datetime
import os
import platform
Expand Down Expand Up @@ -33,6 +34,70 @@ def pytest_configure(self, config: _pytest.config.Config) -> None:
kwargs["api_url"] = api_url
self.mergify_ci = MergifyCIInsights(**kwargs)

# xdist controller state.
self._xdist_flaky_context: typing.Optional[typing.Dict[str, typing.Any]] = None
self._xdist_flaky_mode: typing.Optional[str] = None
self._xdist_aggregated_metrics: typing.Dict[str, typing.Any] = {
"test_metrics": {},
"over_length_tests": [],
"debug_logs": [],
}
self._xdist_available_budget_duration_ms: float = 0.0

# On xdist controller, reuse the already-loaded detector's context
# for distribution to workers. No extra API call needed since
# MergifyCIInsights.__post_init__ already calls _load_flaky_detector().
if _is_xdist_controller(config) and self.mergify_ci.flaky_detector:
self._xdist_flaky_context = dataclasses.asdict(
self.mergify_ci.flaky_detector._context
)
self._xdist_flaky_mode = self.mergify_ci.flaky_detector.mode

# xdist worker: load flaky detector from controller-provided context.
if _is_xdist_worker(config):
workerinput = getattr(config, "workerinput", {})
context = workerinput.get("flaky_detection_context")
mode = workerinput.get("flaky_detection_mode")
if context is not None and mode is not None:
self.mergify_ci.load_flaky_detector_from_context(context, mode)

def pytest_configure_node(self, node: typing.Any) -> None:
"""xdist hook: distribute flaky detection context to workers."""
# Disable under 'each' mode to avoid duplicated budgets.
dist_mode = getattr(node.config.option, "dist", None)
if dist_mode == "each":
return

if self._xdist_flaky_context is not None:
node.workerinput["flaky_detection_context"] = self._xdist_flaky_context
node.workerinput["flaky_detection_mode"] = self._xdist_flaky_mode

def pytest_testnodedown(self, node: typing.Any, error: typing.Any) -> None:
"""xdist hook: collect metrics from completed workers."""
workeroutput = getattr(node, "workeroutput", None)
if workeroutput is None:
return

worker_metrics = workeroutput.get("flaky_detection_metrics")
if worker_metrics is None:
return

# Merge test metrics (workers run distinct tests, no overlap).
self._xdist_aggregated_metrics["test_metrics"].update(
worker_metrics.get("test_metrics", {})
)
self._xdist_aggregated_metrics["over_length_tests"].extend(
worker_metrics.get("over_length_tests", [])
)
self._xdist_aggregated_metrics["debug_logs"].extend(
worker_metrics.get("debug_logs", [])
)

if "available_budget_duration_ms" in worker_metrics:
self._xdist_available_budget_duration_ms = worker_metrics[
"available_budget_duration_ms"
]

def pytest_terminal_summary(
self, terminalreporter: _pytest.terminal.TerminalReporter
) -> None:
Expand All @@ -56,7 +121,37 @@ def pytest_terminal_summary(
)
return

if self.mergify_ci.flaky_detector:
if _is_xdist_controller(terminalreporter.config):
if self._xdist_flaky_context:
# Always show report (even if no test_metrics — shows "No new tests detected").
from pytest_mergify import flaky_detection

mode: typing.Literal["new", "unhealthy"] = (
self._xdist_flaky_mode # type: ignore[assignment]
if self._xdist_flaky_mode in ("new", "unhealthy")
else "new"
)
terminalreporter.write_line(
flaky_detection.make_report_from_aggregated(
context_dict=self._xdist_flaky_context,
mode=mode,
available_budget_duration_ms=self._xdist_available_budget_duration_ms,
aggregated_metrics=self._xdist_aggregated_metrics,
)
)
elif self.mergify_ci.flaky_detector_error_message:
terminalreporter.write_line(
f"""⚠️ Flaky detection couldn't be enabled because of an error.

Common issues:
• Your 'MERGIFY_TOKEN' might not be set or could be invalid
• There might be a network connectivity issue with the Mergify API

📚 Documentation: https://docs.mergify.com/ci-insights/test-frameworks/pytest/
🔍 Details: {self.mergify_ci.flaky_detector_error_message}""",
yellow=True,
)
elif self.mergify_ci.flaky_detector:
terminalreporter.write_line(self.mergify_ci.flaky_detector.make_report())
elif self.mergify_ci.flaky_detector_error_message:
terminalreporter.write_line(
Expand Down Expand Up @@ -147,6 +242,17 @@ def pytest_sessionfinish(
self,
session: _pytest.main.Session,
) -> typing.Generator[None, None, None]:
# xdist worker: export metrics via workeroutput (independent of tracer).
if _is_xdist_worker(session.config) and self.mergify_ci.flaky_detector:
workeroutput = getattr(session.config, "workeroutput", None)
if workeroutput is not None:
metrics = self.mergify_ci.flaky_detector.to_serializable_metrics()
metrics["available_budget_duration_ms"] = (
self.mergify_ci.flaky_detector._available_budget_duration.total_seconds()
* 1000
)
workeroutput["flaky_detection_metrics"] = metrics

if not self.tracer:
yield
return
Expand Down Expand Up @@ -462,3 +568,15 @@ def _should_skip_item(item: _pytest.nodes.Item) -> bool:

# nosemgrep: python.lang.security.audit.eval-detected.eval-detected
return bool(eval(condition_code, globals_))


def _is_xdist_controller(config: _pytest.config.Config) -> bool:
"""Check if running as xdist controller (not a worker)."""
return config.pluginmanager.has_plugin("dsession") and not hasattr(
config, "workerinput"
)


def _is_xdist_worker(config: _pytest.config.Config) -> bool:
"""Check if running as xdist worker."""
return hasattr(config, "workerinput")
16 changes: 16 additions & 0 deletions pytest_mergify/ci_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ def _load_flaky_detector(self) -> None:
f"Could not load flaky detector: {str(exception)}"
)

def load_flaky_detector_from_context(
self,
context_dict: typing.Dict[str, typing.Any],
mode: typing.Literal["new", "unhealthy"],
) -> None:
"""Construct FlakyDetector from pre-fetched context (xdist worker path)."""
try:
self.flaky_detector = flaky_detection.FlakyDetector.from_context(
context_dict=context_dict,
mode=mode,
)
except Exception as exception:
self.flaky_detector_error_message = (
f"Could not load flaky detector: {str(exception)}"
)

def mark_test_as_quarantined_if_needed(self, item: _pytest.nodes.Item) -> bool:
"""
Returns `True` if the test was marked as quarantined, otherwise returns `False`.
Expand Down
165 changes: 157 additions & 8 deletions pytest_mergify/flaky_detection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import datetime
import json
import os
import typing

Expand Down Expand Up @@ -159,9 +160,33 @@ class FlakyDetector:
init=False, default_factory=list
)

_is_xdist: bool = dataclasses.field(init=False, default=False)

def __post_init__(self) -> None:
self._context = self._fetch_context()

@classmethod
def from_context(
cls,
context_dict: typing.Dict[str, typing.Any],
mode: typing.Literal["new", "unhealthy"],
) -> "FlakyDetector":
"""Construct from serialized context dict, skipping the API call."""
instance = cls.__new__(cls)
instance.token = ""
instance.url = ""
instance.full_repository_name = ""
instance.mode = mode
instance._context = _FlakyDetectionContext(**context_dict)
instance._test_metrics = {}
instance._over_length_tests = set()
instance._available_budget_duration = datetime.timedelta()
instance._tests_to_process = []
instance._suspended_item_finalizers = {}
instance._debug_logs = []
instance._is_xdist = True
return instance

def _fetch_context(self) -> _FlakyDetectionContext:
owner, repository_name = utils.split_full_repo_name(
self.full_repository_name,
Expand Down Expand Up @@ -370,21 +395,36 @@ def set_test_deadline(
) -> None:
metrics = self._test_metrics[test]

remaining_budget = self._get_remaining_budget_duration()
remaining_tests = self._count_remaining_tests()
if self._is_xdist:
# Static allocation: equal share of total budget per test.
per_test_budget = self._available_budget_duration / max(
len(self._tests_to_process), 1
)
metrics.deadline = (
datetime.datetime.now(datetime.timezone.utc) + per_test_budget
)
else:
remaining_budget = self._get_remaining_budget_duration()
remaining_tests = self._count_remaining_tests()

# Distribute remaining budget equally across remaining tests.
metrics.deadline = datetime.datetime.now(datetime.timezone.utc) + (
remaining_budget / remaining_tests
)

# Distribute remaining budget equally across remaining tests.
metrics.deadline = datetime.datetime.now(datetime.timezone.utc) + (
remaining_budget / remaining_tests
)
self._debug_logs.append(
utils.StructuredLog.make(
message="Deadline set",
test=test,
available_budget=str(self._available_budget_duration),
remaining_budget=str(remaining_budget),
remaining_budget=str(self._get_remaining_budget_duration())
if not self._is_xdist
else "N/A (xdist)",
is_xdist=self._is_xdist,
all_tests=len(self._tests_to_process),
remaining_tests=remaining_tests,
remaining_tests=self._count_remaining_tests()
if not self._is_xdist
else "N/A (xdist)",
)
)

Expand Down Expand Up @@ -438,6 +478,34 @@ def restore_item_finalizers(self, item: _pytest.nodes.Item) -> None:
item.session._setupstate.stack.update(self._suspended_item_finalizers)
self._suspended_item_finalizers.clear()

def to_serializable_metrics(self) -> typing.Dict[str, typing.Any]:
"""Serialize metrics for transport via xdist workeroutput."""
return {
"test_metrics": {
test: {
"rerun_count": metrics.rerun_count,
"total_duration_ms": metrics.total_duration.total_seconds() * 1000,
"initial_setup_duration_ms": metrics.initial_setup_duration.total_seconds()
* 1000,
"initial_call_duration_ms": metrics.initial_call_duration.total_seconds()
* 1000,
"initial_teardown_duration_ms": metrics.initial_teardown_duration.total_seconds()
* 1000,
"prevented_timeout": metrics.prevented_timeout,
}
for test, metrics in self._test_metrics.items()
},
"over_length_tests": list(self._over_length_tests),
"debug_logs": [
{
"timestamp": log.timestamp.isoformat(),
"message": log.message,
**log.attributes,
}
for log in self._debug_logs
],
}

def _count_remaining_tests(self) -> int:
already_processed_tests = {
test for test, metrics in self._test_metrics.items() if metrics.deadline
Expand All @@ -456,3 +524,84 @@ def _get_remaining_budget_duration(self) -> datetime.timedelta:
self._available_budget_duration - self._get_used_budget_duration(),
datetime.timedelta(),
)


def make_report_from_aggregated(
context_dict: typing.Dict[str, typing.Any],
mode: typing.Literal["new", "unhealthy"],
available_budget_duration_ms: float,
aggregated_metrics: typing.Dict[str, typing.Any],
) -> str:
"""Generate report on the controller from aggregated worker metrics."""
context = _FlakyDetectionContext(**context_dict)
test_metrics = aggregated_metrics["test_metrics"]
over_length_tests = aggregated_metrics["over_length_tests"]
debug_logs = aggregated_metrics["debug_logs"]

result = "🐛 Flaky detection"

if over_length_tests:
result += (
f"{os.linesep}- Skipped {len(over_length_tests)} "
f"test{'s' if len(over_length_tests) > 1 else ''}:"
)
for test in over_length_tests:
result += (
f"{os.linesep} • '{test}' has not been tested multiple times because the name of the test "
f"exceeds our limit of {context.max_test_name_length} characters"
)

if not test_metrics:
result += f"{os.linesep}- No {mode} tests detected, but we are watching 👀"
return result

available_budget_seconds = available_budget_duration_ms / 1000
used_budget_ms = sum(m["total_duration_ms"] for m in test_metrics.values())
used_budget_seconds = used_budget_ms / 1000
result += (
f"{os.linesep}- Used {used_budget_seconds / available_budget_seconds * 100:.2f} % of the budget "
f"({used_budget_seconds:.2f} s/{available_budget_seconds:.2f} s)"
)

result += (
f"{os.linesep}- Active for {len(test_metrics)} {mode} "
f"test{'s' if len(test_metrics) > 1 else ''}:"
)
for test, m in test_metrics.items():
if m["rerun_count"] < context.min_test_execution_count:
result += (
f"{os.linesep} • '{test}' is too slow to be tested at least "
f"{context.min_test_execution_count} times within the budget"
)
continue

rerun_duration_seconds = m["total_duration_ms"] / 1000
result += (
f"{os.linesep} • '{test}' has been tested {m['rerun_count']} "
f"time{'s' if m['rerun_count'] > 1 else ''} using approx. "
f"{rerun_duration_seconds / available_budget_seconds * 100:.2f} % of the budget "
f"({rerun_duration_seconds:.2f} s/{available_budget_seconds:.2f} s)"
)

tests_prevented_from_timeout = [
test for test, m in test_metrics.items() if m["prevented_timeout"]
]
if tests_prevented_from_timeout:
result += (
f"{os.linesep}⚠️ Reduced reruns for the following "
f"test{'s' if len(tests_prevented_from_timeout) else ''} to respect 'pytest-timeout':"
)
for test in tests_prevented_from_timeout:
result += f"{os.linesep} • '{test}'"

result += (
f"{os.linesep}To improve flaky detection and prevent fixture-level timeouts from limiting reruns, enable function-only timeouts. "
f"Reference: https://github.com/pytest-dev/pytest-timeout?tab=readme-ov-file#avoiding-timeouts-in-fixtures"
)

if os.environ.get("PYTEST_MERGIFY_DEBUG") and debug_logs:
result += f"{os.linesep}🔎 Debug Logs"
for log in debug_logs:
result += f"{os.linesep}{json.dumps(log)}"

return result
Loading
Loading