Skip to content

Commit 463e306

Browse files
committed
fixup! feat: redesign PartitionedAssetTimetable
1 parent be85856 commit 463e306

File tree

8 files changed

+13
-57
lines changed

8 files changed

+13
-57
lines changed

airflow-core/src/airflow/models/asset.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,6 @@ def to_serialized(self) -> SerializedAsset:
383383
group=self.group,
384384
extra=self.extra,
385385
watchers=[],
386-
# AIP-76: we allow user to specify per asset partition_mapper, but this exists in timetable
387-
# instead of asset model itself. Thus, from asset model to SerializedAsset will always set
388-
# partition_mapper to None
389386
)
390387

391388
def add_trigger(self, trigger: Trigger, watcher_name: str):

airflow-core/src/airflow/serialization/decoders.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def _decode_asset(var: dict[str, Any]):
105105
)
106106
for watcher in watchers
107107
],
108-
partition_mapper=var.get("partition_mapper", None),
109108
)
110109

111110

@@ -128,10 +127,8 @@ def decode_asset_like(var: dict[str, Any]) -> SerializedAssetBase:
128127
case DAT.ASSET_ANY:
129128
return SerializedAssetAny([decode_asset_like(x) for x in var["objects"]])
130129
case DAT.ASSET_ALIAS:
131-
# TODO: (AIP-76) partition_mapper
132130
return SerializedAssetAlias(name=var["name"], group=var["group"])
133131
case DAT.ASSET_REF:
134-
# TODO: (AIP-76) partition_mapper
135132
if "name" in var:
136133
return SerializedAssetNameRef(**var)
137134
return SerializedAssetUriRef(**var)

airflow-core/src/airflow/serialization/definitions/assets.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ class SerializedAsset(SerializedAssetBase):
120120
group: str
121121
extra: dict[str, Any]
122122
watchers: MutableSequence[SerializedAssetWatcher]
123-
partition_mapper: dict | None = None
124123

125124
def as_expression(self) -> Any:
126125
"""
@@ -164,7 +163,6 @@ def asprofile(self) -> AssetProfile:
164163
return AssetProfile(name=self.name or None, uri=self.uri or None, type="Asset")
165164

166165

167-
# TODO: (AIP-76) add partition_mapper: dict | None = None
168166
class SerializedAssetRef(SerializedAssetBase, AttrsInstance):
169167
"""Serialized representation of an asset reference."""
170168

@@ -193,7 +191,6 @@ def iter_dag_dependencies(self, *, source: str = "", target: str = "") -> Iterat
193191
)
194192

195193

196-
# TODO: (AIP-76) add partition_mapper: dict | None = None
197194
@attrs.define(hash=True)
198195
class SerializedAssetNameRef(SerializedAssetRef):
199196
"""Serialized representation of an asset reference by name."""
@@ -203,7 +200,6 @@ class SerializedAssetNameRef(SerializedAssetRef):
203200
_dependency_type = "asset-name-ref"
204201

205202

206-
# TODO: (AIP-76) add partition_mapper: dict | None = None
207203
@attrs.define(hash=True)
208204
class SerializedAssetUriRef(SerializedAssetRef):
209205
"""Serialized representation of an asset reference by URI."""
@@ -213,7 +209,6 @@ class SerializedAssetUriRef(SerializedAssetRef):
213209
_dependency_type = "asset-uri-ref"
214210

215211

216-
# TODO: (AIP-76) add partition_mapper: dict | None = None
217212
@attrs.define
218213
class SerializedAssetAlias(SerializedAssetBase):
219214
"""Serialized representation of an asset alias."""

airflow-core/src/airflow/serialization/encoders.py

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -168,42 +168,17 @@ def encode_asset_like(a: BaseAsset | SerializedAssetBase) -> dict[str, Any]:
168168
d: dict[str, Any]
169169
match a:
170170
case Asset() | SerializedAsset():
171-
d = {
172-
"__type": DAT.ASSET,
173-
"name": a.name,
174-
"uri": a.uri,
175-
"group": a.group,
176-
"extra": a.extra,
177-
}
171+
d = {"__type": DAT.ASSET, "name": a.name, "uri": a.uri, "group": a.group, "extra": a.extra}
178172
if a.watchers:
179173
d["watchers"] = [{"name": w.name, "trigger": encode_trigger(w.trigger)} for w in a.watchers]
180-
181-
if a.partition_mapper:
182-
if isinstance(a.partition_mapper, dict):
183-
d["partition_mapper"] = a.partition_mapper
184-
else:
185-
d["partition_mapper"] = encode_partition_mapper(a.partition_mapper)
186174
return d
187175
case AssetAlias() | SerializedAssetAlias():
188-
return {
189-
"__type": DAT.ASSET_ALIAS,
190-
"name": a.name,
191-
"group": a.group,
192-
# TODO: (AIP_76) add partition_mapper
193-
# if a.partition_mapper:
194-
# if isinstance(a.partition_mapper, dict):
195-
# d["partition_mapper"] = a.partition_mapper
196-
# else:
197-
# d["partition_mapper"] = encode_partition_mapper(a.partition_mapper)
198-
}
176+
return {"__type": DAT.ASSET_ALIAS, "name": a.name, "group": a.group}
199177
case AssetAll() | SerializedAssetAll():
200-
# TODO: (AIP_76) add partition_mapper
201178
return {"__type": DAT.ASSET_ALL, "objects": [encode_asset_like(x) for x in a.objects]}
202179
case AssetAny() | SerializedAssetAny():
203-
# TODO: (AIP_76) add partition_mapper
204180
return {"__type": DAT.ASSET_ANY, "objects": [encode_asset_like(x) for x in a.objects]}
205181
case AssetRef() | SerializedAssetRef():
206-
# TODO: (AIP_76) add partition_mapper
207182
return {"__type": DAT.ASSET_REF, **attrs.asdict(a)}
208183
raise ValueError(f"serialization not implemented for {type(a).__name__!r}")
209184

airflow-core/src/airflow/timetables/simple.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable:
289289
else None,
290290
)
291291
return timetable
292+
293+
def get_partition_mapper(self, *, name: str, uri: str) -> PartitionMapper:
294+
# TODO: implement
295+
...

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8702,11 +8702,9 @@ def test_partitioned_dag_run_with_customized_mapper(
87028702
with dag_maker(
87038703
dag_id="asset-event-consumer",
87048704
schedule=PartitionedAssetTimetable(
8705+
assets=Asset(name="asset-1"),
87058706
# TODO: (AIP-76) fix typing
8706-
assets=Asset(
8707-
name="asset-1",
8708-
partition_mapper=Key1Mapper(), # type: ignore[call-overload]
8709-
),
8707+
default_partition_mapper=Key1Mapper(), # type: ignore[call-overload]
87108708
),
87118709
session=session,
87128710
):
@@ -8858,6 +8856,10 @@ def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper(
88588856
Asset(name="asset-1", partition_mapper=Key1Mapper()) # type: ignore[call-overload]
88598857
& Asset(name="asset-2", partition_mapper=Key1Mapper()) # type: ignore[call-overload]
88608858
),
8859+
partition_mapper_mapping={
8860+
Asset(name="asset-1"): Key1Mapper(),
8861+
Asset(name="asset-2"): Key1Mapper(),
8862+
},
88618863
),
88628864
session=session,
88638865
):

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3052,10 +3052,7 @@ def test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
30523052
with dag_maker(
30533053
dag_id="asset_event_listener",
30543054
schedule=PartitionedAssetTimetable(
3055-
assets=Asset(
3056-
name="hello",
3057-
partition_mapper=IdentityMapper(),
3058-
)
3055+
assets=Asset(name="hello"), default_partition_mapper=IdentityMapper()
30593056
),
30603057
session=session,
30613058
):

task-sdk/src/airflow/sdk/definitions/asset/__init__.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
from pydantic.types import JsonValue
3838
from typing_extensions import Self
3939

40-
from airflow.sdk import PartitionMapper
4140
from airflow.sdk.api.datamodels._generated import AssetProfile
4241
from airflow.sdk.io.path import ObjectStoragePath
4342
from airflow.triggers.base import BaseEventTrigger
@@ -230,8 +229,6 @@ class BaseAsset:
230229
:meta private:
231230
"""
232231

233-
partition_mapper: PartitionMapper | None = None
234-
235232
def __or__(self, other: BaseAsset) -> BaseAsset:
236233
if not isinstance(other, BaseAsset):
237234
return NotImplemented
@@ -281,7 +278,6 @@ class Asset(os.PathLike, BaseAsset):
281278
watchers: list[AssetWatcher] = attrs.field(
282279
factory=list,
283280
)
284-
partition_mapper: PartitionMapper | None = None
285281

286282
asset_type: ClassVar[str] = "asset"
287283
__version__: ClassVar[int] = 1
@@ -295,7 +291,6 @@ def __init__(
295291
group: str = ...,
296292
extra: dict[str, JsonValue] | None = None,
297293
watchers: list[AssetWatcher] = ...,
298-
partition_mapper: PartitionMapper | None = None,
299294
) -> None:
300295
"""Canonical; both name and uri are provided."""
301296

@@ -307,7 +302,6 @@ def __init__(
307302
group: str = ...,
308303
extra: dict[str, JsonValue] | None = None,
309304
watchers: list[AssetWatcher] = ...,
310-
partition_mapper: PartitionMapper | None = None,
311305
) -> None:
312306
"""It's possible to only provide the name, either by keyword or as the only positional argument."""
313307

@@ -319,7 +313,6 @@ def __init__(
319313
group: str = ...,
320314
extra: dict[str, JsonValue] | None = None,
321315
watchers: list[AssetWatcher] = ...,
322-
partition_mapper: PartitionMapper | None = None,
323316
) -> None:
324317
"""It's possible to only provide the URI as a keyword argument."""
325318

@@ -331,7 +324,6 @@ def __init__(
331324
group: str | None = None,
332325
extra: dict[str, JsonValue] | None = None,
333326
watchers: list[AssetWatcher] | None = None,
334-
partition_mapper: PartitionMapper | None = None,
335327
) -> None:
336328
if name is None and uri is None:
337329
raise TypeError("Asset() requires either 'name' or 'uri'")
@@ -353,12 +345,9 @@ def __init__(
353345
kwargs["extra"] = extra
354346
if watchers is not None:
355347
kwargs["watchers"] = watchers
356-
if partition_mapper is not None:
357-
kwargs["partition_mapper"] = partition_mapper
358348

359349
self.__attrs_init__(name=name, uri=uri, **kwargs)
360350

361-
# TODO: AIP-76: support something like Asset.ref(name=..., partition_mapper=...)?
362351
@overload
363352
@staticmethod
364353
def ref(*, name: str) -> AssetNameRef: ...

0 commit comments

Comments
 (0)