Skip to content

Commit 335981e

Browse files
committed
remove SimpleRetrieverTestReadDecorator and use StreamSlicerTestReadDecorator for scenarios wehere we want to limit the number of requests that are made to the backend source.
1 parent d0e7c7d commit 335981e

File tree

8 files changed

+160
-135
lines changed

8 files changed

+160
-135
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,6 @@
510510
AsyncRetriever,
511511
LazySimpleRetriever,
512512
SimpleRetriever,
513-
SimpleRetrieverTestReadDecorator,
514513
)
515514
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
516515
ConnectorBuilderFileUploader,
@@ -530,7 +529,10 @@
530529
)
531530
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
532531
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
533-
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
532+
from airbyte_cdk.sources.declarative.stream_slicers import (
533+
StreamSlicer,
534+
StreamSlicerTestReadDecorator,
535+
)
534536
from airbyte_cdk.sources.declarative.transformations import (
535537
AddFields,
536538
RecordTransformation,
@@ -3241,6 +3243,11 @@ def _get_url() -> str:
32413243
request_options_provider = DefaultRequestOptionsProvider(parameters={})
32423244

32433245
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
3246+
if self._should_limit_slices_fetched():
3247+
stream_slicer = StreamSlicerTestReadDecorator(
3248+
wrapped_slicer=stream_slicer,
3249+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3250+
)
32443251

32453252
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
32463253
paginator = (
@@ -3299,10 +3306,7 @@ def _get_url() -> str:
32993306
parameters=model.parameters or {},
33003307
)
33013308

3302-
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
3303-
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0
3304-
retriever_log_formatter = log_formatter if test_read_enabled else None
3305-
return SimpleRetrieverTestReadDecorator(
3309+
return SimpleRetriever(
33063310
name=name,
33073311
paginator=paginator,
33083312
primary_key=primary_key,
@@ -3312,14 +3316,37 @@ def _get_url() -> str:
33123316
request_option_provider=request_options_provider,
33133317
cursor=cursor,
33143318
config=config,
3315-
maximum_number_of_slices=maximum_number_of_slices,
3316-
emit_connector_builder_messages=self._emit_connector_builder_messages,
33173319
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33183320
additional_query_properties=query_properties,
3319-
log_formatter=retriever_log_formatter,
3321+
log_formatter=self._get_log_formatter(log_formatter, name),
33203322
parameters=model.parameters or {},
33213323
)
33223324

3325+
def _get_log_formatter(
3326+
self, log_formatter: Callable[[Response], Any] | None, name: str
3327+
) -> Callable[[Response], Any] | None:
3328+
if self._should_limit_slices_fetched():
3329+
return (
3330+
(
3331+
lambda response: format_http_message(
3332+
response,
3333+
f"Stream '{name}' request",
3334+
f"Request performed in order to extract records for stream '{name}'",
3335+
name,
3336+
)
3337+
)
3338+
if not log_formatter
3339+
else log_formatter
3340+
)
3341+
return None
3342+
3343+
def _should_limit_slices_fetched(self) -> bool:
3344+
"""
3345+
Returns True if the number of slices fetched should be limited, False otherwise.
3346+
This is used to limit the number of slices fetched during tests.
3347+
"""
3348+
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)
3349+
33233350
@staticmethod
33243351
def _query_properties_in_request_parameters(
33253352
requester: Union[HttpRequesterModel, CustomRequesterModel],
@@ -3410,7 +3437,7 @@ def create_async_retriever(
34103437
transformations: List[RecordTransformation],
34113438
**kwargs: Any,
34123439
) -> AsyncRetriever:
3413-
def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetriever:
3440+
def _get_download_retriever() -> SimpleRetriever:
34143441
record_selector = RecordSelector(
34153442
extractor=download_extractor,
34163443
name=name,
@@ -3430,19 +3457,15 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
34303457
if model.download_paginator
34313458
else NoPagination(parameters={})
34323459
)
3433-
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
3434-
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0
34353460

3436-
return SimpleRetrieverTestReadDecorator(
3461+
return SimpleRetriever(
34373462
requester=download_requester,
34383463
record_selector=record_selector,
34393464
primary_key=None,
34403465
name=job_download_components_name,
34413466
paginator=paginator,
34423467
config=config,
34433468
parameters={},
3444-
maximum_number_of_slices=maximum_number_of_slices,
3445-
emit_connector_builder_messages=self._emit_connector_builder_messages,
34463469
)
34473470

34483471
def _get_job_timeout() -> datetime.timedelta:
@@ -3479,7 +3502,14 @@ def _get_job_timeout() -> datetime.timedelta:
34793502
transformations=transformations,
34803503
client_side_incremental_sync=client_side_incremental_sync,
34813504
)
3505+
34823506
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
3507+
if self._should_limit_slices_fetched():
3508+
stream_slicer = StreamSlicerTestReadDecorator(
3509+
wrapped_slicer=stream_slicer,
3510+
maximum_number_of_slices=self._limit_slices_fetched or 5,
3511+
)
3512+
34833513
creation_requester = self._create_component_from_model(
34843514
model=model.creation_requester,
34853515
decoder=decoder,

airbyte_cdk/sources/declarative/retrievers/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,11 @@
77
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
88
LazySimpleRetriever,
99
SimpleRetriever,
10-
SimpleRetrieverTestReadDecorator,
1110
)
1211

1312
__all__ = [
1413
"Retriever",
1514
"SimpleRetriever",
16-
"SimpleRetrieverTestReadDecorator",
1715
"AsyncRetriever",
1816
"LazySimpleRetriever",
1917
]

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -645,54 +645,6 @@ def _deep_merge(
645645
target[key] = value
646646

647647

648-
@dataclass
649-
class SimpleRetrieverTestReadDecorator(SimpleRetriever):
650-
"""
651-
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
652-
slices that are queried throughout a read command.
653-
654-
maximum_number_of_slices must be provided when test read is enabled.
655-
"""
656-
657-
maximum_number_of_slices: int = 0
658-
emit_connector_builder_messages: bool = False
659-
660-
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
661-
super().__post_init__(parameters)
662-
if self.test_read_enabled():
663-
self.log_formatter = (
664-
(
665-
lambda response: format_http_message(
666-
response,
667-
f"Stream '{self.name}' request",
668-
f"Request performed in order to extract records for stream '{self.name}'",
669-
self.name,
670-
)
671-
)
672-
if not self.log_formatter
673-
else self.log_formatter
674-
)
675-
676-
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
677-
raise ValueError(
678-
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
679-
)
680-
681-
def test_read_enabled(self) -> bool:
682-
"""
683-
Indicates whether the retriever is in test read mode.
684-
This is used to limit the number of slices processed during a test read.
685-
"""
686-
return bool(self.maximum_number_of_slices or self.emit_connector_builder_messages)
687-
688-
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
689-
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
690-
if not self.test_read_enabled():
691-
return super().stream_slices()
692-
else:
693-
return islice(super().stream_slices(), self.maximum_number_of_slices)
694-
695-
696648
@deprecated(
697649
"This class is experimental. Use at your own risk.",
698650
category=ExperimentalClassWarning,

airbyte_cdk/sources/declarative/stream_slicers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@
33
#
44

55
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
6+
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer_test_read_decorator import (
7+
StreamSlicerTestReadDecorator,
8+
)
69

7-
__all__ = ["StreamSlicer"]
10+
__all__ = ["StreamSlicer", "StreamSlicerTestReadDecorator"]
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from itertools import islice
7+
from typing import Any, Iterable, Mapping, Optional, Union
8+
9+
from airbyte_cdk.sources.types import StreamSlice, StreamState
10+
11+
from .stream_slicer import StreamSlicer
12+
13+
14+
@dataclass
15+
class StreamSlicerTestReadDecorator(StreamSlicer):
16+
"""
17+
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
18+
slices that are queried throughout a read command.
19+
"""
20+
21+
wrapped_slicer: StreamSlicer
22+
maximum_number_of_slices: int = 5
23+
24+
def stream_slices(self) -> Iterable[StreamSlice]:
25+
return islice(self.wrapped_slicer.stream_slices(), self.maximum_number_of_slices)
26+
27+
def get_request_params(
28+
self,
29+
*,
30+
stream_state: Optional[StreamState] = None,
31+
stream_slice: Optional[StreamSlice] = None,
32+
next_page_token: Optional[Mapping[str, Any]] = None,
33+
) -> Mapping[str, Any]:
34+
return self.wrapped_slicer.get_request_params(
35+
stream_state=stream_state,
36+
stream_slice=stream_slice,
37+
next_page_token=next_page_token,
38+
)
39+
40+
def get_request_headers(
41+
self,
42+
*,
43+
stream_state: Optional[StreamState] = None,
44+
stream_slice: Optional[StreamSlice] = None,
45+
next_page_token: Optional[Mapping[str, Any]] = None,
46+
) -> Mapping[str, Any]:
47+
return self.wrapped_slicer.get_request_headers(
48+
stream_state=stream_state,
49+
stream_slice=stream_slice,
50+
next_page_token=next_page_token,
51+
)
52+
53+
def get_request_body_data(
54+
self,
55+
*,
56+
stream_state: Optional[StreamState] = None,
57+
stream_slice: Optional[StreamSlice] = None,
58+
next_page_token: Optional[Mapping[str, Any]] = None,
59+
) -> Union[Mapping[str, Any], str]:
60+
return self.wrapped_slicer.get_request_body_data(
61+
stream_state=stream_state,
62+
stream_slice=stream_slice,
63+
next_page_token=next_page_token,
64+
)
65+
66+
def get_request_body_json(
67+
self,
68+
*,
69+
stream_state: Optional[StreamState] = None,
70+
stream_slice: Optional[StreamSlice] = None,
71+
next_page_token: Optional[Mapping[str, Any]] = None,
72+
) -> Mapping[str, Any]:
73+
return self.wrapped_slicer.get_request_body_json(
74+
stream_state=stream_state,
75+
stream_slice=stream_slice,
76+
next_page_token=next_page_token,
77+
)
78+
79+
def __getattr__(self, name: str) -> Any:
80+
# Delegate everything else to the wrapped object
81+
return getattr(self.wrapped_slicer, name)

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
from airbyte_cdk.models import Type as MessageType
5959
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
6060
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
61-
from airbyte_cdk.sources.declarative.retrievers import SimpleRetrieverTestReadDecorator
6261
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
6362
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
6463
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets, update_secrets
@@ -1112,7 +1111,7 @@ def test_read_source(mock_http_stream):
11121111

11131112
streams = source.streams(config)
11141113
for s in streams:
1115-
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
1114+
assert isinstance(s.retriever, SimpleRetriever)
11161115

11171116

11181117
@patch.object(
@@ -1158,7 +1157,7 @@ def test_read_source_single_page_single_slice(mock_http_stream):
11581157

11591158
streams = source.streams(config)
11601159
for s in streams:
1161-
assert isinstance(s.retriever, SimpleRetrieverTestReadDecorator)
1160+
assert isinstance(s.retriever, SimpleRetriever)
11621161

11631162

11641163
@pytest.mark.parametrize(

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,12 @@
151151
)
152152
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
153153
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
154-
from airbyte_cdk.sources.declarative.retrievers import (
155-
AsyncRetriever,
156-
SimpleRetriever,
157-
SimpleRetrieverTestReadDecorator,
158-
)
154+
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
159155
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
160156
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
161157
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
162158
from airbyte_cdk.sources.declarative.spec import Spec
159+
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
163160
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
164161
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
165162
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
@@ -2715,6 +2712,13 @@ def test_simple_retriever_emit_log_messages():
27152712
"path": "/v1/api",
27162713
},
27172714
}
2715+
request = requests.PreparedRequest()
2716+
request.headers = {"header": "value"}
2717+
request.url = "http://byrde.enterprises.com/casinos"
2718+
2719+
response = requests.Response()
2720+
response.request = request
2721+
response.status_code = 200
27182722

27192723
connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
27202724
retriever = connector_builder_factory.create_component(
@@ -2727,8 +2731,12 @@ def test_simple_retriever_emit_log_messages():
27272731
transformations=[],
27282732
)
27292733

2730-
assert isinstance(retriever, SimpleRetrieverTestReadDecorator)
2734+
assert isinstance(retriever, SimpleRetriever)
27312735
assert connector_builder_factory._message_repository._log_level == Level.DEBUG
2736+
assert retriever.log_formatter is not None
2737+
assert retriever.log_formatter(response) == connector_builder_factory._get_log_formatter(
2738+
None, retriever.name
2739+
)(response)
27322740

27332741

27342742
def test_create_page_increment():
@@ -3078,7 +3086,8 @@ def test_use_request_options_provider_for_datetime_based_cursor():
30783086
assert retriever.name == "Test"
30793087

30803088
assert isinstance(retriever.cursor, DatetimeBasedCursor)
3081-
assert isinstance(retriever.stream_slicer, DatetimeBasedCursor)
3089+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
3090+
assert isinstance(retriever.stream_slicer.wrapped_slicer, DatetimeBasedCursor)
30823091

30833092
assert isinstance(retriever.request_option_provider, DatetimeBasedRequestOptionsProvider)
30843093
assert (
@@ -3166,7 +3175,8 @@ def test_do_not_separate_request_options_provider_for_non_datetime_based_cursor(
31663175
assert retriever.name == "Test"
31673176

31683177
assert isinstance(retriever.cursor, PerPartitionCursor)
3169-
assert isinstance(retriever.stream_slicer, PerPartitionCursor)
3178+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
3179+
assert isinstance(retriever.stream_slicer.wrapped_slicer, PerPartitionCursor)
31703180

31713181
assert isinstance(retriever.request_option_provider, PerPartitionCursor)
31723182
assert isinstance(retriever.request_option_provider._cursor_factory, CursorFactory)
@@ -3207,7 +3217,8 @@ def test_use_default_request_options_provider():
32073217
assert retriever.primary_key == "id"
32083218
assert retriever.name == "Test"
32093219

3210-
assert isinstance(retriever.stream_slicer, SinglePartitionRouter)
3220+
assert isinstance(retriever.stream_slicer, StreamSlicerTestReadDecorator)
3221+
assert isinstance(retriever.stream_slicer.wrapped_slicer, SinglePartitionRouter)
32113222
assert isinstance(retriever.request_option_provider, DefaultRequestOptionsProvider)
32123223

32133224

0 commit comments

Comments
 (0)