diff --git a/airbyte_cdk/sources/declarative/async_job/timer.py b/airbyte_cdk/sources/declarative/async_job/timer.py index c4e5a9a1d..a8e77c600 100644 --- a/airbyte_cdk/sources/declarative/async_job/timer.py +++ b/airbyte_cdk/sources/declarative/async_job/timer.py @@ -2,6 +2,8 @@ from datetime import datetime, timedelta, timezone from typing import Optional +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now + class Timer: def __init__(self, timeout: timedelta) -> None: @@ -36,4 +38,4 @@ def has_timed_out(self) -> bool: @staticmethod def _now() -> datetime: - return datetime.now(tz=timezone.utc) + return ab_datetime_now() diff --git a/airbyte_cdk/sources/declarative/auth/jwt.py b/airbyte_cdk/sources/declarative/auth/jwt.py index c83d081bb..203c64b03 100644 --- a/airbyte_cdk/sources/declarative/auth/jwt.py +++ b/airbyte_cdk/sources/declarative/auth/jwt.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now class JwtAlgorithm(str): @@ -127,7 +128,7 @@ def _get_jwt_payload(self) -> dict[str, Any]: """ Builds and returns the payload used when signing the JWT. """ - now = int(datetime.now().timestamp()) + now = ab_datetime_now().timestamp() exp = now + self._token_duration if isinstance(self._token_duration, int) else now nbf = now diff --git a/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py b/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py index eb407db44..dc84f0ab1 100644 --- a/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py +++ b/airbyte_cdk/sources/declarative/datetime/min_max_datetime.py @@ -8,6 +8,7 @@ from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse @dataclass @@ -39,7 +40,6 @@ class MinMaxDatetime: def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.datetime = InterpolatedString.create(self.datetime, parameters=parameters or {}) - self._parser = DatetimeParser() self.min_datetime = ( InterpolatedString.create(self.min_datetime, parameters=parameters) # type: ignore [assignment] # expression has type "InterpolatedString | None", variable has type "InterpolatedString | str" if self.min_datetime @@ -65,25 +65,29 @@ def get_datetime( if not datetime_format: datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z" - time = self._parser.parse( + time = ab_datetime_parse( str( self.datetime.eval( # type: ignore[union-attr] # str has no attribute "eval" config, **additional_parameters, ) ), - datetime_format, - ) # type: ignore # datetime is always cast to an interpolated string + formats=[datetime_format] if datetime_format else None, + ) if self.min_datetime: min_time = str(self.min_datetime.eval(config, **additional_parameters)) # type: ignore # min_datetime is always cast to an interpolated string if min_time: - min_datetime = self._parser.parse(min_time, datetime_format) # type: ignore # min_datetime is always cast to an interpolated string + min_datetime = ab_datetime_parse( + min_time, formats=[datetime_format] if datetime_format else None + ) time = max(time, min_datetime) if self.max_datetime: max_time = str(self.max_datetime.eval(config, **additional_parameters)) # type: ignore # max_datetime is always cast to an interpolated string if max_time: - max_datetime = self._parser.parse(max_time, datetime_format) + max_datetime = ab_datetime_parse( + max_time, formats=[datetime_format] if datetime_format else None + ) time = min(time, max_datetime) return time diff --git a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py b/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py index cb39f56ba..152d51824 100644 --- a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py @@ -21,6 +21,11 @@ ) from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.utils.datetime_helpers import ( + ab_datetime_format, + ab_datetime_now, + ab_datetime_parse, +) from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths @@ -89,7 +94,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters) ) - self._timezone = datetime.timezone.utc self._interpolation = JinjaInterpolation() self._step = ( @@ -112,7 +116,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._partition_field_end = InterpolatedString.create( self.partition_field_end or "end_time", parameters=parameters ) - self._parser = DatetimeParser() # If datetime format is not specified then start/end datetime should inherit it from the stream slicer if not self._start_datetime.datetime_format: @@ -240,10 +243,10 @@ def select_best_end_datetime(self) -> datetime.datetime: :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. """ - now = datetime.datetime.now(tz=self._timezone) + now = ab_datetime_now() if not self._end_datetime: return now - return min(self._end_datetime.get_datetime(self.config), now) + return min(self._end_datetime.get_datetime(self.config), now.to_datetime()) def _calculate_cursor_datetime_from_state( self, stream_state: Mapping[str, Any] @@ -253,7 +256,12 @@ def _calculate_cursor_datetime_from_state( return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) def _format_datetime(self, dt: datetime.datetime) -> str: - return self._parser.format(dt, self.datetime_format) + """Format the datetime according to the configured format. + + # TODO: Standardize cursor serialization with ISO 8601 format and deprecate custom formats + # in STATE messages. + """ + return ab_datetime_format(dt, self.datetime_format) def _partition_daterange( self, @@ -308,12 +316,15 @@ def _get_date( return comparator(cursor_date, default_date) def parse_date(self, date: str) -> datetime.datetime: - for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: - try: - return self._parser.parse(date, datetime_format) - except ValueError: - pass - raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}") + formats = list(set(self.cursor_datetime_formats + [self.datetime_format])) + try: + return ab_datetime_parse( + date, + formats=formats, + disallow_other_formats=False, # TODO: Consider permissive parsing. + ) + except ValueError as ex: + raise ValueError(f"No format in {formats} matching '{date}'") from ex @classmethod def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]: diff --git a/airbyte_cdk/sources/declarative/interpolation/macros.py b/airbyte_cdk/sources/declarative/interpolation/macros.py index f84ece214..fd3f63c59 100644 --- a/airbyte_cdk/sources/declarative/interpolation/macros.py +++ b/airbyte_cdk/sources/declarative/interpolation/macros.py @@ -13,6 +13,11 @@ from isodate import parse_duration from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser +from airbyte_cdk.utils.datetime_helpers import ( + ab_datetime_format, + ab_datetime_now, + ab_datetime_parse, +) """ This file contains macros that can be evaluated by a `JinjaInterpolation` object @@ -26,7 +31,7 @@ def now_utc() -> datetime.datetime: Usage: `"{{ now_utc() }}"` """ - return datetime.datetime.now(datetime.timezone.utc) + return ab_datetime_now() def today_utc() -> datetime.date: @@ -36,7 +41,7 @@ def today_utc() -> datetime.date: Usage: `"{{ today_utc() }}"` """ - return datetime.datetime.now(datetime.timezone.utc).date() + return ab_datetime_now().date() def today_with_timezone(timezone: str) -> datetime.date: @@ -46,7 +51,7 @@ def today_with_timezone(timezone: str) -> datetime.date: :param timezone: timezone expressed as IANA keys format. Example: "Pacific/Tarawa" :return: """ - return datetime.datetime.now(tz=pytz.timezone(timezone)).date() + return ab_datetime_now(tz=timezone).date() def timestamp(dt: Union[float, str]) -> Union[int, float]: @@ -62,10 +67,7 @@ def timestamp(dt: Union[float, str]) -> Union[int, float]: :param dt: datetime to convert to timestamp :return: unix timestamp """ - if isinstance(dt, (int, float)): - return int(dt) - else: - return str_to_datetime(dt).astimezone(pytz.utc).timestamp() + return ab_datetime_parse(dt).timestamp() def str_to_datetime(s: str) -> datetime.datetime: @@ -82,12 +84,7 @@ def str_to_datetime(s: str) -> datetime.datetime: :param s: string to parse as datetime :return: datetime object in UTC timezone """ - - parsed_date = parser.isoparse(s) - if not parsed_date.tzinfo: - # Assume UTC if the input does not contain a timezone - parsed_date = parsed_date.replace(tzinfo=pytz.utc) - return parsed_date.astimezone(pytz.utc) + return ab_datetime_parse(s) def max(*args: typing.Any) -> typing.Any: @@ -140,9 +137,10 @@ def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str: :param num_days: number of days to add to current date time :return: datetime formatted as RFC3339 """ - return ( - datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=num_days) - ).strftime(format) + return ab_datetime_format( + ab_datetime_now() + datetime.timedelta(days=num_days), + format=format, + ) def duration(datestring: str) -> Union[datetime.timedelta, isodate.Duration]: @@ -169,17 +167,15 @@ def format_datetime( https://github.com/python/cpython/issues/56959 """ if isinstance(dt, datetime.datetime): - return dt.strftime(format) + return ab_datetime_format(dt, format) if isinstance(dt, int): - dt_datetime = DatetimeParser().parse(dt, input_format if input_format else "%s") + dt_datetime = ab_datetime_parse(dt, formats=[input_format]) else: - dt_datetime = ( - datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt) - ) + dt_datetime = ab_datetime_parse(dt, formats=[input_format]) if dt_datetime.tzinfo is None: dt_datetime = dt_datetime.replace(tzinfo=pytz.utc) - return DatetimeParser().format(dt=dt_datetime, format=format) + return ab_datetime_format(dt_datetime, format=format) _macros_list = [ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 25840f06f..8663b222e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -546,6 +546,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now ComponentDefinition = Mapping[str, Any] @@ -3532,7 +3533,7 @@ def create_fixed_window_call_rate_policy( # Set the initial reset timestamp to 10 days from now. # This value will be updated by the first request. return FixedWindowCallRatePolicy( - next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10), + next_reset_ts=ab_datetime_now() + datetime.timedelta(days=10), period=parse_duration(model.period), call_limit=model.call_limit, matchers=matchers, diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index cbf3d119b..0653b05eb 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -19,6 +19,7 @@ use_file_transfer, ) from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse class FileReadMode(Enum): @@ -95,7 +96,11 @@ def filter_files_by_globs_and_start_date( Utility method for filtering files based on globs. """ start_date = ( - datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) + ab_datetime_parse( + self.config.start_date, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ) if self.config and self.config.start_date else None ) diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index f55675e0a..38273ce1e 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -37,6 +37,7 @@ from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import SchemaType from airbyte_cdk.utils import is_cloud_environment +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now from airbyte_cdk.utils.traced_exception import AirbyteTracedException unstructured_partition_pdf = None @@ -300,7 +301,7 @@ def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[st format_config.processing, FileType.MD, "auto", - RemoteFile(uri="test", last_modified=datetime.now()), + RemoteFile(uri="test", last_modified=ab_datetime_now()), ) except Exception: return False, "".join(traceback.format_exc()) diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index a70169197..5fc66d182 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -20,6 +20,11 @@ from airbyte_cdk.sources.streams.concurrent.cursor import CursorField from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.types import Record +from airbyte_cdk.utils.datetime_helpers import ( + ab_datetime_format, + ab_datetime_now, + ab_datetime_parse, +) if TYPE_CHECKING: from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamPartition @@ -110,11 +115,15 @@ def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datet ) cursor_str = min(prev_cursor_str, earliest_file_cursor_value) cursor_dt, cursor_uri = cursor_str.split("_", 1) - return datetime.strptime(cursor_dt, self.DATE_TIME_FORMAT), cursor_uri + return ab_datetime_parse( + cursor_dt, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ), cursor_uri def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str: if file: - return f"{datetime.strftime(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}" + return f"{ab_datetime_format(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}" return self.zero_cursor_value def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: @@ -125,7 +134,11 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: ) return RemoteFile( uri=filename, - last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT), + last_modified=ab_datetime_parse( + last_modified, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ), ) else: return None @@ -153,8 +166,9 @@ def add_file(self, file: RemoteFile) -> None: ) else: self._pending_files.pop(file.uri) - self._file_to_datetime_history[file.uri] = file.last_modified.strftime( - self.DATE_TIME_FORMAT + self._file_to_datetime_history[file.uri] = ab_datetime_format( + file.last_modified, + format=self.DATE_TIME_FORMAT, ) if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: # Get the earliest file based on its last modified date and its uri @@ -206,11 +220,16 @@ def _compute_latest_file_in_history(self) -> Optional[RemoteFile]: with self._state_lock: if self._file_to_datetime_history: filename, last_modified = max( - self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) + self._file_to_datetime_history.items(), + key=lambda f: (f[1], f[0]), ) return RemoteFile( uri=filename, - last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT), + last_modified=ab_datetime_parse( + last_modified, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ), ) else: return None @@ -239,8 +258,10 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: with self._state_lock: if file.uri in self._file_to_datetime_history: # If the file's uri is in the history, we should sync the file if it has been modified since it was synced - updated_at_from_history = datetime.strptime( - self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT + updated_at_from_history = ab_datetime_parse( + self._file_to_datetime_history[file.uri], + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, ) if file.last_modified < updated_at_from_history: self._message_repository.emit_message( @@ -287,9 +308,13 @@ def _compute_start_time(self) -> datetime: return datetime.min else: earliest = min(self._file_to_datetime_history.values()) - earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) + earliest_dt = ab_datetime_parse( + earliest, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ) if self._is_history_full(): - time_window = datetime.now() - self._time_window_if_history_is_full + time_window = ab_datetime_now() - self._time_window_if_history_is_full earliest_dt = min(earliest_dt, time_window) return earliest_dt diff --git a/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py b/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py index 08ad8c3ae..fe7ad8c3f 100644 --- a/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py @@ -12,6 +12,11 @@ AbstractFileBasedCursor, ) from airbyte_cdk.sources.file_based.types import StreamState +from airbyte_cdk.utils.datetime_helpers import ( + ab_datetime_format, + ab_datetime_now, + ab_datetime_parse, +) class DefaultFileBasedCursor(AbstractFileBasedCursor): @@ -42,8 +47,8 @@ def set_initial_state(self, value: StreamState) -> None: self._initial_earliest_file_in_history = self._compute_earliest_file_in_history() def add_file(self, file: RemoteFile) -> None: - self._file_to_datetime_history[file.uri] = file.last_modified.strftime( - self.DATE_TIME_FORMAT + self._file_to_datetime_history[file.uri] = ab_datetime_format( + file.last_modified, self.DATE_TIME_FORMAT ) if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: # Get the earliest file based on its last modified date and its uri @@ -82,8 +87,10 @@ def _is_history_full(self) -> bool: def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: if file.uri in self._file_to_datetime_history: # If the file's uri is in the history, we should sync the file if it has been modified since it was synced - updated_at_from_history = datetime.strptime( - self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT + updated_at_from_history = ab_datetime_parse( + self._file_to_datetime_history[file.uri], + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, ) if file.last_modified < updated_at_from_history: logger.warning( @@ -132,7 +139,12 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) ) return RemoteFile( - uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT) + uri=filename, + last_modified=ab_datetime_parse( + last_modified, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ), ) else: return None @@ -142,8 +154,12 @@ def _compute_start_time(self) -> datetime: return datetime.min else: earliest = min(self._file_to_datetime_history.values()) - earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) + earliest_dt = ab_datetime_parse( + earliest, + formats=[self.DATE_TIME_FORMAT], + disallow_other_formats=False, + ) if self._is_history_full(): - time_window = datetime.now() - self._time_window_if_history_is_full + time_window = ab_datetime_now() - self._time_window_if_history_is_full earliest_dt = min(earliest_dt, time_window) return earliest_dt diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 14f823e45..206d475f4 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -19,6 +19,8 @@ from pyrate_limiter import Rate as PyRateRate from pyrate_limiter.exceptions import BucketFullException +from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now + # prevents mypy from complaining about missing session attributes in LimiterMixin if TYPE_CHECKING: MIXIN_BASE = requests.Session @@ -280,7 +282,7 @@ class UnlimitedCallRatePolicy(BaseCallRatePolicy): ), FixedWindowCallRatePolicy( matchers=[HttpRequestMatcher(url="/some/method")], - next_reset_ts=datetime.now(), + next_reset_ts=ab_datetime_now(), period=timedelta(hours=1) call_limit=1000, ), @@ -332,7 +334,7 @@ def try_acquire(self, request: Any, weight: int) -> None: self._update_current_window() if self._calls_num + weight > self._call_limit: - reset_in = self._next_reset_ts - datetime.datetime.now() + reset_in = self._next_reset_ts - ab_datetime_now() error_message = ( f"reached maximum number of allowed calls {self._call_limit} " f"per {self._offset} interval, next reset in {reset_in}." @@ -385,7 +387,7 @@ def update( self._next_reset_ts = call_reset_ts def _update_current_window(self) -> None: - now = datetime.datetime.now() + now = ab_datetime_now() if now > self._next_reset_ts: logger.debug("started new window, %s calls available now", self._call_limit) self._next_reset_ts = self._next_reset_ts + self._offset @@ -662,7 +664,7 @@ def get_reset_ts_from_response( self, response: requests.Response ) -> Optional[datetime.datetime]: if response.headers.get(self._ratelimit_reset_header): - return datetime.datetime.fromtimestamp( + return AirbyteDateTime.fromtimestamp( int(response.headers[self._ratelimit_reset_header]) ) return None diff --git a/airbyte_cdk/utils/datetime_format_inferrer.py b/airbyte_cdk/utils/datetime_format_inferrer.py index 28eaefa31..0ff4cce82 100644 --- a/airbyte_cdk/utils/datetime_format_inferrer.py +++ b/airbyte_cdk/utils/datetime_format_inferrer.py @@ -6,6 +6,7 @@ from airbyte_cdk.models import AirbyteRecordMessage from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser +from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse, ab_datetime_try_parse class DatetimeFormatInferrer: @@ -53,9 +54,13 @@ def _can_be_datetime(self, value: Any) -> bool: def _matches_format(self, value: Any, format: str) -> bool: """Checks if the value matches the format""" try: - self._parser.parse(value, format) - return True - except ValueError: + dt = ab_datetime_try_parse( + value, + formats=[format], + disallow_other_formats=True, + ) + return dt is not None + except (ValueError, TypeError): return False def _initialize(self, record: AirbyteRecordMessage) -> None: diff --git a/airbyte_cdk/utils/datetime_helpers.py b/airbyte_cdk/utils/datetime_helpers.py index 99cf1ad23..ab9cf4d63 100644 --- a/airbyte_cdk/utils/datetime_helpers.py +++ b/airbyte_cdk/utils/datetime_helpers.py @@ -81,8 +81,9 @@ ``` """ +import decimal from datetime import datetime, timedelta, timezone -from typing import Any, Optional, Union, overload +from typing import Any, Optional, Union, cast, overload from dateutil import parser from typing_extensions import Never @@ -219,8 +220,9 @@ def __add__(self, other: timedelta) -> "AirbyteDateTime": Raises: TypeError: If other is not a timedelta. """ - result = super().__add__(other) - if isinstance(result, datetime): + if isinstance(other, timedelta): + dt = self.to_datetime() + result = dt + other return AirbyteDateTime.from_datetime(result) raise TypeError("Invalid operation") @@ -342,11 +344,11 @@ def from_str(cls, dt_str: str) -> "AirbyteDateTime": return ab_datetime_parse(dt_str) -def ab_datetime_now() -> AirbyteDateTime: +def ab_datetime_now( + tz: str | timezone | None = None, +) -> AirbyteDateTime: """Returns the current time as an AirbyteDateTime in UTC timezone. - Previously named: now() - Returns: AirbyteDateTime: Current UTC time. @@ -358,7 +360,11 @@ def ab_datetime_now() -> AirbyteDateTime: return AirbyteDateTime.from_datetime(datetime.now(timezone.utc)) -def ab_datetime_parse(dt_str: str | int) -> AirbyteDateTime: +def ab_datetime_parse( + dt_str: str | int | float, + formats: list[str | None] | list[str] | None = None, + disallow_other_formats: bool = False, +) -> AirbyteDateTime: """Parses a datetime string or timestamp into an AirbyteDateTime with timezone awareness. This implementation is as flexible as possible to handle various datetime formats. @@ -374,6 +380,10 @@ def ab_datetime_parse(dt_str: str | int) -> AirbyteDateTime: Args: dt_str: A datetime string in ISO8601/RFC3339 format, Unix timestamp (int/str), or other recognizable datetime format. + formats: Optional list of format strings to try before falling back to more + forgiving parsing logic. If provided, these formats will be tried in order. + disallow_other_formats: If True, only the provided formats will be used for parsing, + and more forgiving parsing logic will not be attempted. Returns: AirbyteDateTime: A timezone-aware datetime object. @@ -388,26 +398,107 @@ def ab_datetime_parse(dt_str: str | int) -> AirbyteDateTime: '2023-03-14T15:00:00+00:00' >>> ab_datetime_parse("2023-03-14") # Date-only '2023-03-14T00:00:00+00:00' + >>> ab_datetime_parse("2023-03-14 15:09:26", formats=["%Y-%m-%d %H:%M:%S"]) + '2023-03-14T15:09:26+00:00' """ try: + # Remove None values from formats list, and coalesce to None if empty + formats = [f for f in formats or [] if f] or None + + if isinstance(dt_str, str): + if dt_str.startswith("-"): + raise ValueError("Timestamp cannot be negative: " + dt_str) + + if dt_str[1:].replace(".", "").isdigit(): + # Handle floats and ints as strings + if "." in dt_str: + dt_str = float(dt_str) + else: + dt_str = int(dt_str) + + if isinstance(dt_str, float): + # Handle float values as Unix timestamps (UTC) + if dt_str < 0: + raise ValueError("Timestamp cannot be negative") + if len(str(abs(int(dt_str)))) > 10: + raise ValueError("Timestamp value too large") + + instant = Instant.from_timestamp(dt_str) + return AirbyteDateTime.from_datetime(instant.py_datetime()) + # Handle numeric values as Unix timestamps (UTC) if isinstance(dt_str, int) or ( isinstance(dt_str, str) and (dt_str.isdigit() or (dt_str.startswith("-") and dt_str[1:].isdigit())) + and ( + not formats + or ("%s" in formats) # Custom case for Unix timestamp in declarative sources + ) ): timestamp = int(dt_str) if timestamp < 0: raise ValueError("Timestamp cannot be negative") if len(str(abs(timestamp))) > 10: raise ValueError("Timestamp value too large") + instant = Instant.from_timestamp(timestamp) return AirbyteDateTime.from_datetime(instant.py_datetime()) + if formats: + for format_str in formats: + try: + if format_str == "%s": + if isinstance(dt_str, int) or ( + isinstance(dt_str, str) and dt_str.isdigit() + ): + timestamp = int(dt_str) + if timestamp < 0: + raise ValueError("Timestamp cannot be negative") + return AirbyteDateTime.from_datetime( + datetime.fromtimestamp(timestamp, tz=timezone.utc) + ) + parsed = datetime.strptime(dt_str, format_str) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return AirbyteDateTime.from_datetime(parsed) + except ValueError: + continue + + if disallow_other_formats: + raise ValueError( + f"Could not parse datetime string '{dt_str}' with any of the provided formats: {formats}" + ) + if not isinstance(dt_str, str): raise ValueError( f"Could not parse datetime string: expected string or integer, got {type(dt_str)}" ) + if formats: + for format_str in formats: + try: + if format_str == "%s": + if isinstance(dt_str, int) or ( + isinstance(dt_str, str) and dt_str.isdigit() + ): + timestamp = int(dt_str) + if timestamp < 0: + raise ValueError("Timestamp cannot be negative") + return AirbyteDateTime.from_datetime( + datetime.fromtimestamp(timestamp, tz=timezone.utc) + ) + parsed = datetime.strptime(dt_str, format_str) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return AirbyteDateTime.from_datetime(parsed) + except ValueError: + continue + + if disallow_other_formats: + raise ValueError( + f"Could not parse datetime string '{dt_str}' with any of the provided formats: {formats}" + ) + # Handle date-only format first if ":" not in dt_str and dt_str.count("-") == 2 and "/" not in dt_str: try: @@ -432,6 +523,7 @@ def ab_datetime_parse(dt_str: str | int) -> AirbyteDateTime: return AirbyteDateTime.from_datetime(parsed) except (ValueError, TypeError): raise ValueError(f"Could not parse datetime string: {dt_str}") + except ValueError as e: if "Invalid date format:" in str(e): raise @@ -442,7 +534,9 @@ def ab_datetime_parse(dt_str: str | int) -> AirbyteDateTime: raise ValueError(f"Could not parse datetime string: {dt_str}") -def ab_datetime_try_parse(dt_str: str) -> AirbyteDateTime | None: +def ab_datetime_try_parse( + dt_str: str | int, formats: list[str] | None = None, disallow_other_formats: bool = False +) -> AirbyteDateTime | None: """Try to parse the input as a datetime, failing gracefully instead of raising an exception. This is a thin wrapper around `ab_datetime_parse()` that catches parsing errors and @@ -450,13 +544,30 @@ def ab_datetime_try_parse(dt_str: str) -> AirbyteDateTime | None: The implementation is as flexible as possible to handle various datetime formats. Always returns a timezone-aware datetime (defaults to `UTC` if no timezone specified). + Args: + dt_str: A datetime string in ISO8601/RFC3339 format, Unix timestamp (int/str), + or other recognizable datetime format. + formats: Optional list of format strings to try before falling back to more + forgiving parsing logic. If provided, these formats will be tried in order. + disallow_other_formats: If True, only the provided formats will be used for parsing, + and more forgiving parsing logic will not be attempted. + Example: >>> ab_datetime_try_parse("2023-03-14T15:09:26Z") # Returns AirbyteDateTime >>> ab_datetime_try_parse("2023-03-14 15:09:26Z") # Missing 'T' delimiter still parsable >>> ab_datetime_try_parse("2023-03-14") # Returns midnight UTC time + >>> ab_datetime_try_parse( + >>> "2023-03-14 15:09:26", + >>> formats=["%Y-%m-%d %H:%M:%S"], # Using specific format + >>> disallow_other_formats=True, # Disallow other formats + >>> ) """ try: - return ab_datetime_parse(dt_str) + return ab_datetime_parse( + dt_str, + formats=formats, + disallow_other_formats=disallow_other_formats, + ) except (ValueError, TypeError): return None @@ -487,7 +598,7 @@ def ab_datetime_format( '2023-03-14T15:09:26+00:00' """ if isinstance(dt, AirbyteDateTime): - return str(dt) + return dt.strftime(format) if format else str(dt) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) diff --git a/airbyte_cdk/utils/stream_status_utils.py b/airbyte_cdk/utils/stream_status_utils.py index 49c07f49c..904c49bc8 100644 --- a/airbyte_cdk/utils/stream_status_utils.py +++ b/airbyte_cdk/utils/stream_status_utils.py @@ -17,6 +17,7 @@ TraceType, ) from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now def as_airbyte_message( @@ -28,7 +29,7 @@ def as_airbyte_message( Builds an AirbyteStreamStatusTraceMessage for the provided stream """ - now_millis = datetime.now().timestamp() * 1000.0 + now_millis = ab_datetime_now().to_epoch_millis() trace_message = AirbyteTraceMessage( type=TraceType.STREAM_STATUS, diff --git a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py index 40bb6d40b..634c089dc 100644 --- a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py +++ b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py @@ -126,7 +126,7 @@ def test_missing_checksum_fails_to_run( py_components_config_dict = get_py_components_config_dict() # Truncate the start_date to speed up tests py_components_config_dict["start_date"] = ( - datetime.datetime.now() - datetime.timedelta(days=2) + ab_datetime_now() - datetime.timedelta(days=2) ).strftime("%Y-%m-%d") py_components_config_dict.pop("__injected_components_py_checksums") @@ -158,7 +158,7 @@ def test_invalid_checksum_fails_to_run( py_components_config_dict = get_py_components_config_dict() # Truncate the start_date to speed up tests py_components_config_dict["start_date"] = ( - datetime.datetime.now() - datetime.timedelta(days=2) + ab_datetime_now() - datetime.timedelta(days=2) ).strftime("%Y-%m-%d") py_components_config_dict["__injected_components_py_checksums"][hash_type] = "invalid_checksum" @@ -205,7 +205,7 @@ def test_fail_unless_custom_code_enabled_explicitly( py_components_config_dict = get_py_components_config_dict() # Truncate the start_date to speed up tests py_components_config_dict["start_date"] = ( - datetime.datetime.now() - datetime.timedelta(days=2) + ab_datetime_now() - datetime.timedelta(days=2) ).strftime("%Y-%m-%d") with NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: @@ -242,7 +242,7 @@ def test_sync_with_injected_py_components( ) # Truncate the start_date to speed up tests py_components_config_dict["start_date"] = ( - datetime.datetime.now() - datetime.timedelta(days=2) + ab_datetime_now() - datetime.timedelta(days=2) ).strftime("%Y-%m-%d") assert isinstance(py_components_config_dict, dict) assert "__injected_declarative_manifest" in py_components_config_dict diff --git a/unit_tests/sources/declarative/auth/test_jwt.py b/unit_tests/sources/declarative/auth/test_jwt.py index 49b7ea570..03aa2969c 100644 --- a/unit_tests/sources/declarative/auth/test_jwt.py +++ b/unit_tests/sources/declarative/auth/test_jwt.py @@ -91,7 +91,7 @@ def test_get_jwt_payload(self, iss, sub, aud, additional_jwt_payload, expected): additional_jwt_payload=additional_jwt_payload, ) with freezegun.freeze_time("2022-01-01 00:00:00"): - expected["iat"] = int(datetime.now().timestamp()) + expected["iat"] = int(ab_datetime_now().to_timestamp()) expected["exp"] = expected["iat"] + 1000 expected["nbf"] = expected["iat"] assert authenticator._get_jwt_payload() == expected diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index 7335e056b..4a1fd8331 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -11,6 +11,7 @@ from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now interpolation = JinjaInterpolation() @@ -131,9 +132,9 @@ def test_positive_day_delta(): val = interpolation.eval(delta_template, {}) # We need to assert against an earlier delta since the interpolation function runs datetime.now() a few milliseconds earlier - assert val > ( - datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=24, hours=23) - ).strftime("%Y-%m-%dT%H:%M:%S.%f%z") + assert val > (ab_datetime_now() + datetime.timedelta(days=24, hours=23)).strftime( + "%Y-%m-%dT%H:%M:%S.%f%z" + ) def test_positive_day_delta_with_format(): @@ -148,9 +149,9 @@ def test_negative_day_delta(): delta_template = "{{ day_delta(-25) }}" val = interpolation.eval(delta_template, {}) - assert val <= ( - datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=25) - ).strftime("%Y-%m-%dT%H:%M:%S.%f%z") + assert val <= (ab_datetime_now() - datetime.timedelta(days=25)).strftime( + "%Y-%m-%dT%H:%M:%S.%f%z" + ) @pytest.mark.parametrize( diff --git a/unit_tests/sources/file_based/availability_strategy/test_default_file_based_availability_strategy.py b/unit_tests/sources/file_based/availability_strategy/test_default_file_based_availability_strategy.py index b05bff03f..52f3390aa 100644 --- a/unit_tests/sources/file_based/availability_strategy/test_default_file_based_availability_strategy.py +++ b/unit_tests/sources/file_based/availability_strategy/test_default_file_based_availability_strategy.py @@ -16,9 +16,10 @@ from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now _FILE_WITH_UNKNOWN_EXTENSION = RemoteFile( - uri="a.unknown_extension", last_modified=datetime.now(), file_type="csv" + uri="a.unknown_extension", last_modified=ab_datetime_now(), file_type="csv" ) _ANY_CONFIG = FileBasedStreamConfig( name="config.name",