Skip to content

Commit 8a3e81d

Browse files
authored
[autorevert] refactoring: extract Signal, decouple pattern detection (#7042)
Sketching up the new simplified scalable architecture for autorevert. Introduces a modular Signal abstraction that cleanly separates signal extraction (from jobs/tests/classifications) from pattern detection. ### Motivation Decouple “how we extract signal” from “how we detect patterns” so we can: - Incorporate richer sources (tests, multiple failures, classification) without rewriting pattern logic. - Handle missing/partial signal more gracefully. - Iterate on detection rules independently. - Self-contained code much easier to cover in tests. What’s Implemented - New core types in pytorch_auto_revert/signal.py & most of the pattern detection logic (the **new one** that we discussed) - Tests in tests/test_signal.py pending-only and empty commits). What’s Not Yet Wired / Behavior Changes - Existing production detection flow remains unchanged (migration is not started). Next Steps (Migration Plan) - Finish pattern detection and issue `RestartCommits` when needed - Build Signal extractors: - From workflow_job for job-level signal (including classification). - From test_run_s3/test_run_summary for per-test failures (optional, staged). - Normalize names (stable keys) - Switch to using Signals & Extractors
1 parent 0192d56 commit 8a3e81d

File tree

2 files changed

+585
-0
lines changed

2 files changed

+585
-0
lines changed
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
from enum import Enum
4+
from typing import Callable, List, Optional, Set, Tuple, Union
5+
6+
7+
class SignalStatus(Enum):
8+
"""signal status enum"""
9+
10+
PENDING = "pending"
11+
SUCCESS = "success"
12+
FAILURE = "failure"
13+
14+
15+
# data classes:
16+
17+
18+
@dataclass
19+
class AutorevertPattern:
20+
"""
21+
Represents an autorevert pattern detected in a signal.
22+
23+
- newer_failing_commits: list of newer commits (after the suspected commit)
24+
that have failures for this signal (newest → older order).
25+
- suspected_commit: the oldest commit that first started to fail.
26+
- older_successful_commit: the most recent successful commit before
27+
failures started (direct parent of the suspected commit for this signal).
28+
"""
29+
30+
workflow_name: str
31+
newer_failing_commits: List[str]
32+
suspected_commit: str
33+
older_successful_commit: str
34+
35+
36+
@dataclass
37+
class RestartCommits:
38+
"""
39+
Represents an intent to restart a specific set of commits on the signal.
40+
"""
41+
42+
commit_shas: Set[str]
43+
44+
45+
class SignalEvent:
46+
"""A single observation contributing to a Signal on a given commit.
47+
48+
Represents one job/test/classification-derived event with a status and
49+
start/end timestamps used to reason about ordering and patterns.
50+
"""
51+
52+
def __init__(
53+
self,
54+
name: str,
55+
status: SignalStatus,
56+
started_at: datetime,
57+
ended_at: Optional[datetime] = None,
58+
):
59+
self.name = name
60+
self.status = status
61+
self.started_at = started_at
62+
self.ended_at = ended_at
63+
64+
@property
65+
def is_pending(self) -> bool:
66+
return self.status == SignalStatus.PENDING
67+
68+
@property
69+
def is_success(self) -> bool:
70+
return self.status == SignalStatus.SUCCESS
71+
72+
@property
73+
def is_failure(self) -> bool:
74+
return self.status == SignalStatus.FAILURE
75+
76+
77+
class SignalCommit:
78+
"""All events for a single commit, ordered oldest → newest by start time."""
79+
80+
def __init__(self, head_sha: str, events: List[SignalEvent]):
81+
self.head_sha = head_sha
82+
# enforce events ordered by time, oldest first
83+
self.events = sorted(events, key=lambda e: e.started_at) if events else []
84+
# counts by status
85+
self.statuses = {}
86+
for e in self.events:
87+
self.statuses[e.status] = self.statuses.get(e.status, 0) + 1
88+
89+
@property
90+
def has_pending(self) -> bool:
91+
return SignalStatus.PENDING in self.statuses
92+
93+
@property
94+
def has_success(self) -> bool:
95+
return SignalStatus.SUCCESS in self.statuses
96+
97+
@property
98+
def has_failure(self) -> bool:
99+
return SignalStatus.FAILURE in self.statuses
100+
101+
def events_by_status(self, status: SignalStatus) -> List[SignalEvent]:
102+
"""Get all events with the specified status."""
103+
return [event for event in self.events if event.status == status]
104+
105+
# make iterable
106+
def __iter__(self):
107+
return iter(self.events)
108+
109+
110+
def _bounds(
111+
commits: List["SignalCommit"], keep_predicate: Callable[[SignalEvent], bool]
112+
) -> Tuple[Optional[datetime], Optional[datetime]]:
113+
"""
114+
Compute (min_time, max_time) over events of commits satisfying predicate `keep`.
115+
"""
116+
lo: Optional[datetime] = None
117+
hi: Optional[datetime] = None
118+
for c in commits:
119+
for e in c.events:
120+
if keep_predicate(e):
121+
t = e.started_at
122+
if lo is None or t < lo:
123+
lo = t
124+
if hi is None or t > hi:
125+
hi = t
126+
return lo, hi
127+
128+
129+
@dataclass
130+
class PartitionedCommits:
131+
"""
132+
Represents the result of partitioning commits based on an autorevert pattern.
133+
"""
134+
135+
def __init__(
136+
self,
137+
failed: List[SignalCommit],
138+
unknown: List[SignalCommit],
139+
successful: List[SignalCommit],
140+
):
141+
self.failed = failed
142+
self.unknown = unknown
143+
self.successful = successful
144+
145+
def failure_events_count(self) -> int:
146+
return sum(c.statuses.get(SignalStatus.FAILURE, 0) for c in self.failed)
147+
148+
def success_events_count(self) -> int:
149+
return sum(c.statuses.get(SignalStatus.SUCCESS, 0) for c in self.successful)
150+
151+
def confirm_not_an_infra_issue(self) -> "InfraCheckResult":
152+
"""
153+
Infra check based on this partition that classifies whether observed
154+
failures are likely infra or code-caused.
155+
156+
Invariants established (priority: CONFIRMED > PENDING):
157+
- CONFIRMED: at least one failure (newer side) timestamp lies strictly
158+
between two actual success timestamps (older side).
159+
- PENDING: success-like and failure-like time ranges overlap, but no
160+
confirmed sandwich yet; pending events could complete the sandwich.
161+
- RESTART_SUCCESS: no success-like event occurs after any failure-like
162+
event (ranges do not overlap in that direction).
163+
- RESTART_FAILURE: no failure-like event occurs after any success-like
164+
event (ranges do not overlap in that direction).
165+
166+
Notes:
167+
- success-like = SUCCESS or PENDING; failure-like = FAILURE or PENDING.
168+
- Only "failed" and "successful" partitions are considered; "unknown" is ignored.
169+
- Flakiness is assumed to be ruled out upstream.
170+
"""
171+
172+
# success-like includes pending; actual-success excludes pending
173+
min_succ_like, max_succ_like = _bounds(
174+
self.successful, lambda e: e.is_success or e.is_pending
175+
)
176+
min_succ, max_succ = _bounds(self.successful, lambda e: e.is_success)
177+
# failure-like includes pending
178+
min_fail_like, max_fail_like = _bounds(
179+
self.failed, lambda e: e.is_failure or e.is_pending
180+
)
181+
182+
# Strict ordering without overlap → restart
183+
if min_succ_like is None or max_succ_like <= min_fail_like:
184+
return InfraCheckResult.RESTART_SUCCESS
185+
if min_fail_like is None or max_fail_like <= min_succ_like:
186+
return InfraCheckResult.RESTART_FAILURE
187+
188+
# Confirmed: any actual failure falls strictly between two actual successes
189+
if min_succ is not None and max_succ is not None and min_succ < max_succ:
190+
for c in self.failed:
191+
for e in c.events:
192+
if e.is_failure and (min_succ < e.started_at < max_succ):
193+
return InfraCheckResult.CONFIRMED
194+
195+
# Overlap exists, but not confirmed yet → pending
196+
return InfraCheckResult.PENDING
197+
198+
199+
class InfraCheckResult(Enum):
200+
"""Outcome of infra check based on partitioned commits."""
201+
202+
CONFIRMED = "confirmed" # failure bracketed by two successes (not infra)
203+
PENDING = "pending" # pending events could still form the sandwich
204+
RESTART_SUCCESS = "restart_success" # no success after any failure
205+
RESTART_FAILURE = "restart_failure" # no failure after any success
206+
207+
208+
class Signal:
209+
"""A refined, column-like view of raw CI data for pattern detection.
210+
211+
- key: stable identifier for the signal (e.g., normalized job/test name)
212+
- workflow_name: source workflow this signal is derived from
213+
- commits: newest → older list of SignalCommit objects for this signal
214+
"""
215+
216+
def __init__(self, key: str, workflow_name: str, commits: List[SignalCommit]):
217+
self.key = key
218+
self.workflow_name = workflow_name
219+
# commits are ordered from newest to oldest
220+
self.commits = commits
221+
222+
def detect_fixed(self) -> bool:
223+
"""
224+
Find the first commit with any non‑pending event; if it contains a success, consider the signal recovered.
225+
"""
226+
# find the first non-pending existing commit in the signal
227+
for commit in self.commits:
228+
# not all events are pending
229+
if commit.has_success or commit.has_failure:
230+
# If we found a commit that has resolved jobs, check if it has failed jobs
231+
return commit.has_success # recovered
232+
return False
233+
234+
def detect_flaky(self) -> bool:
235+
"""
236+
Checks whether signal is flaky, i.e. has both successful and failed events for any commit.
237+
238+
Notes:
239+
* false result can mean that there is not enough data to determine flakiness.
240+
* the definition of "flaky" here is somewhat broad, as it also includes intermittent infra issues.
241+
For the sake of simplicity we're leaning on the conservative side and discarding potentially
242+
intermittent outside issues as "flakiness".
243+
* while technically "flakiness" is not a property of the signal (it can be introduced or removed by changes),
244+
for simplicity we assume that flakiness stays constant within the limited time window we're considering,
245+
and we lean on the conservative side (discarding signal if we know it was flaky).
246+
that means that we will have some false negatives, but they will be very infrequent
247+
(need both conditions — recovery from flakiness + autorevert pattern within the same window)
248+
"""
249+
return any(commit.has_success and commit.has_failure for commit in self.commits)
250+
251+
def has_successes(self) -> bool:
252+
"""
253+
Checks if there is at least one successful event in the signal.
254+
"""
255+
return any(commit.has_success for commit in self.commits)
256+
257+
def partition_by_autorevert_pattern(self) -> Optional[PartitionedCommits]:
258+
"""
259+
Partition the most recent commit history into three lists:
260+
- Failed commits before the first potential breakage
261+
- Pending / missing signal commits in the middle (if any)
262+
- Successful commits after the breakage (up to the next breakage, if any)
263+
264+
Preserves the original order of commits (newest to oldest).
265+
266+
The useful invariant this establishes:
267+
- pending commits in the "failed" list are expected to resolve to failure
268+
- pending commits in the "successful" list are expected to resolve to success
269+
- pending commits in the "unknown" list could resolve either way
270+
- commits with the missing signal (that we need to trigger) would fall into the "unknown" list
271+
"""
272+
if len(self.commits) < 2:
273+
return None
274+
275+
failed = []
276+
successful = []
277+
278+
picking_failed = True # simple state machine
279+
280+
# first broadly partition into failed and successful
281+
for c in self.commits:
282+
if c.has_success:
283+
picking_failed = False
284+
elif c.has_failure and not picking_failed:
285+
# encountered a failure after the streak of successes
286+
# this indicates another older pattern which we don't care about
287+
break
288+
289+
if picking_failed:
290+
failed.append(c)
291+
else:
292+
successful.append(c)
293+
294+
# further partition failed into failed and unknown (pending/missing)
295+
unknown = []
296+
while failed and not failed[-1].has_failure:
297+
unknown.append(failed.pop())
298+
299+
unknown.reverse()
300+
301+
if not failed or not successful:
302+
return None
303+
304+
return PartitionedCommits(failed=failed, unknown=unknown, successful=successful)
305+
306+
def process_valid_autorevert_pattern(
307+
self,
308+
) -> Optional[Union[AutorevertPattern, RestartCommits]]:
309+
"""
310+
Detect valid autorevert pattern in the Signal.
311+
312+
Validates all invariants before checking for the pattern.
313+
314+
Returns:
315+
AutorevertPattern if a valid pattern is detected, None if no pattern is detected,
316+
or RestartCommit if the pattern is not confirmed but a restart is needed.
317+
"""
318+
if self.detect_flaky() or self.detect_fixed() or not self.has_successes():
319+
return None
320+
321+
partition = self.partition_by_autorevert_pattern()
322+
if partition is None:
323+
return None
324+
325+
restart_commits = set()
326+
327+
# close gaps in the signal (greedily for now)
328+
for c in partition.unknown:
329+
if not c.events:
330+
restart_commits.add(c.head_sha)
331+
332+
infra_check_result = partition.confirm_not_an_infra_issue()
333+
# note re: event_count < 2:
334+
# this is a confidence heuristic to detect flakiness, can adjust as needed
335+
if (
336+
infra_check_result == InfraCheckResult.RESTART_FAILURE
337+
or partition.failure_events_count() < 2
338+
):
339+
# restarting oldest failed
340+
restart_commits.add(partition.failed[-1].head_sha)
341+
elif (
342+
infra_check_result == InfraCheckResult.RESTART_SUCCESS
343+
or partition.success_events_count() < 2
344+
):
345+
# restarting newest successful
346+
restart_commits.add(partition.successful[0].head_sha)
347+
348+
if restart_commits:
349+
return RestartCommits(commit_shas=restart_commits)
350+
351+
if infra_check_result != InfraCheckResult.CONFIRMED:
352+
return None
353+
354+
if partition.unknown:
355+
# there are still pending/missing commits in the unknown partition
356+
return None
357+
358+
# all invariants validated, confirmed not infra, pattern exists
359+
# failed is newest -> older; the last element is the suspected commit
360+
suspected = partition.failed[-1]
361+
newer_failures = [c.head_sha for c in partition.failed[:-1]]
362+
return AutorevertPattern(
363+
workflow_name=self.workflow_name,
364+
newer_failing_commits=newer_failures,
365+
suspected_commit=suspected.head_sha,
366+
older_successful_commit=partition.successful[0].head_sha,
367+
)

0 commit comments

Comments
 (0)