Skip to content

Commit 3154598

Browse files
added unit tests
1 parent a140e99 commit 3154598

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import json
2+
from unittest.mock import MagicMock
3+
4+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
5+
ConcurrentDeclarativeSource,
6+
)
7+
from airbyte_cdk.models import (
8+
ConfiguredAirbyteCatalog,
9+
ConfiguredAirbyteStream,
10+
DestinationSyncMode,
11+
Type,
12+
SyncMode,
13+
)
14+
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
15+
16+
MANIFEST = {
17+
"version": "6.7.0",
18+
"type": "DeclarativeSource",
19+
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
20+
"dynamic_streams": [
21+
{
22+
"type": "DynamicDeclarativeStream",
23+
"stream_template": {
24+
"type": "DeclarativeStream",
25+
"schema_loader": {
26+
"type": "InlineSchemaLoader",
27+
"schema": {
28+
"$schema": "http://json-schema.org/schema#",
29+
"type": "object",
30+
"properties": {
31+
"field1": {"type": "string"},
32+
"field2": {"type": "string"},
33+
},
34+
},
35+
},
36+
"retriever": {
37+
"type": "SimpleRetriever",
38+
"requester": {
39+
"type": "HttpRequester",
40+
"url_base": "https://api.test.com",
41+
"path": "/search/{{parameters['object_name']}}",
42+
"http_method": "GET",
43+
"authenticator": {
44+
"type": "ApiKeyAuthenticator",
45+
"header": "apikey",
46+
"api_token": "{{ config['api_key'] }}",
47+
},
48+
},
49+
"record_selector": {
50+
"type": "RecordSelector",
51+
"extractor": {"type": "DpathExtractor", "field_path": []},
52+
},
53+
"paginator": {"type": "NoPagination"},
54+
},
55+
},
56+
"components_resolver": {
57+
"type": "ParametrizedComponentsResolver",
58+
"stream_parameters": {
59+
"type": "StreamParametersDefinition",
60+
"lisf_of_parameters_for_stream": [
61+
{
62+
"parameters": {"object_name": "Customers"},
63+
"name": "Customers",
64+
"transformations": [
65+
{
66+
"type": "AddFields",
67+
"fields": [
68+
{
69+
"type": "AddedFieldDefinition",
70+
"path": ["field2"],
71+
"value": "Related to customers field",
72+
}
73+
],
74+
}
75+
],
76+
},
77+
{
78+
"parameters": {"object_name": "Refunds"},
79+
"name": "Refunds",
80+
"transformations": [
81+
{
82+
"type": "AddFields",
83+
"fields": [
84+
{
85+
"type": "AddedFieldDefinition",
86+
"path": ["field2"],
87+
"value": "Related to refunds field",
88+
}
89+
],
90+
}
91+
],
92+
},
93+
{
94+
"parameters": {"object_name": "Orders"},
95+
"name": "Orders",
96+
"transformations": [
97+
{
98+
"type": "AddFields",
99+
"fields": [
100+
{
101+
"type": "AddedFieldDefinition",
102+
"path": ["field2"],
103+
"value": "Related to orders field",
104+
}
105+
],
106+
}
107+
],
108+
},
109+
],
110+
},
111+
"components_mapping": [
112+
{
113+
"type": "ComponentMappingDefinition",
114+
"create_or_update": True,
115+
"field_path": ["$parameters"],
116+
"value": "{{components_values['parameters']}}",
117+
},
118+
{
119+
"type": "ComponentMappingDefinition",
120+
"create_or_update": True,
121+
"field_path": ["name"],
122+
"value": "{{components_values['name']}}",
123+
},
124+
{
125+
"type": "ComponentMappingDefinition",
126+
"create_or_update": True,
127+
"field_path": ["transformations"],
128+
"value": "{{components_values['transformations']}}",
129+
},
130+
],
131+
},
132+
}
133+
],
134+
}
135+
136+
137+
def test_dynamic_streams_with_parametrized_components_resolver():
138+
config = {"api_key": "test"}
139+
source = ConcurrentDeclarativeSource(
140+
source_config=MANIFEST, config=config, catalog=None, state=None
141+
)
142+
discover_result = source.discover(config=config, logger=MagicMock())
143+
assert len(discover_result.streams) == 3
144+
145+
configured_streams = []
146+
for stream in discover_result.streams:
147+
configured_stream = ConfiguredAirbyteStream(
148+
stream=stream,
149+
sync_mode=SyncMode.full_refresh,
150+
destination_sync_mode=DestinationSyncMode.overwrite,
151+
)
152+
configured_streams.append(configured_stream)
153+
configured_catalog = ConfiguredAirbyteCatalog(streams=configured_streams)
154+
155+
with HttpMocker() as http_mocker:
156+
http_mocker.get(
157+
HttpRequest(url="https://api.test.com/search/Customers"),
158+
HttpResponse(body=json.dumps({"field1": "Customers info"})),
159+
)
160+
http_mocker.get(
161+
HttpRequest(url="https://api.test.com/search/Orders"),
162+
HttpResponse(body=json.dumps({"field1": "Orders info"})),
163+
)
164+
http_mocker.get(
165+
HttpRequest(url="https://api.test.com/search/Refunds"),
166+
HttpResponse(body=json.dumps({"field1": "Refunds info"})),
167+
)
168+
169+
records = [
170+
record
171+
for record in source.read(MagicMock(), config, configured_catalog)
172+
if record.type == Type.RECORD
173+
]
174+
175+
assert len(records) == 3
176+
assert [record.record.data for record in records] == [
177+
{"field1": "Customers info", "field2": "Related to customers field"},
178+
{"field1": "Refunds info", "field2": "Related to refunds field"},
179+
{"field1": "Orders info", "field2": "Related to orders field"},
180+
]

0 commit comments

Comments
 (0)