Skip to content

Commit 7fdc9a4

Browse files
committed
Fix log_formatter in factory
1 parent ec212db commit 7fdc9a4

File tree

2 files changed

+12
-47
lines changed

2 files changed

+12
-47
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@
528528
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
529529
KeysToSnakeCaseTransformation,
530530
)
531+
from airbyte_cdk.sources.http_logger import format_http_message
531532
from airbyte_cdk.sources.message import (
532533
InMemoryMessageRepository,
533534
LogAppenderMessageRepositoryDecorator,
@@ -2382,6 +2383,8 @@ def create_dynamic_schema_loader(
23822383
primary_key=None,
23832384
stream_slicer=combined_slicers,
23842385
transformations=[],
2386+
enable_logs=False,
2387+
use_cache=True,
23852388
)
23862389
schema_type_identifier = self._create_component_from_model(
23872390
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -2968,6 +2971,7 @@ def create_simple_retriever(
29682971
]
29692972
] = None,
29702973
use_cache: Optional[bool] = None,
2974+
enable_logs: bool = True,
29712975
**kwargs: Any,
29722976
) -> SimpleRetriever:
29732977
def _get_url() -> str:
@@ -3144,6 +3148,12 @@ def _get_url() -> str:
31443148
config=config,
31453149
maximum_number_of_slices=self._limit_slices_fetched or 5,
31463150
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3151+
log_formatter=(lambda response: format_http_message(
3152+
response,
3153+
f"Stream '{name}' request",
3154+
f"Request performed in order to extract records for stream '{name}'",
3155+
name,
3156+
)) if enable_logs else None,
31473157
parameters=model.parameters or {},
31483158
)
31493159
return SimpleRetriever(

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import (
1111
Any,
1212
Callable,
13-
Dict,
1413
Iterable,
1514
List,
1615
Mapping,
@@ -42,7 +41,6 @@
4241
from airbyte_cdk.sources.declarative.requesters.requester import Requester
4342
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
4443
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
45-
from airbyte_cdk.sources.http_logger import format_http_message
4644
from airbyte_cdk.sources.source import ExperimentalClassWarning
4745
from airbyte_cdk.sources.streams.core import StreamData
4846
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
@@ -93,6 +91,7 @@ class SimpleRetriever(Retriever):
9391
cursor: Optional[DeclarativeCursor] = None
9492
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
9593
additional_query_properties: Optional[QueryProperties] = None
94+
log_formatter: Optional[Callable[[requests.Response], Any]] = None
9695

9796
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9897
self._paginator = self.paginator or NoPagination(parameters=parameters)
@@ -353,6 +352,7 @@ def _fetch_next_page(
353352
stream_slice=stream_slice,
354353
next_page_token=next_page_token,
355354
),
355+
log_formatter=self.log_formatter,
356356
)
357357

358358
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
@@ -664,51 +664,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
664664
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
665665
return islice(super().stream_slices(), self.maximum_number_of_slices)
666666

667-
def _fetch_next_page(
668-
self,
669-
stream_state: Mapping[str, Any],
670-
stream_slice: StreamSlice,
671-
next_page_token: Optional[Mapping[str, Any]] = None,
672-
) -> Optional[requests.Response]:
673-
is_dynamic_schema_call = self.name == "dynamic_properties"
674-
return self.requester.send_request(
675-
path=self._paginator_path(
676-
next_page_token=next_page_token,
677-
stream_state=stream_state,
678-
stream_slice=stream_slice,
679-
),
680-
stream_state=stream_state,
681-
stream_slice=stream_slice,
682-
next_page_token=next_page_token,
683-
request_headers=self._request_headers(
684-
stream_state=stream_state,
685-
stream_slice=stream_slice,
686-
next_page_token=next_page_token,
687-
),
688-
request_params=self._request_params(
689-
stream_state=stream_state,
690-
stream_slice=stream_slice,
691-
next_page_token=next_page_token,
692-
),
693-
request_body_data=self._request_body_data(
694-
stream_state=stream_state,
695-
stream_slice=stream_slice,
696-
next_page_token=next_page_token,
697-
),
698-
request_body_json=self._request_body_json(
699-
stream_state=stream_state,
700-
stream_slice=stream_slice,
701-
next_page_token=next_page_token,
702-
),
703-
log_formatter=lambda response: format_http_message(
704-
response,
705-
f"Stream '{self.name}' request",
706-
f"Request performed in order to extract records for stream '{self.name}'",
707-
self.name,
708-
is_auxiliary=is_dynamic_schema_call, # Mark schema discovery calls as auxiliary for cleaner logs
709-
),
710-
)
711-
712667

713668
@deprecated(
714669
"This class is experimental. Use at your own risk.",

0 commit comments

Comments
 (0)