Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.partition_mapper.base import PartitionMapper
from airflow.partition_mappers.base import PartitionMapper


class IdentityMapper(PartitionMapper):
Expand Down
144 changes: 144 additions & 0 deletions airflow-core/src/airflow/partition_mappers/temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Any

from airflow.partition_mappers.base import PartitionMapper


class _BaseTemporalMapper(PartitionMapper, ABC):
"""Base class for Temporal Partition Mappers."""

default_output_format: str

def __init__(
self,
input_format: str = "%Y-%m-%dT%H:%M:%S",
output_format: str | None = None,
):
self.input_format = input_format
self.output_format = output_format or self.default_output_format

def to_downstream(self, key: str) -> str:
dt = datetime.strptime(key, self.input_format)
normalized = self.normalize(dt)
return self.format(normalized)

@abstractmethod
def normalize(self, dt: datetime) -> datetime:
"""Return canonical start datetime for the partition."""

def format(self, dt: datetime) -> str:
"""Format the normalized datetime."""
return dt.strftime(self.output_format)

def serialize(self) -> dict[str, Any]:
return {
"input_format": self.input_format,
"output_format": self.output_format,
}

@classmethod
def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
return cls(
input_format=data["input_format"],
output_format=data["output_format"],
)


class HourlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to hour."""

default_output_format = "%Y-%m-%dT%H"

def normalize(self, dt: datetime) -> datetime:
return dt.replace(minute=0, second=0, microsecond=0)


class DailyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to day."""

default_output_format = "%Y-%m-%d"

def normalize(self, dt: datetime) -> datetime:
return dt.replace(hour=0, minute=0, second=0, microsecond=0)


class WeeklyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to week."""

default_output_format = "%Y-%m-%d (W%V)"

def normalize(self, dt: datetime) -> datetime:
start = dt - timedelta(days=dt.weekday())
return start.replace(hour=0, minute=0, second=0, microsecond=0)


class MonthlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to month."""

default_output_format = "%Y-%m"

def normalize(self, dt: datetime) -> datetime:
return dt.replace(
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)


class QuarterlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to quarter."""

default_output_format = "%Y-Q{quarter}"

def normalize(self, dt: datetime) -> datetime:
quarter = (dt.month - 1) // 3
month = quarter * 3 + 1
return dt.replace(
month=month,
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)

def format(self, dt: datetime) -> str:
quarter = (dt.month - 1) // 3 + 1
return dt.strftime(self.output_format).format(quarter=quarter)


class YearlyMapper(_BaseTemporalMapper):
"""Map a time-based partition key to year."""

default_output_format = "%Y"

def normalize(self, dt: datetime) -> datetime:
return dt.replace(
month=1,
day=1,
hour=0,
minute=0,
second=0,
microsecond=0,
)
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

if TYPE_CHECKING:
from airflow.listeners.listener import ListenerManager
from airflow.partition_mapper.base import PartitionMapper
from airflow.partition_mappers.base import PartitionMapper
from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.timetables.base import Timetable

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/serialization/decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
)

if TYPE_CHECKING:
from airflow.partition_mapper.base import PartitionMapper
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import Timetable as CoreTimetable

R = TypeVar("R")
Expand Down
36 changes: 34 additions & 2 deletions airflow-core/src/airflow/serialization/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import pendulum

from airflow._shared.module_loading import qualname
from airflow.partition_mapper.base import PartitionMapper as CorePartitionMapper
from airflow.partition_mappers.base import PartitionMapper as CorePartitionMapper
from airflow.sdk import (
Asset,
AssetAlias,
Expand All @@ -35,15 +35,21 @@
AssetOrTimeSchedule,
CronDataIntervalTimetable,
CronTriggerTimetable,
DailyMapper,
DeltaDataIntervalTimetable,
DeltaTriggerTimetable,
EventsTimetable,
IdentityMapper,
MonthlyMapper,
MultipleCronTriggerTimetable,
PartitionMapper,
QuarterlyMapper,
WeeklyMapper,
YearlyMapper,
)
from airflow.sdk.bases.timetable import BaseTimetable
from airflow.sdk.definitions.asset import AssetRef
from airflow.sdk.definitions.partition_mappers.temporal import HourlyMapper
from airflow.sdk.definitions.timetables.assets import (
AssetTriggeredTimetable,
PartitionedAssetTimetable,
Expand Down Expand Up @@ -355,7 +361,13 @@ def _(self, timetable: PartitionedAssetTimetable) -> dict[str, Any]:
}

BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
IdentityMapper: "airflow.partition_mapper.identity.IdentityMapper",
IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
HourlyMapper: "airflow.partition_mappers.temporal.HourlyMapper",
DailyMapper: "airflow.partition_mappers.temporal.DailyMapper",
WeeklyMapper: "airflow.partition_mappers.temporal.WeeklyMapper",
MonthlyMapper: "airflow.partition_mappers.temporal.MonthlyMapper",
QuarterlyMapper: "airflow.partition_mappers.temporal.QuarterlyMapper",
YearlyMapper: "airflow.partition_mappers.temporal.YearlyMapper",
}

@functools.singledispatchmethod
Expand All @@ -370,6 +382,26 @@ def serialize_partition_mapper(
def _(self, partition_mapper: IdentityMapper) -> dict[str, Any]:
return {}

@serialize_partition_mapper.register(HourlyMapper)
@serialize_partition_mapper.register(DailyMapper)
@serialize_partition_mapper.register(WeeklyMapper)
@serialize_partition_mapper.register(MonthlyMapper)
@serialize_partition_mapper.register(QuarterlyMapper)
@serialize_partition_mapper.register(YearlyMapper)
def _(
self,
partition_mapper: HourlyMapper
| DailyMapper
| WeeklyMapper
| MonthlyMapper
| QuarterlyMapper
| YearlyMapper,
) -> dict[str, Any]:
return {
"input_format": partition_mapper.input_format,
"output_format": partition_mapper.output_format,
}


_serializer = _Serializer()

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow.settings import json

if TYPE_CHECKING:
from airflow.partition_mapper.base import PartitionMapper
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import Timetable as CoreTimetable


Expand Down Expand Up @@ -162,4 +162,4 @@ def __str__(self) -> str:

def is_core_partition_mapper_import_path(importable_string: str) -> bool:
"""Whether an importable string points to a core partition mapper class."""
return importable_string.startswith("airflow.partition_mapper.")
return importable_string.startswith("airflow.partition_mappers.")
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def ensure_serialized_asset(o): # type: ignore[misc,no-redef]

from pendulum import DateTime

from airflow.partition_mapper.base import PartitionMapper
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import TimeRestriction
from airflow.utils.types import DagRunType

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
from airflow.models.team import Team
from airflow.models.trigger import Trigger
from airflow.observability.trace import Trace
from airflow.partition_mapper.base import PartitionMapper as CorePartitionMapper
from airflow.partition_mappers.base import PartitionMapper as CorePartitionMapper
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
Expand Down
32 changes: 32 additions & 0 deletions airflow-core/tests/unit/partition_mappers/test_identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.partition_mappers.identity import IdentityMapper


class TestIdentityMapper:
def test_to_downstream(self):
pm = IdentityMapper()
assert pm.to_downstream("key") == "key"

def test_serialize(self):
pm = IdentityMapper()
assert pm.serialize() == {}

def test_deserialize(self):
assert isinstance(IdentityMapper.deserialize({}), IdentityMapper)
83 changes: 83 additions & 0 deletions airflow-core/tests/unit/partition_mappers/test_temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from airflow.partition_mappers.temporal import (
DailyMapper,
HourlyMapper,
MonthlyMapper,
QuarterlyMapper,
WeeklyMapper,
YearlyMapper,
_BaseTemporalMapper,
)


class TestTemporalMappers:
@pytest.mark.parametrize(
("mapper_cls", "expected_downstream_key"),
[
(HourlyMapper, "2026-02-10T14"),
(DailyMapper, "2026-02-10"),
(WeeklyMapper, "2026-02-09 (W07)"),
(MonthlyMapper, "2026-02"),
(QuarterlyMapper, "2026-Q1"),
(YearlyMapper, "2026"),
],
)
def test_to_downstream(
self,
mapper_cls: type[_BaseTemporalMapper],
expected_downstream_key: str,
):
pm = mapper_cls()
assert pm.to_downstream("2026-02-10T14:30:45") == expected_downstream_key

@pytest.mark.parametrize(
("mapper_cls", "expected_outut_format"),
[
(HourlyMapper, "%Y-%m-%dT%H"),
(DailyMapper, "%Y-%m-%d"),
(WeeklyMapper, "%Y-%m-%d (W%V)"),
(MonthlyMapper, "%Y-%m"),
(QuarterlyMapper, "%Y-Q{quarter}"),
(YearlyMapper, "%Y"),
],
)
def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_format: str):
pm = mapper_cls()
assert pm.serialize() == {
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": expected_outut_format,
}

@pytest.mark.parametrize(
"mapper_cls",
[HourlyMapper, DailyMapper, WeeklyMapper, MonthlyMapper, QuarterlyMapper, YearlyMapper],
)
def test_deserialize(self, mapper_cls):
pm = mapper_cls.deserialize(
{
"input_format": "%Y-%m-%dT%H:%M:%S",
"output_format": "customized-format",
}
)
assert isinstance(pm, mapper_cls)
assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
assert pm.output_format == "customized-format"
Loading
Loading