Skip to content

Commit e996805

Browse files
author
maxime.c
committed
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' into maxi297/incremental_without_partition_router_as_defaultstream
2 parents 86909df + e31fed9 commit e996805

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ def run_test_read(
120120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
121121

122122
schema_inferrer = SchemaInferrer(
123-
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
123+
self._pk_to_nested_and_composite_field(
124+
stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
125+
)
126+
if stream
127+
else None,
124128
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125129
if stream
126130
else None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,13 +2055,9 @@ def create_declarative_stream(
20552055
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20562056

20572057
if (
2058-
(
2059-
isinstance(combined_slicers, PartitionRouter)
2060-
or isinstance(concurrent_cursor, ConcurrentCursor)
2061-
)
2062-
and not is_parent
2063-
and not self._emit_connector_builder_messages
2064-
):
2058+
isinstance(combined_slicers, PartitionRouter)
2059+
or isinstance(concurrent_cursor, ConcurrentCursor)
2060+
) and not is_parent:
20652061
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20662062
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
20672063
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
@@ -2097,7 +2093,13 @@ def create_declarative_stream(
20972093
retriever,
20982094
self._message_repository,
20992095
),
2100-
stream_slicer,
2096+
stream_slicer=cast(
2097+
StreamSlicer,
2098+
StreamSlicerTestReadDecorator(
2099+
wrapped_slicer=stream_slicer,
2100+
maximum_number_of_slices=self._limit_slices_fetched or 5,
2101+
),
2102+
),
21012103
)
21022104

21032105
return DefaultStream(

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import json
88
import logging
99
import os
10-
from typing import List, Literal
10+
from typing import List, Literal, Union
1111
from unittest import mock
1212
from unittest.mock import MagicMock, patch
1313

@@ -17,7 +17,6 @@
1717

1818
from airbyte_cdk import connector_builder
1919
from airbyte_cdk.connector_builder.connector_builder_handler import (
20-
TestLimits,
2120
create_source,
2221
get_limits,
2322
resolve_manifest,
@@ -60,6 +59,7 @@
6059
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
6160
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
6261
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
62+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
6363
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
6464
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets
6565
from unit_tests.connector_builder.utils import create_configured_catalog
@@ -440,6 +440,14 @@
440440
}
441441

442442

443+
def get_retriever(stream: Union[DeclarativeStream, DefaultStream]):
444+
return (
445+
stream.retriever
446+
if isinstance(stream, DeclarativeStream)
447+
else stream._stream_partition_generator._partition_factory._retriever
448+
)
449+
450+
443451
@pytest.fixture
444452
def valid_resolve_manifest_config_file(tmp_path):
445453
config_file = tmp_path / "config.json"
@@ -1130,8 +1138,9 @@ def test_read_source(mock_http_stream):
11301138

11311139
streams = source.streams(config)
11321140
for s in streams:
1133-
assert isinstance(s.retriever, SimpleRetriever)
1134-
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
1141+
retriever = get_retriever(s)
1142+
assert isinstance(retriever, SimpleRetriever)
1143+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
11351144

11361145

11371146
@patch.object(
@@ -1177,8 +1186,9 @@ def test_read_source_single_page_single_slice(mock_http_stream):
11771186

11781187
streams = source.streams(config)
11791188
for s in streams:
1180-
assert isinstance(s.retriever, SimpleRetriever)
1181-
assert isinstance(s.retriever.stream_slicer, StreamSlicerTestReadDecorator)
1189+
retriever = get_retriever(s)
1190+
assert isinstance(retriever, SimpleRetriever)
1191+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
11821192

11831193

11841194
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)