Skip to content

Commit ed4ea74

Browse files
committed
Add unit tests
1 parent 2e37296 commit ed4ea74

File tree

3 files changed

+338
-2
lines changed

3 files changed

+338
-2
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,7 @@ def create_declarative_stream(
17461746
transformations.append(
17471747
self._create_component_from_model(model=transformation_model, config=config)
17481748
)
1749+
17491750
retriever = self._create_component_from_model(
17501751
model=model.retriever,
17511752
config=config,
@@ -1756,6 +1757,7 @@ def create_declarative_stream(
17561757
stop_condition_on_cursor=stop_condition_on_cursor,
17571758
client_side_incremental_sync=client_side_incremental_sync,
17581759
transformations=transformations,
1760+
incremental_sync=model.incremental_sync,
17591761
)
17601762
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
17611763

@@ -2747,7 +2749,7 @@ def create_simple_retriever(
27472749
if model.lazy_read_pointer and not bool(
27482750
self._connector_state_manager.get_stream_state(name, None)
27492751
):
2750-
if model.partition_router.type != "SubstreamPartitionRouterModel": # type: ignore[union-attr] # model.partition_router has BaseModel type
2752+
if model.partition_router.type != "SubstreamPartitionRouter": # type: ignore[union-attr] # model.partition_router has BaseModel type
27512753
raise ValueError(
27522754
"LazySimpleRetriever only supports 'SubstreamPartitionRouterModel' as the 'partition_router' type. " # type: ignore[union-attr] # model.partition_router has BaseModel type
27532755
f"Found: '{model.partition_router.type}'."

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def next_page_token(
7171
last_page_token_value: Optional[Any] = None,
7272
) -> Optional[Any]:
7373
decoded_response = next(self.decoder.decode(response))
74-
7574
# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
7675
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
7776
headers: Dict[str, Any] = dict(response.headers)
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,338 @@
11
#
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
4+
5+
import json
6+
from unittest.mock import MagicMock
7+
8+
import freezegun
9+
10+
from airbyte_cdk.models import (
11+
AirbyteStateBlob,
12+
AirbyteStateMessage,
13+
AirbyteStateType,
14+
AirbyteStreamState,
15+
ConfiguredAirbyteCatalog,
16+
ConfiguredAirbyteStream,
17+
DestinationSyncMode,
18+
StreamDescriptor,
19+
Type,
20+
)
21+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
22+
ConcurrentDeclarativeSource,
23+
)
24+
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
25+
26+
_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"}
27+
_MANIFEST = {
28+
"version": "6.0.0",
29+
"type": "DeclarativeSource",
30+
"check": {"type": "CheckStream", "stream_names": ["TestStream"]},
31+
"definitions": {
32+
"TestStream": {
33+
"type": "DeclarativeStream",
34+
"name": "TestStream",
35+
"primary_key": [],
36+
"schema_loader": {
37+
"type": "InlineSchemaLoader",
38+
"schema": {
39+
"$schema": "http://json-schema.org/schema#",
40+
"properties": {},
41+
"type": "object",
42+
},
43+
},
44+
"retriever": {
45+
"type": "SimpleRetriever",
46+
"lazy_read_pointer": ["items"],
47+
"requester": {
48+
"type": "HttpRequester",
49+
"url_base": "https://api.test.com",
50+
"path": "parent/{{ stream_partition.parent_id }}/items",
51+
"http_method": "GET",
52+
"authenticator": {
53+
"type": "ApiKeyAuthenticator",
54+
"header": "apikey",
55+
"api_token": "{{ config['api_key'] }}",
56+
},
57+
},
58+
"record_selector": {
59+
"type": "RecordSelector",
60+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]},
61+
},
62+
"paginator": {
63+
"type": "DefaultPaginator",
64+
"page_token_option": {
65+
"type": "RequestOption",
66+
"inject_into": "request_parameter",
67+
"field_name": "starting_after"
68+
},
69+
"pagination_strategy": {
70+
"type": "CursorPagination",
71+
"cursor_value": '{{ response["data"][-1]["id"] }}',
72+
"stop_condition": '{{ not response.get("has_more", False) }}'
73+
}
74+
},
75+
"partition_router": {
76+
"type": "SubstreamPartitionRouter",
77+
"parent_stream_configs": [
78+
{
79+
"type": "ParentStreamConfig",
80+
"parent_key": "id",
81+
"partition_field": "parent_id",
82+
"stream": {
83+
"type": "DeclarativeStream",
84+
"name": "parent",
85+
"retriever": {
86+
"type": "SimpleRetriever",
87+
"requester": {
88+
"type": "HttpRequester",
89+
"url_base": "https://api.test.com",
90+
"path": "/parents",
91+
"http_method": "GET",
92+
"authenticator": {
93+
"type": "ApiKeyAuthenticator",
94+
"header": "apikey",
95+
"api_token": "{{ config['api_key'] }}",
96+
},
97+
},
98+
"record_selector": {
99+
"type": "RecordSelector",
100+
"extractor": {
101+
"type": "DpathExtractor",
102+
"field_path": ["data"],
103+
},
104+
},
105+
},
106+
"schema_loader": {
107+
"type": "InlineSchemaLoader",
108+
"schema": {
109+
"$schema": "http://json-schema.org/schema#",
110+
"properties": {"id": {"type": "integer"}},
111+
"type": "object",
112+
},
113+
},
114+
},
115+
}
116+
],
117+
},
118+
},
119+
"incremental_sync": {
120+
"type": "DatetimeBasedCursor",
121+
"start_datetime": {
122+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
123+
},
124+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
125+
"datetime_format": "%Y-%m-%d",
126+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
127+
"cursor_granularity": "P1D",
128+
"step": "P15D",
129+
"cursor_field": "updated_at",
130+
"start_time_option": {
131+
"type": "RequestOption",
132+
"field_name": "start",
133+
"inject_into": "request_parameter",
134+
},
135+
"end_time_option": {
136+
"type": "RequestOption",
137+
"field_name": "end",
138+
"inject_into": "request_parameter",
139+
},
140+
},
141+
},
142+
},
143+
"streams": [{"$ref": "#/definitions/TestStream"}],
144+
"spec": {
145+
"connection_specification": {
146+
"$schema": "http://json-schema.org/draft-07/schema#",
147+
"type": "object",
148+
"required": [],
149+
"properties": {},
150+
"additionalProperties": True,
151+
},
152+
"documentation_url": "https://example.org",
153+
"type": "Spec",
154+
},
155+
}
156+
157+
158+
def to_configured_stream(
159+
stream,
160+
sync_mode=None,
161+
destination_sync_mode=DestinationSyncMode.append,
162+
cursor_field=None,
163+
primary_key=None,
164+
) -> ConfiguredAirbyteStream:
165+
return ConfiguredAirbyteStream(
166+
stream=stream,
167+
sync_mode=sync_mode,
168+
destination_sync_mode=destination_sync_mode,
169+
cursor_field=cursor_field,
170+
primary_key=primary_key,
171+
)
172+
173+
174+
def to_configured_catalog(
175+
configured_streams,
176+
) -> ConfiguredAirbyteCatalog:
177+
return ConfiguredAirbyteCatalog(streams=configured_streams)
178+
179+
180+
def create_configured_catalog(
181+
source: ConcurrentDeclarativeSource, config: dict
182+
) -> ConfiguredAirbyteCatalog:
183+
"""
184+
Discovers streams from the source and converts them into a configured catalog.
185+
"""
186+
actual_catalog = source.discover(logger=source.logger, config=config)
187+
configured_streams = [
188+
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
189+
for stream in actual_catalog.streams
190+
]
191+
return to_configured_catalog(configured_streams)
192+
193+
194+
def get_records(
195+
source: ConcurrentDeclarativeSource,
196+
config: dict,
197+
catalog: ConfiguredAirbyteCatalog,
198+
state: list = None,
199+
) -> list:
200+
"""
201+
Reads records from the source given a configuration, catalog, and optional state.
202+
Returns a list of record data dictionaries.
203+
"""
204+
return [
205+
message.record.data
206+
for message in source.read(logger=MagicMock(), config=config, catalog=catalog, state=state)
207+
if message.type == Type.RECORD
208+
]
209+
210+
211+
@freezegun.freeze_time("2024-07-15")
212+
def test_retriever_with_lazy_reading():
213+
"""Test the lazy loading behavior of the SimpleRetriever with paginated substream data."""
214+
with HttpMocker() as http_mocker:
215+
http_mocker.get(
216+
HttpRequest(url="https://api.test.com/parents"),
217+
HttpResponse(
218+
body=json.dumps(
219+
{
220+
"data": [
221+
{"id": 1, "name": "parent_1", "updated_at": "2024-07-13", "items": {"data": [{"id": 1}, {"id": 2}], "has_more": True}},
222+
{"id": 2, "name": "parent_2", "updated_at": "2024-07-13", "items": {"data": [{"id": 3}, {"id": 4}], "has_more": False}},
223+
],
224+
"has_more": False
225+
}
226+
)
227+
),
228+
)
229+
230+
http_mocker.get(
231+
HttpRequest(url="https://api.test.com/parent/1/items?starting_after=2&start=2024-07-01&end=2024-07-15"),
232+
HttpResponse(
233+
body=json.dumps(
234+
{
235+
"data": [{"id": 5}, {"id": 6}],
236+
"has_more": True
237+
}
238+
)
239+
),
240+
)
241+
242+
http_mocker.get(
243+
HttpRequest(url="https://api.test.com/parent/1/items?starting_after=6&start=2024-07-01&end=2024-07-15"),
244+
HttpResponse(
245+
body=json.dumps(
246+
{
247+
"data": [{"id": 7}],
248+
"has_more": False
249+
}
250+
)
251+
),
252+
)
253+
254+
source = ConcurrentDeclarativeSource(
255+
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
256+
)
257+
configured_catalog = create_configured_catalog(source, _CONFIG)
258+
259+
# Test full data retrieval (without state)
260+
full_records = get_records(source, _CONFIG, configured_catalog)
261+
expected_full = [
262+
{"id": 1},
263+
{"id": 2},
264+
{"id": 5},
265+
{"id": 6},
266+
{"id": 7},
267+
{"id": 3},
268+
{"id": 4},
269+
]
270+
assert expected_full == full_records
271+
272+
273+
@freezegun.freeze_time("2024-07-15")
274+
def test_incremental_sync_with_state():
275+
"""Test incremental sync behavior using state to fetch only new records."""
276+
with HttpMocker() as http_mocker:
277+
http_mocker.get(
278+
HttpRequest(url="https://api.test.com/parents"),
279+
HttpResponse(
280+
body=json.dumps(
281+
{
282+
"data": [
283+
{"id": 1, "name": "parent_1", "updated_at": "2024-07-13",
284+
"items": {"data": [{"id": 1}, {"id": 2}], "has_more": False}},
285+
{"id": 2, "name": "parent_2", "updated_at": "2024-07-13",
286+
"items": {"data": [{"id": 3}, {"id": 4}], "has_more": False}},
287+
],
288+
"has_more": False
289+
}
290+
)
291+
),
292+
)
293+
294+
http_mocker.get(
295+
HttpRequest(url="https://api.test.com/parent/1/items?start=2024-07-13&end=2024-07-15"),
296+
HttpResponse(
297+
body=json.dumps(
298+
{
299+
"data": [{"id": 10, "updated_at": "2024-07-13"}],
300+
"has_more": False
301+
}
302+
)
303+
),
304+
)
305+
http_mocker.get(
306+
HttpRequest(url="https://api.test.com/parent/2/items?start=2024-07-13&end=2024-07-15"),
307+
HttpResponse(
308+
body=json.dumps(
309+
{
310+
"data": [{"id": 11, "updated_at": "2024-07-13"}],
311+
"has_more": False
312+
}
313+
)
314+
),
315+
)
316+
317+
state = [
318+
AirbyteStateMessage(
319+
type=AirbyteStateType.STREAM,
320+
stream=AirbyteStreamState(
321+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
322+
stream_state=AirbyteStateBlob(updated_at="2024-07-13"),
323+
),
324+
)
325+
]
326+
source = ConcurrentDeclarativeSource(
327+
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=state
328+
)
329+
configured_catalog = create_configured_catalog(source, _CONFIG)
330+
331+
# Test incremental data retrieval (with state)
332+
incremental_records = get_records(source, _CONFIG, configured_catalog, state)
333+
expected_incremental = [
334+
{"id": 10, "updated_at": "2024-07-13"},
335+
{"id": 11, "updated_at": "2024-07-13"},
336+
]
337+
assert expected_incremental == incremental_records
338+

0 commit comments

Comments
 (0)