Skip to content

Commit 1b2a724

Browse files
fix(cdk): replace direct datetime parsing/formatting with datetime_helpers
Co-Authored-By: Aaron <AJ> Steers <[email protected]>
1 parent 5366b13 commit 1b2a724

File tree

11 files changed

+126
-75
lines changed

11 files changed

+126
-75
lines changed

airbyte_cdk/sources/declarative/datetime/datetime_parser.py

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import datetime
66
from typing import Union
77

8+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
9+
810

911
class DatetimeParser:
1012
"""
@@ -19,47 +21,82 @@ class DatetimeParser:
1921
_UNIX_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
2022

2123
def parse(self, date: Union[str, int], format: str) -> datetime.datetime:
22-
# "%s" is a valid (but unreliable) directive for formatting, but not for parsing
23-
# It is defined as
24-
# The number of seconds since the Epoch, 1970-01-01 00:00:00+0000 (UTC). https://man7.org/linux/man-pages/man3/strptime.3.html
25-
#
26-
# The recommended way to parse a date from its timestamp representation is to use datetime.fromtimestamp
27-
# See https://stackoverflow.com/a/4974930
24+
"""
25+
Parse a datetime string according to a specified format.
26+
27+
Args:
28+
date: String to parse
29+
format: Format string as described in https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
30+
with the following extensions:
31+
- %s: Unix timestamp
32+
- %s_as_float: Unix timestamp as float
33+
- %epoch_microseconds: Microseconds since epoch
34+
- %ms: Milliseconds since epoch
35+
- %_ms: Custom millisecond format
36+
37+
Returns:
38+
The parsed datetime
39+
"""
40+
# Special format handling
2841
if format == "%s":
2942
return datetime.datetime.fromtimestamp(int(date), tz=datetime.timezone.utc)
3043
elif format == "%s_as_float":
3144
return datetime.datetime.fromtimestamp(float(date), tz=datetime.timezone.utc)
3245
elif format == "%epoch_microseconds":
33-
return self._UNIX_EPOCH + datetime.timedelta(microseconds=int(date))
46+
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
47+
return epoch + datetime.timedelta(microseconds=int(date))
3448
elif format == "%ms":
35-
return self._UNIX_EPOCH + datetime.timedelta(milliseconds=int(date))
49+
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
50+
return epoch + datetime.timedelta(milliseconds=int(date))
3651
elif "%_ms" in format:
52+
# Convert custom millisecond format to standard format
3753
format = format.replace("%_ms", "%f")
38-
parsed_datetime = datetime.datetime.strptime(str(date), format)
39-
if self._is_naive(parsed_datetime):
54+
55+
# For standard formats, use ab_datetime_parse with the specific format
56+
try:
57+
result = ab_datetime_parse(date, formats=[format], disallow_other_formats=True)
58+
return result.to_datetime() # Convert AirbyteDateTime to standard datetime
59+
except ValueError:
60+
# Fallback to original implementation for backward compatibility
61+
parsed_datetime = datetime.datetime.strptime(str(date), format)
62+
if self._is_naive(parsed_datetime):
63+
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
4064
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
41-
return parsed_datetime
65+
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
4266

4367
def format(self, dt: datetime.datetime, format: str) -> str:
44-
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
45-
# It's safer to use the timestamp() method than the %s directive
46-
# See https://stackoverflow.com/a/4974930
68+
"""
69+
Format a datetime object according to a specified format.
70+
71+
Args:
72+
dt: The datetime object to format
73+
format: Format string as described in https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
74+
with the following extensions:
75+
- %s: Unix timestamp
76+
- %s_as_float: Unix timestamp as float
77+
- %epoch_microseconds: Microseconds since epoch
78+
- %ms: Milliseconds since epoch
79+
- %_ms: Custom millisecond format
80+
81+
Returns:
82+
The formatted string
83+
"""
84+
# Handle special formats
4785
if format == "%s":
4886
return str(int(dt.timestamp()))
49-
if format == "%s_as_float":
87+
elif format == "%s_as_float":
5088
return str(float(dt.timestamp()))
51-
if format == "%epoch_microseconds":
89+
elif format == "%epoch_microseconds":
5290
return str(int(dt.timestamp() * 1_000_000))
53-
if format == "%ms":
54-
# timstamp() returns a float representing the number of seconds since the unix epoch
91+
elif format == "%ms":
5592
return str(int(dt.timestamp() * 1000))
56-
if "%_ms" in format:
93+
elif "%_ms" in format:
5794
_format = format.replace("%_ms", "%f")
5895
milliseconds = int(dt.microsecond / 1000)
59-
formatted_dt = dt.strftime(_format).replace(dt.strftime("%f"), "%03d" % milliseconds)
60-
return formatted_dt
61-
else:
62-
return dt.strftime(format)
96+
return dt.strftime(_format).replace(dt.strftime("%f"), "%03d" % milliseconds)
97+
98+
# For standard formats, use ab_datetime_format
99+
return ab_datetime_format(dt, format)
63100

64101
def _is_naive(self, dt: datetime.datetime) -> bool:
65102
return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None

airbyte_cdk/sources/declarative/datetime/min_max_datetime.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
1010
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
1112

1213

1314
@dataclass
@@ -39,7 +40,6 @@ class MinMaxDatetime:
3940

4041
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4142
self.datetime = InterpolatedString.create(self.datetime, parameters=parameters or {})
42-
self._parser = DatetimeParser()
4343
self.min_datetime = (
4444
InterpolatedString.create(self.min_datetime, parameters=parameters) # type: ignore [assignment] # expression has type "InterpolatedString | None", variable has type "InterpolatedString | str"
4545
if self.min_datetime
@@ -65,25 +65,25 @@ def get_datetime(
6565
if not datetime_format:
6666
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"
6767

68-
time = self._parser.parse(
68+
time = ab_datetime_parse(
6969
str(
7070
self.datetime.eval( # type: ignore[union-attr] # str has no attribute "eval"
7171
config,
7272
**additional_parameters,
7373
)
7474
),
75-
datetime_format,
76-
) # type: ignore # datetime is always cast to an interpolated string
75+
formats=[datetime_format] if datetime_format else None
76+
).to_datetime() # type: ignore # datetime is always cast to an interpolated string
7777

7878
if self.min_datetime:
7979
min_time = str(self.min_datetime.eval(config, **additional_parameters)) # type: ignore # min_datetime is always cast to an interpolated string
8080
if min_time:
81-
min_datetime = self._parser.parse(min_time, datetime_format) # type: ignore # min_datetime is always cast to an interpolated string
81+
min_datetime = ab_datetime_parse(min_time, formats=[datetime_format] if datetime_format else None).to_datetime() # type: ignore # min_datetime is always cast to an interpolated string
8282
time = max(time, min_datetime)
8383
if self.max_datetime:
8484
max_time = str(self.max_datetime.eval(config, **additional_parameters)) # type: ignore # max_datetime is always cast to an interpolated string
8585
if max_time:
86-
max_datetime = self._parser.parse(max_time, datetime_format)
86+
max_datetime = ab_datetime_parse(max_time, formats=[datetime_format] if datetime_format else None).to_datetime()
8787
time = min(time, max_datetime)
8888
return time
8989

airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from airbyte_cdk.sources.message import MessageRepository
2323
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
24+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
2425
from airbyte_cdk.utils.mapping_helpers import _validate_component_request_option_paths
2526

2627

@@ -112,7 +113,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
112113
self._partition_field_end = InterpolatedString.create(
113114
self.partition_field_end or "end_time", parameters=parameters
114115
)
115-
self._parser = DatetimeParser()
116116

117117
# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
118118
if not self._start_datetime.datetime_format:
@@ -253,7 +253,7 @@ def _calculate_cursor_datetime_from_state(
253253
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
254254

255255
def _format_datetime(self, dt: datetime.datetime) -> str:
256-
return self._parser.format(dt, self.datetime_format)
256+
return ab_datetime_format(dt, self.datetime_format)
257257

258258
def _partition_daterange(
259259
self,
@@ -308,12 +308,11 @@ def _get_date(
308308
return comparator(cursor_date, default_date)
309309

310310
def parse_date(self, date: str) -> datetime.datetime:
311-
for datetime_format in self.cursor_datetime_formats + [self.datetime_format]:
312-
try:
313-
return self._parser.parse(date, datetime_format)
314-
except ValueError:
315-
pass
316-
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
311+
try:
312+
# Try each format in the list, falling back to the default format
313+
return ab_datetime_parse(date, formats=self.cursor_datetime_formats + [self.datetime_format]).to_datetime()
314+
except ValueError:
315+
raise ValueError(f"No format in {self.cursor_datetime_formats + [self.datetime_format]} matching {date}")
317316

318317
@classmethod
319318
def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:

airbyte_cdk/sources/declarative/interpolation/macros.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
from isodate import parse_duration
1414

1515
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
16+
from airbyte_cdk.utils.datetime_helpers import (
17+
ab_datetime_format,
18+
ab_datetime_now,
19+
ab_datetime_parse,
20+
)
1621

1722
"""
1823
This file contains macros that can be evaluated by a `JinjaInterpolation` object
@@ -26,7 +31,7 @@ def now_utc() -> datetime.datetime:
2631
Usage:
2732
`"{{ now_utc() }}"`
2833
"""
29-
return datetime.datetime.now(datetime.timezone.utc)
34+
return ab_datetime_now().to_datetime()
3035

3136

3237
def today_utc() -> datetime.date:
@@ -82,12 +87,10 @@ def str_to_datetime(s: str) -> datetime.datetime:
8287
:param s: string to parse as datetime
8388
:return: datetime object in UTC timezone
8489
"""
85-
86-
parsed_date = parser.isoparse(s)
87-
if not parsed_date.tzinfo:
88-
# Assume UTC if the input does not contain a timezone
89-
parsed_date = parsed_date.replace(tzinfo=pytz.utc)
90-
return parsed_date.astimezone(pytz.utc)
90+
dt = ab_datetime_parse(s).to_datetime()
91+
if dt.tzinfo is None:
92+
dt = dt.replace(tzinfo=pytz.utc)
93+
return dt.astimezone(pytz.utc)
9194

9295

9396
def max(*args: typing.Any) -> typing.Any:
@@ -140,9 +143,10 @@ def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:
140143
:param num_days: number of days to add to current date time
141144
:return: datetime formatted as RFC3339
142145
"""
143-
return (
144-
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=num_days)
145-
).strftime(format)
146+
return ab_datetime_format(
147+
ab_datetime_now().to_datetime() + datetime.timedelta(days=num_days),
148+
format
149+
)
146150

147151

148152
def duration(datestring: str) -> Union[datetime.timedelta, isodate.Duration]:
@@ -169,17 +173,17 @@ def format_datetime(
169173
https://github.com/python/cpython/issues/56959
170174
"""
171175
if isinstance(dt, datetime.datetime):
172-
return dt.strftime(format)
176+
return ab_datetime_format(dt, format)
173177

174178
if isinstance(dt, int):
175-
dt_datetime = DatetimeParser().parse(dt, input_format if input_format else "%s")
179+
dt_datetime = ab_datetime_parse(dt, formats=[input_format] if input_format else None).to_datetime()
176180
else:
177181
dt_datetime = (
178-
datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt)
182+
ab_datetime_parse(dt, formats=[input_format] if input_format else None).to_datetime() if input_format else str_to_datetime(dt)
179183
)
180184
if dt_datetime.tzinfo is None:
181185
dt_datetime = dt_datetime.replace(tzinfo=pytz.utc)
182-
return DatetimeParser().format(dt=dt_datetime, format=format)
186+
return ab_datetime_format(dt_datetime, format)
183187

184188

185189
_macros_list = [

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use_file_transfer,
2020
)
2121
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
22+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
2223

2324

2425
class FileReadMode(Enum):
@@ -95,7 +96,7 @@ def filter_files_by_globs_and_start_date(
9596
Utility method for filtering files based on globs.
9697
"""
9798
start_date = (
98-
datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
99+
ab_datetime_parse(self.config.start_date, formats=[self.DATE_TIME_FORMAT]).to_datetime()
99100
if self.config and self.config.start_date
100101
else None
101102
)

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
2121
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
2222
from airbyte_cdk.sources.types import Record
23+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
2324

2425
if TYPE_CHECKING:
2526
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamPartition
@@ -110,11 +111,11 @@ def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datet
110111
)
111112
cursor_str = min(prev_cursor_str, earliest_file_cursor_value)
112113
cursor_dt, cursor_uri = cursor_str.split("_", 1)
113-
return datetime.strptime(cursor_dt, self.DATE_TIME_FORMAT), cursor_uri
114+
return ab_datetime_parse(cursor_dt, formats=[self.DATE_TIME_FORMAT]).to_datetime(), cursor_uri
114115

115116
def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str:
116117
if file:
117-
return f"{datetime.strftime(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}"
118+
return f"{ab_datetime_format(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}"
118119
return self.zero_cursor_value
119120

120121
def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
@@ -125,7 +126,7 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
125126
)
126127
return RemoteFile(
127128
uri=filename,
128-
last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT),
129+
last_modified=ab_datetime_parse(last_modified, formats=[self.DATE_TIME_FORMAT]).to_datetime(),
129130
)
130131
else:
131132
return None
@@ -153,8 +154,8 @@ def add_file(self, file: RemoteFile) -> None:
153154
)
154155
else:
155156
self._pending_files.pop(file.uri)
156-
self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
157-
self.DATE_TIME_FORMAT
157+
self._file_to_datetime_history[file.uri] = ab_datetime_format(
158+
file.last_modified, self.DATE_TIME_FORMAT
158159
)
159160
if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
160161
# Get the earliest file based on its last modified date and its uri
@@ -210,7 +211,7 @@ def _compute_latest_file_in_history(self) -> Optional[RemoteFile]:
210211
)
211212
return RemoteFile(
212213
uri=filename,
213-
last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT),
214+
last_modified=ab_datetime_parse(last_modified, formats=[self.DATE_TIME_FORMAT]).to_datetime(),
214215
)
215216
else:
216217
return None
@@ -239,9 +240,9 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
239240
with self._state_lock:
240241
if file.uri in self._file_to_datetime_history:
241242
# If the file's uri is in the history, we should sync the file if it has been modified since it was synced
242-
updated_at_from_history = datetime.strptime(
243-
self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT
244-
)
243+
updated_at_from_history = ab_datetime_parse(
244+
self._file_to_datetime_history[file.uri], formats=[self.DATE_TIME_FORMAT]
245+
).to_datetime()
245246
if file.last_modified < updated_at_from_history:
246247
self._message_repository.emit_message(
247248
AirbyteMessage(
@@ -287,7 +288,7 @@ def _compute_start_time(self) -> datetime:
287288
return datetime.min
288289
else:
289290
earliest = min(self._file_to_datetime_history.values())
290-
earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT)
291+
earliest_dt = ab_datetime_parse(earliest, formats=[self.DATE_TIME_FORMAT]).to_datetime()
291292
if self._is_history_full():
292293
time_window = datetime.now() - self._time_window_if_history_is_full
293294
earliest_dt = min(earliest_dt, time_window)

0 commit comments

Comments
 (0)