Skip to content

Commit 37af9e6

Browse files
committed
Move log_formatter as argument
1 parent ec212db commit 37af9e6

File tree

2 files changed

+18
-47
lines changed

2 files changed

+18
-47
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@
528528
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
529529
KeysToSnakeCaseTransformation,
530530
)
531+
from airbyte_cdk.sources.http_logger import format_http_message
531532
from airbyte_cdk.sources.message import (
532533
InMemoryMessageRepository,
533534
LogAppenderMessageRepositoryDecorator,
@@ -2382,6 +2383,7 @@ def create_dynamic_schema_loader(
23822383
primary_key=None,
23832384
stream_slicer=combined_slicers,
23842385
transformations=[],
2386+
enable_logs=False,
23852387
)
23862388
schema_type_identifier = self._create_component_from_model(
23872389
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -2968,6 +2970,7 @@ def create_simple_retriever(
29682970
]
29692971
] = None,
29702972
use_cache: Optional[bool] = None,
2973+
enable_logs: bool = True,
29712974
**kwargs: Any,
29722975
) -> SimpleRetriever:
29732976
def _get_url() -> str:
@@ -3132,6 +3135,13 @@ def _get_url() -> str:
31323135
)
31333136

31343137
if self._limit_slices_fetched or self._emit_connector_builder_messages:
3138+
log_formatter = lambda response: format_http_message(
3139+
response,
3140+
f"Stream '{self.name}' request",
3141+
f"Request performed in order to extract records for stream '{self.name}'",
3142+
self.name,
3143+
) if enable_logs else None
3144+
31353145
return SimpleRetrieverTestReadDecorator(
31363146
name=name,
31373147
paginator=paginator,
@@ -3144,6 +3154,12 @@ def _get_url() -> str:
31443154
config=config,
31453155
maximum_number_of_slices=self._limit_slices_fetched or 5,
31463156
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3157+
log_formatter=lambda response: format_http_message(
3158+
response,
3159+
f"Stream '{name}' request",
3160+
f"Request performed in order to extract records for stream '{name}'",
3161+
name,
3162+
) if enable_logs else None,
31473163
parameters=model.parameters or {},
31483164
)
31493165
return SimpleRetriever(

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import (
1111
Any,
1212
Callable,
13-
Dict,
1413
Iterable,
1514
List,
1615
Mapping,
@@ -42,7 +41,6 @@
4241
from airbyte_cdk.sources.declarative.requesters.requester import Requester
4342
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
4443
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
45-
from airbyte_cdk.sources.http_logger import format_http_message
4644
from airbyte_cdk.sources.source import ExperimentalClassWarning
4745
from airbyte_cdk.sources.streams.core import StreamData
4846
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
@@ -93,6 +91,7 @@ class SimpleRetriever(Retriever):
9391
cursor: Optional[DeclarativeCursor] = None
9492
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
9593
additional_query_properties: Optional[QueryProperties] = None
94+
log_formatter: Optional[Callable[[requests.Response], Any]] = None
9695

9796
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9897
self._paginator = self.paginator or NoPagination(parameters=parameters)
@@ -353,6 +352,7 @@ def _fetch_next_page(
353352
stream_slice=stream_slice,
354353
next_page_token=next_page_token,
355354
),
355+
log_formatter=self.log_formatter,
356356
)
357357

358358
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
@@ -664,51 +664,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
664664
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
665665
return islice(super().stream_slices(), self.maximum_number_of_slices)
666666

667-
def _fetch_next_page(
668-
self,
669-
stream_state: Mapping[str, Any],
670-
stream_slice: StreamSlice,
671-
next_page_token: Optional[Mapping[str, Any]] = None,
672-
) -> Optional[requests.Response]:
673-
is_dynamic_schema_call = self.name == "dynamic_properties"
674-
return self.requester.send_request(
675-
path=self._paginator_path(
676-
next_page_token=next_page_token,
677-
stream_state=stream_state,
678-
stream_slice=stream_slice,
679-
),
680-
stream_state=stream_state,
681-
stream_slice=stream_slice,
682-
next_page_token=next_page_token,
683-
request_headers=self._request_headers(
684-
stream_state=stream_state,
685-
stream_slice=stream_slice,
686-
next_page_token=next_page_token,
687-
),
688-
request_params=self._request_params(
689-
stream_state=stream_state,
690-
stream_slice=stream_slice,
691-
next_page_token=next_page_token,
692-
),
693-
request_body_data=self._request_body_data(
694-
stream_state=stream_state,
695-
stream_slice=stream_slice,
696-
next_page_token=next_page_token,
697-
),
698-
request_body_json=self._request_body_json(
699-
stream_state=stream_state,
700-
stream_slice=stream_slice,
701-
next_page_token=next_page_token,
702-
),
703-
log_formatter=lambda response: format_http_message(
704-
response,
705-
f"Stream '{self.name}' request",
706-
f"Request performed in order to extract records for stream '{self.name}'",
707-
self.name,
708-
is_auxiliary=is_dynamic_schema_call, # Mark schema discovery calls as auxiliary for cleaner logs
709-
),
710-
)
711-
712667

713668
@deprecated(
714669
"This class is experimental. Use at your own risk.",

0 commit comments

Comments
 (0)