Skip to content

Commit 0629f06

Browse files
authored
Merge branch 'main' into tolik0/concurrent-perpartition-increase-throttle-to-600
2 parents 158259c + 9ad1bc0 commit 0629f06

File tree

7 files changed

+187
-165
lines changed

7 files changed

+187
-165
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from isodate import parse_duration
2828
from pydantic.v1 import BaseModel
29+
from requests import Response
2930

3031
from airbyte_cdk.connector_builder.models import (
3132
LogMessage as ConnectorBuilderLogMessage,
@@ -529,6 +530,7 @@
529530
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
530531
KeysToSnakeCaseTransformation,
531532
)
533+
from airbyte_cdk.sources.http_logger import format_http_message
532534
from airbyte_cdk.sources.message import (
533535
InMemoryMessageRepository,
534536
LogAppenderMessageRepositoryDecorator,
@@ -2390,15 +2392,24 @@ def create_dynamic_schema_loader(
23902392
schema_transformations.append(
23912393
self._create_component_from_model(model=transformation_model, config=config)
23922394
)
2393-
2395+
name = "dynamic_properties"
23942396
retriever = self._create_component_from_model(
23952397
model=model.retriever,
23962398
config=config,
2397-
name="dynamic_properties",
2399+
name=name,
23982400
primary_key=None,
23992401
stream_slicer=combined_slicers,
24002402
transformations=[],
24012403
use_cache=True,
2404+
log_formatter=(
2405+
lambda response: format_http_message(
2406+
response,
2407+
f"Schema loader '{name}' request",
2408+
f"Request performed in order to extract schema.",
2409+
name,
2410+
is_auxiliary=True,
2411+
)
2412+
),
24022413
)
24032414
schema_type_identifier = self._create_component_from_model(
24042415
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -2985,6 +2996,7 @@ def create_simple_retriever(
29852996
]
29862997
] = None,
29872998
use_cache: Optional[bool] = None,
2999+
log_formatter: Optional[Callable[[Response], Any]] = None,
29883000
**kwargs: Any,
29893001
) -> SimpleRetriever:
29903002
def _get_url() -> str:
@@ -3161,6 +3173,7 @@ def _get_url() -> str:
31613173
config=config,
31623174
maximum_number_of_slices=self._limit_slices_fetched or 5,
31633175
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3176+
log_formatter=log_formatter,
31643177
parameters=model.parameters or {},
31653178
)
31663179
return SimpleRetriever(

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,14 @@ def _get_polling_response_interpolation_context(self, job: AsyncJob) -> Dict[str
320320
return polling_response_context
321321

322322
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
323-
stream_slice = StreamSlice(
324-
partition={},
325-
cursor_slice={},
326-
extra_fields={
323+
return StreamSlice(
324+
partition=job.job_parameters().partition,
325+
cursor_slice=job.job_parameters().cursor_slice,
326+
extra_fields=dict(job.job_parameters().extra_fields)
327+
| {
327328
"creation_response": self._get_creation_response_interpolation_context(job),
328329
},
329330
)
330-
return stream_slice
331331

332332
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
333333
if not self.download_target_requester:

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1313
from airbyte_cdk.sources.types import Config, StreamSlice
14+
from airbyte_cdk.utils.mapping_helpers import get_interpolation_context
1415

1516

1617
@dataclass
@@ -52,8 +53,8 @@ def eval_request_inputs(
5253
:param next_page_token: The pagination token
5354
:return: The request inputs to set on an outgoing HTTP request
5455
"""
55-
kwargs = {
56-
"stream_slice": stream_slice,
57-
"next_page_token": next_page_token,
58-
}
56+
kwargs = get_interpolation_context(
57+
stream_slice=stream_slice,
58+
next_page_token=next_page_token,
59+
)
5960
return self._interpolator.eval(self.config, **kwargs) # type: ignore # self._interpolator is always initialized with a value and will not be None

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
99
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1010
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
11+
from airbyte_cdk.utils.mapping_helpers import get_interpolation_context
1112

1213

1314
@dataclass
@@ -51,10 +52,10 @@ def eval_request_inputs(
5152
:param valid_value_types: A tuple of types that the interpolator should allow
5253
:return: The request inputs to set on an outgoing HTTP request
5354
"""
54-
kwargs = {
55-
"stream_slice": stream_slice,
56-
"next_page_token": next_page_token,
57-
}
55+
kwargs = get_interpolation_context(
56+
stream_slice=stream_slice,
57+
next_page_token=next_page_token,
58+
)
5859
interpolated_value = self._interpolator.eval( # type: ignore # self._interpolator is always initialized with a value and will not be None
5960
self.config,
6061
valid_key_types=valid_key_types,

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import (
1111
Any,
1212
Callable,
13-
Dict,
1413
Iterable,
1514
List,
1615
Mapping,
@@ -93,6 +92,7 @@ class SimpleRetriever(Retriever):
9392
cursor: Optional[DeclarativeCursor] = None
9493
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
9594
additional_query_properties: Optional[QueryProperties] = None
95+
log_formatter: Optional[Callable[[requests.Response], Any]] = None
9696

9797
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9898
self._paginator = self.paginator or NoPagination(parameters=parameters)
@@ -353,6 +353,7 @@ def _fetch_next_page(
353353
stream_slice=stream_slice,
354354
next_page_token=next_page_token,
355355
),
356+
log_formatter=self.log_formatter,
356357
)
357358

358359
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
@@ -655,6 +656,19 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever):
655656

656657
def __post_init__(self, options: Mapping[str, Any]) -> None:
657658
super().__post_init__(options)
659+
self.log_formatter = (
660+
(
661+
lambda response: format_http_message(
662+
response,
663+
f"Stream '{self.name}' request",
664+
f"Request performed in order to extract records for stream '{self.name}'",
665+
self.name,
666+
)
667+
)
668+
if not self.log_formatter
669+
else self.log_formatter
670+
)
671+
658672
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
659673
raise ValueError(
660674
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
@@ -664,49 +678,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
664678
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
665679
return islice(super().stream_slices(), self.maximum_number_of_slices)
666680

667-
def _fetch_next_page(
668-
self,
669-
stream_state: Mapping[str, Any],
670-
stream_slice: StreamSlice,
671-
next_page_token: Optional[Mapping[str, Any]] = None,
672-
) -> Optional[requests.Response]:
673-
return self.requester.send_request(
674-
path=self._paginator_path(
675-
next_page_token=next_page_token,
676-
stream_state=stream_state,
677-
stream_slice=stream_slice,
678-
),
679-
stream_state=stream_state,
680-
stream_slice=stream_slice,
681-
next_page_token=next_page_token,
682-
request_headers=self._request_headers(
683-
stream_state=stream_state,
684-
stream_slice=stream_slice,
685-
next_page_token=next_page_token,
686-
),
687-
request_params=self._request_params(
688-
stream_state=stream_state,
689-
stream_slice=stream_slice,
690-
next_page_token=next_page_token,
691-
),
692-
request_body_data=self._request_body_data(
693-
stream_state=stream_state,
694-
stream_slice=stream_slice,
695-
next_page_token=next_page_token,
696-
),
697-
request_body_json=self._request_body_json(
698-
stream_state=stream_state,
699-
stream_slice=stream_slice,
700-
next_page_token=next_page_token,
701-
),
702-
log_formatter=lambda response: format_http_message(
703-
response,
704-
f"Stream '{self.name}' request",
705-
f"Request performed in order to extract records for stream '{self.name}'",
706-
self.name,
707-
),
708-
)
709-
710681

711682
@deprecated(
712683
"This class is experimental. Use at your own risk.",

0 commit comments

Comments
 (0)