Skip to content

Commit 4cd1e0c

Browse files
committed
Add unittest
1 parent 4378007 commit 4cd1e0c

File tree

1 file changed

+132
-0
lines changed

1 file changed

+132
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import json
6+
from unittest.mock import MagicMock
7+
8+
from airbyte_cdk.models import (ConfiguredAirbyteCatalog,
9+
ConfiguredAirbyteStream, DestinationSyncMode,
10+
Type)
11+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import \
12+
ConcurrentDeclarativeSource
13+
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
14+
15+
16+
def to_configured_stream(
17+
stream,
18+
sync_mode=None,
19+
destination_sync_mode=DestinationSyncMode.append,
20+
cursor_field=None,
21+
primary_key=None,
22+
) -> ConfiguredAirbyteStream:
23+
return ConfiguredAirbyteStream(
24+
stream=stream,
25+
sync_mode=sync_mode,
26+
destination_sync_mode=destination_sync_mode,
27+
cursor_field=cursor_field,
28+
primary_key=primary_key,
29+
)
30+
31+
32+
def to_configured_catalog(
33+
configured_streams,
34+
) -> ConfiguredAirbyteCatalog:
35+
return ConfiguredAirbyteCatalog(streams=configured_streams)
36+
37+
38+
_CONFIG = {
39+
"start_date": "2024-07-01T00:00:00.000Z",
40+
}
41+
42+
43+
_MANIFEST = {
44+
"version": "6.7.0",
45+
"type": "DeclarativeSource",
46+
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
47+
"streams": [
48+
{
49+
"type": "DeclarativeStream",
50+
"name": "test_stream",
51+
"primary_key": [],
52+
"schema_loader": {
53+
"type": "InlineSchemaLoader",
54+
"schema": {
55+
"$schema": "http://json-schema.org/schema#",
56+
"properties": {
57+
"ABC": {"type": "number"},
58+
"AED": {"type": "number"},
59+
},
60+
"type": "object",
61+
},
62+
},
63+
"retriever": {
64+
"type": "SimpleRetriever",
65+
"requester": {
66+
"type": "HttpRequester",
67+
"url_base": "https://api.test.com",
68+
"path": "/items",
69+
"http_method": "GET",
70+
"authenticator": {
71+
"type": "ApiKeyAuthenticator",
72+
"header": "apikey",
73+
"api_token": "{{ config['api_key'] }}",
74+
},
75+
},
76+
"record_selector": {
77+
"type": "RecordSelector",
78+
"extractor": {
79+
"type": "KeyValueExtractor",
80+
"keys_extractor": {
81+
"type": "DpathExtractor",
82+
"field_path": ["dimensions", "names"],
83+
},
84+
"values_extractor": {
85+
"type": "DpathExtractor",
86+
"field_path": ["dimensions", "values"],
87+
},
88+
},
89+
},
90+
"paginator": {"type": "NoPagination"},
91+
},
92+
}
93+
],
94+
}
95+
96+
97+
def test_key_value_extractor():
98+
source = ConcurrentDeclarativeSource(
99+
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
100+
)
101+
102+
actual_catalog = source.discover(logger=source.logger, config=_CONFIG)
103+
104+
configured_streams = [
105+
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
106+
for stream in actual_catalog.streams
107+
]
108+
configured_catalog = to_configured_catalog(configured_streams)
109+
110+
with HttpMocker() as http_mocker:
111+
http_mocker.get(
112+
HttpRequest(url="https://api.test.com/items"),
113+
HttpResponse(
114+
body=json.dumps(
115+
{
116+
"dimensions": {
117+
"names": ["customer_segment", "traffic_source"],
118+
"values": ["enterprise", "organic_search"],
119+
}
120+
}
121+
)
122+
),
123+
)
124+
125+
records = [
126+
message.record
127+
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
128+
if message.type == Type.RECORD
129+
]
130+
131+
assert len(records) == 1
132+
assert records[0].data == {"customer_segment": "enterprise", "traffic_source": "organic_search"}

0 commit comments

Comments
 (0)