Skip to content

Commit a24e3ca

Browse files
authored
add dag_id filters (apache#47730)
1 parent c1af1f5 commit a24e3ca

File tree

1 file changed

+28
-6
lines changed
  • airflow/api_fastapi/core_api/routes/public

1 file changed

+28
-6
lines changed

airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
5555
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
5656
from airflow.api_fastapi.core_api.security import (
57+
ReadableDagsFilterDep,
5758
requires_access_asset,
5859
requires_access_asset_alias,
5960
requires_access_dag,
@@ -81,6 +82,7 @@ def _generate_queued_event_where_clause(
8182
asset_id: int | None = None,
8283
dag_id: str | None = None,
8384
before: datetime | str | None = None,
85+
permitted_dag_ids: set[str] | None = None,
8486
) -> list:
8587
"""Get AssetDagRunQueue where clause."""
8688
where_clause = []
@@ -90,6 +92,8 @@ def _generate_queued_event_where_clause(
9092
where_clause.append(AssetDagRunQueue.asset_id == asset_id)
9193
if before is not None:
9294
where_clause.append(AssetDagRunQueue.created_at < before)
95+
if permitted_dag_ids is not None:
96+
where_clause.append(AssetDagRunQueue.target_dag_id.in_(permitted_dag_ids))
9397
return where_clause
9498

9599

@@ -322,11 +326,14 @@ def materialize_asset(
322326
)
323327
def get_asset_queued_events(
324328
asset_id: int,
329+
readable_dags_filter: ReadableDagsFilterDep,
325330
session: SessionDep,
326331
before: OptionalDateTimeQuery = None,
327332
) -> QueuedEventCollectionResponse:
328333
"""Get queued asset events for an asset."""
329-
where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before)
334+
where_clause = _generate_queued_event_where_clause(
335+
asset_id=asset_id, before=before, permitted_dag_ids=readable_dags_filter.value
336+
)
330337
query = select(AssetDagRunQueue).where(*where_clause)
331338

332339
dag_asset_queued_events_select, total_entries = paginated_select(statement=query)
@@ -381,11 +388,14 @@ def get_asset(
381388
)
382389
def get_dag_asset_queued_events(
383390
dag_id: str,
391+
readable_dags_filter: ReadableDagsFilterDep,
384392
session: SessionDep,
385393
before: OptionalDateTimeQuery = None,
386394
) -> QueuedEventCollectionResponse:
387395
"""Get queued asset events for a DAG."""
388-
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
396+
where_clause = _generate_queued_event_where_clause(
397+
dag_id=dag_id, before=before, permitted_dag_ids=readable_dags_filter.value
398+
)
389399
query = select(AssetDagRunQueue).where(*where_clause)
390400

391401
dag_asset_queued_events_select, total_entries = paginated_select(statement=query)
@@ -412,11 +422,14 @@ def get_dag_asset_queued_events(
412422
def get_dag_asset_queued_event(
413423
dag_id: str,
414424
asset_id: int,
425+
readable_dags_filter: ReadableDagsFilterDep,
415426
session: SessionDep,
416427
before: OptionalDateTimeQuery = None,
417428
) -> QueuedEventResponse:
418429
"""Get a queued asset event for a DAG."""
419-
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, asset_id=asset_id, before=before)
430+
where_clause = _generate_queued_event_where_clause(
431+
dag_id=dag_id, asset_id=asset_id, before=before, permitted_dag_ids=readable_dags_filter.value
432+
)
420433
query = select(AssetDagRunQueue).where(*where_clause)
421434
adrq = session.scalar(query)
422435
if not adrq:
@@ -440,11 +453,14 @@ def get_dag_asset_queued_event(
440453
)
441454
def delete_asset_queued_events(
442455
asset_id: int,
456+
readable_dags_filter: ReadableDagsFilterDep,
443457
session: SessionDep,
444458
before: OptionalDateTimeQuery = None,
445459
):
446460
"""Delete queued asset events for an asset."""
447-
where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before)
461+
where_clause = _generate_queued_event_where_clause(
462+
asset_id=asset_id, before=before, permitted_dag_ids=readable_dags_filter.value
463+
)
448464
delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
449465
result = session.execute(delete_stmt)
450466
if result.rowcount == 0:
@@ -471,10 +487,13 @@ def delete_asset_queued_events(
471487
)
472488
def delete_dag_asset_queued_events(
473489
dag_id: str,
490+
readable_dags_filter: ReadableDagsFilterDep,
474491
session: SessionDep,
475492
before: OptionalDateTimeQuery = None,
476493
):
477-
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
494+
where_clause = _generate_queued_event_where_clause(
495+
dag_id=dag_id, before=before, permitted_dag_ids=readable_dags_filter.value
496+
)
478497

479498
delete_statement = delete(AssetDagRunQueue).where(*where_clause)
480499
result = session.execute(delete_statement)
@@ -501,11 +520,14 @@ def delete_dag_asset_queued_events(
501520
def delete_dag_asset_queued_event(
502521
dag_id: str,
503522
asset_id: int,
523+
readable_dags_filter: ReadableDagsFilterDep,
504524
session: SessionDep,
505525
before: OptionalDateTimeQuery = None,
506526
):
507527
"""Delete a queued asset event for a DAG."""
508-
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, asset_id=asset_id)
528+
where_clause = _generate_queued_event_where_clause(
529+
dag_id=dag_id, before=before, asset_id=asset_id, permitted_dag_ids=readable_dags_filter.value
530+
)
509531
delete_statement = (
510532
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
511533
)

0 commit comments

Comments
 (0)