Skip to content

Commit d43756c

Browse files
authored
Second cut at verifying team/bundle at dag parse time (apache#57166)
Verify team/bundle/executor at dag parse time Bundle path is already plumbed down to the dag bag for parsing, also pass along the bundle name which is used to lookup the bundle config and find the team name from bundle configuration. This is used to verify that tasks/dags are using executors that are available/configured for their team.
1 parent 9fa4462 commit d43756c

File tree

9 files changed

+319
-102
lines changed

9 files changed

+319
-102
lines changed

airflow-core/src/airflow/cli/commands/dag_command.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
375375

376376
for bundle in all_bundles:
377377
if bundle.name in bundles_to_search:
378-
dagbag = DagBag(bundle.path, bundle_path=bundle.path)
378+
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
379379
dagbag.collect_dags()
380380
dags_list.extend(list(dagbag.dags.values()))
381381
dagbag_import_errors += len(dagbag.import_errors)
@@ -472,7 +472,7 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None:
472472

473473
for bundle in all_bundles:
474474
if bundle.name in bundles_to_search:
475-
dagbag = DagBag(bundle.path, bundle_path=bundle.path)
475+
dagbag = DagBag(bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
476476
for filename, errors in dagbag.import_errors.items():
477477
data.append({"bundle_name": bundle.name, "filepath": filename, "error": errors})
478478
else:
@@ -524,7 +524,7 @@ def dag_report(args) -> None:
524524
if bundle.name not in bundles_to_reserialize:
525525
continue
526526
bundle.initialize()
527-
dagbag = DagBag(bundle.path, include_examples=False)
527+
dagbag = DagBag(bundle.path, bundle_name=bundle.name, include_examples=False)
528528
all_dagbag_stats.extend(dagbag.dagbag_stats)
529529

530530
AirflowConsole().print_as(
@@ -688,5 +688,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
688688
if bundle.name not in bundles_to_reserialize:
689689
continue
690690
bundle.initialize()
691-
dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False)
691+
dag_bag = DagBag(
692+
bundle.path, bundle_path=bundle.path, bundle_name=bundle.name, include_examples=False
693+
)
692694
sync_bag_to_db(dag_bag, bundle.name, bundle_version=bundle.get_current_version(), session=session)

airflow-core/src/airflow/dag_processing/dagbag.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,25 @@ def handle_timeout(signum, frame):
134134
signal.setitimer(signal.ITIMER_REAL, 0)
135135

136136

137-
def _validate_executor_fields(dag: DAG) -> None:
137+
def _executor_exists(executor_name: str, team_name: str | None) -> bool:
138+
"""Check if executor exists, with global fallback for teams."""
139+
try:
140+
# First pass check for team-specific executor or a global executor (i.e. team_name=None)
141+
ExecutorLoader.lookup_executor_name_by_str(executor_name, team_name=team_name)
142+
return True
143+
except UnknownExecutorException:
144+
if team_name:
145+
# If we had a team_name but didn't find an executor, check if there is a global executor that
146+
# satisfies the request.
147+
try:
148+
ExecutorLoader.lookup_executor_name_by_str(executor_name, team_name=None)
149+
return True
150+
except UnknownExecutorException:
151+
pass
152+
return False
153+
154+
155+
def _validate_executor_fields(dag: DAG, bundle_name: str | None = None) -> None:
138156
"""Validate that executors specified in tasks are available and owned by the same team as the dag bundle."""
139157
import logging
140158

@@ -144,32 +162,30 @@ def _validate_executor_fields(dag: DAG) -> None:
144162
# Check if multi team is available by reading the multi_team configuration (which is boolean)
145163
if conf.getboolean("core", "multi_team"):
146164
# Get team name from bundle configuration if available
147-
if hasattr(dag, "bundle_name") and dag.bundle_name:
165+
if bundle_name:
148166
from airflow.dag_processing.bundles.manager import DagBundlesManager
149167

150168
bundle_manager = DagBundlesManager()
151-
bundle_config = bundle_manager._bundle_config[dag.bundle_name]
169+
bundle_config = bundle_manager._bundle_config[bundle_name]
152170

153171
dag_team_name = bundle_config.team_name
154172
if dag_team_name:
155173
log.debug(
156-
"Found team '%s' for DAG '%s' via bundle '%s'", dag_team_name, dag.dag_id, dag.bundle_name
174+
"Found team '%s' for DAG '%s' via bundle '%s'", dag_team_name, dag.dag_id, bundle_name
157175
)
158176

159177
for task in dag.tasks:
160178
if not task.executor:
161179
continue
162-
try:
163-
# Validate that the executor exists and is available for the DAG's team
164-
ExecutorLoader.lookup_executor_name_by_str(task.executor, team_name=dag_team_name)
165-
except UnknownExecutorException:
180+
181+
if not _executor_exists(task.executor, dag_team_name):
166182
if dag_team_name:
167183
raise UnknownExecutorException(
168184
f"Task '{task.task_id}' specifies executor '{task.executor}', which is not available "
169-
f"for team '{dag_team_name}' (the team associated with DAG '{dag.dag_id}'). "
170-
f"Make sure '{task.executor}' is configured for team '{dag_team_name}' in your "
185+
f"for team '{dag_team_name}' (the team associated with DAG '{dag.dag_id}') or as a global executor. "
186+
f"Make sure '{task.executor}' is configured for team '{dag_team_name}' or globally in your "
171187
"[core] executors configuration, or update the task's executor to use one of the "
172-
f"configured executors for team '{dag_team_name}'."
188+
f"configured executors for team '{dag_team_name}' or available global executors."
173189
)
174190
raise UnknownExecutorException(
175191
f"Task '{task.task_id}' specifies executor '{task.executor}', which is not available. "
@@ -210,9 +226,11 @@ def __init__(
210226
collect_dags: bool = True,
211227
known_pools: set[str] | None = None,
212228
bundle_path: Path | None = None,
229+
bundle_name: str | None = None,
213230
):
214231
super().__init__()
215232
self.bundle_path = bundle_path
233+
self.bundle_name = bundle_name
216234
include_examples = (
217235
include_examples
218236
if isinstance(include_examples, bool)
@@ -528,7 +546,7 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
528546
dag.relative_fileloc = relative_fileloc
529547
try:
530548
dag.validate()
531-
_validate_executor_fields(dag)
549+
_validate_executor_fields(dag, self.bundle_name)
532550
self.bag_dag(dag=dag)
533551
except AirflowClusterPolicySkipDag:
534552
pass

airflow-core/src/airflow/dag_processing/manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,7 @@ def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess:
921921
id=id,
922922
path=dag_file.absolute_path,
923923
bundle_path=cast("Path", dag_file.bundle_path),
924+
bundle_name=dag_file.bundle_name,
924925
callbacks=callback_to_execute_for_file,
925926
selector=self.selector,
926927
logger=logger,

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ class DagFileParseRequest(BaseModel):
9595
bundle_path: Path
9696
"""Passing bundle path around lets us figure out relative file path."""
9797

98+
bundle_name: str
99+
"""Bundle name for team-specific executor validation."""
100+
98101
callback_requests: list[CallbackRequest] = Field(default_factory=list)
99102
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
100103

@@ -203,6 +206,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP
203206
bag = DagBag(
204207
dag_folder=msg.file,
205208
bundle_path=msg.bundle_path,
209+
bundle_name=msg.bundle_name,
206210
include_examples=False,
207211
load_op_links=False,
208212
)
@@ -493,6 +497,7 @@ def start( # type: ignore[override]
493497
*,
494498
path: str | os.PathLike[str],
495499
bundle_path: Path,
500+
bundle_name: str,
496501
callbacks: list[CallbackRequest],
497502
target: Callable[[], None] = _parse_file_entrypoint,
498503
client: Client,
@@ -504,18 +509,20 @@ def start( # type: ignore[override]
504509

505510
proc: Self = super().start(target=target, client=client, **kwargs)
506511
proc.had_callbacks = bool(callbacks) # Track if this process had callbacks
507-
proc._on_child_started(callbacks, path, bundle_path)
512+
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
508513
return proc
509514

510515
def _on_child_started(
511516
self,
512517
callbacks: list[CallbackRequest],
513518
path: str | os.PathLike[str],
514519
bundle_path: Path,
520+
bundle_name: str,
515521
) -> None:
516522
msg = DagFileParseRequest(
517523
file=os.fspath(path),
518524
bundle_path=bundle_path,
525+
bundle_name=bundle_name,
519526
callback_requests=callbacks,
520527
)
521528
self.send_msg(msg, request_id=0)

airflow-core/src/airflow/utils/cli.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
280280
bundle = manager.get_bundle(bundle_name)
281281
with _airflow_parsing_context_manager(dag_id=dag_id):
282282
dagbag = DagBag(
283-
dag_folder=dagfile_path or bundle.path, bundle_path=bundle.path, include_examples=False
283+
dag_folder=dagfile_path or bundle.path,
284+
bundle_path=bundle.path,
285+
bundle_name=bundle.name,
286+
include_examples=False,
284287
)
285288
if dag := dagbag.dags.get(dag_id):
286289
return dag
@@ -290,7 +293,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str, dagfile_path: str | N
290293
bundle.initialize()
291294
with _airflow_parsing_context_manager(dag_id=dag_id):
292295
dagbag = DagBag(
293-
dag_folder=dagfile_path or bundle.path, bundle_path=bundle.path, include_examples=False
296+
dag_folder=dagfile_path or bundle.path,
297+
bundle_path=bundle.path,
298+
bundle_name=bundle.name,
299+
include_examples=False,
294300
)
295301
sync_bag_to_db(dagbag, bundle.name, bundle.version)
296302
if dag := dagbag.dags.get(dag_id):
@@ -327,7 +333,7 @@ def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False, fr
327333
return [get_bagged_dag(bundle_names=bundle_names, dag_id=dag_id)]
328334

329335
def _find_dag(bundle):
330-
dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path)
336+
dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path, bundle_name=bundle.name)
331337
matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id, dag.dag_id)]
332338
return matched_dags
333339

airflow-core/tests/unit/cli/commands/test_dag_command.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,7 @@ def test_dag_test_with_bundle_name(self, mock_dagbag, configure_dag_bundles):
784784
mock_dagbag.assert_called_once_with(
785785
bundle_path=TEST_DAGS_FOLDER,
786786
dag_folder=TEST_DAGS_FOLDER,
787+
bundle_name="testing",
787788
include_examples=False,
788789
)
789790

@@ -805,6 +806,7 @@ def test_dag_test_with_dagfile_path(self, mock_dagbag, configure_dag_bundles):
805806
mock_dagbag.assert_called_once_with(
806807
bundle_path=TEST_DAGS_FOLDER,
807808
dag_folder=str(dag_file),
809+
bundle_name="testing",
808810
include_examples=False,
809811
)
810812

@@ -836,6 +838,7 @@ def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag, configure
836838
mock_dagbag.assert_called_once_with(
837839
bundle_path=TEST_DAGS_FOLDER,
838840
dag_folder=str(dag_file),
841+
bundle_name="testing",
839842
include_examples=False,
840843
)
841844

0 commit comments

Comments
 (0)