Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1b2a724
fix(cdk): replace direct datetime parsing/formatting with datetime_he…
devin-ai-integration[bot] Apr 8, 2025
f7f9868
fix: apply ruff formatting
devin-ai-integration[bot] Apr 8, 2025
e4d281f
fix: respect format parameter in ab_datetime_format for AirbyteDateTi…
devin-ai-integration[bot] Apr 8, 2025
c4f2e3a
fix: handle AirbyteDateTime objects in DatetimeBasedCursor
devin-ai-integration[bot] Apr 9, 2025
e6db281
fix: convert AirbyteDateTime to standard datetime before subtraction
devin-ai-integration[bot] Apr 9, 2025
eddec7b
fix: apply ruff formatting to datetime_helpers.py
devin-ai-integration[bot] Apr 9, 2025
46377c7
fix: improve AirbyteDateTime.__add__ to properly check input type
devin-ai-integration[bot] Apr 9, 2025
db3c1b9
fix: improve AirbyteDateTime.__add__ to convert to standard datetime …
devin-ai-integration[bot] Apr 9, 2025
43bce33
fix: update to_datetime to return standard datetime object
devin-ai-integration[bot] Apr 9, 2025
64f9662
fix: apply ruff formatting to datetime_helpers.py
devin-ai-integration[bot] Apr 9, 2025
7a609ad
fix: add special handling for %s format in ab_datetime_parse
devin-ai-integration[bot] Apr 9, 2025
3a9f153
fix: apply ruff formatting to datetime_helpers.py
devin-ai-integration[bot] Apr 9, 2025
af7d32c
manual updates (aj)
aaronsteers Apr 9, 2025
b410369
fix issues
aaronsteers Apr 9, 2025
71112d6
clean up
aaronsteers Apr 9, 2025
8c4b8b7
fix typing issues
aaronsteers Apr 9, 2025
081a36a
more now() updates
aaronsteers Apr 9, 2025
8ae67b2
chore: fix now refs
aaronsteers Apr 9, 2025
df49470
remove extra casting
aaronsteers Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/declarative/async_job/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from datetime import datetime, timedelta, timezone
from typing import Optional

from airbyte_cdk.utils.datetime_helpers import ab_datetime_now


class Timer:
def __init__(self, timeout: timedelta) -> None:
Expand Down Expand Up @@ -36,4 +38,4 @@ def has_timed_out(self) -> bool:

@staticmethod
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
return ab_datetime_now()
3 changes: 2 additions & 1 deletion airbyte_cdk/sources/declarative/auth/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now


class JwtAlgorithm(str):
Expand Down Expand Up @@ -127,7 +128,7 @@ def _get_jwt_payload(self) -> dict[str, Any]:
"""
Builds and returns the payload used when signing the JWT.
"""
now = int(datetime.now().timestamp())
now = ab_datetime_now().timestamp()
exp = now + self._token_duration if isinstance(self._token_duration, int) else now
nbf = now

Expand Down
16 changes: 10 additions & 6 deletions airbyte_cdk/sources/declarative/datetime/min_max_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse


@dataclass
Expand Down Expand Up @@ -39,7 +40,6 @@ class MinMaxDatetime:

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.datetime = InterpolatedString.create(self.datetime, parameters=parameters or {})
self._parser = DatetimeParser()
self.min_datetime = (
InterpolatedString.create(self.min_datetime, parameters=parameters) # type: ignore [assignment] # expression has type "InterpolatedString | None", variable has type "InterpolatedString | str"
if self.min_datetime
Expand All @@ -65,25 +65,29 @@ def get_datetime(
if not datetime_format:
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"

time = self._parser.parse(
time = ab_datetime_parse(
str(
self.datetime.eval( # type: ignore[union-attr] # str has no attribute "eval"
config,
**additional_parameters,
)
),
datetime_format,
) # type: ignore # datetime is always cast to an interpolated string
formats=[datetime_format] if datetime_format else None,
)

if self.min_datetime:
min_time = str(self.min_datetime.eval(config, **additional_parameters)) # type: ignore # min_datetime is always cast to an interpolated string
if min_time:
min_datetime = self._parser.parse(min_time, datetime_format) # type: ignore # min_datetime is always cast to an interpolated string
min_datetime = ab_datetime_parse(
min_time, formats=[datetime_format] if datetime_format else None
)
time = max(time, min_datetime)
if self.max_datetime:
max_time = str(self.max_datetime.eval(config, **additional_parameters)) # type: ignore # max_datetime is always cast to an interpolated string
if max_time:
max_datetime = self._parser.parse(max_time, datetime_format)
max_datetime = ab_datetime_parse(
max_time, formats=[datetime_format] if datetime_format else None
)
time = min(time, max_datetime)
return time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
)
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.utils.datetime_helpers import (
ab_datetime_format,
ab_datetime_now,
ab_datetime_parse,
)
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths


Expand Down Expand Up @@ -89,7 +94,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters)
)

self._timezone = datetime.timezone.utc
self._interpolation = JinjaInterpolation()

self._step = (
Expand All @@ -112,7 +116,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._partition_field_end = InterpolatedString.create(
self.partition_field_end or "end_time", parameters=parameters
)
self._parser = DatetimeParser()

# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
if not self._start_datetime.datetime_format:
Expand Down Expand Up @@ -240,10 +243,10 @@ def select_best_end_datetime(self) -> datetime.datetime:

:return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
"""
now = datetime.datetime.now(tz=self._timezone)
now = ab_datetime_now()
if not self._end_datetime:
return now
return min(self._end_datetime.get_datetime(self.config), now)
return min(self._end_datetime.get_datetime(self.config), now.to_datetime())

def _calculate_cursor_datetime_from_state(
self, stream_state: Mapping[str, Any]
Expand All @@ -253,7 +256,12 @@ def _calculate_cursor_datetime_from_state(
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime) -> str:
return self._parser.format(dt, self.datetime_format)
"""Format the datetime according to the configured format.

# TODO: Standardize cursor serialization with ISO 8601 format and deprecate custom formats
# in STATE messages.
"""
return ab_datetime_format(dt, self.datetime_format)

def _partition_daterange(
self,
Expand Down Expand Up @@ -308,12 +316,15 @@ def _get_date(
return comparator(cursor_date, default_date)

def parse_date(self, date: str) -> datetime.datetime:
for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
try:
return self._parser.parse(date, datetime_format)
except ValueError:
pass
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
formats = list(set(self.cursor_datetime_formats + [self.datetime_format]))
try:
return ab_datetime_parse(
date,
formats=formats,
disallow_other_formats=False, # TODO: Consider permissive parsing.
)
except ValueError as ex:
raise ValueError(f"No format in {formats} matching '{date}'") from ex

@classmethod
def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:
Expand Down
40 changes: 18 additions & 22 deletions airbyte_cdk/sources/declarative/interpolation/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from isodate import parse_duration

from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.utils.datetime_helpers import (
ab_datetime_format,
ab_datetime_now,
ab_datetime_parse,
)

"""
This file contains macros that can be evaluated by a `JinjaInterpolation` object
Expand All @@ -26,7 +31,7 @@ def now_utc() -> datetime.datetime:
Usage:
`"{{ now_utc() }}"`
"""
return datetime.datetime.now(datetime.timezone.utc)
return ab_datetime_now()


def today_utc() -> datetime.date:
Expand All @@ -36,7 +41,7 @@ def today_utc() -> datetime.date:
Usage:
`"{{ today_utc() }}"`
"""
return datetime.datetime.now(datetime.timezone.utc).date()
return ab_datetime_now().date()


def today_with_timezone(timezone: str) -> datetime.date:
Expand All @@ -46,7 +51,7 @@ def today_with_timezone(timezone: str) -> datetime.date:
:param timezone: timezone expressed as IANA keys format. Example: "Pacific/Tarawa"
:return:
"""
return datetime.datetime.now(tz=pytz.timezone(timezone)).date()
return ab_datetime_now(tz=timezone).date()


def timestamp(dt: Union[float, str]) -> Union[int, float]:
Expand All @@ -62,10 +67,7 @@ def timestamp(dt: Union[float, str]) -> Union[int, float]:
:param dt: datetime to convert to timestamp
:return: unix timestamp
"""
if isinstance(dt, (int, float)):
return int(dt)
else:
return str_to_datetime(dt).astimezone(pytz.utc).timestamp()
return ab_datetime_parse(dt).timestamp()


def str_to_datetime(s: str) -> datetime.datetime:
Expand All @@ -82,12 +84,7 @@ def str_to_datetime(s: str) -> datetime.datetime:
:param s: string to parse as datetime
:return: datetime object in UTC timezone
"""

parsed_date = parser.isoparse(s)
if not parsed_date.tzinfo:
# Assume UTC if the input does not contain a timezone
parsed_date = parsed_date.replace(tzinfo=pytz.utc)
return parsed_date.astimezone(pytz.utc)
return ab_datetime_parse(s)


def max(*args: typing.Any) -> typing.Any:
Expand Down Expand Up @@ -140,9 +137,10 @@ def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:
:param num_days: number of days to add to current date time
:return: datetime formatted as RFC3339
"""
return (
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=num_days)
).strftime(format)
return ab_datetime_format(
ab_datetime_now() + datetime.timedelta(days=num_days),
format=format,
)


def duration(datestring: str) -> Union[datetime.timedelta, isodate.Duration]:
Expand All @@ -169,17 +167,15 @@ def format_datetime(
https://github.com/python/cpython/issues/56959
"""
if isinstance(dt, datetime.datetime):
return dt.strftime(format)
return ab_datetime_format(dt, format)

if isinstance(dt, int):
dt_datetime = DatetimeParser().parse(dt, input_format if input_format else "%s")
dt_datetime = ab_datetime_parse(dt, formats=[input_format])
else:
dt_datetime = (
datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt)
)
dt_datetime = ab_datetime_parse(dt, formats=[input_format])
if dt_datetime.tzinfo is None:
dt_datetime = dt_datetime.replace(tzinfo=pytz.utc)
return DatetimeParser().format(dt=dt_datetime, format=format)
return ab_datetime_format(dt_datetime, format=format)


_macros_list = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now

ComponentDefinition = Mapping[str, Any]

Expand Down Expand Up @@ -3532,7 +3533,7 @@ def create_fixed_window_call_rate_policy(
# Set the initial reset timestamp to 10 days from now.
# This value will be updated by the first request.
return FixedWindowCallRatePolicy(
next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10),
next_reset_ts=ab_datetime_now() + datetime.timedelta(days=10),
period=parse_duration(model.period),
call_limit=model.call_limit,
matchers=matchers,
Expand Down
7 changes: 6 additions & 1 deletion airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use_file_transfer,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse


class FileReadMode(Enum):
Expand Down Expand Up @@ -95,7 +96,11 @@ def filter_files_by_globs_and_start_date(
Utility method for filtering files based on globs.
"""
start_date = (
datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
ab_datetime_parse(
self.config.start_date,
formats=[self.DATE_TIME_FORMAT],
disallow_other_formats=False,
)
if self.config and self.config.start_date
else None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from airbyte_cdk.utils import is_cloud_environment
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

unstructured_partition_pdf = None
Expand Down Expand Up @@ -300,7 +301,7 @@ def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[st
format_config.processing,
FileType.MD,
"auto",
RemoteFile(uri="test", last_modified=datetime.now()),
RemoteFile(uri="test", last_modified=ab_datetime_now()),
)
except Exception:
return False, "".join(traceback.format_exc())
Expand Down
Loading
Loading