Skip to content

Commit 9ad1bc0

Browse files
lazebnyimaxi297octavia-squidington-iiimaxi297
authored
fix(connector-builder): fix dynamic stream read with dynamic schema (#534)
Co-authored-by: maxi297 <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
1 parent 13a38db commit 9ad1bc0

File tree

3 files changed

+32
-51
lines changed

3 files changed

+32
-51
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from isodate import parse_duration
2828
from pydantic.v1 import BaseModel
29+
from requests import Response
2930

3031
from airbyte_cdk.connector_builder.models import (
3132
LogMessage as ConnectorBuilderLogMessage,
@@ -529,6 +530,7 @@
529530
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
530531
KeysToSnakeCaseTransformation,
531532
)
533+
from airbyte_cdk.sources.http_logger import format_http_message
532534
from airbyte_cdk.sources.message import (
533535
InMemoryMessageRepository,
534536
LogAppenderMessageRepositoryDecorator,
@@ -2390,15 +2392,24 @@ def create_dynamic_schema_loader(
23902392
schema_transformations.append(
23912393
self._create_component_from_model(model=transformation_model, config=config)
23922394
)
2393-
2395+
name = "dynamic_properties"
23942396
retriever = self._create_component_from_model(
23952397
model=model.retriever,
23962398
config=config,
2397-
name="dynamic_properties",
2399+
name=name,
23982400
primary_key=None,
23992401
stream_slicer=combined_slicers,
24002402
transformations=[],
24012403
use_cache=True,
2404+
log_formatter=(
2405+
lambda response: format_http_message(
2406+
response,
2407+
f"Schema loader '{name}' request",
2408+
f"Request performed in order to extract schema.",
2409+
name,
2410+
is_auxiliary=True,
2411+
)
2412+
),
24022413
)
24032414
schema_type_identifier = self._create_component_from_model(
24042415
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -2985,6 +2996,7 @@ def create_simple_retriever(
29852996
]
29862997
] = None,
29872998
use_cache: Optional[bool] = None,
2999+
log_formatter: Optional[Callable[[Response], Any]] = None,
29883000
**kwargs: Any,
29893001
) -> SimpleRetriever:
29903002
def _get_url() -> str:
@@ -3161,6 +3173,7 @@ def _get_url() -> str:
31613173
config=config,
31623174
maximum_number_of_slices=self._limit_slices_fetched or 5,
31633175
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3176+
log_formatter=log_formatter,
31643177
parameters=model.parameters or {},
31653178
)
31663179
return SimpleRetriever(

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import (
1111
Any,
1212
Callable,
13-
Dict,
1413
Iterable,
1514
List,
1615
Mapping,
@@ -93,6 +92,7 @@ class SimpleRetriever(Retriever):
9392
cursor: Optional[DeclarativeCursor] = None
9493
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
9594
additional_query_properties: Optional[QueryProperties] = None
95+
log_formatter: Optional[Callable[[requests.Response], Any]] = None
9696

9797
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9898
self._paginator = self.paginator or NoPagination(parameters=parameters)
@@ -353,6 +353,7 @@ def _fetch_next_page(
353353
stream_slice=stream_slice,
354354
next_page_token=next_page_token,
355355
),
356+
log_formatter=self.log_formatter,
356357
)
357358

358359
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
@@ -655,6 +656,19 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever):
655656

656657
def __post_init__(self, options: Mapping[str, Any]) -> None:
657658
super().__post_init__(options)
659+
self.log_formatter = (
660+
(
661+
lambda response: format_http_message(
662+
response,
663+
f"Stream '{self.name}' request",
664+
f"Request performed in order to extract records for stream '{self.name}'",
665+
self.name,
666+
)
667+
)
668+
if not self.log_formatter
669+
else self.log_formatter
670+
)
671+
658672
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
659673
raise ValueError(
660674
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
@@ -664,49 +678,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
664678
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
665679
return islice(super().stream_slices(), self.maximum_number_of_slices)
666680

667-
def _fetch_next_page(
668-
self,
669-
stream_state: Mapping[str, Any],
670-
stream_slice: StreamSlice,
671-
next_page_token: Optional[Mapping[str, Any]] = None,
672-
) -> Optional[requests.Response]:
673-
return self.requester.send_request(
674-
path=self._paginator_path(
675-
next_page_token=next_page_token,
676-
stream_state=stream_state,
677-
stream_slice=stream_slice,
678-
),
679-
stream_state=stream_state,
680-
stream_slice=stream_slice,
681-
next_page_token=next_page_token,
682-
request_headers=self._request_headers(
683-
stream_state=stream_state,
684-
stream_slice=stream_slice,
685-
next_page_token=next_page_token,
686-
),
687-
request_params=self._request_params(
688-
stream_state=stream_state,
689-
stream_slice=stream_slice,
690-
next_page_token=next_page_token,
691-
),
692-
request_body_data=self._request_body_data(
693-
stream_state=stream_state,
694-
stream_slice=stream_slice,
695-
next_page_token=next_page_token,
696-
),
697-
request_body_json=self._request_body_json(
698-
stream_state=stream_state,
699-
stream_slice=stream_slice,
700-
next_page_token=next_page_token,
701-
),
702-
log_formatter=lambda response: format_http_message(
703-
response,
704-
f"Stream '{self.name}' request",
705-
f"Request performed in order to extract records for stream '{self.name}'",
706-
self.name,
707-
),
708-
)
709-
710681

711682
@deprecated(
712683
"This class is experimental. Use at your own risk.",

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -921,11 +921,8 @@ def test_emit_log_request_response_messages(mocker):
921921
stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={})
922922
)
923923

924-
assert requester.send_request.call_args_list[0][1]["log_formatter"] is not None
925-
assert (
926-
requester.send_request.call_args_list[0][1]["log_formatter"](response)
927-
== format_http_message_mock.return_value
928-
)
924+
assert retriever.log_formatter is not None
925+
assert retriever.log_formatter(response) == format_http_message_mock.return_value
929926

930927

931928
def test_retriever_last_page_size_for_page_increment():

0 commit comments

Comments
 (0)