Skip to content

Commit 77399b5

Browse files
vatsrahul1001nailo2c
authored andcommitted
AIP-84 Adding logging actions (apache#47556)
closes: apache#44057
1 parent 14a446e commit 77399b5

File tree

6 files changed

+39
-6
lines changed

6 files changed

+39
-6
lines changed

airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
DAGResponse,
5858
)
5959
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
60+
from airflow.api_fastapi.logging.decorators import action_logging
6061
from airflow.exceptions import AirflowException, DagNotFound
6162
from airflow.models import DAG, DagModel
6263
from airflow.models.dagrun import DagRun
@@ -207,6 +208,7 @@ def get_dag_details(dag_id: str, session: SessionDep, request: Request) -> DAGDe
207208
status.HTTP_404_NOT_FOUND,
208209
]
209210
),
211+
dependencies=[Depends(action_logging())],
210212
)
211213
def patch_dag(
212214
dag_id: str,
@@ -249,6 +251,7 @@ def patch_dag(
249251
status.HTTP_404_NOT_FOUND,
250252
]
251253
),
254+
dependencies=[Depends(action_logging())],
252255
)
253256
def patch_dags(
254257
patch_body: DAGPatchBody,
@@ -310,6 +313,7 @@ def patch_dags(
310313
status.HTTP_422_UNPROCESSABLE_ENTITY,
311314
]
312315
),
316+
dependencies=[Depends(action_logging())],
313317
)
314318
def delete_dag(
315319
dag_id: str,

airflow/api_fastapi/core_api/routes/public/pools.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
4343
from airflow.api_fastapi.core_api.security import requires_access_pool
4444
from airflow.api_fastapi.core_api.services.public.pools import BulkPoolService
45+
from airflow.api_fastapi.logging.decorators import action_logging
4546
from airflow.models.pool import Pool
4647

4748
pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
@@ -56,7 +57,7 @@
5657
status.HTTP_404_NOT_FOUND,
5758
]
5859
),
59-
dependencies=[Depends(requires_access_pool(method="DELETE"))],
60+
dependencies=[Depends(requires_access_pool(method="DELETE")), Depends(action_logging())],
6061
)
6162
def delete_pool(
6263
pool_name: str,
@@ -130,7 +131,7 @@ def get_pools(
130131
status.HTTP_404_NOT_FOUND,
131132
]
132133
),
133-
dependencies=[Depends(requires_access_pool(method="PUT"))],
134+
dependencies=[Depends(requires_access_pool(method="PUT")), Depends(action_logging())],
134135
)
135136
def patch_pool(
136137
pool_name: str,
@@ -182,7 +183,7 @@ def patch_pool(
182183
responses=create_openapi_http_exception_doc(
183184
[status.HTTP_409_CONFLICT]
184185
), # handled by global exception handler
185-
dependencies=[Depends(requires_access_pool(method="POST"))],
186+
dependencies=[Depends(requires_access_pool(method="POST")), Depends(action_logging())],
186187
)
187188
def post_pool(
188189
body: PoolBody,
@@ -196,7 +197,7 @@ def post_pool(
196197

197198
@pools_router.patch(
198199
"",
199-
dependencies=[Depends(requires_access_pool(method="PUT"))],
200+
dependencies=[Depends(requires_access_pool(method="PUT")), Depends(action_logging())],
200201
)
201202
def bulk_pools(
202203
request: BulkBody[PoolBody],

airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
TaskInstancesBatchBody,
6161
)
6262
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
63+
from airflow.api_fastapi.logging.decorators import action_logging
6364
from airflow.exceptions import TaskNotFound
6465
from airflow.models import Base, DagRun
6566
from airflow.models.dag import DAG
@@ -463,6 +464,7 @@ def get_task_instances(
463464
@task_instances_router.post(
464465
task_instances_prefix + "/list",
465466
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
467+
dependencies=[Depends(action_logging())],
466468
)
467469
def get_task_instances_batch(
468470
dag_id: Literal["~"],
@@ -601,6 +603,7 @@ def get_mapped_task_instance_try_details(
601603
@task_instances_router.post(
602604
"/clearTaskInstances",
603605
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
606+
dependencies=[Depends(action_logging())],
604607
)
605608
def post_clear_task_instances(
606609
dag_id: str,
@@ -805,12 +808,14 @@ def patch_task_instance_dry_run(
805808
responses=create_openapi_http_exception_doc(
806809
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
807810
),
811+
dependencies=[Depends(action_logging())],
808812
)
809813
@task_instances_router.patch(
810814
task_instances_prefix + "/{task_id}/{map_index}",
811815
responses=create_openapi_http_exception_doc(
812816
[status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT],
813817
),
818+
dependencies=[Depends(action_logging())],
814819
)
815820
def patch_task_instance(
816821
dag_id: str,

tests/api_fastapi/core_api/routes/public/test_dags.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from airflow.utils.types import DagRunTriggeredByType, DagRunType
3131

3232
from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
33+
from tests_common.test_utils.logs import check_last_log
3334

3435
pytestmark = pytest.mark.db_test
3536

@@ -257,14 +258,15 @@ class TestPatchDag(TestDagEndpoint):
257258
],
258259
)
259260
def test_patch_dag(
260-
self, test_client, query_params, dag_id, body, expected_status_code, expected_is_paused
261+
self, test_client, query_params, dag_id, body, expected_status_code, expected_is_paused, session
261262
):
262263
response = test_client.patch(f"/public/dags/{dag_id}", json=body, params=query_params)
263264

264265
assert response.status_code == expected_status_code
265266
if expected_status_code == 200:
266267
body = response.json()
267268
assert body["is_paused"] == expected_is_paused
269+
check_last_log(session, dag_id=dag_id, event="patch_dag", logical_date=None)
268270

269271

270272
class TestPatchDags(TestDagEndpoint):
@@ -312,7 +314,14 @@ class TestPatchDags(TestDagEndpoint):
312314
],
313315
)
314316
def test_patch_dags(
315-
self, test_client, query_params, body, expected_status_code, expected_ids, expected_paused_ids
317+
self,
318+
test_client,
319+
query_params,
320+
body,
321+
expected_status_code,
322+
expected_ids,
323+
expected_paused_ids,
324+
session,
316325
):
317326
response = test_client.patch("/public/dags", json=body, params=query_params)
318327

@@ -322,6 +331,7 @@ def test_patch_dags(
322331
assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
323332
paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if dag["is_paused"]]
324333
assert paused_dag_ids == expected_paused_ids
334+
check_last_log(session, dag_id=DAG1_ID, event="patch_dag", logical_date=None)
325335

326336

327337
class TestDagDetails(TestDagEndpoint):
@@ -496,6 +506,7 @@ def test_delete_dag(
496506
status_code_details,
497507
has_running_dagruns,
498508
is_create_dag,
509+
session,
499510
):
500511
if is_create_dag:
501512
self._create_dag_for_deletion(
@@ -510,3 +521,5 @@ def test_delete_dag(
510521

511522
details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details")
512523
assert details_response.status_code == status_code_details
524+
if details_response.status_code == 204:
525+
check_last_log(session, dag_id=dag_id, event="delete_dag", logical_date=None)

tests/api_fastapi/core_api/routes/public/test_pools.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from airflow.utils.session import provide_session
2323

2424
from tests_common.test_utils.db import clear_db_pools
25+
from tests_common.test_utils.logs import check_last_log
2526

2627
pytestmark = pytest.mark.db_test
2728

@@ -64,6 +65,7 @@ def test_delete_should_respond_204(self, test_client, session):
6465
assert response.status_code == 204
6566
pools = session.query(Pool).all()
6667
assert len(pools) == 2
68+
check_last_log(session, dag_id=None, event="delete_pool", logical_date=None)
6769

6870
def test_delete_should_respond_401(self, unauthenticated_test_client):
6971
response = unauthenticated_test_client.delete(f"/public/pools/{POOL1_NAME}")
@@ -300,6 +302,8 @@ def test_should_respond_200(
300302
del error["url"]
301303

302304
assert body == expected_response
305+
if response.status_code == 200:
306+
check_last_log(session, dag_id=None, event="patch_pool", logical_date=None)
303307

304308
def test_should_respond_401(self, unauthenticated_test_client):
305309
response = unauthenticated_test_client.patch(f"/public/pools/{POOL1_NAME}", params={}, json={})
@@ -356,6 +360,7 @@ def test_should_respond_200(self, test_client, session, body, expected_status_co
356360

357361
assert response.json() == expected_response
358362
assert session.query(Pool).count() == n_pools + 1
363+
check_last_log(session, dag_id=None, event="post_pool", logical_date=None)
359364

360365
def test_should_respond_401(self, unauthenticated_test_client):
361366
response = unauthenticated_test_client.post("/public/pools", json={})
@@ -751,6 +756,7 @@ def test_bulk_pools(self, test_client, actions, expected_results, session):
751756
response_data = response.json()
752757
for key, value in expected_results.items():
753758
assert response_data[key] == value
759+
check_last_log(session, dag_id=None, event="bulk_pools", logical_date=None)
754760

755761
def test_should_respond_401(self, unauthenticated_test_client):
756762
response = unauthenticated_test_client.patch("/public/pools", json={})

tests/api_fastapi/core_api/routes/public/test_task_instances.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
clear_db_runs,
4747
clear_rendered_ti_fields,
4848
)
49+
from tests_common.test_utils.logs import check_last_log
4950
from tests_common.test_utils.mock_operators import MockOperator
5051

5152
pytestmark = pytest.mark.db_test
@@ -1394,6 +1395,7 @@ def test_should_respond_200(
13941395
assert response.status_code == 200, body
13951396
assert expected_ti_count == body["total_entries"]
13961397
assert expected_ti_count == len(body["task_instances"])
1398+
check_last_log(session, dag_id="~", event="get_task_instances_batch", logical_date=None)
13971399

13981400
def test_should_respond_200_for_order_by(self, test_client, session):
13991401
dag_id = "example_python_operator"
@@ -2061,6 +2063,7 @@ def test_should_respond_200(
20612063
)
20622064
assert response.status_code == 200
20632065
assert response.json()["total_entries"] == expected_ti
2066+
check_last_log(session, dag_id=request_dag, event="post_clear_task_instances", logical_date=None)
20642067

20652068
@pytest.mark.parametrize("flag", ["include_future", "include_past"])
20662069
def test_dag_run_with_future_or_past_flag_returns_400(self, test_client, session, flag):
@@ -2990,6 +2993,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
29902993
state=self.NEW_STATE,
29912994
task_id=self.TASK_ID,
29922995
)
2996+
check_last_log(session, dag_id=self.DAG_ID, event="patch_task_instance", logical_date=None)
29932997

29942998
def test_should_update_task_instance_state(self, test_client, session):
29952999
self.create_task_instances(session)

0 commit comments

Comments
 (0)