Skip to content
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def _trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
note: str | None = None,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
Expand Down Expand Up @@ -118,6 +119,7 @@ def _trigger_dag(
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
note=note,
state=DagRunState.QUEUED,
partition_key=partition_key,
session=session,
Expand All @@ -137,6 +139,7 @@ def trigger_dag(
conf: dict | str | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
note: str | None = None,
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | None:
Expand Down Expand Up @@ -169,6 +172,7 @@ def trigger_dag(
replace_microseconds=replace_microseconds,
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
note=note,
partition_key=partition_key,
session=session,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class TriggerDAGRunPayload(StrictBaseModel):
conf: dict = Field(default_factory=dict)
reset_dag_run: bool = False
partition_key: str | None = None
note: str | None = None


class DagRunStateResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Tag,
TypeAdapter,
WithJsonSchema,
model_validator,
)

from airflow.api_fastapi.common.types import UtcDateTime
Expand Down Expand Up @@ -304,6 +305,40 @@ class DagRun(StrictBaseModel):
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
note: str | None = None

@model_validator(mode="before")
@classmethod
def extract_dag_run_note(cls, data: Any) -> Any:
"""Extract the `note` (`str | None` from `association_proxy("dag_run_note", "content")`) relationship from `DagRun` to prevent `DetachedInstanceError` when constructing `DagRunContext` or `TIRunContext` models."""
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.exc import NoInspectionAvailable
from sqlalchemy.orm.state import InstanceState

if isinstance(data, dict):
return data

# Check if this is a SQLAlchemy model by looking for the inspection interface
try:
insp: InstanceState = sa_inspect(data)
except NoInspectionAvailable:
# Not a SQLAlchemy object, return as-is for Pydantic to handle
return data

# Check if dag_run_note is already loaded (avoid lazy load on detached instance)
if "note" in insp.dict:
note_value: str | None = insp.dict["note"]
else:
note_value = None

# Convert to dict to avoid further lazy loading issues
values = {
field_name: getattr(data, field_name, None)
for field_name in cls.model_fields
if field_name != "note"
}
values["note"] = note_value
return values


class TIRunContext(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def trigger_dag_run(
triggered_by=DagRunTriggeredByType.OPERATOR,
replace_microseconds=False,
partition_key=payload.partition_key,
note=payload.note,
session=session,
)
except DagRunAlreadyExists:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
MovePreviousRunEndpoint,
)
from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
AddNoteField,
ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField,
)

bundle = VersionBundle(
HeadVersion(),
Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField),
Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, AddNoteField),
Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
Version("2025-11-07", AddPartitionKeyField),
Version("2025-11-05", AddTriggeringUserNameField),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema

from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIDeferredStatePayload, TIRunContext
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
DagRun,
TIDeferredStatePayload,
TIRunContext,
)


class ModifyDeferredTaskKwargsToJsonValue(VersionChange):
Expand Down Expand Up @@ -50,3 +54,17 @@ class RemoveUpstreamMapIndexesField(VersionChange):
def add_upstream_map_indexes_field(response: ResponseInfo) -> None: # type: ignore[misc]
"""Add upstream_map_indexes field with None for older API versions."""
response.body["upstream_map_indexes"] = None


class AddNoteField(VersionChange):
"""Add note parameter to DagRun Model."""

description = __doc__

instructions_to_migrate_to_previous_version = (schema(DagRun).field("note").didnt_exist,)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def remove_note_field(response: ResponseInfo) -> None: # type: ignore[misc]
"""Remove note field for older API versions."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("note", None)
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def __init__(
backfill_id: NonNegativeInt | None = None,
bundle_version: str | None = None,
partition_key: str | None = None,
note: str | None = None,
):
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
Expand All @@ -348,6 +349,7 @@ def __init__(
self.run_after = run_after
self.start_date = start_date
self.conf = conf or {}
self.note = note
if state is not None:
self.state = state
if not is_arg_set(queued_at):
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ def create_dagrun(
creating_job_id: int | None = None,
backfill_id: NonNegativeInt | None = None,
partition_key: str | None = None,
note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
"""
Expand Down Expand Up @@ -583,6 +584,7 @@ def create_dagrun(
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
partition_key=partition_key,
note=note,
session=session,
)

Expand Down Expand Up @@ -1111,6 +1113,7 @@ def _create_orm_dagrun(
triggered_by: DagRunTriggeredByType,
triggering_user_name: str | None = None,
partition_key: str | None = None,
note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
bundle_version = None
Expand Down Expand Up @@ -1138,6 +1141,7 @@ def _create_orm_dagrun(
backfill_id=backfill_id,
bundle_version=bundle_version,
partition_key=partition_key,
note=note,
)
# Load defaults into the following two fields to ensure result can be serialized detached
max_log_template_id = session.scalar(select(func.max(LogTemplate.__table__.c.id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def test_get_state(self, client, session, dag_maker):
"start_date": "2023-01-02T00:00:00Z",
"state": "success",
"triggering_user_name": None,
"note": None,
}

def test_dag_run_not_found(self, client):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def test_ti_run_state_to_running(
"triggering_user_name": None,
"consumed_asset_events": [],
"partition_key": None,
"note": None,
},
"task_reschedule_count": 0,
"max_tries": max_tries,
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/tests/unit/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,22 @@ def test_create_dagrun_job_id_is_set(self, testing_dag_bundle):
)
assert dr.creating_job_id == job_id

def test_create_dagrun_note_is_set(self, testing_dag_bundle):
note = "This is a test note"
dag = DAG(dag_id="test_create_dagrun_note_is_set", schedule=None)
scheduler_dag = sync_dag_to_db(dag)
dr = scheduler_dag.create_dagrun(
run_id="test_create_dagrun_note_is_set",
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
run_after=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
state=State.NONE,
note=note,
triggered_by=DagRunTriggeredByType.TEST,
)
assert dr.note == note

@pytest.mark.parametrize("partition_key", [None, "my-key", 123])
def test_create_dagrun_partition_key(self, partition_key, dag_maker):
with dag_maker("test_create_dagrun_partition_key"):
Expand Down
1 change: 1 addition & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ def on_success_callable(context):
)

def test_dagrun_update_state_with_handle_callback_failure(self, testing_dag_bundle, dag_maker, session):

def on_failure_callable(context):
assert context["dag_run"].dag_id == "test_dagrun_update_state_with_handle_callback_failure"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import datetime
import inspect
import json
import time
from collections.abc import Sequence
Expand Down Expand Up @@ -179,6 +180,7 @@ def __init__(
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
fail_when_dag_is_paused: bool = False,
note: str | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
openlineage_inject_parent_info: bool = True,
**kwargs,
Expand All @@ -201,6 +203,7 @@ def __init__(
self.skip_when_already_exists = skip_when_already_exists
self.fail_when_dag_is_paused = fail_when_dag_is_paused
self.openlineage_inject_parent_info = openlineage_inject_parent_info
self.note = note
self.deferrable = deferrable
self.logical_date = logical_date
if logical_date is NOTSET:
Expand Down Expand Up @@ -274,7 +277,7 @@ def execute(self, context: Context):
def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
from airflow.providers.common.compat.sdk import DagRunTriggerException

raise DagRunTriggerException(
kwargs_accepted = dict(
trigger_dag_id=self.trigger_dag_id,
dag_run_id=run_id,
conf=self.conf,
Expand All @@ -288,8 +291,16 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
deferrable=self.deferrable,
)

if self.note and "note" in inspect.signature(DagRunTriggerException.__init__).parameters:
kwargs_accepted["note"] = self.note

raise DagRunTriggerException(**kwargs_accepted)

def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
try:
if self.note:
self.log.warning("Parameter 'note' is not supported in Airflow 2.x and will be ignored.")

dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
run_id=run_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def test_trigger_dagrun(self):
"""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "bar"},
task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, conf={"foo": "bar"}, note="Test note"
)

# Ensure correct exception is raised
Expand All @@ -131,6 +129,8 @@ def test_trigger_dagrun(self):
assert exc_info.value.wait_for_completion is False
assert exc_info.value.allowed_states == [DagRunState.SUCCESS]
assert exc_info.value.failed_states == [DagRunState.FAILED]
if getattr(exc_info, "note", None) is not None:
assert exc_info.value.note == "Test note"

expected_run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL, run_after=timezone.utcnow()
Expand Down Expand Up @@ -556,13 +556,19 @@ def test_trigger_dagrun(self, dag_maker, mock_supervisor_comms):
"""Test TriggerDagRunOperator."""
with time_machine.travel("2025-02-18T08:04:46Z", tick=False):
with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True):
task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID)
task = TriggerDagRunOperator(
task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, note="Test note"
)
mock_warning = mock.patch.object(task.log, "warning").start()
dag_maker.sync_dagbag_to_db()
parse_and_sync_to_db(self.f_name)
dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

dagrun = dag_maker.session.scalar(select(DagRun).where(DagRun.dag_id == TRIGGERED_DAG_ID))
assert mock_warning.mock_calls == [
mock.call("Parameter 'note' is not supported in Airflow 2.x and will be ignored.")
]
assert dagrun.run_type == DagRunType.MANUAL
assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date)

Expand Down
5 changes: 4 additions & 1 deletion task-sdk/src/airflow/sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,12 @@ def trigger(
conf: dict | None = None,
logical_date: datetime | None = None,
reset_dag_run: bool = False,
note: str | None = None,
) -> OKResponse | ErrorResponse:
"""Trigger a Dag run via the API server."""
body = TriggerDAGRunPayload(logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run)
body = TriggerDAGRunPayload(
logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run, note=note
)

try:
self.client.post(
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ class TriggerDAGRunPayload(BaseModel):
conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None
reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
note: Annotated[str | None, Field(title="Note")] = None


class UpdateHITLDetailPayload(BaseModel):
Expand Down Expand Up @@ -627,6 +628,7 @@ class DagRun(BaseModel):
triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None
consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")]
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
note: Annotated[str | None, Field(title="Note")] = None


class TIRunContext(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/src/airflow/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def __init__(
failed_states: list[str],
poke_interval: int,
deferrable: bool,
note: str | None = None,
):
super().__init__()
self.trigger_dag_id = trigger_dag_id
Expand All @@ -261,6 +262,7 @@ def __init__(
self.failed_states = failed_states
self.poke_interval = poke_interval
self.deferrable = deferrable
self.note = note


class DownstreamTasksSkipped(AirflowException):
Expand Down
6 changes: 1 addition & 5 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,11 +1371,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
dump_opts = {"exclude_unset": True}
elif isinstance(msg, TriggerDagRun):
resp = self.client.dag_runs.trigger(
msg.dag_id,
msg.run_id,
msg.conf,
msg.logical_date,
msg.reset_dag_run,
msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.reset_dag_run, msg.note
)
elif isinstance(msg, GetDagRun):
dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id)
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,7 @@ def _handle_trigger_dag_run(
logical_date=drte.logical_date,
conf=drte.conf,
reset_dag_run=drte.reset_dag_run,
note=drte.note,
),
)

Expand Down
Loading
Loading