Skip to content

Commit af7d32c

Browse files
committed
manual updates (aj)
1 parent 3a9f153 commit af7d32c

File tree

10 files changed

+102
-128
lines changed

10 files changed

+102
-128
lines changed

airbyte_cdk/sources/declarative/datetime/datetime_parser.py

Lines changed: 23 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import datetime
66
from typing import Union
77

8-
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format, ab_datetime_parse
9-
108

119
class DatetimeParser:
1210
"""
@@ -21,82 +19,47 @@ class DatetimeParser:
2119
_UNIX_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
2220

2321
def parse(self, date: Union[str, int], format: str) -> datetime.datetime:
24-
"""
25-
Parse a datetime string according to a specified format.
26-
27-
Args:
28-
date: String to parse
29-
format: Format string as described in https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
30-
with the following extensions:
31-
- %s: Unix timestamp
32-
- %s_as_float: Unix timestamp as float
33-
- %epoch_microseconds: Microseconds since epoch
34-
- %ms: Milliseconds since epoch
35-
- %_ms: Custom millisecond format
36-
37-
Returns:
38-
The parsed datetime
39-
"""
40-
# Special format handling
22+
# "%s" is a valid (but unreliable) directive for formatting, but not for parsing
23+
# It is defined as
24+
# The number of seconds since the Epoch, 1970-01-01 00:00:00+0000 (UTC). https://man7.org/linux/man-pages/man3/strptime.3.html
25+
#
26+
# The recommended way to parse a date from its timestamp representation is to use datetime.fromtimestamp
27+
# See https://stackoverflow.com/a/4974930
4128
if format == "%s":
4229
return datetime.datetime.fromtimestamp(int(date), tz=datetime.timezone.utc)
4330
elif format == "%s_as_float":
4431
return datetime.datetime.fromtimestamp(float(date), tz=datetime.timezone.utc)
4532
elif format == "%epoch_microseconds":
46-
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
47-
return epoch + datetime.timedelta(microseconds=int(date))
33+
return self._UNIX_EPOCH + datetime.timedelta(microseconds=int(date))
4834
elif format == "%ms":
49-
epoch = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
50-
return epoch + datetime.timedelta(milliseconds=int(date))
35+
return self._UNIX_EPOCH + datetime.timedelta(milliseconds=int(date))
5136
elif "%_ms" in format:
52-
# Convert custom millisecond format to standard format
5337
format = format.replace("%_ms", "%f")
54-
55-
# For standard formats, use ab_datetime_parse with the specific format
56-
try:
57-
result = ab_datetime_parse(date, formats=[format], disallow_other_formats=True)
58-
return result.to_datetime() # Convert AirbyteDateTime to standard datetime
59-
except ValueError:
60-
# Fallback to original implementation for backward compatibility
61-
parsed_datetime = datetime.datetime.strptime(str(date), format)
62-
if self._is_naive(parsed_datetime):
63-
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
38+
parsed_datetime = datetime.datetime.strptime(str(date), format)
39+
if self._is_naive(parsed_datetime):
6440
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
65-
return parsed_datetime.replace(tzinfo=datetime.timezone.utc)
41+
return parsed_datetime
6642

6743
def format(self, dt: datetime.datetime, format: str) -> str:
68-
"""
69-
Format a datetime object according to a specified format.
70-
71-
Args:
72-
dt: The datetime object to format
73-
format: Format string as described in https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
74-
with the following extensions:
75-
- %s: Unix timestamp
76-
- %s_as_float: Unix timestamp as float
77-
- %epoch_microseconds: Microseconds since epoch
78-
- %ms: Milliseconds since epoch
79-
- %_ms: Custom millisecond format
80-
81-
Returns:
82-
The formatted string
83-
"""
84-
# Handle special formats
44+
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
45+
# It's safer to use the timestamp() method than the %s directive
46+
# See https://stackoverflow.com/a/4974930
8547
if format == "%s":
8648
return str(int(dt.timestamp()))
87-
elif format == "%s_as_float":
49+
if format == "%s_as_float":
8850
return str(float(dt.timestamp()))
89-
elif format == "%epoch_microseconds":
51+
if format == "%epoch_microseconds":
9052
return str(int(dt.timestamp() * 1_000_000))
91-
elif format == "%ms":
53+
if format == "%ms":
54+
# timstamp() returns a float representing the number of seconds since the unix epoch
9255
return str(int(dt.timestamp() * 1000))
93-
elif "%_ms" in format:
56+
if "%_ms" in format:
9457
_format = format.replace("%_ms", "%f")
9558
milliseconds = int(dt.microsecond / 1000)
96-
return dt.strftime(_format).replace(dt.strftime("%f"), "%03d" % milliseconds)
97-
98-
# For standard formats, use ab_datetime_format
99-
return ab_datetime_format(dt, format)
59+
formatted_dt = dt.strftime(_format).replace(dt.strftime("%f"), "%03d" % milliseconds)
60+
return formatted_dt
61+
else:
62+
return dt.strftime(format)
10063

10164
def _is_naive(self, dt: datetime.datetime) -> bool:
10265
return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None

airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ def _calculate_cursor_datetime_from_state(
253253
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
254254

255255
def _format_datetime(self, dt: datetime.datetime) -> str:
256+
"""Format the datetime according to the configured format.
257+
258+
# TODO: Standardize cursor serialization with ISO 8601 format and deprecate custom formats
259+
# in STATE messages.
260+
"""
256261
return ab_datetime_format(dt, self.datetime_format)
257262

258263
def _partition_daterange(
@@ -267,11 +272,7 @@ def _partition_daterange(
267272

268273
while self._is_within_date_range(start, end):
269274
next_start = self._evaluate_next_start_date_safely(start, step)
270-
if hasattr(next_start, "to_datetime"):
271-
next_start_dt = next_start.to_datetime()
272-
else:
273-
next_start_dt = next_start
274-
end_date = self._get_date(next_start_dt - self._cursor_granularity, end, min)
275+
end_date = self._get_date(next_start - self._cursor_granularity, end, min)
275276
dates.append(
276277
StreamSlice(
277278
partition={},
@@ -298,8 +299,6 @@ def _evaluate_next_start_date_safely(
298299
would have broken anyway.
299300
"""
300301
try:
301-
if hasattr(start, "to_datetime"):
302-
start = start.to_datetime()
303302
return start + step
304303
except OverflowError:
305304
return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)
@@ -314,15 +313,17 @@ def _get_date(
314313
return comparator(cursor_date, default_date)
315314

316315
def parse_date(self, date: str) -> datetime.datetime:
316+
formats = list(set(self.cursor_datetime_formats + [self.datetime_format]))
317317
try:
318-
# Try each format in the list, falling back to the default format
319318
return ab_datetime_parse(
320-
date, formats=self.cursor_datetime_formats + [self.datetime_format]
321-
).to_datetime()
322-
except ValueError:
323-
raise ValueError(
324-
f"No format in {self.cursor_datetime_formats + [self.datetime_format]} matching {date}"
319+
date,
320+
formats=formats,
321+
disallow_other_formats=False, # TODO: Consider permissive parsing.
325322
)
323+
except ValueError as ex:
324+
raise ValueError(
325+
f"No format in {formats} matching '{date}'"
326+
) from ex
326327

327328
@classmethod
328329
def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]:

airbyte_cdk/sources/declarative/interpolation/macros.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def now_utc() -> datetime.datetime:
3131
Usage:
3232
`"{{ now_utc() }}"`
3333
"""
34-
return ab_datetime_now().to_datetime()
34+
return ab_datetime_now()
3535

3636

3737
def today_utc() -> datetime.date:
@@ -67,10 +67,7 @@ def timestamp(dt: Union[float, str]) -> Union[int, float]:
6767
:param dt: datetime to convert to timestamp
6868
:return: unix timestamp
6969
"""
70-
if isinstance(dt, (int, float)):
71-
return int(dt)
72-
else:
73-
return str_to_datetime(dt).astimezone(pytz.utc).timestamp()
70+
return ab_datetime_parse(dt).timestamp()
7471

7572

7673
def str_to_datetime(s: str) -> datetime.datetime:
@@ -87,10 +84,7 @@ def str_to_datetime(s: str) -> datetime.datetime:
8784
:param s: string to parse as datetime
8885
:return: datetime object in UTC timezone
8986
"""
90-
dt = ab_datetime_parse(s).to_datetime()
91-
if dt.tzinfo is None:
92-
dt = dt.replace(tzinfo=pytz.utc)
93-
return dt.astimezone(pytz.utc)
87+
return ab_datetime_parse(s)
9488

9589

9690
def max(*args: typing.Any) -> typing.Any:
@@ -144,7 +138,8 @@ def day_delta(num_days: int, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:
144138
:return: datetime formatted as RFC3339
145139
"""
146140
return ab_datetime_format(
147-
ab_datetime_now().to_datetime() + datetime.timedelta(days=num_days), format
141+
ab_datetime_now() + datetime.timedelta(days=num_days),
142+
format=format,
148143
)
149144

150145

@@ -175,18 +170,12 @@ def format_datetime(
175170
return ab_datetime_format(dt, format)
176171

177172
if isinstance(dt, int):
178-
dt_datetime = ab_datetime_parse(
179-
dt, formats=[input_format] if input_format else None
180-
).to_datetime()
173+
dt_datetime = ab_datetime_parse(dt, formats=[input_format])
181174
else:
182-
dt_datetime = (
183-
ab_datetime_parse(dt, formats=[input_format] if input_format else None).to_datetime()
184-
if input_format
185-
else str_to_datetime(dt)
186-
)
175+
dt_datetime = ab_datetime_parse(dt, formats=[input_format])
187176
if dt_datetime.tzinfo is None:
188177
dt_datetime = dt_datetime.replace(tzinfo=pytz.utc)
189-
return ab_datetime_format(dt_datetime, format)
178+
return ab_datetime_format(dt_datetime, format=format)
190179

191180

192181
_macros_list = [

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ def filter_files_by_globs_and_start_date(
9696
Utility method for filtering files based on globs.
9797
"""
9898
start_date = (
99-
ab_datetime_parse(self.config.start_date, formats=[self.DATE_TIME_FORMAT]).to_datetime()
99+
ab_datetime_parse(
100+
self.config.start_date,
101+
formats=[self.DATE_TIME_FORMAT],
102+
disallow_other_formats=False,
103+
)
100104
if self.config and self.config.start_date
101105
else None
102106
)

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datet
112112
cursor_str = min(prev_cursor_str, earliest_file_cursor_value)
113113
cursor_dt, cursor_uri = cursor_str.split("_", 1)
114114
return ab_datetime_parse(
115-
cursor_dt, formats=[self.DATE_TIME_FORMAT]
116-
).to_datetime(), cursor_uri
115+
cursor_dt,
116+
formats=[self.DATE_TIME_FORMAT],
117+
disallow_other_formats=False,
118+
), cursor_uri
117119

118120
def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str:
119121
if file:
@@ -129,8 +131,10 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
129131
return RemoteFile(
130132
uri=filename,
131133
last_modified=ab_datetime_parse(
132-
last_modified, formats=[self.DATE_TIME_FORMAT]
133-
).to_datetime(),
134+
last_modified,
135+
formats=[self.DATE_TIME_FORMAT],
136+
disallow_other_formats=False,
137+
),
134138
)
135139
else:
136140
return None
@@ -159,7 +163,8 @@ def add_file(self, file: RemoteFile) -> None:
159163
else:
160164
self._pending_files.pop(file.uri)
161165
self._file_to_datetime_history[file.uri] = ab_datetime_format(
162-
file.last_modified, self.DATE_TIME_FORMAT
166+
file.last_modified,
167+
format=self.DATE_TIME_FORMAT,
163168
)
164169
if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
165170
# Get the earliest file based on its last modified date and its uri
@@ -212,12 +217,14 @@ def _compute_latest_file_in_history(self) -> Optional[RemoteFile]:
212217
if self._file_to_datetime_history:
213218
filename, last_modified = max(
214219
self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
215-
)
220+
)``
216221
return RemoteFile(
217222
uri=filename,
218223
last_modified=ab_datetime_parse(
219-
last_modified, formats=[self.DATE_TIME_FORMAT]
220-
).to_datetime(),
224+
last_modified,
225+
formats=[self.DATE_TIME_FORMAT],
226+
disallow_other_formats=False,
227+
),
221228
)
222229
else:
223230
return None
@@ -247,8 +254,10 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
247254
if file.uri in self._file_to_datetime_history:
248255
# If the file's uri is in the history, we should sync the file if it has been modified since it was synced
249256
updated_at_from_history = ab_datetime_parse(
250-
self._file_to_datetime_history[file.uri], formats=[self.DATE_TIME_FORMAT]
251-
).to_datetime()
257+
self._file_to_datetime_history[file.uri],
258+
formats=[self.DATE_TIME_FORMAT],
259+
disallow_other_formats=False,
260+
)
252261
if file.last_modified < updated_at_from_history:
253262
self._message_repository.emit_message(
254263
AirbyteMessage(
@@ -294,7 +303,11 @@ def _compute_start_time(self) -> datetime:
294303
return datetime.min
295304
else:
296305
earliest = min(self._file_to_datetime_history.values())
297-
earliest_dt = ab_datetime_parse(earliest, formats=[self.DATE_TIME_FORMAT]).to_datetime()
306+
earliest_dt = ab_datetime_parse(
307+
earliest,
308+
formats=[self.DATE_TIME_FORMAT],
309+
disallow_other_formats=False,
310+
)
298311
if self._is_history_full():
299312
time_window = datetime.now() - self._time_window_if_history_is_full
300313
earliest_dt = min(earliest_dt, time_window)

airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
8484
if file.uri in self._file_to_datetime_history:
8585
# If the file's uri is in the history, we should sync the file if it has been modified since it was synced
8686
updated_at_from_history = ab_datetime_parse(
87-
self._file_to_datetime_history[file.uri], formats=[self.DATE_TIME_FORMAT]
88-
).to_datetime()
87+
self._file_to_datetime_history[file.uri],
88+
formats=[self.DATE_TIME_FORMAT],
89+
disallow_other_formats=False,
90+
)
8991
if file.last_modified < updated_at_from_history:
9092
logger.warning(
9193
f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file."
@@ -135,8 +137,10 @@ def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
135137
return RemoteFile(
136138
uri=filename,
137139
last_modified=ab_datetime_parse(
138-
last_modified, formats=[self.DATE_TIME_FORMAT]
139-
).to_datetime(),
140+
last_modified,
141+
formats=[self.DATE_TIME_FORMAT],
142+
disallow_other_formats=False,
143+
),
140144
)
141145
else:
142146
return None
@@ -146,7 +150,11 @@ def _compute_start_time(self) -> datetime:
146150
return datetime.min
147151
else:
148152
earliest = min(self._file_to_datetime_history.values())
149-
earliest_dt = ab_datetime_parse(earliest, formats=[self.DATE_TIME_FORMAT]).to_datetime()
153+
earliest_dt = ab_datetime_parse(
154+
earliest,
155+
formats=[self.DATE_TIME_FORMAT],
156+
disallow_other_formats=False,
157+
)
150158
if self._is_history_full():
151159
time_window = datetime.now() - self._time_window_if_history_is_full
152160
earliest_dt = min(earliest_dt, time_window)

airbyte_cdk/sources/streams/call_rate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ def get_reset_ts_from_response(
666666
if response.headers.get(self._ratelimit_reset_header):
667667
return AirbyteDateTime.fromtimestamp(
668668
int(response.headers[self._ratelimit_reset_header])
669-
).to_datetime()
669+
)
670670
return None
671671

672672
def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:

airbyte_cdk/utils/datetime_format_inferrer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ def _can_be_datetime(self, value: Any) -> bool:
5454
def _matches_format(self, value: Any, format: str) -> bool:
5555
"""Checks if the value matches the format"""
5656
try:
57-
dt = ab_datetime_try_parse(value, formats=[format], disallow_other_formats=True)
57+
dt = ab_datetime_try_parse(
58+
value,
59+
formats=[format],
60+
disallow_other_formats=True,
61+
)
5862
return dt is not None
5963
except (ValueError, TypeError):
6064
return False

0 commit comments

Comments
 (0)