Skip to content

Commit 97bb059

Browse files
AIP-65 Add DagVersion to TaskInstanceResponse Serializer (apache#46320)
* AIP-65 Add DagVersion to TaskInstanceResponse Serializer * Update following code review * Fix test
1 parent 7a718dd commit 97bb059

File tree

16 files changed

+354
-36
lines changed

16 files changed

+354
-36
lines changed

airflow/api_fastapi/common/parameters.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
TaskOutletAssetReference,
4949
)
5050
from airflow.models.dag import DagModel, DagTag
51+
from airflow.models.dag_version import DagVersion
5152
from airflow.models.dagrun import DagRun
5253
from airflow.models.pool import Pool
5354
from airflow.models.taskinstance import TaskInstance
@@ -635,6 +636,17 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
635636
QueryTITaskDisplayNamePatternSearch = Annotated[
636637
_SearchParam, Depends(search_param_factory(TaskInstance.task_display_name, "task_display_name_pattern"))
637638
]
639+
QueryTIDagVersionFilter = Annotated[
640+
FilterParam[list[int]],
641+
Depends(
642+
filter_param_factory(
643+
DagVersion.version_number,
644+
list[int],
645+
FilterOptionEnum.ANY_EQUAL,
646+
default_factory=list,
647+
)
648+
),
649+
]
638650

639651
# Assets
640652
QueryAssetNamePatternSearch = Annotated[
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from datetime import datetime
20+
from uuid import UUID
21+
22+
from airflow.api_fastapi.core_api.base import BaseModel
23+
24+
25+
class DagVersionResponse(BaseModel):
26+
"""Dag Version serializer for responses."""
27+
28+
id: UUID
29+
version_number: int
30+
dag_id: str
31+
bundle_name: str
32+
bundle_version: str | None
33+
created_at: datetime

airflow/api_fastapi/core_api/datamodels/task_instances.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
)
3434

3535
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
36+
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
3637
from airflow.api_fastapi.core_api.datamodels.job import JobResponse
3738
from airflow.api_fastapi.core_api.datamodels.trigger import TriggerResponse
3839
from airflow.utils.state import TaskInstanceState
@@ -76,6 +77,7 @@ class TaskInstanceResponse(BaseModel):
7677
)
7778
trigger: TriggerResponse | None
7879
queued_by_job: JobResponse | None = Field(alias="triggerer_job")
80+
dag_version: DagVersionResponse | None
7981

8082

8183
class TaskInstanceCollectionResponse(BaseModel):

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4842,6 +4842,14 @@ paths:
48424842
items:
48434843
type: string
48444844
title: Executor
4845+
- name: version_number
4846+
in: query
4847+
required: false
4848+
schema:
4849+
type: array
4850+
items:
4851+
type: integer
4852+
title: Version Number
48454853
- name: limit
48464854
in: query
48474855
required: false
@@ -5464,6 +5472,14 @@ paths:
54645472
items:
54655473
type: string
54665474
title: Executor
5475+
- name: version_number
5476+
in: query
5477+
required: false
5478+
schema:
5479+
type: array
5480+
items:
5481+
type: integer
5482+
title: Version Number
54675483
- name: limit
54685484
in: query
54695485
required: false
@@ -8649,6 +8665,40 @@ components:
86498665
- dag_id
86508666
title: DagTagResponse
86518667
description: DAG Tag serializer for responses.
8668+
DagVersionResponse:
8669+
properties:
8670+
id:
8671+
type: string
8672+
format: uuid
8673+
title: Id
8674+
version_number:
8675+
type: integer
8676+
title: Version Number
8677+
dag_id:
8678+
type: string
8679+
title: Dag Id
8680+
bundle_name:
8681+
type: string
8682+
title: Bundle Name
8683+
bundle_version:
8684+
anyOf:
8685+
- type: string
8686+
- type: 'null'
8687+
title: Bundle Version
8688+
created_at:
8689+
type: string
8690+
format: date-time
8691+
title: Created At
8692+
type: object
8693+
required:
8694+
- id
8695+
- version_number
8696+
- dag_id
8697+
- bundle_name
8698+
- bundle_version
8699+
- created_at
8700+
title: DagVersionResponse
8701+
description: Dag Version serializer for responses.
86528702
DagWarningType:
86538703
type: string
86548704
enum:
@@ -9900,6 +9950,10 @@ components:
99009950
anyOf:
99019951
- $ref: '#/components/schemas/JobResponse'
99029952
- type: 'null'
9953+
dag_version:
9954+
anyOf:
9955+
- $ref: '#/components/schemas/DagVersionResponse'
9956+
- type: 'null'
99039957
type: object
99049958
required:
99059959
- id
@@ -9931,6 +9985,7 @@ components:
99319985
- rendered_map_index
99329986
- trigger
99339987
- triggerer_job
9988+
- dag_version
99349989
title: TaskInstanceResponse
99359990
description: TaskInstance serializer for responses.
99369991
TaskInstanceState:

airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
OffsetFilter,
3636
QueryLimit,
3737
QueryOffset,
38+
QueryTIDagVersionFilter,
3839
QueryTIExecutorFilter,
3940
QueryTIPoolFilter,
4041
QueryTIQueueFilter,
@@ -87,6 +88,7 @@ def get_task_instance(
8788
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)
8889
.join(TI.dag_run)
8990
.options(joinedload(TI.rendered_task_instance_fields))
91+
.options(joinedload(TI.dag_version))
9092
)
9193
task_instance = session.scalar(query)
9294

@@ -121,6 +123,7 @@ def get_mapped_task_instances(
121123
pool: QueryTIPoolFilter,
122124
queue: QueryTIQueueFilter,
123125
executor: QueryTIExecutorFilter,
126+
version_number: QueryTIDagVersionFilter,
124127
limit: QueryLimit,
125128
offset: QueryOffset,
126129
order_by: Annotated[
@@ -156,6 +159,7 @@ def get_mapped_task_instances(
156159
select(TI)
157160
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id, TI.map_index >= 0)
158161
.join(TI.dag_run)
162+
.options(joinedload(TI.dag_version))
159163
)
160164
# 0 can mean a mapped TI that expanded to an empty list, so it is not an automatic 404
161165
unfiltered_total_count = get_query_count(query, session=session)
@@ -185,6 +189,7 @@ def get_mapped_task_instances(
185189
pool,
186190
queue,
187191
executor,
192+
version_number,
188193
],
189194
order_by=order_by,
190195
offset=offset,
@@ -330,6 +335,7 @@ def get_mapped_task_instance(
330335
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id, TI.map_index == map_index)
331336
.join(TI.dag_run)
332337
.options(joinedload(TI.rendered_task_instance_fields))
338+
.options(joinedload(TI.dag_version))
333339
)
334340
task_instance = session.scalar(query)
335341

@@ -361,6 +367,7 @@ def get_task_instances(
361367
pool: QueryTIPoolFilter,
362368
queue: QueryTIQueueFilter,
363369
executor: QueryTIExecutorFilter,
370+
version_number: QueryTIDagVersionFilter,
364371
limit: QueryLimit,
365372
offset: QueryOffset,
366373
order_by: Annotated[
@@ -397,7 +404,7 @@ def get_task_instances(
397404
This endpoint allows specifying `~` as the dag_id, dag_run_id to retrieve Task Instances for all DAGs
398405
and DAG runs.
399406
"""
400-
query = select(TI).join(TI.dag_run)
407+
query = select(TI).join(TI.dag_run).outerjoin(TI.dag_version).options(joinedload(TI.dag_version))
401408

402409
if dag_id != "~":
403410
dag = request.app.state.dag_bag.get_dag(dag_id)
@@ -428,6 +435,7 @@ def get_task_instances(
428435
executor,
429436
task_id,
430437
task_display_name_pattern,
438+
version_number,
431439
],
432440
order_by=order_by,
433441
offset=offset,

airflow/api_fastapi/core_api/routes/ui/structure.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def structure_data(
8989
start_edges: list[dict] = []
9090
end_edges: list[dict] = []
9191

92-
for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items():
92+
for dependency_dag_id, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
9393
for dependency in dependencies:
9494
# Dependencies not related to `dag_id` are ignored
9595
if dependency_dag_id != dag_id and dependency.target != dag_id:

airflow/ui/openapi-gen/queries/common.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = (
982982
taskId,
983983
updatedAtGte,
984984
updatedAtLte,
985+
versionNumber,
985986
}: {
986987
dagId: string;
987988
dagRunId: string;
@@ -1003,6 +1004,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = (
10031004
taskId: string;
10041005
updatedAtGte?: string;
10051006
updatedAtLte?: string;
1007+
versionNumber?: number[];
10061008
},
10071009
queryKey?: Array<unknown>,
10081010
) => [
@@ -1029,6 +1031,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = (
10291031
taskId,
10301032
updatedAtGte,
10311033
updatedAtLte,
1034+
versionNumber,
10321035
},
10331036
]),
10341037
];
@@ -1191,6 +1194,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = (
11911194
taskId,
11921195
updatedAtGte,
11931196
updatedAtLte,
1197+
versionNumber,
11941198
}: {
11951199
dagId: string;
11961200
dagRunId: string;
@@ -1213,6 +1217,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = (
12131217
taskId?: string;
12141218
updatedAtGte?: string;
12151219
updatedAtLte?: string;
1220+
versionNumber?: number[];
12161221
},
12171222
queryKey?: Array<unknown>,
12181223
) => [
@@ -1240,6 +1245,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = (
12401245
taskId,
12411246
updatedAtGte,
12421247
updatedAtLte,
1248+
versionNumber,
12431249
},
12441250
]),
12451251
];

airflow/ui/openapi-gen/queries/prefetch.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstance = (
13281328
* @param data.pool
13291329
* @param data.queue
13301330
* @param data.executor
1331+
* @param data.versionNumber
13311332
* @param data.limit
13321333
* @param data.offset
13331334
* @param data.orderBy
@@ -1357,6 +1358,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (
13571358
taskId,
13581359
updatedAtGte,
13591360
updatedAtLte,
1361+
versionNumber,
13601362
}: {
13611363
dagId: string;
13621364
dagRunId: string;
@@ -1378,6 +1380,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (
13781380
taskId: string;
13791381
updatedAtGte?: string;
13801382
updatedAtLte?: string;
1383+
versionNumber?: number[];
13811384
},
13821385
) =>
13831386
queryClient.prefetchQuery({
@@ -1402,6 +1405,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (
14021405
taskId,
14031406
updatedAtGte,
14041407
updatedAtLte,
1408+
versionNumber,
14051409
}),
14061410
queryFn: () =>
14071411
TaskInstanceService.getMappedTaskInstances({
@@ -1425,6 +1429,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (
14251429
taskId,
14261430
updatedAtGte,
14271431
updatedAtLte,
1432+
versionNumber,
14281433
}),
14291434
});
14301435
/**
@@ -1611,6 +1616,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (
16111616
* @param data.pool
16121617
* @param data.queue
16131618
* @param data.executor
1619+
* @param data.versionNumber
16141620
* @param data.limit
16151621
* @param data.offset
16161622
* @param data.orderBy
@@ -1641,6 +1647,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
16411647
taskId,
16421648
updatedAtGte,
16431649
updatedAtLte,
1650+
versionNumber,
16441651
}: {
16451652
dagId: string;
16461653
dagRunId: string;
@@ -1663,6 +1670,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
16631670
taskId?: string;
16641671
updatedAtGte?: string;
16651672
updatedAtLte?: string;
1673+
versionNumber?: number[];
16661674
},
16671675
) =>
16681676
queryClient.prefetchQuery({
@@ -1688,6 +1696,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
16881696
taskId,
16891697
updatedAtGte,
16901698
updatedAtLte,
1699+
versionNumber,
16911700
}),
16921701
queryFn: () =>
16931702
TaskInstanceService.getTaskInstances({
@@ -1712,6 +1721,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
17121721
taskId,
17131722
updatedAtGte,
17141723
updatedAtLte,
1724+
versionNumber,
17151725
}),
17161726
});
17171727
/**

0 commit comments

Comments
 (0)