Skip to content

Commit d93746b

Browse files
authored
Merge branch 'main' into add-workers-celery-k8s-container-lifecycle-hooks
2 parents 3c09b1c + 6d6f8bf commit d93746b

File tree

23 files changed

+648
-75
lines changed

23 files changed

+648
-75
lines changed

.github/CODEOWNERS

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,6 @@ Dockerfile.ci @potiuk @ashb @gopidesupavan @amoghrajesh @jscheffl @bugraoz93 @ka
149149

150150
# Shared Libraries
151151
/shared/ @ashb @amoghrajesh @potiuk
152+
153+
# RMs on release documents
154+
/dev/README_RELEASE_*.md @potiuk @jscheffl @vincbeck @shahar1 @jedcunningham @bugraoz93

airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from airflow._shared.timezones import timezone
2626
from airflow.api_fastapi.core_api.base import BaseModel
27+
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
2728
from airflow.utils.state import DagRunState
2829
from airflow.utils.types import DagRunType
2930

@@ -79,6 +80,7 @@ class GridRunsResponse(BaseModel):
7980
run_after: datetime
8081
state: DagRunState | None
8182
run_type: DagRunType
83+
dag_versions: list[DagVersionResponse] = []
8284
has_missed_deadline: bool
8385

8486
@computed_field

airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class LightGridTaskInstanceSummary(BaseModel):
3232
child_states: dict[TaskInstanceState | None, int] | None
3333
min_start_date: datetime | None
3434
max_end_date: datetime | None
35+
dag_version_number: int | None = None
3536

3637

3738
class GridTISummaries(BaseModel):

airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,12 @@ components:
23052305
- type: 'null'
23062306
run_type:
23072307
$ref: '#/components/schemas/DagRunType'
2308+
dag_versions:
2309+
items:
2310+
$ref: '#/components/schemas/DagVersionResponse'
2311+
type: array
2312+
title: Dag Versions
2313+
default: []
23082314
has_missed_deadline:
23092315
type: boolean
23102316
title: Has Missed Deadline
@@ -2575,6 +2581,11 @@ components:
25752581
format: date-time
25762582
- type: 'null'
25772583
title: Max End Date
2584+
dag_version_number:
2585+
anyOf:
2586+
- type: integer
2587+
- type: 'null'
2588+
title: Dag Version Number
25782589
type: object
25792590
required:
25802591
- task_id

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import structlog
2424
from fastapi import Depends, HTTPException, status
2525
from sqlalchemy import exists, select
26-
from sqlalchemy.orm import joinedload
26+
from sqlalchemy.orm import joinedload, load_only, selectinload
2727

2828
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
2929
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
@@ -58,11 +58,13 @@
5858
get_task_group_children_getter,
5959
task_group_to_dict_grid,
6060
)
61+
from airflow.models.dag import DagModel
6162
from airflow.models.dag_version import DagVersion
6263
from airflow.models.dagrun import DagRun
6364
from airflow.models.deadline import Deadline
6465
from airflow.models.serialized_dag import SerializedDagModel
6566
from airflow.models.taskinstance import TaskInstance
67+
from airflow.models.taskinstancehistory import TaskInstanceHistory
6668

6769
log = structlog.get_logger(logger_name=__name__)
6870
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -282,17 +284,33 @@ def get_grid_runs(
282284
.correlate(DagRun)
283285
.label("has_missed_deadline")
284286
)
285-
base_query = select(
286-
DagRun.dag_id,
287-
DagRun.run_id,
288-
DagRun.queued_at,
289-
DagRun.start_date,
290-
DagRun.end_date,
291-
DagRun.run_after,
292-
DagRun.state,
293-
DagRun.run_type,
294-
has_missed_deadline,
295-
).where(DagRun.dag_id == dag_id)
287+
base_query = (
288+
select(DagRun, has_missed_deadline)
289+
.where(DagRun.dag_id == dag_id)
290+
.options(
291+
load_only(
292+
DagRun.dag_id,
293+
DagRun.run_id,
294+
DagRun.queued_at,
295+
DagRun.start_date,
296+
DagRun.end_date,
297+
DagRun.run_after,
298+
DagRun.state,
299+
DagRun.run_type,
300+
DagRun.bundle_version,
301+
),
302+
joinedload(DagRun.dag_model).load_only(DagModel._dag_display_property_value),
303+
joinedload(DagRun.created_dag_version).joinedload(DagVersion.bundle),
304+
selectinload(DagRun.task_instances)
305+
.load_only(TaskInstance.dag_version_id)
306+
.joinedload(TaskInstance.dag_version)
307+
.joinedload(DagVersion.bundle),
308+
selectinload(DagRun.task_instances_histories)
309+
.load_only(TaskInstanceHistory.dag_version_id)
310+
.joinedload(TaskInstanceHistory.dag_version)
311+
.joinedload(DagVersion.bundle),
312+
)
313+
)
296314

297315
# This comparison is to fall back to DAG timetable when no order_by is provided
298316
if order_by.value == [order_by.get_primary_key_string()]:
@@ -309,8 +327,14 @@ def get_grid_runs(
309327
offset=offset,
310328
filters=[run_after, run_type, state, triggering_user],
311329
limit=limit,
330+
return_total_entries=False,
312331
)
313-
return [GridRunsResponse(**row._mapping) for row in session.execute(dag_runs_select_filter)]
332+
results = session.execute(dag_runs_select_filter).unique().all()
333+
grid_runs = []
334+
for run, has_missed in results:
335+
run.has_missed_deadline = has_missed
336+
grid_runs.append(GridRunsResponse.model_validate(run, from_attributes=True))
337+
return grid_runs
314338

315339

316340
@grid_router.get(
@@ -363,7 +387,9 @@ def get_grid_ti_summaries(
363387
TaskInstance.dag_version_id,
364388
TaskInstance.start_date,
365389
TaskInstance.end_date,
390+
DagVersion.version_number,
366391
)
392+
.outerjoin(DagVersion, TaskInstance.dag_version_id == DagVersion.id)
367393
.where(TaskInstance.dag_id == dag_id)
368394
.where(
369395
TaskInstance.run_id == run_id,
@@ -386,6 +412,7 @@ def get_grid_ti_summaries(
386412
"state": ti.state,
387413
"start_date": ti.start_date,
388414
"end_date": ti.end_date,
415+
"dag_version_number": ti.version_number,
389416
}
390417
)
391418
serdag = _get_serdag(

airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,18 @@ def _get_aggs_for_node(detail):
7272
max_end_date = max(x["end_date"] for x in detail if x["end_date"])
7373
except ValueError:
7474
max_end_date = None
75+
76+
dag_version_numbers = [
77+
x.get("dag_version_number") for x in detail if x.get("dag_version_number") is not None
78+
]
79+
dag_version_number = max(dag_version_numbers) if dag_version_numbers else None
80+
7581
return {
7682
"state": agg_state(states),
7783
"min_start_date": min_start_date,
7884
"max_end_date": max_end_date,
7985
"child_states": dict(Counter(states)),
86+
"dag_version_number": dag_version_number,
8087
}
8188

8289

airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8193,6 +8193,14 @@ export const $GridRunsResponse = {
81938193
run_type: {
81948194
'$ref': '#/components/schemas/DagRunType'
81958195
},
8196+
dag_versions: {
8197+
items: {
8198+
'$ref': '#/components/schemas/DagVersionResponse'
8199+
},
8200+
type: 'array',
8201+
title: 'Dag Versions',
8202+
default: []
8203+
},
81968204
has_missed_deadline: {
81978205
type: 'boolean',
81988206
title: 'Has Missed Deadline'
@@ -8308,6 +8316,17 @@ export const $LightGridTaskInstanceSummary = {
83088316
}
83098317
],
83108318
title: 'Max End Date'
8319+
},
8320+
dag_version_number: {
8321+
anyOf: [
8322+
{
8323+
type: 'integer'
8324+
},
8325+
{
8326+
type: 'null'
8327+
}
8328+
],
8329+
title: 'Dag Version Number'
83118330
}
83128331
},
83138332
type: 'object',

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2009,6 +2009,7 @@ export type GridRunsResponse = {
20092009
run_after: string;
20102010
state: DagRunState | null;
20112011
run_type: DagRunType;
2012+
dag_versions?: Array<DagVersionResponse>;
20122013
has_missed_deadline: boolean;
20132014
readonly duration: number;
20142015
};
@@ -2043,6 +2044,7 @@ export type LightGridTaskInstanceSummary = {
20432044
} | null;
20442045
min_start_date: string | null;
20452046
max_end_date: string | null;
2047+
dag_version_number?: number | null;
20462048
};
20472049

20482050
/**

airflow-core/src/airflow/ui/public/i18n/locales/en/dag.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@
121121
"graphDirection": {
122122
"label": "Graph Direction"
123123
},
124+
"showVersionIndicator": {
125+
"label": "Show Version Indicator",
126+
"options": {
127+
"hideAll": "Hide All",
128+
"showAll": "Show All",
129+
"showBundleVersion": "Show Bundle Version",
130+
"showDagVersion": "Show Dag Version"
131+
}
132+
},
124133
"taskStreamFilter": {
125134
"activeFilter": "Active filter",
126135
"clearFilter": "Clear Filter",

airflow-core/src/airflow/ui/src/constants/localStorage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export const CALENDAR_VIEW_MODE_KEY = "calendar-view-mode";
2626
export const LOG_WRAP_KEY = "log_wrap";
2727
export const LOG_SHOW_TIMESTAMP_KEY = "log_show_timestamp";
2828
export const LOG_SHOW_SOURCE_KEY = "log_show_source";
29+
export const VERSION_INDICATOR_DISPLAY_MODE_KEY = "version_indicator_display_mode";
2930

3031
// Dag-scoped keys
3132
export const dagViewKey = (dagId: string) => `dag_view-${dagId}`;

0 commit comments

Comments
 (0)