Skip to content

Commit d04f22e

Browse files
authored
Clarify a bit the code re updating next_dag run and next run after (#59724)
Currently it's a bit complicated and confusing. It somewhat remains so after this update, but hopefully a little improved. For example we have an argument "last automated dagrun" that takes not a dag run but a data interval. And, when we call it, in places in the scheduler, we may call it with a run that is not actually "the last automated dagrun" -- it could be any dag run the scheduler touches, which of course could be a very old dag run. I don't solve that problem here. But I do try to make things a little clearer and a little simpler at the scheduler call sites. In particular, there were two things that bothered me. 1. in the function `_should_update_dag_next_dagruns`, we did not only *check* whether we should update. In some branches we actually *did* the update then said "no, don't update." I pull this update higher up where it's more visible. 2. it's not obvious that the `calculate_dagrun_date_fields` function actually mutates the dag. It just says "calculate," right? So, to improve this slightly, and to reduce noise a bit at call site, I wrap this (along with the update mentioned above) in a function called `_update_next_dagrun_fields`. At least it's clear we are (at least probably) updating the dag fields. This has the side benefit bringing all invocations of `calculate_dagrun_date_fields` into one function in the scheduler (which is invoked in a few places). So hopefully modest clarity improvement, and hopefully no behavior change. Also -- discovered something very not obvious about this logic. If you passed dag run to it after dag run created, then it would never update the fields. but this is probably the most important time to do the update, so teh scheduler doesn't try to create the same dagrun again. So to prevent ogainst this confusing scenario, I just pull that logic out of this function and move it to the specific call sites where it is required -- only in the scheduling (not dagrun creation) context.
1 parent 4d5e108 commit d04f22e

File tree

2 files changed

+116
-75
lines changed

2 files changed

+116
-75
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 90 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
from airflow.executors.executor_utils import ExecutorName
112112
from airflow.models.taskinstance import TaskInstanceKey
113113
from airflow.serialization.definitions.dag import SerializedDAG
114+
from airflow.timetables.base import DataInterval
114115
from airflow.utils.sqlalchemy import CommitProhibitorGuard
115116

116117
TI = TaskInstance
@@ -1846,14 +1847,14 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
18461847
# https://github.com/apache/airflow/issues/59120
18471848
continue
18481849

1849-
if self._should_update_dag_next_dagruns(
1850-
serdag,
1851-
dag_model,
1852-
last_dag_run=None,
1853-
active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
1850+
self._update_next_dagrun_fields(
1851+
serdag=serdag,
1852+
dag_model=dag_model,
18541853
session=session,
1855-
):
1856-
dag_model.calculate_dagrun_date_fields(serdag, data_interval)
1854+
active_non_backfill_runs=active_runs_of_dags[serdag.dag_id],
1855+
data_interval=data_interval,
1856+
)
1857+
18571858
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
18581859
# memory for larger dags? or expunge_all()
18591860

@@ -1933,45 +1934,6 @@ def _create_dag_runs_asset_triggered(
19331934
dag_run.consumed_asset_events.extend(asset_events)
19341935
session.execute(delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id))
19351936

1936-
def _should_update_dag_next_dagruns(
1937-
self,
1938-
dag: SerializedDAG,
1939-
dag_model: DagModel,
1940-
*,
1941-
last_dag_run: DagRun | None = None,
1942-
active_non_backfill_runs: int | None = None,
1943-
session: Session,
1944-
) -> bool:
1945-
"""Check if the dag's next_dagruns_create_after should be updated."""
1946-
# If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run.
1947-
# In such case, schedule next only if last_dag_run is finished and was an automated run.
1948-
if last_dag_run and not (
1949-
last_dag_run.state in State.finished_dr_states and last_dag_run.run_type == DagRunType.SCHEDULED
1950-
):
1951-
return False
1952-
# If the DAG never schedules skip save runtime
1953-
if not dag.timetable.can_be_scheduled:
1954-
return False
1955-
1956-
if active_non_backfill_runs is None:
1957-
runs_dict = DagRun.active_runs_of_dags(
1958-
dag_ids=[dag.dag_id],
1959-
exclude_backfill=True,
1960-
session=session,
1961-
)
1962-
active_non_backfill_runs = runs_dict.get(dag.dag_id, 0)
1963-
1964-
if active_non_backfill_runs >= dag.max_active_runs:
1965-
self.log.info(
1966-
"DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
1967-
dag_model.dag_id,
1968-
active_non_backfill_runs,
1969-
dag.max_active_runs,
1970-
)
1971-
dag_model.next_dagrun_create_after = None
1972-
return False
1973-
return True
1974-
19751937
def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dict[int, Backfill]:
19761938
"""
19771939
Lock Backfill rows to prevent race conditions when multiple schedulers run concurrently.
@@ -2147,16 +2109,13 @@ def _schedule_dag_run(
21472109

21482110
dag = dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, session=session)
21492111
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
2150-
if dag_model is None:
2112+
if not dag_model:
21512113
self.log.error("Couldn't find DAG model %s in database!", dag_run.dag_id)
21522114
return callback
21532115

21542116
if not dag:
21552117
self.log.error("Couldn't find DAG %s in DAG bag!", dag_run.dag_id)
21562118
return callback
2157-
if not dag_model:
2158-
self.log.error("Couldn't find DAG model %s in database!", dag_run.dag_id)
2159-
return callback
21602119

21612120
if (
21622121
dag_run.start_date
@@ -2176,10 +2135,17 @@ def _schedule_dag_run(
21762135
session.flush()
21772136
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
21782137

2179-
if self._should_update_dag_next_dagruns(
2180-
dag, dag_model, last_dag_run=dag_run, session=session
2181-
):
2182-
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
2138+
# TODO: questionable that this logic does what it is trying to do
2139+
# I think its intent is, in part, to do this when it's the latest scheduled run
2140+
# but it does not know that it is the latest. I think it could probably check that
2141+
# logical date is equal to or greater than DagModel.next_dagrun, or something
2142+
if dag_run.state in State.finished_dr_states and dag_run.run_type == DagRunType.SCHEDULED:
2143+
self._update_next_dagrun_fields(
2144+
serdag=dag,
2145+
dag_model=dag_model,
2146+
session=session,
2147+
data_interval=get_run_data_interval(dag.timetable, dag_run),
2148+
)
21832149

21842150
dag_run_reloaded = session.scalar(
21852151
select(DagRun)
@@ -2252,8 +2218,18 @@ def _schedule_dag_run(
22522218
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
22532219
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
22542220

2255-
if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session):
2256-
dag_model.calculate_dagrun_date_fields(dag, get_run_data_interval(dag.timetable, dag_run))
2221+
# TODO: questionable that this logic does what it is trying to do
2222+
# I think its intent is, in part, to do this when it's the latest scheduled run
2223+
# but it does not know that it is the latest. I think it could probably check that
2224+
# logical date is equal to or greater than DagModel.next_dagrun, or something
2225+
if dag_run.state in State.finished_dr_states and dag_run.run_type == DagRunType.SCHEDULED:
2226+
self._update_next_dagrun_fields(
2227+
serdag=dag,
2228+
dag_model=dag_model,
2229+
session=session,
2230+
data_interval=get_run_data_interval(dag.timetable, dag_run),
2231+
)
2232+
22572233
# This will do one query per dag run. We "could" build up a complex
22582234
# query to update all the TIs across all the logical dates and dag
22592235
# IDs in a single query, but it turns out that can be _very very slow_
@@ -2270,6 +2246,47 @@ def _schedule_dag_run(
22702246

22712247
return callback_to_run
22722248

2249+
def _update_next_dagrun_fields(
2250+
self,
2251+
*,
2252+
serdag: SerializedDAG,
2253+
dag_model: DagModel,
2254+
session: Session,
2255+
active_non_backfill_runs: int | None = None,
2256+
data_interval: DataInterval | None,
2257+
):
2258+
"""
2259+
Conditionally update fields next_dagrun and next_dagrun_create_after on dag table.
2260+
2261+
If dag exceeds max active runs, set to None.
2262+
2263+
If dag's timetable not schedulable, don't update.
2264+
2265+
Otherwise, update via ``DagModel.calculate_dagrun_date_fields``.
2266+
"""
2267+
exceeds_max, active_runs = self._exceeds_max_active_runs(
2268+
dag_model=dag_model,
2269+
active_non_backfill_runs=active_non_backfill_runs,
2270+
session=session,
2271+
)
2272+
if exceeds_max:
2273+
self.log.info(
2274+
"Dag exceeds max_active_runs; not creating any more runs",
2275+
dag_id=dag_model.dag_id,
2276+
active_runs=active_runs,
2277+
max_active_runs=dag_model.max_active_runs,
2278+
)
2279+
# null out next_dagrun_create_after so scheduler will not examine this dag
2280+
# this is periodically reconsidered in the scheduler and dag processor.
2281+
dag_model.next_dagrun_create_after = None
2282+
return
2283+
2284+
# If the DAG never schedules skip save runtime
2285+
if not serdag.timetable.can_be_scheduled:
2286+
return
2287+
2288+
dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_dag_run=data_interval)
2289+
22732290
def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool:
22742291
"""
22752292
Only run DagRun.verify integrity if Serialized DAG has changed since it is slow.
@@ -3001,6 +3018,23 @@ def _try_to_load_executor(self, ti: TaskInstance, session, team_name=NOTSET) ->
30013018

30023019
return executor
30033020

3021+
def _exceeds_max_active_runs(
3022+
self,
3023+
*,
3024+
dag_model: DagModel,
3025+
active_non_backfill_runs: int | None = None,
3026+
session: Session,
3027+
):
3028+
if active_non_backfill_runs is None:
3029+
runs_dict = DagRun.active_runs_of_dags(
3030+
dag_ids=[dag_model.dag_id],
3031+
exclude_backfill=True,
3032+
session=session,
3033+
)
3034+
active_non_backfill_runs = runs_dict.get(dag_model.dag_id, 0)
3035+
exceeds = active_non_backfill_runs >= dag_model.max_active_runs
3036+
return exceeds, active_non_backfill_runs
3037+
30043038

30053039
# Backcompat for older versions of task sdk import SchedulerDagBag from here
30063040
SchedulerDagBag = DBDagBag

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
PartitionedAssetKeyLog,
6464
)
6565
from airflow.models.backfill import Backfill, _create_backfill
66-
from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval
66+
from airflow.models.dag import DagModel, get_last_dagrun, get_run_data_interval, infer_automated_data_interval
6767
from airflow.models.dag_version import DagVersion
6868
from airflow.models.dagbundle import DagBundleModel
6969
from airflow.models.dagrun import DagRun
@@ -4421,8 +4421,9 @@ def test_should_update_dag_next_dagruns(self, provide_run_count: bool, kwargs: d
44214421
EmptyOperator(task_id="dummy")
44224422

44234423
index = 0
4424+
dr: DagRun
44244425
for index in range(other_runs):
4425-
dag_maker.create_dagrun(
4426+
dr = dag_maker.create_dagrun(
44264427
run_id=f"run_{index}",
44274428
logical_date=(DEFAULT_DATE + timedelta(days=index)),
44284429
start_date=timezone.utcnow(),
@@ -4444,13 +4445,15 @@ def test_should_update_dag_next_dagruns(self, provide_run_count: bool, kwargs: d
44444445
scheduler_job = Job(executor=self.null_exec)
44454446
self.job_runner = SchedulerJobRunner(job=scheduler_job)
44464447

4447-
actual = self.job_runner._should_update_dag_next_dagruns(
4448-
dag=dag,
4449-
dag_model=dag_maker.dag_model,
4450-
active_non_backfill_runs=other_runs if provide_run_count else None, # exclude backfill here
4451-
session=session,
4452-
)
4453-
assert actual == should_update
4448+
with patch("airflow.models.dag.DagModel.calculate_dagrun_date_fields") as mock_calc:
4449+
self.job_runner._update_next_dagrun_fields(
4450+
serdag=dag,
4451+
dag_model=dag_maker.dag_model,
4452+
active_non_backfill_runs=other_runs if provide_run_count else None, # exclude backfill here
4453+
session=session,
4454+
data_interval=get_run_data_interval(dag.timetable, dr),
4455+
)
4456+
assert mock_calc.called == should_update
44544457

44554458
@pytest.mark.parametrize(
44564459
("run_type", "expected"),
@@ -4472,11 +4475,9 @@ def test_should_update_dag_next_dagruns_after_run_type(self, run_type, expected,
44724475
with dag_maker(
44734476
schedule="*/1 * * * *",
44744477
max_active_runs=3,
4475-
) as dag:
4478+
):
44764479
EmptyOperator(task_id="dummy")
44774480

4478-
dag_model = dag_maker.dag_model
4479-
44804481
run = dag_maker.create_dagrun(
44814482
run_id="run",
44824483
run_type=run_type,
@@ -4490,13 +4491,19 @@ def test_should_update_dag_next_dagruns_after_run_type(self, run_type, expected,
44904491
scheduler_job = Job(executor=self.null_exec)
44914492
self.job_runner = SchedulerJobRunner(job=scheduler_job)
44924493

4493-
actual = self.job_runner._should_update_dag_next_dagruns(
4494-
dag=dag,
4495-
dag_model=dag_model,
4496-
last_dag_run=run,
4497-
session=session,
4498-
)
4499-
assert actual == expected
4494+
# ensure terminal state; otherwise the run type is moot
4495+
run.state = DagRunState.FAILED
4496+
for ti in run.get_task_instances(session=session):
4497+
ti.state = "failed"
4498+
session.flush()
4499+
4500+
with patch("airflow.models.dag.DagModel.calculate_dagrun_date_fields") as mock_calc:
4501+
self.job_runner._schedule_dag_run(
4502+
dag_run=run,
4503+
session=session,
4504+
)
4505+
4506+
assert mock_calc.called == expected
45004507

45014508
def test_create_dag_runs(self, dag_maker):
45024509
"""

0 commit comments

Comments
 (0)