Skip to content

Commit 60a4b6a

Browse files
committed
style: remove core partition mapper from typing
1 parent 947cd1e commit 60a4b6a

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8748,7 +8748,12 @@ def test_partitioned_dag_run_with_customized_mapper(
87488748
dag_id="asset-event-consumer",
87498749
schedule=PartitionedAssetTimetable(
87508750
assets=Asset(name="asset-1"),
8751-
default_partition_mapper=Key1Mapper(),
8751+
# Most users should use the partition mapper provided by the task-SDK.
8752+
# Advanced users can import from core and register their own partition mapper
8753+
# via an Airflow plugin.
8754+
# We intentionally exclude core mappers from the public typing
8755+
# so standard users don't accidentally rely on internal implementations.
8756+
default_partition_mapper=Key1Mapper(), # type: ignore[arg-type]
87528757
),
87538758
session=session,
87548759
):
@@ -8896,9 +8901,14 @@ def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper(
88968901
dag_id="asset-event-consumer",
88978902
schedule=PartitionedAssetTimetable(
88988903
assets=(Asset(name="asset-1") & Asset(name="asset-2")),
8904+
# Most users should use the partition mapper provided by the task-SDK.
8905+
# Advanced users can import from core and register their own partition mapper
8906+
# via an Airflow plugin.
8907+
# We intentionally exclude core mappers from the public typing
8908+
# so standard users don't accidentally rely on internal implementations.
88998909
partition_mapper_mapping={
8900-
Asset(name="asset-1"): Key1Mapper(),
8901-
Asset(name="asset-2"): Key1Mapper(),
8910+
Asset(name="asset-1"): Key1Mapper(), # type: ignore[dict-item]
8911+
Asset(name="asset-2"): Key1Mapper(), # type: ignore[dict-item]
89028912
},
89038913
),
89048914
session=session,

task-sdk/src/airflow/sdk/definitions/timetables/assets.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
if TYPE_CHECKING:
2929
from collections.abc import Collection
3030

31-
from airflow.partition_mapper.base import PartitionMapper as CorePartitionMapper
3231
from airflow.sdk import Asset
3332
from airflow.sdk.definitions.partition_mapper.base import PartitionMapper
3433

@@ -51,10 +50,8 @@ class PartitionedAssetTimetable(AssetTriggeredTimetable):
5150
"""Asset-driven timetable that listens for partitioned assets."""
5251

5352
asset_condition: BaseAsset = attrs.field(alias="assets")
54-
partition_mapper_mapping: dict[BaseAsset, PartitionMapper | CorePartitionMapper] = attrs.field(
55-
factory=dict
56-
)
57-
default_partition_mapper: PartitionMapper | CorePartitionMapper = IdentityMapper()
53+
partition_mapper_mapping: dict[BaseAsset, PartitionMapper] = attrs.field(factory=dict)
54+
default_partition_mapper: PartitionMapper = IdentityMapper()
5855

5956

6057
def _coerce_assets(o: Collection[Asset] | BaseAsset) -> BaseAsset:

0 commit comments

Comments
 (0)