Skip to content

Commit 2fa9230

Browse files
authored
Rewrite asset event registration (apache#47677)
There are many issues in the original implementation when you send something that's not a simple asset, I ended up almost rewritten the whole thing... Now, the task runner collects everything the user writes into outlet_events, and send all of them with the task's outlets as declared by the user verbatim to the API server. The server does all the resolution and filtering instead. The task runner now emits slightly different data; extra is no longer eagerly resolved; the server side fills in the default automatically. Fixtures using asset refs now work correctly. The API server side tests are vastly improved to cover more kinds of data, including where events against other aliases are not incorrectly picked up when processing events originated from an alias. Exceptions are now raised if an event is raised against inactive assets. Also removed redundant arg from register_asset_change. Arguments 'aliases' and 'source_alias_names' basically serve the same purpose, but we use each for a different section of the code. They can become one.
1 parent deccf93 commit 2fa9230

File tree

14 files changed

+310
-285
lines changed

14 files changed

+310
-285
lines changed

airflow/api_fastapi/execution_api/datamodels/taskinstance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TISuccessStatePayload(StrictBaseModel):
9797
"""When the task completed executing"""
9898

9999
task_outlets: Annotated[list[AssetProfile], Field(default_factory=list)]
100-
outlet_events: Annotated[list[Any], Field(default_factory=list)]
100+
outlet_events: Annotated[list[dict[str, Any]], Field(default_factory=list)]
101101

102102

103103
class TITargetStatePayload(StrictBaseModel):

airflow/assets/manager.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ def register_asset_change(
109109
task_instance: TaskInstance | None = None,
110110
asset: Asset | AssetModel | AssetUniqueKey,
111111
extra=None,
112-
aliases: Collection[AssetAlias] = (),
113-
source_alias_names: Iterable[str] | None = None,
112+
source_alias_names: Collection[str] = (),
114113
session: Session,
115114
**kwargs,
116115
) -> AssetEvent | None:
@@ -135,7 +134,7 @@ def register_asset_change(
135134
return None
136135

137136
cls._add_asset_alias_association(
138-
alias_names={alias.name for alias in aliases}, asset_model=asset_model, session=session
137+
alias_names=source_alias_names, asset_model=asset_model, session=session
139138
)
140139

141140
event_kwargs = {

airflow/exceptions.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
from http import HTTPStatus
2828
from typing import TYPE_CHECKING, Any, NamedTuple
2929

30+
from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
3031
from airflow.utils.trigger_rule import TriggerRule
3132

3233
if TYPE_CHECKING:
3334
from collections.abc import Sized
3435

3536
from airflow.models import DagRun
36-
from airflow.sdk.definitions.asset import AssetUniqueKey
3737

3838

3939
class AirflowException(Exception):
@@ -113,33 +113,40 @@ class AirflowFailException(AirflowException):
113113
"""Raise when the task should be failed without retrying."""
114114

115115

116-
class AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
117-
"""Raise when the task is executed with inactive assets."""
116+
class _AirflowExecuteWithInactiveAssetExecption(AirflowFailException):
117+
main_message: str
118118

119-
def __init__(self, inactive_asset_unikeys: Collection[AssetUniqueKey]) -> None:
120-
self.inactive_asset_unique_keys = inactive_asset_unikeys
119+
def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef | AssetUriRef]) -> None:
120+
self.inactive_asset_keys = inactive_asset_keys
121+
122+
@staticmethod
123+
def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
124+
if isinstance(key, AssetUniqueKey):
125+
return f"Asset(name={key.name!r}, uri={key.uri!r})"
126+
elif isinstance(key, AssetNameRef):
127+
return f"Asset.ref(name={key.name!r})"
128+
elif isinstance(key, AssetUriRef):
129+
return f"Asset.ref(uri={key.uri!r})"
130+
return repr(key) # Should not happen, but let's fails more gracefully in an exception.
131+
132+
def __str__(self) -> str:
133+
return f"{self.main_message}: {self.inactive_assets_message}"
121134

122135
@property
123-
def inactive_assets_error_msg(self):
124-
return ", ".join(
125-
f'Asset(name="{key.name}", uri="{key.uri}")' for key in self.inactive_asset_unique_keys
126-
)
136+
def inactive_assets_message(self) -> str:
137+
return ", ".join(self._render_asset_key(key) for key in self.inactive_asset_keys)
127138

128139

129-
class AirflowInactiveAssetInInletOrOutletException(AirflowExecuteWithInactiveAssetExecption):
140+
class AirflowInactiveAssetInInletOrOutletException(_AirflowExecuteWithInactiveAssetExecption):
130141
"""Raise when the task is executed with inactive assets in its inlet or outlet."""
131142

132-
def __str__(self) -> str:
133-
return f"Task has the following inactive assets in its inlets or outlets: {self.inactive_assets_error_msg}"
143+
main_message = "Task has the following inactive assets in its inlets or outlets"
134144

135145

136-
class AirflowInactiveAssetAddedToAssetAliasException(AirflowExecuteWithInactiveAssetExecption):
146+
class AirflowInactiveAssetAddedToAssetAliasException(_AirflowExecuteWithInactiveAssetExecption):
137147
"""Raise when inactive assets are added to an asset alias."""
138148

139-
def __str__(self) -> str:
140-
return (
141-
f"The following assets accessed by an AssetAlias are inactive: {self.inactive_assets_error_msg}"
142-
)
149+
main_message = "The following assets accessed by an AssetAlias are inactive"
143150

144151

145152
class AirflowOptionalProviderFeatureException(AirflowException):

airflow/models/taskinstance.py

Lines changed: 119 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
from airflow.models.taskreschedule import TaskReschedule
105105
from airflow.models.xcom import LazyXComSelectSequence, XCom
106106
from airflow.plugins_manager import integrate_macros_plugins
107-
from airflow.sdk.api.datamodels._generated import AssetProfile
108107
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
109108
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
110109
from airflow.sdk.definitions.param import process_params
@@ -159,6 +158,7 @@
159158
from airflow.models.baseoperator import BaseOperator
160159
from airflow.models.dag import DAG as SchedulerDAG, DagModel
161160
from airflow.models.dagrun import DagRun
161+
from airflow.sdk.api.datamodels._generated import AssetProfile
162162
from airflow.sdk.definitions._internal.abstractoperator import Operator
163163
from airflow.sdk.definitions.dag import DAG
164164
from airflow.sdk.types import RuntimeTaskInstanceProtocol
@@ -356,28 +356,17 @@ def _run_raw_task(
356356
if not test_mode:
357357
_add_log(event=ti.state, task_instance=ti, session=session)
358358
if ti.state == TaskInstanceState.SUCCESS:
359-
added_alias_to_task_outlet = False
360-
task_outlets = []
361-
outlet_events = []
362-
events = context["outlet_events"]
363-
for obj in ti.task.outlets or []:
364-
# Lineage can have other types of objects besides assets
365-
if isinstance(obj, Asset):
366-
task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, type=Asset.__name__))
367-
outlet_events.append(attrs.asdict(events[obj])) # type: ignore
368-
elif isinstance(obj, AssetNameRef):
369-
task_outlets.append(AssetProfile(name=obj.name, type=AssetNameRef.__name__))
370-
outlet_events.append(attrs.asdict(events)) # type: ignore
371-
elif isinstance(obj, AssetUriRef):
372-
task_outlets.append(AssetProfile(uri=obj.uri, type=AssetUriRef.__name__))
373-
outlet_events.append(attrs.asdict(events)) # type: ignore
374-
elif isinstance(obj, AssetAlias):
375-
if not added_alias_to_task_outlet:
376-
task_outlets.append(AssetProfile(name=obj.name, type=AssetAlias.__name__))
377-
added_alias_to_task_outlet = True
378-
for asset_alias_event in events[obj].asset_alias_events:
379-
outlet_events.append(attrs.asdict(asset_alias_event))
380-
TaskInstance.register_asset_changes_in_db(ti, task_outlets, outlet_events, session=session)
359+
from airflow.sdk.execution_time.task_runner import (
360+
_build_asset_profiles,
361+
_serialize_outlet_events,
362+
)
363+
364+
TaskInstance.register_asset_changes_in_db(
365+
ti,
366+
list(_build_asset_profiles(ti.task.outlets)),
367+
list(_serialize_outlet_events(context["outlet_events"])),
368+
session=session,
369+
)
381370

382371
TaskInstance.save_to_db(ti=ti, session=session)
383372
if ti.state == TaskInstanceState.SUCCESS:
@@ -2744,104 +2733,126 @@ def _run_raw_task(
27442733
def register_asset_changes_in_db(
27452734
ti: TaskInstance,
27462735
task_outlets: list[AssetProfile],
2747-
outlet_events: list[Any],
2736+
outlet_events: list[dict[str, Any]],
27482737
session: Session = NEW_SESSION,
27492738
) -> None:
2750-
# One task only triggers one asset event for each asset with the same extra.
2751-
# This tuple[asset uri, extra] to sets alias names mapping is used to find whether
2752-
# there're assets with same uri but different extra that we need to emit more than one asset events.
2753-
asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = defaultdict(set)
2754-
asset_name_refs: set[str] = set()
2755-
asset_uri_refs: set[str] = set()
2756-
2757-
for obj in task_outlets:
2758-
ti.log.debug("outlet obj %s", obj)
2759-
# Lineage can have other types of objects besides assets
2760-
if obj.type == Asset.__name__:
2761-
asset_manager.register_asset_change(
2762-
task_instance=ti,
2763-
asset=Asset(name=obj.name, uri=obj.uri), # type: ignore
2764-
extra=outlet_events[0]["extra"],
2765-
session=session,
2766-
)
2767-
elif obj.type == AssetNameRef.__name__:
2768-
asset_name_refs.add(obj.name) # type: ignore
2769-
elif obj.type == AssetUriRef.__name__:
2770-
asset_uri_refs.add(obj.uri) # type: ignore
2771-
elif obj.type == AssetAlias.__name__:
2772-
outlet_events = list(
2773-
map(
2774-
lambda event: {**event, "dest_asset_key": AssetUniqueKey(**event["dest_asset_key"])},
2775-
outlet_events,
2776-
)
2777-
)
2778-
for asset_alias_event in outlet_events:
2779-
asset_alias_name = asset_alias_event["source_alias_name"]
2780-
asset_unique_key = asset_alias_event["dest_asset_key"]
2781-
frozen_extra = frozenset(asset_alias_event["extra"].items())
2782-
asset_alias_names[(asset_unique_key, frozen_extra)].add(asset_alias_name)
2783-
2784-
asset_unique_keys = {key for key, _ in asset_alias_names}
2785-
existing_aliased_assets: set[AssetUniqueKey] = {
2786-
AssetUniqueKey.from_asset(asset_obj)
2787-
for asset_obj in session.scalars(
2739+
asset_keys = {
2740+
AssetUniqueKey(o.name, o.uri)
2741+
for o in task_outlets
2742+
if o.type == Asset.__name__ and o.name and o.uri
2743+
}
2744+
asset_name_refs = {
2745+
Asset.ref(name=o.name) for o in task_outlets if o.type == AssetNameRef.__name__ and o.name
2746+
}
2747+
asset_uri_refs = {
2748+
Asset.ref(uri=o.uri) for o in task_outlets if o.type == AssetUriRef.__name__ and o.uri
2749+
}
2750+
2751+
asset_models: dict[AssetUniqueKey, AssetModel] = {
2752+
AssetUniqueKey.from_asset(am): am
2753+
for am in session.scalars(
27882754
select(AssetModel).where(
2789-
tuple_(AssetModel.name, AssetModel.uri).in_(
2790-
attrs.astuple(key) for key in asset_unique_keys
2791-
)
2755+
AssetModel.active.has(),
2756+
or_(
2757+
tuple_(AssetModel.name, AssetModel.uri).in_(attrs.astuple(k) for k in asset_keys),
2758+
AssetModel.name.in_(r.name for r in asset_name_refs),
2759+
AssetModel.uri.in_(r.uri for r in asset_uri_refs),
2760+
),
27922761
)
27932762
)
27942763
}
2795-
inactive_asset_unique_keys = TaskInstance._get_inactive_asset_unique_keys(
2796-
asset_unique_keys={key for key in asset_unique_keys if key in existing_aliased_assets},
2797-
session=session,
2798-
)
2799-
if inactive_asset_unique_keys:
2800-
raise AirflowInactiveAssetAddedToAssetAliasException(inactive_asset_unique_keys)
2801-
2802-
if missing_assets := [
2803-
asset_unique_key.to_asset()
2804-
for asset_unique_key, _ in asset_alias_names
2805-
if asset_unique_key not in existing_aliased_assets
2806-
]:
2807-
asset_manager.create_assets(missing_assets, session=session)
2808-
ti.log.warning("Created new assets for alias reference: %s", missing_assets)
2809-
session.flush() # Needed because we need the id for fk.
2810-
2811-
for (unique_key, extra_items), alias_names in asset_alias_names.items():
2812-
ti.log.info(
2813-
'Creating event for %r through aliases "%s"',
2814-
unique_key,
2815-
", ".join(alias_names),
2816-
)
2817-
asset_manager.register_asset_change(
2818-
task_instance=ti,
2819-
asset=unique_key,
2820-
aliases=[AssetAlias(name=name) for name in alias_names],
2821-
extra=dict(extra_items),
2822-
session=session,
2823-
source_alias_names=alias_names,
2824-
)
28252764

2826-
# Handle events derived from references.
2827-
asset_stmt = select(AssetModel).where(AssetModel.name.in_(asset_name_refs), AssetModel.active.has())
2828-
for asset_model in session.scalars(asset_stmt):
2829-
ti.log.info("Creating event through asset name reference %r", asset_model.name)
2765+
asset_event_extras: dict[AssetUniqueKey, dict] = {
2766+
AssetUniqueKey(**event["dest_asset_key"]): event["extra"]
2767+
for event in outlet_events
2768+
if "source_alias_name" not in event
2769+
}
2770+
2771+
bad_asset_keys: set[AssetUniqueKey | AssetNameRef | AssetUriRef] = set()
2772+
2773+
for key in asset_keys:
2774+
try:
2775+
am = asset_models[key]
2776+
except KeyError:
2777+
bad_asset_keys.add(key)
2778+
continue
2779+
ti.log.debug("register event for asset %s", am)
28302780
asset_manager.register_asset_change(
28312781
task_instance=ti,
2832-
asset=asset_model,
2833-
extra=outlet_events[asset_model].extra,
2782+
asset=am,
2783+
extra=asset_event_extras.get(key),
28342784
session=session,
28352785
)
2836-
asset_stmt = select(AssetModel).where(AssetModel.uri.in_(asset_uri_refs), AssetModel.active.has())
2837-
for asset_model in session.scalars(asset_stmt):
2838-
ti.log.info("Creating event for through asset URI reference %r", asset_model.uri)
2839-
asset_manager.register_asset_change(
2840-
task_instance=ti,
2841-
asset=asset_model,
2842-
extra=outlet_events[asset_model].extra,
2786+
2787+
if asset_name_refs:
2788+
asset_models_by_name = {key.name: am for key, am in asset_models.items()}
2789+
asset_event_extras_by_name = {key.name: extra for key, extra in asset_event_extras.items()}
2790+
for nref in asset_name_refs:
2791+
try:
2792+
am = asset_models_by_name[nref.name]
2793+
except KeyError:
2794+
bad_asset_keys.add(nref)
2795+
continue
2796+
ti.log.debug("register event for asset name ref %s", am)
2797+
asset_manager.register_asset_change(
2798+
task_instance=ti,
2799+
asset=am,
2800+
extra=asset_event_extras_by_name.get(nref.name),
2801+
session=session,
2802+
)
2803+
if asset_uri_refs:
2804+
asset_models_by_uri = {key.uri: am for key, am in asset_models.items()}
2805+
asset_event_extras_by_uri = {key.uri: extra for key, extra in asset_event_extras.items()}
2806+
for uref in asset_uri_refs:
2807+
try:
2808+
am = asset_models_by_uri[uref.uri]
2809+
except KeyError:
2810+
bad_asset_keys.add(uref)
2811+
continue
2812+
ti.log.debug("register event for asset uri ref %s", am)
2813+
asset_manager.register_asset_change(
2814+
task_instance=ti,
2815+
asset=am,
2816+
extra=asset_event_extras_by_uri.get(uref.uri),
2817+
session=session,
2818+
)
2819+
2820+
def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey, frozenset], set[str]]:
2821+
d = defaultdict(set)
2822+
for event in outlet_events:
2823+
try:
2824+
alias_name = event["source_alias_name"]
2825+
except KeyError:
2826+
continue
2827+
if alias_name not in outlet_alias_names:
2828+
continue
2829+
asset_key = AssetUniqueKey(**event["dest_asset_key"])
2830+
extra_key = frozenset(event["extra"].items())
2831+
d[asset_key, extra_key].add(alias_name)
2832+
return d
2833+
2834+
outlet_alias_names = {o.name for o in task_outlets if o.type == AssetAlias.__name__ and o.name}
2835+
if outlet_alias_names and (event_extras_from_aliases := _asset_event_extras_from_aliases()):
2836+
bad_alias_asset_keys = TaskInstance._get_inactive_asset_unique_keys(
2837+
{key for key, _ in event_extras_from_aliases},
28432838
session=session,
28442839
)
2840+
for (asset_key, extra_key), event_aliase_names in event_extras_from_aliases.items():
2841+
if asset_key in bad_alias_asset_keys:
2842+
continue
2843+
ti.log.debug("register event for asset %s with aliases %s", asset_key, event_aliase_names)
2844+
asset_manager.register_asset_change(
2845+
task_instance=ti,
2846+
asset=asset_key,
2847+
source_alias_names=event_aliase_names,
2848+
extra=dict(extra_key),
2849+
session=session,
2850+
)
2851+
if bad_alias_asset_keys:
2852+
raise AirflowInactiveAssetAddedToAssetAliasException(bad_alias_asset_keys)
2853+
2854+
if bad_asset_keys:
2855+
raise AirflowInactiveAssetInInletOrOutletException(bad_asset_keys)
28452856

28462857
def _execute_task_with_callbacks(self, context: Context, test_mode: bool = False, *, session: Session):
28472858
"""Prepare Task for Execution."""

0 commit comments

Comments
 (0)