Skip to content

Commit 038b58f

Browse files
fix: Add comprehensive integration tests and fix schema inference issues
- Fix stream_name propagation in factory to use stream context name - Add type conversion for Mapping-like wrapper objects to plain dicts - Add 5 comprehensive integration tests using HttpMocker and JSONPlaceholder API - Update test assertions to handle genson's nullable type format - Add proper error handling for retriever failures - Update all unit test mocks to include stream_slices() method Co-Authored-By: AJ Steers <[email protected]>
1 parent c671ccf commit 038b58f

File tree

4 files changed

+505
-11
lines changed

4 files changed

+505
-11
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2534,7 +2534,7 @@ def create_inferred_schema_loader(
25342534
retriever=retriever,
25352535
config=config,
25362536
record_sample_size=model.record_sample_size or 100,
2537-
stream_name=model.stream_name or "",
2537+
stream_name=model.stream_name or name,
25382538
parameters=model.parameters or {},
25392539
)
25402540

airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

5+
from collections.abc import Mapping as ABCMapping
56
from dataclasses import InitVar, dataclass
67
from typing import Any, Mapping, Optional
78

@@ -55,18 +56,27 @@ def get_json_schema(self) -> Mapping[str, Any]:
5556

5657
record_count = 0
5758
try:
58-
for record in self.retriever.read_records({}): # type: ignore[call-overload]
59-
if record_count >= self.record_sample_size:
60-
break
59+
for stream_slice in self.retriever.stream_slices():
60+
for record in self.retriever.read_records(
61+
records_schema={}, stream_slice=stream_slice
62+
):
63+
if record_count >= self.record_sample_size:
64+
break
65+
66+
if isinstance(record, ABCMapping) and not isinstance(record, dict):
67+
record = dict(record)
6168

62-
airbyte_record = AirbyteRecordMessage(
63-
stream=self.stream_name,
64-
data=record, # type: ignore[arg-type]
65-
emitted_at=0, # Not used for schema inference
66-
)
69+
airbyte_record = AirbyteRecordMessage(
70+
stream=self.stream_name,
71+
data=record, # type: ignore[arg-type]
72+
emitted_at=0,
73+
)
6774

68-
schema_inferrer.accumulate(airbyte_record)
69-
record_count += 1
75+
schema_inferrer.accumulate(airbyte_record)
76+
record_count += 1
77+
78+
if record_count >= self.record_sample_size:
79+
break
7080
except Exception:
7181
return {}
7282

unit_tests/sources/declarative/schema/test_inferred_schema_loader.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
def mock_retriever():
8181
"""Create a mock retriever that returns sample records."""
8282
retriever = MagicMock()
83+
retriever.stream_slices.return_value = iter([None])
8384
retriever.read_records.return_value = iter(
8485
[
8586
{"id": 1, "name": "Alice", "age": 30, "active": True},
@@ -126,6 +127,7 @@ def test_inferred_schema_loader_basic(inferred_schema_loader):
126127
def test_inferred_schema_loader_empty_records():
127128
"""Test that InferredSchemaLoader returns empty schema when no records are available."""
128129
retriever = MagicMock()
130+
retriever.stream_slices.return_value = iter([None])
129131
retriever.read_records.return_value = iter([])
130132

131133
config = MagicMock()
@@ -147,6 +149,7 @@ def test_inferred_schema_loader_respects_sample_size():
147149
"""Test that InferredSchemaLoader respects the record_sample_size parameter."""
148150
retriever = MagicMock()
149151
records = [{"id": i, "name": f"User{i}"} for i in range(10)]
152+
retriever.stream_slices.return_value = iter([None])
150153
retriever.read_records.return_value = iter(records)
151154

152155
config = MagicMock()
@@ -169,6 +172,7 @@ def test_inferred_schema_loader_respects_sample_size():
169172
def test_inferred_schema_loader_handles_errors():
170173
"""Test that InferredSchemaLoader handles errors gracefully."""
171174
retriever = MagicMock()
175+
retriever.stream_slices.return_value = iter([None])
172176
retriever.read_records.side_effect = Exception("API Error")
173177

174178
config = MagicMock()
@@ -189,6 +193,7 @@ def test_inferred_schema_loader_handles_errors():
189193
def test_inferred_schema_loader_with_nested_objects():
190194
"""Test that InferredSchemaLoader handles nested objects correctly."""
191195
retriever = MagicMock()
196+
retriever.stream_slices.return_value = iter([None])
192197
retriever.read_records.return_value = iter(
193198
[
194199
{
@@ -224,6 +229,7 @@ def test_inferred_schema_loader_with_nested_objects():
224229
def test_inferred_schema_loader_with_arrays():
225230
"""Test that InferredSchemaLoader handles arrays correctly."""
226231
retriever = MagicMock()
232+
retriever.stream_slices.return_value = iter([None])
227233
retriever.read_records.return_value = iter(
228234
[
229235
{"id": 1, "name": "Alice", "tags": ["admin", "user"]},

0 commit comments

Comments
 (0)