Skip to content

Commit 5903a75

Browse files
authored
[autorevert] Inject synthetic PENDING events for pending workflows in signal extraction (#7272)
Summary: - Add a post‑processing step in signal extraction that injects synthetic PENDING events for pending workflows when the workflow haven't (yet) produced more precise events for this (signal, commit) - Implemented via a simple pass over JobRows keyed by (sha, workflow) → pending wf_run_ids. - Insert synthetic events only when a pending wf_run_id exists and **the signal has no existing event for that run.** - Use started_at = now + 1 minute Rationale: - Multi‑stage workflows (build → test) delay job scheduling, leaving signals “empty” for active runs. - Choosing “now + 1 minute” ensures synthetic events are in the future (good proxy for the actual job start time) - This is especially useful for bisection to avoid duplication of the already scheduled workflows (CH deduplication is not enough when the job takes 30+ minutes to appear) ### Testing e.g. see pending events in the right column here: [2025-10-01T22-31-22.115571-00-00.html](https://github.com/user-attachments/files/22650153/2025-10-01T22-31-22.115571-00-00.html)
1 parent e8dfdb6 commit 5903a75

File tree

2 files changed

+146
-2
lines changed

2 files changed

+146
-2
lines changed

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"""
99

1010
from dataclasses import dataclass
11-
from datetime import datetime
11+
from datetime import datetime, timedelta, timezone
1212
from typing import Dict, Iterable, List, Optional, Set, Tuple
1313

1414
from .job_agg_index import JobAggIndex, JobMeta, SignalStatus as AggStatus
@@ -92,7 +92,11 @@ def extract(self) -> List[Signal]:
9292
# When that happens, the jobs have identical timestamps by DIFFERENT job ids.
9393
# But since they are still the same job logically, we want to deduplicate them
9494
# for the purposes of signal events.
95-
return self._dedup_signal_events(test_signals + job_signals)
95+
signals = self._dedup_signal_events(test_signals + job_signals)
96+
97+
# Inject synthetic PENDING events for workflow runs that are known to be
98+
# pending but have no events in a given signal (e.g. multi-stage workflows).
99+
return self._inject_pending_workflow_events(signals, jobs)
96100

97101
# -----------------------------
98102
# Deduplication (GitHub-specific)
@@ -125,6 +129,90 @@ def _dedup_signal_events(self, signals: List[Signal]) -> List[Signal]:
125129
)
126130
return deduped
127131

132+
# -----------------------------
133+
# Pending workflow synthesis
134+
# -----------------------------
135+
def _inject_pending_workflow_events(
136+
self,
137+
signals: List[Signal],
138+
jobs: List[JobRow],
139+
) -> List[Signal]:
140+
"""
141+
For each signal/commit, if there exists a pending workflow run and the
142+
signal has no event for that wf_run_id, insert a synthetic PENDING event
143+
with started_at set slightly in the future (now + 1 minute).
144+
"""
145+
if not signals or not jobs:
146+
return signals
147+
148+
# Simple pass over JobRows to collect pending workflow run ids per (sha, workflow)
149+
pending_runs: Dict[Tuple[Sha, WorkflowName], Set[int]] = {}
150+
for j in jobs:
151+
if j.is_pending:
152+
pending_runs.setdefault((j.head_sha, j.workflow_name), set()).add(
153+
int(j.wf_run_id)
154+
)
155+
156+
# Avoid deprecated utcnow(); derive UTC then store naive to match existing code
157+
now_plus = (datetime.now(timezone.utc) + timedelta(minutes=1)).replace(
158+
tzinfo=None
159+
)
160+
161+
out: List[Signal] = []
162+
for s in signals:
163+
new_commits: List[SignalCommit] = []
164+
for c in s.commits:
165+
pending_ids = pending_runs.get(
166+
(Sha(c.head_sha), WorkflowName(s.workflow_name))
167+
)
168+
if not pending_ids:
169+
new_commits.append(c)
170+
continue
171+
172+
have_ids = {e.wf_run_id for e in c.events}
173+
missing_ids = pending_ids - have_ids
174+
if not missing_ids:
175+
new_commits.append(c)
176+
continue
177+
178+
# Build synthetic pending events for the missing wf_run_ids
179+
# set started_at to the future
180+
synth_events: List[SignalEvent] = list(c.events)
181+
for wf_run_id in missing_ids:
182+
name = self._fmt_event_name(
183+
workflow=s.workflow_name,
184+
kind="synthetic",
185+
identifier=str(s.key),
186+
wf_run_id=WfRunId(wf_run_id),
187+
run_attempt=RunAttempt(0),
188+
)
189+
synth_events.append(
190+
SignalEvent(
191+
name=name,
192+
status=SignalStatus.PENDING,
193+
started_at=now_plus,
194+
ended_at=None,
195+
wf_run_id=int(wf_run_id),
196+
run_attempt=0,
197+
job_id=None,
198+
)
199+
)
200+
new_commits.append(
201+
SignalCommit(
202+
head_sha=c.head_sha, timestamp=c.timestamp, events=synth_events
203+
)
204+
)
205+
206+
out.append(
207+
Signal(
208+
key=s.key,
209+
workflow_name=s.workflow_name,
210+
commits=new_commits,
211+
job_base_name=s.job_base_name,
212+
)
213+
)
214+
return out
215+
128216
# -----------------------------
129217
# Phase B — Tests (test_run_s3 only)
130218
# -----------------------------

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,62 @@ def test_test_track_mapping_failure_then_success(self):
386386
self.assertEqual(test_sig.commits[0].events[0].status, SignalStatus.FAILURE)
387387
self.assertEqual(test_sig.commits[1].events[0].status, SignalStatus.SUCCESS)
388388

389+
def test_inject_pending_workflow_event_when_missing_in_signal(self):
390+
# Multi-stage workflow: newest commit has a pending workflow run (build stage),
391+
# tests not yet scheduled -> no events for that wf_run_id in the test signal.
392+
# Older commit has a test failure so the test signal exists.
393+
jobs = [
394+
# Newest commit: pending build job under wf_run_id=200
395+
J(
396+
sha="H2",
397+
wf="trunk",
398+
run=200,
399+
job=901,
400+
attempt=1,
401+
name="linux-build",
402+
status="in_progress",
403+
conclusion="",
404+
started_at=ts(self.t0, 20),
405+
),
406+
# Older commit: test job that failed with a concrete test verdict
407+
J(
408+
sha="H1",
409+
wf="trunk",
410+
run=190,
411+
job=902,
412+
attempt=1,
413+
name="linux-test",
414+
status="completed",
415+
conclusion="failure",
416+
started_at=ts(self.t0, 10),
417+
rule="pytest failure",
418+
),
419+
]
420+
tests = [
421+
T(
422+
job=902,
423+
run=190,
424+
attempt=1,
425+
file="m.py",
426+
name="test_synthetic_pending",
427+
failing=1,
428+
)
429+
]
430+
431+
signals = self._extract(jobs, tests)
432+
test_sig = self._find_test_signal(
433+
signals, "trunk", "m.py::test_synthetic_pending"
434+
)
435+
self.assertIsNotNone(test_sig)
436+
# Expect two commits in newest->older order
437+
self.assertEqual([c.head_sha for c in test_sig.commits], ["H2", "H1"])
438+
439+
# For the newest commit (H2): we should have a synthetic pending event for wf_run_id=200
440+
c_new = test_sig.commits[0]
441+
self.assertEqual(len(c_new.events), 1)
442+
self.assertEqual(c_new.events[0].status, SignalStatus.PENDING)
443+
self.assertEqual(c_new.events[0].wf_run_id, 200)
444+
389445

390446
if __name__ == "__main__":
391447
unittest.main()

0 commit comments

Comments
 (0)