diff --git a/airflow-core/newsfragments/67765.bugfix.rst b/airflow-core/newsfragments/67765.bugfix.rst new file mode 100644 index 0000000000000..1838418be90ee --- /dev/null +++ b/airflow-core/newsfragments/67765.bugfix.rst @@ -0,0 +1 @@ +Add the ``run_type`` tag to the ``dagrun.duration.failed`` metric emitted when a Dag run times out, making it consistent with the same metric emitted on normal Dag run completion. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 224659c4c4d99..abffc055503ed 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2457,7 +2457,9 @@ def _schedule_dag_run( stats.timing( "dagrun.duration.failed", duration, - tags={"dag_id": dag_run.dag_id}, + # Include ``run_type`` (via stats_tags) so the timeout path matches + # DagRun._emit_duration_stats_for_finished_state; see #64765. + tags=dag_run.stats_tags, ) return callback_to_execute diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 05803f5fd5be0..a4c7d38742413 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3835,6 +3835,45 @@ def test_dagrun_timeout_fails_run(self, dag_maker): session.rollback() session.close() + @mock.patch("airflow._shared.observability.metrics.stats._get_backend") + def test_dagrun_timeout_duration_metric_has_run_type(self, mock_get_backend, dag_maker): + """ + The ``dagrun.duration.failed`` metric emitted when a Dag run times out must carry the + ``run_type`` tag, matching the metric emitted on normal Dag run completion via + ``DagRun._emit_duration_stats_for_finished_state``. Regression test for #64765. + """ + mock_stats = mock.MagicMock(spec=StatsLogger) + mock_get_backend.return_value = mock_stats + + session = settings.Session() + with dag_maker( + dag_id="test_dagrun_timeout_duration_metric", + dagrun_timeout=datetime.timedelta(seconds=60), + session=session, + ): + EmptyOperator(task_id="dummy") + + dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - datetime.timedelta(days=1)) + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + self.job_runner._schedule_dag_run(dr, session) + session.flush() + + session.refresh(dr) + assert dr.state == State.FAILED + + # The timing call must include both dag_id and run_type (dag_run.stats_tags), not just dag_id. + mock_stats.timing.assert_any_call( + "dagrun.duration.failed", + mock.ANY, + tags={"dag_id": dr.dag_id, "run_type": dr.run_type}, + ) + + session.rollback() + session.close() + def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker): """ Test that dagrun timeout fails run and update the next dagrun