diff --git a/airbyte_cdk/sources/declarative/interpolation/macros.py b/airbyte_cdk/sources/declarative/interpolation/macros.py index 9b8aca336..6ed0971bc 100644 --- a/airbyte_cdk/sources/declarative/interpolation/macros.py +++ b/airbyte_cdk/sources/declarative/interpolation/macros.py @@ -71,6 +71,24 @@ def timestamp(dt: Union[float, str]) -> Union[int, float]: return str_to_datetime(dt).astimezone(pytz.utc).timestamp() +def timestamp_to_datetime(ts: Union[int, float, str]) -> datetime.datetime: + """ + Converts a Unix timestamp to a datetime object with UTC timezone. + + Usage: + "{{ timestamp_to_datetime(1658505815) }}" + + :param ts: Unix timestamp (in seconds) to convert to datetime + :return: datetime object in UTC timezone + """ + try: + ts_value = float(ts) + except (TypeError, ValueError) as exc: + raise ValueError(f"Invalid timestamp value: {ts}") from exc + + return datetime.datetime.fromtimestamp(ts_value, tz=datetime.timezone.utc) + + def str_to_datetime(s: str) -> datetime.datetime: """ Converts a string to a datetime object with UTC timezone @@ -222,6 +240,7 @@ def generate_uuid() -> str: now_utc, today_utc, timestamp, + timestamp_to_datetime, max, min, day_delta, diff --git a/unit_tests/sources/declarative/interpolation/test_macros.py b/unit_tests/sources/declarative/interpolation/test_macros.py index 1102cbc35..0dd6f368e 100644 --- a/unit_tests/sources/declarative/interpolation/test_macros.py +++ b/unit_tests/sources/declarative/interpolation/test_macros.py @@ -15,6 +15,7 @@ [ ("test_now_utc", "now_utc", True), ("test_today_utc", "today_utc", True), + ("test_timestamp_to_datetime", "timestamp_to_datetime", True), ("test_max", "max", True), ("test_min", "min", True), ("test_day_delta", "day_delta", True), @@ -176,6 +177,38 @@ def test_timestamp(test_name, input_value, expected_output): assert actual_output == expected_output +@pytest.mark.parametrize( + "test_name, input_value, expected_output", + [ + ( + "test_seconds_int", + 1646006400, + datetime.datetime(2022, 2, 28, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + "test_seconds_float", + 1646006400.5, + datetime.datetime(2022, 2, 28, 0, 0, 0, 500000, tzinfo=datetime.timezone.utc), + ), + ( + "test_seconds_string", + "1646006400", + datetime.datetime(2022, 2, 28, 0, 0, tzinfo=datetime.timezone.utc), + ), + ], +) +def test_timestamp_to_datetime(test_name, input_value, expected_output): + timestamp_to_datetime_fn = macros["timestamp_to_datetime"] + actual_output = timestamp_to_datetime_fn(input_value) + assert actual_output == expected_output + + +def test_timestamp_to_datetime_invalid_value(): + timestamp_to_datetime_fn = macros["timestamp_to_datetime"] + with pytest.raises(ValueError): + timestamp_to_datetime_fn("invalid-timestamp") + + def test_utc_datetime_to_local_timestamp_conversion(): """ This test ensures correct timezone handling independent of the timezone of the system on which the sync is running.