Skip to content

Commit 8926c4a

Browse files
authored
Merge branch 'main' into feat-49796-azure_virtual_machines_operator
2 parents 00ea70f + cd633b7 commit 8926c4a

File tree

99 files changed

+5067
-2596
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+5067
-2596
lines changed

PROVIDERS.rst

Lines changed: 60 additions & 457 deletions
Large diffs are not rendered by default.

airflow-core/src/airflow/api_fastapi/common/exceptions.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,26 @@ def exception_handler(self, request: Request, exc: IntegrityError):
7474
for tb in traceback.format_tb(exc.__traceback__):
7575
stacktrace += tb
7676

77-
log_message = f"Error with id {exception_id}\n{stacktrace}"
77+
log_message = f"Error with id {exception_id}, statement: {exc.statement}\n{stacktrace}"
7878
log.error(log_message)
7979
if conf.get("api", "expose_stacktrace") == "True":
8080
message = log_message
81+
statement = str(exc.statement)
82+
orig_error = str(exc.orig)
8183
else:
8284
message = (
8385
"Serious error when handling your request. Check logs for more details - "
8486
f"you will find it in api server when you look for ID {exception_id}"
8587
)
88+
statement = "hidden"
89+
orig_error = "hidden"
8690

8791
raise HTTPException(
8892
status_code=status.HTTP_409_CONFLICT,
8993
detail={
9094
"reason": "Unique constraint violation",
91-
"statement": str(exc.statement),
92-
"orig_error": str(exc.orig),
95+
"statement": statement,
96+
"orig_error": orig_error,
9397
"message": message,
9498
},
9599
)

airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from cadwyn import VersionedAPIRouter
3131
from fastapi import Body, HTTPException, Query, Security, status
3232
from pydantic import JsonValue
33-
from sqlalchemy import func, or_, tuple_, update
33+
from sqlalchemy import and_, func, or_, tuple_, update
3434
from sqlalchemy.engine import CursorResult
3535
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
3636
from sqlalchemy.orm import joinedload
@@ -64,6 +64,7 @@
6464
from airflow.models.asset import AssetActive
6565
from airflow.models.dag import DagModel
6666
from airflow.models.dagrun import DagRun as DR
67+
from airflow.models.log import Log
6768
from airflow.models.taskinstance import TaskInstance as TI, _stop_remaining_tasks
6869
from airflow.models.taskreschedule import TaskReschedule
6970
from airflow.models.trigger import Trigger
@@ -136,10 +137,14 @@ def ti_run(
136137
# This selects the raw JSON value, bypassing the deserialization -- we want that to happen on the
137138
# client
138139
column("next_kwargs", JSON),
140+
DR.logical_date,
141+
DagModel.owners,
139142
)
140143
.select_from(TI)
144+
.join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id))
145+
.join(DagModel, TI.dag_id == DagModel.dag_id)
141146
.where(TI.id == task_instance_id)
142-
.with_for_update()
147+
.with_for_update(of=TI)
143148
)
144149
try:
145150
ti = session.execute(old).one()
@@ -195,6 +200,19 @@ def ti_run(
195200
)
196201
else:
197202
log.info("Task started", previous_state=previous_state, hostname=ti_run_payload.hostname)
203+
session.add(
204+
Log(
205+
event=TaskInstanceState.RUNNING.value,
206+
task_id=ti.task_id,
207+
dag_id=ti.dag_id,
208+
run_id=ti.run_id,
209+
map_index=ti.map_index,
210+
try_number=ti.try_number,
211+
logical_date=ti.logical_date,
212+
owner=ti.owners,
213+
extra=json.dumps({"host_name": ti_run_payload.hostname}) if ti_run_payload.hostname else None,
214+
)
215+
)
198216
# Ensure there is no end date set.
199217
query = query.values(
200218
end_date=None,
@@ -297,16 +315,36 @@ def ti_update_state(
297315
log.debug("Updating task instance state", new_state=ti_patch_payload.state)
298316

299317
old = (
300-
select(TI.state, TI.try_number, TI.max_tries, TI.dag_id)
318+
select(
319+
TI.state,
320+
TI.try_number,
321+
TI.max_tries,
322+
TI.dag_id,
323+
TI.task_id,
324+
TI.run_id,
325+
TI.map_index,
326+
TI.hostname,
327+
DR.logical_date,
328+
DagModel.owners,
329+
)
330+
.select_from(TI)
331+
.join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id))
332+
.join(DagModel, TI.dag_id == DagModel.dag_id)
301333
.where(TI.id == task_instance_id)
302-
.with_for_update()
334+
.with_for_update(of=TI)
303335
)
304336
try:
305337
(
306338
previous_state,
307339
try_number,
308340
max_tries,
309341
dag_id,
342+
task_id,
343+
run_id,
344+
map_index,
345+
hostname,
346+
logical_date,
347+
owners,
310348
) = session.execute(old).one()
311349
log.debug(
312350
"Retrieved current task instance state",
@@ -373,6 +411,19 @@ def ti_update_state(
373411
new_state=updated_state,
374412
rows_affected=getattr(result, "rowcount", 0),
375413
)
414+
session.add(
415+
Log(
416+
event=updated_state.value,
417+
task_id=task_id,
418+
dag_id=dag_id,
419+
run_id=run_id,
420+
map_index=map_index,
421+
try_number=try_number,
422+
logical_date=logical_date,
423+
owner=owners,
424+
extra=json.dumps({"host_name": hostname}) if hostname else None,
425+
)
426+
)
376427
except SQLAlchemyError as e:
377428
log.error("Error updating Task Instance state", error=str(e))
378429
raise HTTPException(

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,9 +1414,21 @@ traces:
14141414
description: |
14151415
If True, then traces from Airflow internal methods are exported. Defaults to False.
14161416
version_added: 3.1.0
1417+
version_deprecated: 3.2.0
1418+
deprecation_reason: |
1419+
This parameter is no longer used.
14171420
type: string
14181421
example: ~
14191422
default: "False"
1423+
task_runner_flush_timeout_milliseconds:
1424+
description: |
1425+
Timeout in milliseconds to wait for the OpenTelemetry span exporter to flush pending spans
1426+
when a task runner process exits. If the exporter does not finish within this time, any
1427+
buffered spans may be dropped.
1428+
version_added: 3.2.0
1429+
type: integer
1430+
example: ~
1431+
default: "30000"
14201432
secrets:
14211433
description: ~
14221434
options:

airflow-core/src/airflow/executors/base_executor.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,11 @@
3232
from airflow.configuration import conf
3333
from airflow.executors import workloads
3434
from airflow.executors.executor_loader import ExecutorLoader
35-
from airflow.executors.workloads.task import TaskInstanceDTO
3635
from airflow.models import Log
3736
from airflow.models.callback import CallbackKey
3837
from airflow.observability.metrics import stats_utils
39-
from airflow.observability.trace import Trace
4038
from airflow.utils.log.logging_mixin import LoggingMixin
4139
from airflow.utils.state import TaskInstanceState
42-
from airflow.utils.thread_safe_dict import ThreadSafeDict
4340

4441
PARALLELISM: int = conf.getint("core", "PARALLELISM")
4542

@@ -143,8 +140,6 @@ class BaseExecutor(LoggingMixin):
143140
:param parallelism: how many jobs should run at one time.
144141
"""
145142

146-
active_spans = ThreadSafeDict()
147-
148143
supports_ad_hoc_ti_run: bool = False
149144
supports_callbacks: bool = False
150145
supports_multi_team: bool = False
@@ -217,10 +212,6 @@ def __repr__(self):
217212
_repr += ")"
218213
return _repr
219214

220-
@classmethod
221-
def set_active_spans(cls, active_spans: ThreadSafeDict):
222-
cls.active_spans = active_spans
223-
224215
def start(self): # pragma: no cover
225216
"""Executors may need to get things started."""
226217

@@ -340,17 +331,6 @@ def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks):
340331
queued_tasks_metric_name = self._get_metric_name("executor.queued_tasks")
341332
running_tasks_metric_name = self._get_metric_name("executor.running_tasks")
342333

343-
span = Trace.get_current_span()
344-
if span.is_recording():
345-
span.add_event(
346-
name="executor",
347-
attributes={
348-
open_slots_metric_name: open_slots,
349-
queued_tasks_metric_name: num_queued_tasks,
350-
running_tasks_metric_name: num_running_tasks,
351-
},
352-
)
353-
354334
self.log.debug("%s running task instances for executor %s", num_running_tasks, name)
355335
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
356336
if open_slots == 0:
@@ -415,30 +395,6 @@ def trigger_tasks(self, open_slots: int) -> None:
415395
if key in self.attempts:
416396
del self.attempts[key]
417397

418-
if isinstance(workload, workloads.ExecuteTask) and hasattr(workload, "ti"):
419-
ti = workload.ti
420-
421-
# If it's None, then the span for the current id hasn't been started.
422-
if self.active_spans is not None and self.active_spans.get("ti:" + str(ti.id)) is None:
423-
if isinstance(ti, TaskInstanceDTO):
424-
parent_context = Trace.extract(ti.parent_context_carrier)
425-
else:
426-
parent_context = Trace.extract(ti.dag_run.context_carrier)
427-
# Start a new span using the context from the parent.
428-
# Attributes will be set once the task has finished so that all
429-
# values will be available (end_time, duration, etc.).
430-
431-
span = Trace.start_child_span(
432-
span_name=f"{ti.task_id}",
433-
parent_context=parent_context,
434-
component="task",
435-
start_as_current=False,
436-
)
437-
self.active_spans.set("ti:" + str(ti.id), span)
438-
# Inject the current context into the carrier.
439-
carrier = Trace.inject()
440-
ti.context_carrier = carrier
441-
442398
workload_list.append(workload)
443399

444400
if workload_list:

airflow-core/src/airflow/executors/workloads/callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def make(
9090
name=dag_run.dag_model.bundle_name,
9191
version=dag_run.bundle_version,
9292
)
93-
fname = f"executor_callbacks/{callback.id}" # TODO: better log file template
93+
fname = f"executor_callbacks/{dag_run.dag_id}/{dag_run.run_id}/{callback.id}"
9494

9595
return cls(
9696
callback=CallbackDTO.model_validate(callback, from_attributes=True),

airflow-core/src/airflow/executors/workloads/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def make(
8686
from airflow.utils.helpers import log_filename_template_renderer
8787

8888
ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
89-
ser_ti.parent_context_carrier = ti.dag_run.context_carrier
89+
ser_ti.context_carrier = ti.dag_run.context_carrier
9090
if not bundle_info:
9191
bundle_info = BundleInfo(
9292
name=ti.dag_model.bundle_name,

0 commit comments

Comments
 (0)