Skip to content

Commit 4ef852e

Browse files
committed
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
2 parents 19d1b22 + fe2f9a5 commit 4ef852e

File tree

11 files changed

+90
-31
lines changed

11 files changed

+90
-31
lines changed
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
5-
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
6-
7-
__all__ = ["MinMaxDatetime"]

airbyte_cdk/sources/declarative/datetime/datetime_parser.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ def parse(self, date: Union[str, int], format: str) -> datetime.datetime:
2929
return datetime.datetime.fromtimestamp(int(date), tz=datetime.timezone.utc)
3030
elif format == "%s_as_float":
3131
return datetime.datetime.fromtimestamp(float(date), tz=datetime.timezone.utc)
32+
elif format == "%epoch_microseconds":
33+
return self._UNIX_EPOCH + datetime.timedelta(microseconds=int(date))
3234
elif format == "%ms":
3335
return self._UNIX_EPOCH + datetime.timedelta(milliseconds=int(date))
3436
elif "%_ms" in format:
@@ -46,6 +48,8 @@ def format(self, dt: datetime.datetime, format: str) -> str:
4648
return str(int(dt.timestamp()))
4749
if format == "%s_as_float":
4850
return str(float(dt.timestamp()))
51+
if format == "%epoch_microseconds":
52+
return str(int(dt.timestamp() * 1_000_000))
4953
if format == "%ms":
5054
# timstamp() returns a float representing the number of seconds since the unix epoch
5155
return str(int(dt.timestamp() * 1000))

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ def decode(
151151
self, response: requests.Response
152152
) -> Generator[MutableMapping[str, Any], None, None]:
153153
if self.is_stream_response():
154+
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
155+
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
156+
response.raw.auto_close = False
154157
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
158+
response.raw.close()
155159
else:
156160
yield from self.parser.parse(data=io.BytesIO(response.content))

airbyte_cdk/sources/declarative/interpolation/macros.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from dateutil import parser
1313
from isodate import parse_duration
1414

15+
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
16+
1517
"""
1618
This file contains macros that can be evaluated by a `JinjaInterpolation` object
1719
"""
@@ -171,11 +173,7 @@ def format_datetime(
171173
dt_datetime = (
172174
datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt)
173175
)
174-
if format == "%s":
175-
return str(int(dt_datetime.timestamp()))
176-
elif format == "%ms":
177-
return str(int(dt_datetime.timestamp() * 1_000_000))
178-
return dt_datetime.strftime(format)
176+
return DatetimeParser().format(dt=dt_datetime, format=format)
179177

180178

181179
_macros_list = [

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
)
5858
from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream
5959
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
60-
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
60+
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
6161
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
6262
from airbyte_cdk.sources.declarative.decoders import (
6363
Decoder,

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinit
5050

5151
def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
5252
extra_args["path_to_yaml"] = self._path_to_yaml
53-
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)
5453

5554
@staticmethod
5655
def _parse(connection_definition_str: str) -> ConnectionDefinition:

unit_tests/sources/declarative/datetime/test_datetime_parser.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,102 +10,118 @@
1010

1111

1212
@pytest.mark.parametrize(
13-
"test_name, input_date, date_format, expected_output_date",
13+
"input_date, date_format, expected_output_date",
1414
[
1515
(
16-
"test_parse_date_iso",
1716
"2021-01-01T00:00:00.000000+0000",
1817
"%Y-%m-%dT%H:%M:%S.%f%z",
1918
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
2019
),
2120
(
22-
"test_parse_date_iso_with_timezone_not_utc",
2321
"2021-01-01T00:00:00.000000+0400",
2422
"%Y-%m-%dT%H:%M:%S.%f%z",
2523
datetime.datetime(
2624
2021, 1, 1, 0, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=14400))
2725
),
2826
),
2927
(
30-
"test_parse_timestamp",
3128
"1609459200",
3229
"%s",
3330
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
3431
),
3532
(
36-
"test_parse_timestamp_as_float",
3733
"1675092508.873709",
3834
"%s_as_float",
3935
datetime.datetime(2023, 1, 30, 15, 28, 28, 873709, tzinfo=datetime.timezone.utc),
4036
),
4137
(
42-
"test_parse_ms_timestamp",
38+
"1675092508873709",
39+
"%epoch_microseconds",
40+
datetime.datetime(2023, 1, 30, 15, 28, 28, 873709, tzinfo=datetime.timezone.utc),
41+
),
42+
(
4343
"1609459200001",
4444
"%ms",
4545
datetime.datetime(2021, 1, 1, 0, 0, 0, 1000, tzinfo=datetime.timezone.utc),
4646
),
4747
(
48-
"test_parse_date_ms",
4948
"20210101",
5049
"%Y%m%d",
5150
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
5251
),
5352
(
54-
"test_parse_format_datetime_with__ms",
5553
"2021-11-22T08:41:55.640Z",
5654
"%Y-%m-%dT%H:%M:%S.%_msZ",
5755
datetime.datetime(2021, 11, 22, 8, 41, 55, 640000, tzinfo=datetime.timezone.utc),
5856
),
5957
],
58+
ids=[
59+
"test_parse_date_iso",
60+
"test_parse_date_iso_with_timezone_not_utc",
61+
"test_parse_timestamp",
62+
"test_parse_timestamp_as_float",
63+
"test_parse_timestamp_microseconds",
64+
"test_parse_ms_timestamp",
65+
"test_parse_date_ms",
66+
"test_parse_format_datetime_with__ms",
67+
],
6068
)
61-
def test_parse_date(test_name, input_date, date_format, expected_output_date):
69+
def test_parse_date(input_date: str, date_format: str, expected_output_date: datetime.datetime):
6270
parser = DatetimeParser()
6371
output_date = parser.parse(input_date, date_format)
6472
assert output_date == expected_output_date
6573

6674

6775
@pytest.mark.parametrize(
68-
"test_name, input_dt, datetimeformat, expected_output",
76+
"input_dt, datetimeformat, expected_output",
6977
[
7078
(
71-
"test_format_timestamp",
7279
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
7380
"%s",
7481
"1609459200",
7582
),
7683
(
77-
"test_format_timestamp_ms",
7884
datetime.datetime(2021, 1, 1, 0, 0, 0, 1000, tzinfo=datetime.timezone.utc),
7985
"%ms",
8086
"1609459200001",
8187
),
8288
(
83-
"test_format_timestamp_as_float",
8489
datetime.datetime(2023, 1, 30, 15, 28, 28, 873709, tzinfo=datetime.timezone.utc),
8590
"%s_as_float",
8691
"1675092508.873709",
8792
),
8893
(
89-
"test_format_string",
94+
datetime.datetime(2023, 1, 30, 15, 28, 28, 873709, tzinfo=datetime.timezone.utc),
95+
"%epoch_microseconds",
96+
"1675092508873709",
97+
),
98+
(
9099
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
91100
"%Y-%m-%d",
92101
"2021-01-01",
93102
),
94103
(
95-
"test_format_to_number",
96104
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
97105
"%Y%m%d",
98106
"20210101",
99107
),
100108
(
101-
"test_parse_format_datetime_with__ms",
102109
datetime.datetime(2021, 11, 22, 8, 41, 55, 640000, tzinfo=datetime.timezone.utc),
103110
"%Y-%m-%dT%H:%M:%S.%_msZ",
104111
"2021-11-22T08:41:55.640Z",
105112
),
106113
],
114+
ids=[
115+
"test_format_timestamp",
116+
"test_format_timestamp_ms",
117+
"test_format_timestamp_as_float",
118+
"test_format_timestamp_microseconds",
119+
"test_format_string",
120+
"test_format_to_number",
121+
"test_parse_format_datetime_with__ms",
122+
],
107123
)
108-
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
124+
def test_format_datetime(input_dt: datetime.datetime, datetimeformat: str, expected_output: str):
109125
parser = DatetimeParser()
110126
output_date = parser.format(input_dt, datetimeformat)
111127
assert output_date == expected_output

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import csv
55
import gzip
66
import json
7+
import socket
8+
from http.server import BaseHTTPRequestHandler, HTTPServer
79
from io import BytesIO, StringIO
10+
from threading import Thread
811
from unittest.mock import patch
912

1013
import pytest
@@ -20,6 +23,12 @@
2023
from airbyte_cdk.utils import AirbyteTracedException
2124

2225

26+
def find_available_port() -> int:
27+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
28+
s.bind(("localhost", 0))
29+
return s.getsockname()[1] # type: ignore # this should return a int
30+
31+
2332
def compress_with_gzip(data: str, encoding: str = "utf-8"):
2433
"""
2534
Compress the data using Gzip.
@@ -202,6 +211,39 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
202211
assert parsed_records == expected_data
203212

204213

214+
class TestServer(BaseHTTPRequestHandler):
215+
def do_GET(self) -> None:
216+
self.send_response(200)
217+
self.end_headers()
218+
self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))
219+
220+
221+
def test_composite_raw_decoder_csv_parser_without_mocked_response():
222+
"""
223+
This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests.
224+
225+
We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv.
226+
This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible.
227+
228+
Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally.
229+
"""
230+
# self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8"))
231+
232+
# start server
233+
port = find_available_port()
234+
httpd = HTTPServer(("localhost", port), TestServer)
235+
thread = Thread(target=httpd.serve_forever, args=())
236+
thread.start()
237+
try:
238+
response = requests.get(f"http://localhost:{port}", stream=True)
239+
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))
240+
241+
assert len(result) == 1
242+
finally:
243+
httpd.shutdown() # release port and kill the thread
244+
thread.join(timeout=5) # ensure thread is cleaned up
245+
246+
205247
def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
206248
requests_mock.register_uri(
207249
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()

unit_tests/sources/declarative/extractors/test_record_filter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import pytest
77

8-
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
8+
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
99
from airbyte_cdk.sources.declarative.extractors.record_filter import (
1010
ClientSideIncrementalRecordFilterDecorator,
1111
RecordFilter,

unit_tests/sources/declarative/interpolation/test_macros.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def test_macros_export(test_name, fn_name, found_in_macros):
7979
),
8080
(
8181
"2022-01-01T01:01:01Z",
82-
"%ms",
82+
"%epoch_microseconds",
8383
"%Y-%m-%dT%H:%M:%SZ",
8484
"1640998861000000",
8585
),

0 commit comments

Comments
 (0)