Skip to content

Commit 7edec78

Browse files
authored
Remove align param from iter dagrun infos (#61420)
This param does not appear to be needed. It's only set to False when called from within get_run_ids, within set_state, which is only called when marking tasks as failed etc, and only under certain conditions, when clearing "past" or "future" runs. I think it only would make a difference if the object run was manually triggered thus did not align with the timetable. We can handle that case by just including the object run in the returned run ids.
1 parent 5f609bd commit 7edec78

File tree

3 files changed

+5
-27
lines changed

3 files changed

+5
-27
lines changed

airflow-core/src/airflow/api/common/mark_tasks.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,12 @@ def get_run_ids(dag: SerializedDAG, run_id: str, future: bool, past: bool, sessi
185185
elif not dag.timetable.periodic:
186186
run_ids = [run_id]
187187
else:
188-
dates = [
188+
dates = {current_logical_date}
189+
dates.update(
189190
info.logical_date
190-
for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
191+
for info in dag.iter_dagrun_infos_between(start_date, end_date)
191192
if info.logical_date # todo: AIP-76 this will not find anything where logical date is null
192-
]
193+
)
193194
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, logical_date=dates, session=session)]
194195
return run_ids
195196

airflow-core/src/airflow/serialization/definitions/dag.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -434,26 +434,13 @@ def iter_dagrun_infos_between(
434434
self,
435435
earliest: datetime.datetime | None,
436436
latest: datetime.datetime,
437-
*,
438-
align: bool = True,
439437
) -> Iterable[DagRunInfo]:
440438
"""
441439
Yield DagRunInfo using this DAG's timetable between given interval.
442440
443441
DagRunInfo instances yielded if their ``logical_date`` is not earlier
444442
than ``earliest``, nor later than ``latest``. The instances are ordered
445443
by their ``logical_date`` from earliest to latest.
446-
447-
If ``align`` is ``False``, the first run will happen immediately on
448-
``earliest``, even if it does not fall on the logical timetable schedule.
449-
The default is ``True``.
450-
451-
Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
452-
``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
453-
``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
454-
if ``align=True``.
455-
456-
# see issue https://github.com/apache/airflow/issues/60455
457444
"""
458445
if isinstance(self.timetable, CronPartitionTimetable):
459446
# todo: AIP-76 need to update this so that it handles partitions
@@ -481,21 +468,12 @@ def iter_dagrun_infos_between(
481468
info = None
482469

483470
if info is None:
484-
# No runs to be scheduled between the user-supplied timeframe. But
485-
# if align=False, "invent" a data interval for the timeframe itself.
486-
if not align:
487-
yield DagRunInfo.interval(earliest, latest)
488471
return
489472

490473
if TYPE_CHECKING:
491474
# todo: AIP-76 after updating this function for partitions, this may not be true
492475
assert info.data_interval is not None
493476

494-
# If align=False and earliest does not fall on the timetable's logical
495-
# schedule, "invent" a data interval for it.
496-
if not align and info.logical_date != earliest:
497-
yield DagRunInfo.interval(earliest, info.data_interval.start)
498-
499477
# Generate naturally according to schedule.
500478
while info is not None:
501479
yield info

airflow-core/tests/unit/models/test_dag.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2747,7 +2747,6 @@ def test_iter_dagrun_infos_between(start_date, expected_infos):
27472747
iterator = create_scheduler_dag(dag).iter_dagrun_infos_between(
27482748
earliest=pendulum.instance(start_date),
27492749
latest=pendulum.instance(DEFAULT_DATE),
2750-
align=True,
27512750
)
27522751
assert expected_infos == list(iterator)
27532752

@@ -2784,7 +2783,7 @@ def _get_registered_timetable(s):
27842783
):
27852784
scheduler_dag = create_scheduler_dag(dag)
27862785

2787-
iterator = scheduler_dag.iter_dagrun_infos_between(earliest=start, latest=end, align=True)
2786+
iterator = scheduler_dag.iter_dagrun_infos_between(earliest=start, latest=end)
27882787
with caplog.at_level(logging.ERROR):
27892788
infos = list(iterator)
27902789

0 commit comments

Comments
 (0)