Skip to content

Commit 87ba445

Browse files
authored
Use run_after for dag run delay metric (#59585)
I noticed what look like some obsolete todos re the schedule delay metrics. It seems we can use run_after which would be the more logical choice compared with the other questionable complicated logic.
1 parent 05a66e7 commit 87ba445

File tree

3 files changed

+3
-18
lines changed

3 files changed

+3
-18
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,14 +2037,7 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
20372037
and dag_run.triggered_by != DagRunTriggeredByType.ASSET
20382038
and dag_run.clear_number < 1
20392039
):
2040-
# TODO: Logically, this should be DagRunInfo.run_after, but the
2041-
# information is not stored on a DagRun, only before the actual
2042-
# execution on DagModel.next_dagrun_create_after. We should add
2043-
# a field on DagRun for this instead of relying on the run
2044-
# always happening immediately after the data interval.
2045-
# We only publish these metrics for scheduled dag runs and only
2046-
# when ``run_type`` is *MANUAL* and ``clear_number`` is 0.
2047-
expected_start_date = get_run_data_interval(dag.timetable, dag_run).end
2040+
expected_start_date = dag_run.run_after
20482041
schedule_delay = dag_run.start_date - expected_start_date
20492042
# Publish metrics twice with backward compatible name, and then with tags
20502043
Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)

airflow-core/src/airflow/models/dagrun.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,8 +1635,6 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
16351635
Note that the stat will only be emitted for scheduler-triggered DAG runs
16361636
(i.e. when ``run_type`` is *SCHEDULED* and ``clear_number`` is equal to 0).
16371637
"""
1638-
from airflow.models.dag import get_run_data_interval
1639-
16401638
if self.state == TaskInstanceState.RUNNING:
16411639
return
16421640
if self.run_type != DagRunType.SCHEDULED:
@@ -1658,13 +1656,7 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis: lis
16581656
except ValueError: # No start dates at all.
16591657
pass
16601658
else:
1661-
# TODO: Logically, this should be DagRunInfo.run_after, but the
1662-
# information is not stored on a DagRun, only before the actual
1663-
# execution on DagModel.next_dagrun_create_after. We should add
1664-
# a field on DagRun for this instead of relying on the run
1665-
# always happening immediately after the data interval.
1666-
data_interval_end = get_run_data_interval(dag.timetable, self).end
1667-
true_delay = first_start_date - data_interval_end
1659+
true_delay = first_start_date - self.run_after
16681660
if true_delay.total_seconds() > 0:
16691661
Stats.timing(
16701662
f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay, tags=self.stats_tags

airflow-core/tests/unit/models/test_dagrun.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1142,7 +1142,7 @@ def test_emit_scheduling_delay(self, session, schedule, expected, testing_dag_bu
11421142
metric_name = f"dagrun.{dag.dag_id}.first_task_scheduling_delay"
11431143

11441144
if expected:
1145-
true_delay = ti.start_date - dag_run.data_interval_end
1145+
true_delay = ti.start_date - dag_run.run_after
11461146
sched_delay_stat_call = call(metric_name, true_delay, tags=expected_stat_tags)
11471147
sched_delay_stat_call_with_tags = call(
11481148
"dagrun.first_task_scheduling_delay", true_delay, tags=expected_stat_tags

0 commit comments

Comments
 (0)