Skip to content

Commit a53540d

Browse files
committed
Add property additional_query_properties param to SimpleRetrieverTestReadDecorator
1 parent 155cdc8 commit a53540d

File tree

5 files changed

+293
-1
lines changed

5 files changed

+293
-1
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def __init__(
128128
component_factory
129129
if component_factory
130130
else ModelToComponentFactory(
131-
emit_connector_builder_messages,
131+
emit_connector_builder_messages=emit_connector_builder_messages,
132132
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
133133
)
134134
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3312,6 +3312,7 @@ def _get_url() -> str:
33123312
config=config,
33133313
maximum_number_of_slices=self._limit_slices_fetched or 5,
33143314
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3315+
additional_query_properties=query_properties,
33153316
log_formatter=log_formatter,
33163317
parameters=model.parameters or {},
33173318
)
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import copy
6+
import json
7+
8+
import freezegun
9+
10+
from airbyte_cdk.connector_builder.connector_builder_handler import (
11+
TestLimits,
12+
)
13+
from airbyte_cdk.connector_builder.main import (
14+
handle_connector_builder_request,
15+
)
16+
from airbyte_cdk.models import (
17+
AirbyteStateBlob,
18+
AirbyteStateMessage,
19+
AirbyteStreamState,
20+
ConfiguredAirbyteCatalogSerializer,
21+
Level,
22+
StreamDescriptor,
23+
)
24+
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
25+
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
26+
from airbyte_cdk.test.mock_http.response_builder import find_template
27+
28+
BASE_URL = "https://api.apilayer.com/exchangerates_data/"
29+
FREEZE_DATE = "2025-05-23"
30+
31+
PROPERTY_KEY = "test"
32+
PROPERTY_LIST = [
33+
"one",
34+
"two",
35+
"three",
36+
"four"
37+
]
38+
39+
MANIFEST = {
40+
"version": "6.48.15",
41+
"type": "DeclarativeSource",
42+
"check": {
43+
"type": "CheckStream",
44+
"stream_names": [
45+
"Rates"
46+
]
47+
},
48+
"streams": [
49+
{
50+
"type": "DeclarativeStream",
51+
"name": "Rates",
52+
"retriever": {
53+
"type": "SimpleRetriever",
54+
"requester": {
55+
"type": "HttpRequester",
56+
"path": "/exchangerates_data/{{stream_interval.start_time}}",
57+
"url_base": "https://api.apilayer.com",
58+
"http_method": "GET",
59+
"authenticator": {
60+
"type": "ApiKeyAuthenticator",
61+
"api_token": "{{ config['api_key'] }}",
62+
"inject_into": {
63+
"type": "RequestOption",
64+
"field_name": "apikey",
65+
"inject_into": "header"
66+
}
67+
},
68+
"request_parameters": {
69+
"base": "{{ config['base'] }}",
70+
PROPERTY_KEY: {
71+
"type": "QueryProperties",
72+
"property_list": PROPERTY_LIST,
73+
"property_chunking": {
74+
"type": "PropertyChunking",
75+
"property_limit_type": "property_count",
76+
"property_limit": 2
77+
}
78+
}
79+
}
80+
},
81+
"record_selector": {
82+
"type": "RecordSelector",
83+
"extractor": {
84+
"type": "DpathExtractor",
85+
"field_path": []
86+
}
87+
}
88+
},
89+
"primary_key": [],
90+
"schema_loader": {
91+
"type": "InlineSchemaLoader",
92+
"schema": {
93+
"type": "object",
94+
"$schema": "http://json-schema.org/schema#",
95+
"properties": {
96+
"base": {
97+
"type": "string"
98+
},
99+
"date": {
100+
"type": "string"
101+
},
102+
"rates": {
103+
"type": "object",
104+
"properties": {
105+
"fake_currency": {
106+
"type": "number"
107+
}
108+
}
109+
},
110+
"success": {
111+
"type": "boolean"
112+
},
113+
"timestamp": {
114+
"type": "number"
115+
},
116+
"historical": {
117+
"type": "boolean"
118+
}
119+
}
120+
}
121+
},
122+
"transformations": [],
123+
"incremental_sync": {
124+
"type": "DatetimeBasedCursor",
125+
"step": "P1D",
126+
"cursor_field": "date",
127+
"end_datetime": {
128+
"type": "MinMaxDatetime",
129+
"datetime": "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}",
130+
"datetime_format": "%Y-%m-%dT%H:%M:%SZ"
131+
},
132+
"start_datetime": {
133+
"type": "MinMaxDatetime",
134+
"datetime": "{{ config['start_date'] }}",
135+
"datetime_format": "%Y-%m-%dT%H:%M:%SZ"
136+
},
137+
"datetime_format": "%Y-%m-%d",
138+
"cursor_granularity": "P1D",
139+
"cursor_datetime_formats": [
140+
"%Y-%m-%d"
141+
]
142+
},
143+
"state_migrations": []
144+
}
145+
],
146+
"spec": {
147+
"type": "Spec",
148+
"documentation_url": "https://example.org",
149+
"connection_specification": {
150+
"type": "object",
151+
"$schema": "http://json-schema.org/draft-07/schema#",
152+
"required": [
153+
"start_date",
154+
"api_key",
155+
"base"
156+
],
157+
"properties": {
158+
"base": {
159+
"type": "string",
160+
"order": 2,
161+
"title": "Base"
162+
},
163+
"api_key": {
164+
"type": "string",
165+
"order": 1,
166+
"title": "API Key",
167+
"airbyte_secret": True
168+
},
169+
"start_date": {
170+
"type": "string",
171+
"order": 0,
172+
"title": "Start date",
173+
"format": "date-time",
174+
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
175+
}
176+
},
177+
"additionalProperties": True
178+
}
179+
},
180+
"metadata": {
181+
"testedStreams": {
182+
"Rates": {
183+
"hasRecords": False,
184+
"streamHash": "4dce031d602258dd3bcc478731d6862a5cdeb70f",
185+
"hasResponse": False,
186+
"primaryKeysAreUnique": False,
187+
"primaryKeysArePresent": False,
188+
"responsesAreSuccessful": False
189+
}
190+
},
191+
"autoImportSchema": {
192+
"Rates": True
193+
}
194+
},
195+
"dynamic_streams": []
196+
}
197+
198+
_stream_name = "Rates"
199+
200+
_A_STATE = [
201+
AirbyteStateMessage(
202+
type="STREAM",
203+
stream=AirbyteStreamState(
204+
stream_descriptor=StreamDescriptor(name=_stream_name),
205+
stream_state=AirbyteStateBlob({"key": "value"}),
206+
),
207+
)
208+
]
209+
210+
TEST_READ_CONFIG = {
211+
"__injected_declarative_manifest": MANIFEST,
212+
"__command": "test_read",
213+
"__test_read_config": {"max_pages_per_slice": 2, "max_slices": 5, "max_records": 10},
214+
}
215+
216+
CONFIGURED_CATALOG = {
217+
"streams": [
218+
{
219+
"stream": {
220+
"name": _stream_name,
221+
"json_schema": {
222+
"$schema": "http://json-schema.org/draft-07/schema#",
223+
"type": "object",
224+
"properties": {},
225+
},
226+
"supported_sync_modes": ["full_refresh"],
227+
"source_defined_cursor": False,
228+
},
229+
"sync_mode": "full_refresh",
230+
"destination_sync_mode": "overwrite",
231+
}
232+
]
233+
}
234+
235+
@freezegun.freeze_time(f"{FREEZE_DATE}T00:00:00Z")
236+
def test_read():
237+
conversion_base = "USD"
238+
config = copy.deepcopy(TEST_READ_CONFIG)
239+
config["start_date"] = f"{FREEZE_DATE}T00:00:00Z"
240+
config["base"] = conversion_base
241+
config["api_key"] = "test_api_key"
242+
243+
stream_url = f'{BASE_URL}{FREEZE_DATE}?base={conversion_base}&{PROPERTY_KEY}='
244+
245+
with HttpMocker() as http_mocker:
246+
source = ManifestDeclarativeSource(source_config=MANIFEST, emit_connector_builder_messages=True)
247+
limits = TestLimits()
248+
249+
http_mocker.get(
250+
HttpRequest(url=f"{stream_url}{PROPERTY_LIST[0]}%2C{PROPERTY_LIST[1]}"),
251+
HttpResponse(json.dumps(find_template("declarative/property_chunking/rates_one_two", __file__)), 200),
252+
)
253+
http_mocker.get(
254+
HttpRequest(url=f"{stream_url}{PROPERTY_LIST[2]}%2C{PROPERTY_LIST[3]}"),
255+
HttpResponse(json.dumps(find_template("declarative/property_chunking/rates_three_four", __file__)), 200),
256+
)
257+
output_record = handle_connector_builder_request(
258+
source,
259+
"test_read",
260+
config,
261+
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
262+
_A_STATE,
263+
limits,
264+
)
265+
# for connector build we get each record in a single page
266+
assert len(output_record.record.data["slices"][0]["pages"]) == 2
267+
for current_log in output_record.record.data["logs"]:
268+
assert not "Something went wrong in the connector" in current_log["message"]
269+
assert not current_log["internal_message"]
270+
assert not current_log["level"] == Level.ERROR
271+
assert not current_log["stacktrace"]
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"success": true,
3+
"timestamp": 1641081599,
4+
"historical": true,
5+
"base": "USD",
6+
"date": "2022-01-01",
7+
"rates": {
8+
"fake_currency": 1.2345
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"success": true,
3+
"timestamp": 1641167999,
4+
"historical": true,
5+
"base": "USD",
6+
"date": "2022-01-02",
7+
"rates": {
8+
"fake_currency": 1.2345
9+
}
10+
}

0 commit comments

Comments
 (0)