Skip to content

Commit d0e7c7d

Browse files
committed
make SimpleRetrieverTestReadDecorator when no read enabled mode required
1 parent 925dc88 commit d0e7c7d

File tree

3 files changed

+46
-51
lines changed

3 files changed

+46
-51
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3299,24 +3299,10 @@ def _get_url() -> str:
32993299
parameters=model.parameters or {},
33003300
)
33013301

3302-
if self._limit_slices_fetched or self._emit_connector_builder_messages:
3303-
return SimpleRetrieverTestReadDecorator(
3304-
name=name,
3305-
paginator=paginator,
3306-
primary_key=primary_key,
3307-
requester=requester,
3308-
record_selector=record_selector,
3309-
stream_slicer=stream_slicer,
3310-
request_option_provider=request_options_provider,
3311-
cursor=cursor,
3312-
config=config,
3313-
maximum_number_of_slices=self._limit_slices_fetched or 5,
3314-
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3315-
additional_query_properties=query_properties,
3316-
log_formatter=log_formatter,
3317-
parameters=model.parameters or {},
3318-
)
3319-
return SimpleRetriever(
3302+
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
3303+
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0
3304+
retriever_log_formatter = log_formatter if test_read_enabled else None
3305+
return SimpleRetrieverTestReadDecorator(
33203306
name=name,
33213307
paginator=paginator,
33223308
primary_key=primary_key,
@@ -3326,8 +3312,11 @@ def _get_url() -> str:
33263312
request_option_provider=request_options_provider,
33273313
cursor=cursor,
33283314
config=config,
3315+
maximum_number_of_slices=maximum_number_of_slices,
3316+
emit_connector_builder_messages=self._emit_connector_builder_messages,
33293317
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33303318
additional_query_properties=query_properties,
3319+
log_formatter=retriever_log_formatter,
33313320
parameters=model.parameters or {},
33323321
)
33333322

@@ -3441,28 +3430,19 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
34413430
if model.download_paginator
34423431
else NoPagination(parameters={})
34433432
)
3444-
maximum_number_of_slices = self._limit_slices_fetched or 5
3433+
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
3434+
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0
34453435

3446-
if self._limit_slices_fetched or self._emit_connector_builder_messages:
3447-
return SimpleRetrieverTestReadDecorator(
3448-
requester=download_requester,
3449-
record_selector=record_selector,
3450-
primary_key=None,
3451-
name=job_download_components_name,
3452-
paginator=paginator,
3453-
config=config,
3454-
parameters={},
3455-
maximum_number_of_slices=maximum_number_of_slices,
3456-
)
3457-
3458-
return SimpleRetriever(
3436+
return SimpleRetrieverTestReadDecorator(
34593437
requester=download_requester,
34603438
record_selector=record_selector,
34613439
primary_key=None,
34623440
name=job_download_components_name,
34633441
paginator=paginator,
34643442
config=config,
34653443
parameters={},
3444+
maximum_number_of_slices=maximum_number_of_slices,
3445+
emit_connector_builder_messages=self._emit_connector_builder_messages,
34663446
)
34673447

34683448
def _get_job_timeout() -> datetime.timedelta:

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -650,33 +650,47 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever):
650650
"""
651651
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
652652
slices that are queried throughout a read command.
653+
654+
maximum_number_of_slices must be provided when test read is enabled.
653655
"""
654656

655-
maximum_number_of_slices: int = 5
656-
657-
def __post_init__(self, options: Mapping[str, Any]) -> None:
658-
super().__post_init__(options)
659-
self.log_formatter = (
660-
(
661-
lambda response: format_http_message(
662-
response,
663-
f"Stream '{self.name}' request",
664-
f"Request performed in order to extract records for stream '{self.name}'",
665-
self.name,
657+
maximum_number_of_slices: int = 0
658+
emit_connector_builder_messages: bool = False
659+
660+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
661+
super().__post_init__(parameters)
662+
if self.test_read_enabled():
663+
self.log_formatter = (
664+
(
665+
lambda response: format_http_message(
666+
response,
667+
f"Stream '{self.name}' request",
668+
f"Request performed in order to extract records for stream '{self.name}'",
669+
self.name,
670+
)
666671
)
672+
if not self.log_formatter
673+
else self.log_formatter
667674
)
668-
if not self.log_formatter
669-
else self.log_formatter
670-
)
671675

672-
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
673-
raise ValueError(
674-
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
675-
)
676+
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
677+
raise ValueError(
678+
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
679+
)
680+
681+
def test_read_enabled(self) -> bool:
682+
"""
683+
Indicates whether the retriever is in test read mode.
684+
This is used to limit the number of slices processed during a test read.
685+
"""
686+
return bool(self.maximum_number_of_slices or self.emit_connector_builder_messages)
676687

677688
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
678689
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
679-
return islice(super().stream_slices(), self.maximum_number_of_slices)
690+
if not self.test_read_enabled():
691+
return super().stream_slices()
692+
else:
693+
return islice(super().stream_slices(), self.maximum_number_of_slices)
680694

681695

682696
@deprecated(

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,7 @@ def test_emit_log_request_response_messages(mocker):
915915
stream_slicer=SinglePartitionRouter(parameters={}),
916916
parameters={},
917917
config={},
918+
maximum_number_of_slices=1,
918919
)
919920

920921
retriever._fetch_next_page(

0 commit comments

Comments
 (0)