From c9808de858d3ce884b508bc43eefaf26d0b654db Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Mon, 10 Nov 2025 14:21:32 -0800 Subject: [PATCH 1/4] Revert "[autorevert] use 'tests.all_test_runs' instead of 'default.test_run_s3' for test signals (#7439)" This reverts commit bc5f54077d45218ee15ffcdf8fff8002efb5803a. --- .../pytorch-auto-revert/SIGNAL_EXTRACTION.md | 26 +++++++++---------- .../pytorch_auto_revert/signal_extraction.py | 12 ++++----- .../signal_extraction_datasource.py | 17 +++--------- .../signal_extraction_types.py | 2 +- .../tests/test_signal_extraction.py | 6 +---- 5 files changed, 24 insertions(+), 39 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md index 6ef7314397..bc9036df23 100644 --- a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md +++ b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md @@ -43,19 +43,19 @@ Notes - This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`). - Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`. -## Phase B — Test Details Fetch (batched, from `tests.all_test_runs`) +## Phase B — Test Details Fetch (batched, from `default.test_run_s3`) -Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`). For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `tests.all_test_runs` — this table contains one row per testcase and is populated earlier while jobs may still be running. +Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0). -Why `tests.all_test_runs`? -- We need per‑test identities to build per‑test Signals; `tests.all_test_runs` has them and is populated earlier than the final summary tables. Summary is optional and redundant for this layer. +Why `test_run_s3` only? +- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer. - Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A. Job selection for test track: - Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window. - Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits. -Optimized batched all_test_runs query (for N job_ids): +Optimized batched test_run_s3 query (for N job_ids): ``` SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name, @@ -63,13 +63,13 @@ SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name, max(error_count > 0) AS errored, max(rerun_count > 0) AS rerun_seen, count() AS rows -FROM tests.all_test_runs +FROM default.test_run_s3 WHERE job_id IN {job_ids:Array(Int64)} GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name ``` Notes -- Use `job_id IN (...)` to leverage the table’s primary key prefix on `job_id`. +- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`. - We keep `workflow_run_attempt` to distinguish attempts within the same workflow run. ## Mapping to Signals @@ -83,7 +83,7 @@ Notes Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting. ### Test‑track semantics -- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id. +- Source of truth for SUCCESS/FAILURE is `default.test_run_s3` per test id. - When a test row exists for an attempt: - Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist. - When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING. @@ -106,9 +106,9 @@ Event naming (for debuggability): ### Test‑track mapping - Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards: - - For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`: - - If rows exist for this `test_id` → status should reflect the found test verdict. - - If no rows exist and the group is still running (some jobs pending) → status = PENDING. + - For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`: + - If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict. + - If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING. - Else (no rows and group completed) → missing/unknown (no event emitted). - Event boundaries (naturally arise from grouping): - Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered). @@ -174,7 +174,7 @@ Notes 3) Implement selectors for test‑track pairs (Python filter on `rule`). 4) Implement batched Phase B queries: - Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes. - - call `tests.all_test_runs` to enumerate failing tests + - call `test_run_s3` to enumerate failing tests 5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified. 6) Add unit tests: - Test‑track: a) failure on one commit; b) success on another; c) unknown/gap. @@ -185,7 +185,7 @@ Notes - Keep the window small (16–32h) and deduplicate commits via push timestamps. - Limit the batched pairs size; chunk when necessary. -- Align filters with primary keys: `job_id` for `tests.all_test_runs`. +- Align filters with primary keys: `job_id` for `test_run_s3`. - Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches. ## Open Questions diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index 17052d4356..d36ec87a10 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -82,9 +82,7 @@ def extract(self) -> List[Signal]: # Select jobs to participate in test-track details fetch test_track_job_ids, failed_job_ids = self._select_test_track_job_ids(jobs) test_rows = self._datasource.fetch_tests_for_job_ids( - test_track_job_ids, - failed_job_ids=failed_job_ids, - lookback_hours=self.lookback_hours, + test_track_job_ids, failed_job_ids=failed_job_ids ) test_signals = self._build_test_signals(jobs, test_rows, commits) @@ -220,7 +218,7 @@ def _inject_pending_workflow_events( return out # ----------------------------- - # Phase B — Tests (tests.all_test_runs only) + # Phase B — Tests (test_run_s3 only) # ----------------------------- def _select_test_track_job_ids( self, jobs: List[JobRow] @@ -261,11 +259,11 @@ def _build_test_signals( ) -> List[Signal]: """Build per-test Signals across commits, scoped to job base. - We index `tests.all_test_runs` rows per (wf_run_id, run_attempt, job_base) and collect + We index `default.test_run_s3` rows per (wf_run_id, run_attempt, job_base) and collect which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base), we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in that base, we emit events per commit/attempt: - - If tests.all_test_runs rows exist → emit at most one FAILURE event if any failed runs exist, + - If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist, and at most one SUCCESS event if any successful runs exist (both may be present). - Else if group pending → PENDING - Else → no event (missing) @@ -297,7 +295,7 @@ def _build_test_signals( value_fn=lambda j: (j.wf_run_id, j.run_attempt), ) - # Index tests.all_test_runs rows per (commit, job_base, wf_run, attempt, test_id) + # Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id) # Store aggregated failure/success counts tests_by_group_attempt: Dict[ Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId], diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py index 6f895c5f23..b1333bd1b2 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py @@ -189,15 +189,12 @@ def fetch_tests_for_job_ids( job_ids: List[JobId], *, failed_job_ids: List[JobId], - lookback_hours: int, ) -> List[TestRow]: - """Batch fetch test verdict rows from tests.all_test_runs for given job ids. + """Batch fetch test verdict rows from default.test_run_s3 for given job ids. If failed_job_ids is provided, first compute the set of failed test identifiers (file+classname+name) from those jobs, and only fetch tests for job_ids that match that set. This reduces the result size significantly. - Additionally, constrain by the table's partition (toDate(time_inserted)) - using NOW() and the lookback window with a 1-day margin to avoid timezone issues. """ log = logging.getLogger(__name__) if not job_ids: @@ -227,31 +224,25 @@ def fetch_tests_for_job_ids( ) # One query with a CTE that enumerates failed test ids from failed_job_ids, # then filters the main selection by those ids for the current chunk. - # Partition pruning: restrict toDate(time_inserted) to the lookback window - # with a 1 day margin using NOW() to avoid timezone handling. + # Note: success_runs explicitly excludes skipped rows via skipped_count = 0. query = """ WITH failed_test_names AS ( SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id - FROM tests.all_test_runs + FROM default.test_run_s3 WHERE job_id IN {failed_job_ids:Array(Int64)} AND (failure_count > 0 OR error_count > 0) - AND toDate(time_inserted) >= - toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1)) ) SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name, countIf(failure_count > 0 OR error_count > 0) AS failure_runs, countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs - FROM tests.all_test_runs + FROM default.test_run_s3 WHERE job_id IN {job_ids:Array(Int64)} AND concat(file, '|', classname, '|', name) IN failed_test_names - AND toDate(time_inserted) >= - toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1)) GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name """ params = { "job_ids": [int(j) for j in chunk], "failed_job_ids": [int(j) for j in failed_job_ids], - "lookback_hours": int(lookback_hours), } for attempt in RetryWithBackoff(): diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py index 5d8340f8f4..169f59ba79 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py @@ -121,7 +121,7 @@ def is_test_failure(self) -> bool: return self.is_failure and (self.rule in DEFAULT_TEST_RULES) -# Represents a test verdict row from the tests.all_test_runs table in ClickHouse +# Represents a test verdict row from the test_run_s3 table in ClickHouse @dataclass(frozen=True) class TestRow: job_id: JobId diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index 4ba9e608bc..e1a76131b9 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -52,11 +52,7 @@ def fetch_jobs_for_workflows( return list(self._jobs) def fetch_tests_for_job_ids( - self, - job_ids: List[JobId], - *, - failed_job_ids: List[JobId], - lookback_hours: int = 24, + self, job_ids: List[JobId], *, failed_job_ids: List[JobId] ) -> List[TestRow]: ids = {int(j) for j in job_ids} return [r for r in self._tests if int(r.job_id) in ids] From 71b904f555d9f89042522b4c53d63e90a06ace4a Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Mon, 10 Nov 2025 14:22:15 -0800 Subject: [PATCH 2/4] Revert "[autorevert] add 'slow' workflow (#7411)" This reverts commit 0ffc3c195f1cea402e596a58f575525f577360fc. --- aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py index 5356731230..c27d71cc98 100755 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py @@ -60,7 +60,7 @@ def __init__(self): self.secret_store_name = os.environ.get("SECRET_STORE_NAME", "") self.workflows = os.environ.get( "WORKFLOWS", - ",".join(["Lint", "trunk", "pull", "inductor", "linux-aarch64", "slow"]), + ",".join(["Lint", "trunk", "pull", "inductor", "linux-aarch64"]), ).split(",") def to_autorevert_v2_params( From e4b2e2a9c11106183b4d4c5960034e6fa9c54823 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Mon, 10 Nov 2025 14:22:39 -0800 Subject: [PATCH 3/4] Revert "[autorevert] requires same revision retry failure for job-track signals (#7396)" This reverts commit db4893e17871ad0dd29f25b23275bde02953af04. --- .../pytorch_auto_revert/signal.py | 33 ----- .../pytorch_auto_revert/signal_extraction.py | 6 +- .../pytorch_auto_revert/tests/test_signal.py | 133 ------------------ 3 files changed, 1 insertion(+), 171 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py index cd67868680..46ddddbf3f 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py @@ -137,10 +137,6 @@ def has_success(self) -> bool: def has_failure(self) -> bool: return SignalStatus.FAILURE in self.statuses - def count_by_status(self, status: SignalStatus) -> int: - """Get the count of events with the specified status.""" - return self.statuses.get(status, 0) - def events_by_status(self, status: SignalStatus) -> List[SignalEvent]: """Get all events with the specified status.""" return [event for event in self.events if event.status == status] @@ -274,13 +270,6 @@ class InfraCheckResult(Enum): RESTART_FAILURE = "restart_failure" # no failure after any success -class SignalSource(Enum): - """Origin of a Signal: test-track or job-track.""" - - TEST = "test" - JOB = "job" - - class Signal: """A refined, column-like view of raw CI data for pattern detection. @@ -296,15 +285,12 @@ def __init__( workflow_name: str, commits: List[SignalCommit], job_base_name: Optional[str] = None, - source: SignalSource = SignalSource.TEST, ): self.key = key self.workflow_name = workflow_name # commits are ordered from newest to oldest self.commits = commits self.job_base_name = job_base_name - # Track the origin of the signal (test-track or job-track). - self.source = source def detect_fixed(self) -> bool: """ @@ -465,16 +451,6 @@ def process_valid_autorevert_pattern( ): restart_commits.add(partition.successful[0].head_sha) - # Job-track specific requirement: when there is no gap (unknown empty), - # require a failed rerun on the first failing commit to increase confidence. - if ( - not partition.unknown - and self.source == SignalSource.JOB - and not partition.failed[-1].has_pending - and len(partition.failed[-1].events) < 2 - ): - restart_commits.add(partition.failed[-1].head_sha) - if restart_commits: return RestartCommits(commit_shas=restart_commits) @@ -496,15 +472,6 @@ def process_valid_autorevert_pattern( f"not enough successes to make call: {partition.success_events_count()}", ) - if ( - self.source == SignalSource.JOB - and partition.failed[-1].count_by_status(SignalStatus.FAILURE) < 2 - ): - return Ineligible( - IneligibleReason.INSUFFICIENT_FAILURES, - "job-track signal requires at least 2 failures on the first failing commit", - ) - if partition.unknown: # there are still pending/missing commits in the unknown partition unknown_shas = ", ".join(c.head_sha for c in partition.unknown) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index d36ec87a10..22099b4fc6 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -12,7 +12,7 @@ from typing import Dict, Iterable, List, Optional, Set, Tuple from .job_agg_index import JobAggIndex, JobMeta, SignalStatus as AggStatus -from .signal import Signal, SignalCommit, SignalEvent, SignalSource, SignalStatus +from .signal import Signal, SignalCommit, SignalEvent, SignalStatus from .signal_extraction_datasource import SignalExtractionDatasource from .signal_extraction_types import ( JobBaseName, @@ -127,7 +127,6 @@ def _dedup_signal_events(self, signals: List[Signal]) -> List[Signal]: workflow_name=s.workflow_name, commits=new_commits, job_base_name=s.job_base_name, - source=s.source, ) ) return deduped @@ -212,7 +211,6 @@ def _inject_pending_workflow_events( workflow_name=s.workflow_name, commits=new_commits, job_base_name=s.job_base_name, - source=s.source, ) ) return out @@ -434,7 +432,6 @@ def _build_test_signals( workflow_name=wf_name, commits=commit_objs, job_base_name=str(job_base_name), - source=SignalSource.TEST, ) ) @@ -541,7 +538,6 @@ def _build_non_test_signals( workflow_name=wf_name, commits=commit_objs, job_base_name=str(base_name), - source=SignalSource.JOB, ) ) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal.py index 644f23e08f..38921486c6 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal.py @@ -10,7 +10,6 @@ Signal, SignalCommit, SignalEvent, - SignalSource, SignalStatus, ) @@ -440,138 +439,6 @@ def test_success_restart_even_when_failed_side_pending_and_insufficient_failures self.assertIn("sha_success_ok", res.commit_shas) self.assertNotIn("sha_fail_pend", res.commit_shas) - def test_job_track_requires_failed_rerun_when_no_gap_missing_rerun(self): - # Job-track: require a failed rerun on the suspected commit when there is no gap. - # Build commits newest -> older - c_fail_newest = SignalCommit( - head_sha="sha_fail_newest", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 7), - wf_run_id=100, - run_attempt=1, - ) - ], - ) - c_fail_new = SignalCommit( - head_sha="sha_fail_new", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 5), - wf_run_id=101, - run_attempt=1, - ) - ], - ) - # Suspected commit: first failure attempt=1, no rerun yet (missing failed rerun) - c_suspected = SignalCommit( - head_sha="sha_suspected", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 4), - wf_run_id=321, - run_attempt=1, - ), - ], - ) - # Base successful commit with two successes - c_base = SignalCommit( - head_sha="sha_base", - timestamp=ts(self.t0, 0), - events=[ - self._ev("job", SignalStatus.SUCCESS, 3), - self._ev("job", SignalStatus.SUCCESS, 6), - ], - ) - - s = Signal( - key="job", - workflow_name="wf", - commits=[c_fail_newest, c_fail_new, c_suspected, c_base], - source=SignalSource.JOB, - ) - res = s.process_valid_autorevert_pattern() - # Should not produce an AutorevertPattern; instead propose restart of suspected commit - self.assertNotIsInstance(res, AutorevertPattern) - self.assertTrue(hasattr(res, "commit_shas")) - self.assertIn("sha_suspected", res.commit_shas) - - def test_job_track_allows_autorevert_when_failed_rerun_present(self): - # Same as above, but suspected has a failed rerun (attempt 2) on the same wf_run_id. - c_fail_newest = SignalCommit( - head_sha="sha_fail_newest", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 7), - wf_run_id=100, - run_attempt=1, - ) - ], - ) - c_fail_new = SignalCommit( - head_sha="sha_fail_new", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 5), - wf_run_id=101, - run_attempt=1, - ) - ], - ) - # Suspected commit: failure attempt=1 then failure attempt=2 on same run id - c_suspected = SignalCommit( - head_sha="sha_suspected", - timestamp=ts(self.t0, 0), - events=[ - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 4), - wf_run_id=321, - run_attempt=1, - ), - SignalEvent( - name="job", - status=SignalStatus.FAILURE, - started_at=ts(self.t0, 6), - wf_run_id=321, - run_attempt=2, - ), - ], - ) - c_base = SignalCommit( - head_sha="sha_base", - timestamp=ts(self.t0, 0), - events=[ - self._ev("job", SignalStatus.SUCCESS, 3), - self._ev("job", SignalStatus.SUCCESS, 6), - ], - ) - - s = Signal( - key="job", - workflow_name="wf", - commits=[c_fail_newest, c_fail_new, c_suspected, c_base], - source=SignalSource.JOB, - ) - res = s.process_valid_autorevert_pattern() - self.assertIsInstance(res, AutorevertPattern) - if __name__ == "__main__": unittest.main() From 2e89f63d014efc18fe9c43e1c382ca3a2fa40c2d Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Mon, 10 Nov 2025 14:23:05 -0800 Subject: [PATCH 4/4] Revert "[autorevert] treat test failures as successes in job-track Signals (#7399)" This reverts commit 64988049dc7892ffd18cd64ef1a65e4debfa5842. --- .../pytorch-auto-revert/SIGNAL_EXTRACTION.md | 16 -------- .../pytorch_auto_revert/signal_extraction.py | 13 +++--- .../tests/test_signal_extraction.py | 40 ------------------- 3 files changed, 7 insertions(+), 62 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md index bc9036df23..65d07bf1f1 100644 --- a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md +++ b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md @@ -82,22 +82,6 @@ Notes - Each commit holds a list of `SignalEvent`s (time‑ordered by `started_at`). Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting. -### Test‑track semantics -- Source of truth for SUCCESS/FAILURE is `default.test_run_s3` per test id. -- When a test row exists for an attempt: - - Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist. -- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING. -- Otherwise (no test rows and not pending) → no event for that attempt. - -### Job‑track semantics (non‑test) -- Build per normalized job base across commits; aggregate shards by `(wf_run_id, run_attempt)`. -- Event mapping per attempt uses aggregated job meta with test‑failure filtering: - - FAILURE only when the attempt had non‑test failures (e.g. infra‑related). - - PENDING when the attempt is still running. - - SUCCESS otherwise, including when failures are exclusively test‑caused (these are handled by test‑track). -- Cancelled attempts are treated as missing (no event). -- Emit a job‑track Signal only when at least one attempt/commit shows a non‑test (infra) failure within the window. - Event naming (for debuggability): - Consistent key=value format: `wf= kind= id= run= attempt=` - Examples: diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index 22099b4fc6..21743edc06 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -492,15 +492,16 @@ def _build_non_test_signals( # Map aggregation verdict to outer SignalStatus if meta.status is None: continue - if meta.status == AggStatus.FAILURE and meta.has_non_test_failures: + if meta.status == AggStatus.FAILURE: # mark presence of non-test failures (relevant for job track) - has_relevant_failures = True + if meta.has_non_test_failures: + has_relevant_failures = True + ev_status = SignalStatus.FAILURE - elif meta.status == AggStatus.PENDING: - ev_status = SignalStatus.PENDING - else: - # Note: when all failures are caused by tests, we do NOT emit job-level failures + elif meta.status == AggStatus.SUCCESS: ev_status = SignalStatus.SUCCESS + else: + ev_status = SignalStatus.PENDING # Extract wf_run_id/run_attempt from the attempt key _, _, _, wf_run_id, run_attempt = akey diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index e1a76131b9..f2ad4e2f05 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -319,46 +319,6 @@ def test_non_test_inclusion_gate(self): self._find_job_signal(signals_b, "trunk", jobs_b[0].base_name) ) - def test_job_track_treats_test_failures_as_success(self): - # When a base has a non-test (infra) failure somewhere (so a job signal is emitted), - # attempts that fail due to tests should NOT appear as FAILURES in the job track. - # They should be treated as SUCCESS at the job-track level, leaving the failure to test-track. - jobs = [ - # Newer commit: infra-caused failure (non-test classification) - J( - sha="Z2", - run=9100, - job=801, - attempt=1, - started_at=ts(self.t0, 20), - conclusion="failure", - rule="infra", # non-test - ), - # Older commit: failure caused by tests (test classification) - J( - sha="Z1", - run=9000, - job=800, - attempt=1, - started_at=ts(self.t0, 10), - conclusion="failure", - rule="pytest failure", # test-caused - ), - ] - - signals = self._extract(jobs, tests=[]) - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) - self.assertIsNotNone(sig) - # Expect commits newest->older - self.assertEqual([c.head_sha for c in sig.commits], ["Z2", "Z1"]) - # Newer infra failure remains FAILURE - self.assertEqual(len(sig.commits[0].events), 1) - self.assertEqual(sig.commits[0].events[0].status, SignalStatus.FAILURE) - # Older test-caused failure is mapped to SUCCESS in job track - self.assertEqual(len(sig.commits[1].events), 1) - self.assertEqual(sig.commits[1].events[0].status, SignalStatus.SUCCESS) - def test_commits_without_jobs_are_included(self): # Verify that commits with no jobs at all are still included in signals # Simulate case where C2 has a failure, C3 has no jobs (e.g., periodic workflow),