Skip to content

Commit ba33479

Browse files
updated SimpleRetriever with ComponentConstructor
1 parent bc4a36f commit ba33479

File tree

1 file changed

+338
-1
lines changed

1 file changed

+338
-1
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 338 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
Set,
1919
Tuple,
2020
Union,
21+
Dict,
22+
cast,
2123
)
2224

2325
import requests
@@ -46,12 +48,49 @@
4648
from airbyte_cdk.sources.streams.core import StreamData
4749
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
4850
from airbyte_cdk.utils.mapping_helpers import combine_mappings
51+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
52+
SimpleRetriever as SimpleRetrieverModel,
53+
)
54+
from airbyte_cdk.sources.declarative.parsers.component_constructor import (
55+
ComponentConstructor,
56+
AdditionalFlags,
57+
)
58+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
59+
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
60+
from pydantic import BaseModel
61+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader
62+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
63+
IncrementingCountCursor as IncrementingCountCursorModel,
64+
)
65+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
66+
DatetimeBasedCursor as DatetimeBasedCursorModel,
67+
)
68+
69+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
70+
CustomIncrementalSync as CustomIncrementalSyncModel,
71+
)
72+
from requests import Response
73+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
74+
HttpRequester as HttpRequesterModel,
75+
)
76+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
77+
CustomRequester as CustomRequesterModel,
78+
)
79+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
80+
QueryProperties as QueryPropertiesModel,
81+
)
82+
from airbyte_cdk.sources.declarative.requesters.query_properties import QueryProperties
83+
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
84+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
85+
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
86+
)
87+
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
4988

5089
FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"
5190

5291

5392
@dataclass
54-
class SimpleRetriever(Retriever):
93+
class SimpleRetriever(Retriever, ComponentConstructor[SimpleRetrieverModel]):
5594
"""
5695
Retrieves records by synchronously sending requests to fetch records.
5796
@@ -93,6 +132,304 @@ class SimpleRetriever(Retriever):
93132
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
94133
additional_query_properties: Optional[QueryProperties] = None
95134
log_formatter: Optional[Callable[[requests.Response], Any]] = None
135+
_should_use_lazy_simple_retriever: bool = False
136+
137+
@classmethod
138+
def _validate_if_lazy_simple_retriever_should_be_applied(
139+
cls,
140+
name: str,
141+
model: SimpleRetrieverModel,
142+
additional_flags: AdditionalFlags,
143+
incremental_sync: Optional[
144+
Union[
145+
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
146+
]
147+
] = None,
148+
) -> bool:
149+
if (
150+
model.partition_router
151+
and isinstance(model.partition_router, SubstreamPartitionRouterModel)
152+
and not bool(additional_flags.connector_state_manager.get_stream_state(name, None))
153+
and any(
154+
parent_stream_config.lazy_read_pointer
155+
for parent_stream_config in model.partition_router.parent_stream_configs
156+
)
157+
):
158+
if incremental_sync:
159+
if incremental_sync.type != "DatetimeBasedCursor":
160+
raise ValueError(
161+
f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}."
162+
)
163+
164+
elif incremental_sync.step or incremental_sync.cursor_granularity:
165+
raise ValueError(
166+
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
167+
)
168+
169+
if model.decoder and model.decoder.type != "JsonDecoder":
170+
raise ValueError(
171+
f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}."
172+
)
173+
cls._should_use_lazy_simple_retriever = True
174+
return True
175+
176+
return False
177+
178+
@classmethod
179+
def resolve_dependencies(
180+
cls,
181+
model: SimpleRetrieverModel,
182+
config: Config,
183+
dependency_constructor: Callable[..., Any],
184+
additional_flags: AdditionalFlags,
185+
*,
186+
name: Optional[str] = None,
187+
primary_key: Optional[Union[str, List[str], List[List[str]]]] = None,
188+
stream_slicer: Optional[StreamSlicer] = None,
189+
request_options_provider: Optional[RequestOptionsProvider] = None,
190+
stop_condition_on_cursor: bool = False,
191+
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
192+
transformations: Optional[List[RecordTransformation]] = None,
193+
file_uploader: Optional[DefaultFileUploader] = None,
194+
incremental_sync: Optional[
195+
Union[
196+
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
197+
]
198+
] = None,
199+
use_cache: Optional[bool] = None,
200+
log_formatter: Optional[Callable[[Response], Any]] = None,
201+
**kwargs: Any,
202+
) -> Mapping[str, Any]:
203+
if not name:
204+
raise ValueError(f"name argument is required to instance a {cls.__name__}")
205+
206+
def _get_url() -> str:
207+
"""
208+
Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever.
209+
This is needed because the URL is not set until the requester is created.
210+
"""
211+
212+
_url: str = (
213+
model.requester.url
214+
if hasattr(model.requester, "url") and model.requester.url is not None
215+
else requester.get_url()
216+
)
217+
_url_base: str = (
218+
model.requester.url_base
219+
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
220+
else requester.get_url_base()
221+
)
222+
223+
return _url or _url_base
224+
225+
def query_properties_in_request_parameters(
226+
requester: Union[HttpRequesterModel, CustomRequesterModel],
227+
) -> bool:
228+
if not hasattr(requester, "request_parameters"):
229+
return False
230+
request_parameters = requester.request_parameters
231+
if request_parameters and isinstance(request_parameters, Mapping):
232+
for request_parameter in request_parameters.values():
233+
if isinstance(request_parameter, QueryPropertiesModel):
234+
return True
235+
return False
236+
237+
def _get_log_formatter(
238+
log_formatter: Callable[[Response], Any] | None,
239+
name: str,
240+
additional_flags: AdditionalFlags,
241+
) -> Callable[[Response], Any] | None:
242+
if additional_flags.should_limit_slices_fetched:
243+
return (
244+
(
245+
lambda response: format_http_message(
246+
response,
247+
f"Stream '{name}' request",
248+
f"Request performed in order to extract records for stream '{name}'",
249+
name,
250+
)
251+
)
252+
if not log_formatter
253+
else log_formatter
254+
)
255+
return None
256+
257+
decoder = (
258+
dependency_constructor(model=model.decoder, config=config)
259+
if model.decoder
260+
else JsonDecoder(parameters={})
261+
)
262+
record_selector = dependency_constructor(
263+
model=model.record_selector,
264+
name=name,
265+
config=config,
266+
decoder=decoder,
267+
transformations=transformations,
268+
client_side_incremental_sync=client_side_incremental_sync,
269+
file_uploader=file_uploader,
270+
)
271+
272+
query_properties: Optional[QueryProperties] = None
273+
query_properties_key: Optional[str] = None
274+
if query_properties_in_request_parameters(model.requester):
275+
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
276+
# places instead of default to request_parameters which isn't clearly documented
277+
if (
278+
hasattr(model.requester, "fetch_properties_from_endpoint")
279+
and model.requester.fetch_properties_from_endpoint
280+
):
281+
raise ValueError(
282+
f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters"
283+
)
284+
285+
query_properties_definitions = []
286+
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using query_properties_in_request_parameters()
287+
if isinstance(request_parameter, QueryPropertiesModel):
288+
query_properties_key = key
289+
query_properties_definitions.append(request_parameter)
290+
291+
if len(query_properties_definitions) > 1:
292+
raise ValueError(
293+
f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages"
294+
)
295+
296+
if len(query_properties_definitions) == 1:
297+
query_properties = dependency_constructor(
298+
model=query_properties_definitions[0], config=config
299+
)
300+
elif (
301+
hasattr(model.requester, "fetch_properties_from_endpoint")
302+
and model.requester.fetch_properties_from_endpoint
303+
):
304+
# todo: Deprecate this condition once dependent connectors migrate to query_properties
305+
query_properties_definition = QueryPropertiesModel(
306+
type="QueryProperties",
307+
property_list=model.requester.fetch_properties_from_endpoint,
308+
always_include_properties=None,
309+
property_chunking=None,
310+
) # type: ignore # $parameters has a default value
311+
312+
query_properties = dependency_constructor(
313+
model=query_properties_definition,
314+
config=config,
315+
dependency_constructor=dependency_constructor,
316+
additional_flags=additional_flags,
317+
**kwargs,
318+
)
319+
320+
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
321+
query_properties = dependency_constructor(
322+
model=model.requester.query_properties,
323+
config=config,
324+
dependency_constructor=dependency_constructor,
325+
additional_flags=additional_flags,
326+
**kwargs,
327+
)
328+
329+
requester = dependency_constructor(
330+
model=model.requester,
331+
decoder=decoder,
332+
name=name,
333+
query_properties_key=query_properties_key,
334+
use_cache=use_cache,
335+
config=config,
336+
)
337+
338+
# Define cursor only if per partition or common incremental support is needed
339+
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
340+
341+
if (
342+
not isinstance(stream_slicer, DatetimeBasedCursor)
343+
or type(stream_slicer) is not DatetimeBasedCursor
344+
):
345+
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
346+
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
347+
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
348+
# request_options_provider
349+
request_options_provider = stream_slicer or DefaultRequestOptionsProvider(parameters={})
350+
elif not request_options_provider:
351+
request_options_provider = DefaultRequestOptionsProvider(parameters={})
352+
353+
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
354+
if additional_flags.should_limit_slices_fetched:
355+
stream_slicer = cast(
356+
StreamSlicer,
357+
StreamSlicerTestReadDecorator(
358+
wrapped_slicer=stream_slicer,
359+
maximum_number_of_slices=additional_flags.limit_slices_fetched or 5,
360+
),
361+
)
362+
363+
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
364+
paginator = (
365+
dependency_constructor(
366+
model=model.paginator,
367+
config=config,
368+
url_base=_get_url(),
369+
extractor_model=model.record_selector.extractor,
370+
decoder=decoder,
371+
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
372+
)
373+
if model.paginator
374+
else NoPagination(parameters={})
375+
)
376+
377+
ignore_stream_slicer_parameters_on_paginated_requests = (
378+
model.ignore_stream_slicer_parameters_on_paginated_requests or False
379+
)
380+
381+
resolved_dependencies = {
382+
"name": name,
383+
"paginator": paginator,
384+
"primary_key": primary_key,
385+
"requester": requester,
386+
"record_selector": record_selector,
387+
"stream_slicer": stream_slicer,
388+
"request_option_provider": request_options_provider,
389+
"cursor": cursor,
390+
"config": config,
391+
"ignore_stream_slicer_parameters_on_paginated_requests": ignore_stream_slicer_parameters_on_paginated_requests,
392+
"parameters": model.parameters or {},
393+
}
394+
395+
if cls._validate_if_lazy_simple_retriever_should_be_applied(
396+
name, model, additional_flags, incremental_sync
397+
):
398+
return resolved_dependencies
399+
400+
resolved_dependencies.update(
401+
{
402+
"additional_query_properties": query_properties,
403+
"log_formatter": _get_log_formatter(log_formatter, name, additional_flags),
404+
}
405+
)
406+
return resolved_dependencies
407+
408+
@classmethod
409+
def build(
410+
cls,
411+
model: SimpleRetrieverModel,
412+
config: Config,
413+
dependency_constructor: Callable[..., Any],
414+
additional_flags: AdditionalFlags,
415+
**kwargs: Any,
416+
) -> "SimpleRetriever":
417+
"""
418+
Builds up the Component and it's component-specific dependencies.
419+
Order of operations:
420+
- build the dependencies first
421+
- build the component with the resolved dependencies
422+
"""
423+
resolved_dependencies: Mapping[str, Any] = cls.resolve_dependencies(
424+
model=model,
425+
config=config,
426+
dependency_constructor=dependency_constructor,
427+
additional_flags=additional_flags,
428+
**kwargs,
429+
)
430+
if cls._should_use_lazy_simple_retriever:
431+
LazySimpleRetriever(**resolved_dependencies)
432+
return SimpleRetriever(**resolved_dependencies)
96433

97434
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
98435
self._paginator = self.paginator or NoPagination(parameters=parameters)

0 commit comments

Comments
 (0)