Skip to content

Commit 85948fd

Browse files
authored
[autorevert] non nullable dates & dedup (#7167)
**Signal event deduplication and timestamp handling:** * Added a deduplication step in `SignalExtractor` to remove duplicate signal events within commits, based on identical `(started_at, wf_run_id)` pairs. This addresses issues with "rerun failed" jobs in GitHub workflows that reuse the same underlying job (but reports them with different job ids) * For test-track signals, extract start_date from the specific job that hosted the test (when available) * Changed all job and signal timestamp fields (`started_at`, `created_at`) to be non-optional and default
1 parent c5db0d8 commit 85948fd

File tree

7 files changed

+157
-16
lines changed

7 files changed

+157
-16
lines changed

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class JobMeta:
4949
job signals accounted for by per-test signals.
5050
"""
5151

52-
started_at: Optional[datetime] = None
52+
started_at: datetime = datetime.min
5353
is_pending: bool = False
5454
is_cancelled: bool = False
5555
has_failures: bool = False
@@ -164,9 +164,7 @@ def stats(self, key: KeyT) -> JobMeta:
164164
jrows = self._groups[key]
165165

166166
# Inline aggregations (only used here)
167-
started_at: Optional[datetime]
168-
times = [r.started_at for r in jrows if r.started_at is not None]
169-
started_at = min(times) if times else None
167+
started_at = min(r.started_at for r in jrows)
170168

171169
meta = JobMeta(
172170
started_at=started_at,

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ def __init__(
7171
name: str,
7272
status: SignalStatus,
7373
started_at: datetime,
74+
wf_run_id: int,
7475
ended_at: Optional[datetime] = None,
7576
):
7677
self.name = name
7778
self.status = status
7879
self.started_at = started_at
7980
self.ended_at = ended_at
81+
self.wf_run_id = wf_run_id
8082

8183
@property
8284
def is_pending(self) -> bool:
@@ -96,8 +98,10 @@ class SignalCommit:
9698

9799
def __init__(self, head_sha: str, events: List[SignalEvent]):
98100
self.head_sha = head_sha
99-
# enforce events ordered by time, oldest first
100-
self.events = sorted(events, key=lambda e: e.started_at) if events else []
101+
# enforce events ordered by time, then by wf_run_id (oldest first)
102+
self.events = (
103+
sorted(events, key=lambda e: (e.started_at, e.wf_run_id)) if events else []
104+
)
101105
# counts by status
102106
self.statuses = {}
103107
for e in self.events:

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from dataclasses import dataclass
1111
from datetime import datetime
12-
from typing import Dict, Iterable, List, Set, Tuple
12+
from typing import Dict, Iterable, List, Optional, Set, Tuple
1313

1414
from .job_agg_index import JobAggIndex, JobMeta, SignalStatus as AggStatus
1515
from .signal import Signal, SignalCommit, SignalEvent, SignalStatus
@@ -31,6 +31,7 @@
3131
class TestOutcome:
3232
failing: bool
3333
errored: bool
34+
started_at: datetime
3435

3536

3637
class SignalExtractor:
@@ -77,7 +78,34 @@ def extract(self) -> List[Signal]:
7778

7879
test_signals = self._build_test_signals(jobs, test_rows)
7980
job_signals = self._build_non_test_signals(jobs)
80-
return test_signals + job_signals
81+
# Deduplicate events within commits across all signals as a final step
82+
# GitHub-specific behavior like "rerun failed" can reuse job instances for reruns.
83+
# When that happens, the jobs have identical timestamps by DIFFERENT job ids.
84+
# But since they are still the same job logically, we want to deduplicate them
85+
# for the purposes of signal events.
86+
return self._dedup_signal_events(test_signals + job_signals)
87+
88+
# -----------------------------
89+
# Deduplication (GitHub-specific)
90+
# -----------------------------
91+
def _dedup_signal_events(self, signals: List[Signal]) -> List[Signal]:
92+
deduped: List[Signal] = []
93+
for s in signals:
94+
new_commits: List[SignalCommit] = []
95+
for c in s.commits:
96+
filtered: List[SignalEvent] = []
97+
prev_key: Optional[Tuple[datetime, int]] = None
98+
for e in c.events: # already sorted by (started_at, wf_run_id)
99+
key = (e.started_at, e.wf_run_id)
100+
if key == prev_key:
101+
continue
102+
filtered.append(e)
103+
prev_key = key
104+
new_commits.append(SignalCommit(head_sha=c.head_sha, events=filtered))
105+
deduped.append(
106+
Signal(key=s.key, workflow_name=s.workflow_name, commits=new_commits)
107+
)
108+
return deduped
81109

82110
# -----------------------------
83111
# Phase B — Tests (test_run_s3 only)
@@ -174,9 +202,13 @@ def _build_test_signals(
174202
tr.test_id,
175203
)
176204
prev = tests_by_group_attempt.get(key)
205+
started_at = min(
206+
(prev.started_at if prev else job.started_at), job.started_at
207+
)
177208
outcome = TestOutcome(
178209
failing=(prev.failing if prev else False) or bool(tr.failing),
179210
errored=(prev.errored if prev else False) or bool(tr.errored),
211+
started_at=started_at,
180212
)
181213
tests_by_group_attempt[key] = outcome
182214
if outcome.failing or outcome.errored:
@@ -226,8 +258,8 @@ def _build_test_signals(
226258
wf_run_id=wf_run_id,
227259
run_attempt=run_attempt,
228260
),
229-
"started_at": meta.started_at or datetime.min,
230261
"ended_at": None,
262+
"wf_run_id": int(wf_run_id),
231263
}
232264

233265
if verdict:
@@ -236,12 +268,17 @@ def _build_test_signals(
236268
status=SignalStatus.FAILURE
237269
if (verdict.failing or verdict.errored)
238270
else SignalStatus.SUCCESS,
271+
started_at=verdict.started_at,
239272
**event_common,
240273
)
241274
)
242275
elif meta.is_pending:
243276
events.append(
244-
SignalEvent(status=SignalStatus.PENDING, **event_common)
277+
SignalEvent(
278+
status=SignalStatus.PENDING,
279+
started_at=meta.started_at,
280+
**event_common,
281+
)
245282
)
246283
# else: missing (no event)
247284

@@ -330,8 +367,9 @@ def _build_non_test_signals(self, jobs: List[JobRow]) -> List[Signal]:
330367
run_attempt=run_attempt,
331368
),
332369
status=ev_status,
333-
started_at=meta.started_at or datetime.min,
370+
started_at=meta.started_at,
334371
ended_at=None,
372+
wf_run_id=int(wf_run_id),
335373
)
336374
)
337375

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ def fetch_jobs_for_workflows(
105105
created_at,
106106
rule,
107107
) in res.result_rows:
108+
# Guard against placeholder started_at by using the later of
109+
# started_at and created_at as the effective start.
110+
# Both columns are non-NULL in ClickHouse.
111+
effective_started = max(started_at, created_at)
108112
rows.append(
109113
JobRow(
110114
head_sha=Sha(head_sha),
@@ -115,7 +119,7 @@ def fetch_jobs_for_workflows(
115119
name=JobName(str(name or "")),
116120
status=str(status or ""),
117121
conclusion=str(conclusion or ""),
118-
started_at=started_at,
122+
started_at=effective_started,
119123
created_at=created_at,
120124
rule=str(rule or ""),
121125
)

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from functools import cached_property
7-
from typing import NewType, Optional, Set
7+
from typing import NewType, Set
88

99

1010
# Default classification rules that indicate test failures.
@@ -35,8 +35,8 @@ class JobRow:
3535
name: JobName
3636
status: str
3737
conclusion: str
38-
started_at: Optional[datetime]
39-
created_at: Optional[datetime]
38+
started_at: datetime
39+
created_at: datetime
4040
rule: str
4141

4242
@cached_property

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ def setUp(self) -> None:
2323
self.t0 = datetime(2025, 8, 19, 12, 0, 0)
2424

2525
def _ev(self, name: str, status: SignalStatus, minute: int) -> SignalEvent:
26-
return SignalEvent(name=name, status=status, started_at=ts(self.t0, minute))
26+
return SignalEvent(
27+
name=name,
28+
status=status,
29+
started_at=ts(self.t0, minute),
30+
wf_run_id=1,
31+
)
2732

2833
def test_detect_recovered_first_non_pending_success(self):
2934
# Newest commit has success (even with pending present) -> recovered
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import unittest
2+
from datetime import datetime, timedelta
3+
4+
from pytorch_auto_revert.signal import Signal, SignalCommit, SignalEvent, SignalStatus
5+
from pytorch_auto_revert.signal_extraction import SignalExtractor
6+
7+
8+
def ts(base: datetime, minutes: int) -> datetime:
9+
return base + timedelta(minutes=minutes)
10+
11+
12+
class TestSignalDedup(unittest.TestCase):
13+
def setUp(self) -> None:
14+
self.t0 = datetime(2025, 8, 20, 12, 0, 0)
15+
16+
def test_dedup_removes_adjacent_duplicates(self):
17+
# Two events with identical (started_at, wf_run_id) within a single commit
18+
e1 = SignalEvent(
19+
name="job-a",
20+
status=SignalStatus.FAILURE,
21+
started_at=ts(self.t0, 1),
22+
wf_run_id=100,
23+
)
24+
e2 = SignalEvent(
25+
name="job-b",
26+
status=SignalStatus.SUCCESS,
27+
started_at=ts(self.t0, 1),
28+
wf_run_id=100,
29+
)
30+
commit = SignalCommit(head_sha="sha", events=[e1, e2])
31+
s = Signal(key="k", workflow_name="wf", commits=[commit])
32+
33+
ex = SignalExtractor(workflows=["wf"], lookback_hours=24)
34+
out = ex._dedup_signal_events([s])
35+
self.assertEqual(len(out), 1)
36+
self.assertEqual(len(out[0].commits[0].events), 1)
37+
# keeps the first encountered event for that pair
38+
self.assertEqual(out[0].commits[0].events[0].name, "job-a")
39+
self.assertEqual(out[0].commits[0].events[0].status, SignalStatus.FAILURE)
40+
41+
def test_dedup_keeps_non_duplicates(self):
42+
e1 = SignalEvent(
43+
name="job-a",
44+
status=SignalStatus.FAILURE,
45+
started_at=ts(self.t0, 1),
46+
wf_run_id=1,
47+
)
48+
e2 = SignalEvent(
49+
name="job-b",
50+
status=SignalStatus.SUCCESS,
51+
started_at=ts(self.t0, 1),
52+
wf_run_id=2,
53+
)
54+
e3 = SignalEvent(
55+
name="job-c",
56+
status=SignalStatus.PENDING,
57+
started_at=ts(self.t0, 2),
58+
wf_run_id=1,
59+
)
60+
commit = SignalCommit(head_sha="sha", events=[e1, e2, e3])
61+
s = Signal(key="k", workflow_name="wf", commits=[commit])
62+
63+
ex = SignalExtractor(workflows=["wf"], lookback_hours=24)
64+
out = ex._dedup_signal_events([s])
65+
self.assertEqual(len(out[0].commits[0].events), 3)
66+
67+
def test_dedup_applies_per_commit(self):
68+
# Duplicates in different commits are not cross-deduped
69+
e1 = SignalEvent(
70+
name="job-a",
71+
status=SignalStatus.FAILURE,
72+
started_at=ts(self.t0, 1),
73+
wf_run_id=100,
74+
)
75+
e2 = SignalEvent(
76+
name="job-b",
77+
status=SignalStatus.SUCCESS,
78+
started_at=ts(self.t0, 1),
79+
wf_run_id=100,
80+
)
81+
c1 = SignalCommit(head_sha="sha1", events=[e1, e2])
82+
c2 = SignalCommit(head_sha="sha2", events=[e1, e2])
83+
s = Signal(key="k", workflow_name="wf", commits=[c1, c2])
84+
85+
ex = SignalExtractor(workflows=["wf"], lookback_hours=24)
86+
out = ex._dedup_signal_events([s])
87+
# Both commits should each have one event after dedup
88+
self.assertEqual([len(c.events) for c in out[0].commits], [1, 1])
89+
90+
91+
if __name__ == "__main__":
92+
unittest.main()

0 commit comments

Comments
 (0)