Skip to content

Commit 3b97c94

Browse files
authored
Fix dag processor crash by ignoring callbacks from other bundles (apache#57192)
When the dag Processor is made to run specific bundle, it previously attempted to process callback requests for bundles it does not own, leading to a StopIteration in render_log_filename due to missing bundle references. Now, fetch_callbacks only enqueues callbacks whose bundle_name matches one of the manager’s active bundles, leaving others in the DB for the correct processor to handle. closes: apache#57081
1 parent 0636439 commit 3b97c94

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

airflow-core/src/airflow/dag_processing/manager.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ def _fetch_callbacks(
451451

452452
callback_queue: list[CallbackRequest] = []
453453
with prohibit_commit(session) as guard:
454+
bundle_names = [bundle.name for bundle in self._dag_bundles]
454455
query: Select[tuple[DbCallbackRequest]] = select(DbCallbackRequest)
455456
query = query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
456457
self.max_callbacks_per_loop
@@ -461,8 +462,11 @@ def _fetch_callbacks(
461462
)
462463
callbacks = session.scalars(query)
463464
for callback in callbacks:
465+
req = callback.get_callback_request()
466+
if req.bundle_name not in bundle_names:
467+
continue
464468
try:
465-
callback_queue.append(callback.get_callback_request())
469+
callback_queue.append(req)
466470
session.delete(callback)
467471
except Exception as e:
468472
self.log.warning("Error adding callback for execution: %s, %s", callback, e)

airflow-core/tests/unit/dag_processing/test_manager.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,7 @@ def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
772772

773773
with configure_testing_dag_bundle(dag_filepath):
774774
manager = DagFileProcessorManager(max_runs=1)
775+
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
775776

776777
with create_session() as session:
777778
callbacks = manager._fetch_callbacks(session=session)
@@ -815,6 +816,51 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te
815816
manager.run()
816817
assert session.query(DbCallbackRequest).count() == 1
817818

819+
@conf_vars({("core", "load_examples"): "False"})
820+
def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle):
821+
"""Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted."""
822+
823+
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
824+
825+
# Create two callbacks: one for the active 'testing' bundle and one for a different bundle
826+
matching = DagCallbackRequest(
827+
dag_id="test_start_date_scheduling",
828+
bundle_name="testing",
829+
bundle_version=None,
830+
filepath="test_on_failure_callback_dag.py",
831+
is_failure_callback=True,
832+
run_id="match",
833+
)
834+
non_matching = DagCallbackRequest(
835+
dag_id="test_start_date_scheduling",
836+
bundle_name="other-bundle",
837+
bundle_version=None,
838+
filepath="test_on_failure_callback_dag.py",
839+
is_failure_callback=True,
840+
run_id="no-match",
841+
)
842+
843+
with create_session() as session:
844+
session.add(DbCallbackRequest(callback=matching, priority_weight=100))
845+
session.add(DbCallbackRequest(callback=non_matching, priority_weight=200))
846+
847+
with configure_testing_dag_bundle(dag_filepath):
848+
manager = DagFileProcessorManager(max_runs=1)
849+
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
850+
851+
with create_session() as session:
852+
callbacks = manager._fetch_callbacks(session=session)
853+
854+
# Only the matching callback should be returned
855+
assert [c.run_id for c in callbacks] == ["match"]
856+
857+
# The non-matching callback should remain in the DB
858+
remaining = session.query(DbCallbackRequest).all()
859+
assert len(remaining) == 1
860+
# Decode remaining request and verify it's for the other bundle
861+
remaining_req = remaining[0].get_callback_request()
862+
assert remaining_req.bundle_name == "other-bundle"
863+
818864
@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
819865
def test_callback_queue(self, mock_get_logger, configure_testing_dag_bundle):
820866
mock_logger = MagicMock()

0 commit comments

Comments
 (0)