Skip to content

Commit 7a8cb92

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add lazy read to simple retriver (#418)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent b69bdbd commit 7a8cb92

File tree

8 files changed

+597
-4
lines changed

8 files changed

+597
-4
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2890,6 +2890,15 @@ definitions:
28902890
type:
28912891
type: string
28922892
enum: [ParentStreamConfig]
2893+
lazy_read_pointer:
2894+
title: Lazy Read Pointer
2895+
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
2896+
type: array
2897+
default: [ ]
2898+
items:
2899+
- type: string
2900+
interpolation_context:
2901+
- config
28932902
parent_key:
28942903
title: Parent Key
28952904
description: The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,6 +2224,11 @@ class DynamicSchemaLoader(BaseModel):
22242224

22252225
class ParentStreamConfig(BaseModel):
22262226
type: Literal["ParentStreamConfig"]
2227+
lazy_read_pointer: Optional[List[str]] = Field(
2228+
[],
2229+
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
2230+
title="Lazy Read Pointer",
2231+
)
22272232
parent_key: str = Field(
22282233
...,
22292234
description="The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@
438438
)
439439
from airbyte_cdk.sources.declarative.retrievers import (
440440
AsyncRetriever,
441+
LazySimpleRetriever,
441442
SimpleRetriever,
442443
SimpleRetrieverTestReadDecorator,
443444
)
@@ -1750,6 +1751,7 @@ def create_declarative_stream(
17501751
transformations.append(
17511752
self._create_component_from_model(model=transformation_model, config=config)
17521753
)
1754+
17531755
retriever = self._create_component_from_model(
17541756
model=model.retriever,
17551757
config=config,
@@ -1760,6 +1762,7 @@ def create_declarative_stream(
17601762
stop_condition_on_cursor=stop_condition_on_cursor,
17611763
client_side_incremental_sync=client_side_incremental_sync,
17621764
transformations=transformations,
1765+
incremental_sync=model.incremental_sync,
17631766
)
17641767
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
17651768

@@ -1905,7 +1908,9 @@ def _merge_stream_slicers(
19051908
) -> Optional[StreamSlicer]:
19061909
retriever_model = model.retriever
19071910

1908-
stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config)
1911+
stream_slicer = self._build_stream_slicer_from_partition_router(
1912+
retriever_model, config, stream_name=model.name
1913+
)
19091914

19101915
if retriever_model.type == "AsyncRetriever":
19111916
is_not_datetime_cursor = (
@@ -2530,6 +2535,16 @@ def create_parent_stream_config(
25302535
if model.request_option
25312536
else None
25322537
)
2538+
2539+
if model.lazy_read_pointer and any("*" in pointer for pointer in model.lazy_read_pointer):
2540+
raise ValueError(
2541+
"The '*' wildcard in 'lazy_read_pointer' is not supported — only direct paths are allowed."
2542+
)
2543+
2544+
model_lazy_read_pointer: List[Union[InterpolatedString, str]] = (
2545+
[x for x in model.lazy_read_pointer] if model.lazy_read_pointer else []
2546+
)
2547+
25332548
return ParentStreamConfig(
25342549
parent_key=model.parent_key,
25352550
request_option=request_option,
@@ -2539,6 +2554,7 @@ def create_parent_stream_config(
25392554
incremental_dependency=model.incremental_dependency or False,
25402555
parameters=model.parameters or {},
25412556
extra_fields=model.extra_fields,
2557+
lazy_read_pointer=model_lazy_read_pointer,
25422558
)
25432559

25442560
@staticmethod
@@ -2681,6 +2697,12 @@ def create_simple_retriever(
26812697
stop_condition_on_cursor: bool = False,
26822698
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
26832699
transformations: List[RecordTransformation],
2700+
incremental_sync: Optional[
2701+
Union[
2702+
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
2703+
]
2704+
] = None,
2705+
**kwargs: Any,
26842706
) -> SimpleRetriever:
26852707
decoder = (
26862708
self._create_component_from_model(model=model.decoder, config=config)
@@ -2738,6 +2760,45 @@ def create_simple_retriever(
27382760
model.ignore_stream_slicer_parameters_on_paginated_requests or False
27392761
)
27402762

2763+
if (
2764+
model.partition_router
2765+
and isinstance(model.partition_router, SubstreamPartitionRouterModel)
2766+
and not bool(self._connector_state_manager.get_stream_state(name, None))
2767+
and any(
2768+
parent_stream_config.lazy_read_pointer
2769+
for parent_stream_config in model.partition_router.parent_stream_configs
2770+
)
2771+
):
2772+
if incremental_sync:
2773+
if incremental_sync.type != "DatetimeBasedCursor":
2774+
raise ValueError(
2775+
f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}."
2776+
)
2777+
2778+
elif incremental_sync.step or incremental_sync.cursor_granularity:
2779+
raise ValueError(
2780+
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
2781+
)
2782+
2783+
if model.decoder and model.decoder.type != "JsonDecoder":
2784+
raise ValueError(
2785+
f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}."
2786+
)
2787+
2788+
return LazySimpleRetriever(
2789+
name=name,
2790+
paginator=paginator,
2791+
primary_key=primary_key,
2792+
requester=requester,
2793+
record_selector=record_selector,
2794+
stream_slicer=stream_slicer,
2795+
request_option_provider=request_options_provider,
2796+
cursor=cursor,
2797+
config=config,
2798+
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
2799+
parameters=model.parameters or {},
2800+
)
2801+
27412802
if self._limit_slices_fetched or self._emit_connector_builder_messages:
27422803
return SimpleRetrieverTestReadDecorator(
27432804
name=name,

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
5+
46
import copy
7+
import json
58
import logging
69
from dataclasses import InitVar, dataclass
710
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
811

912
import dpath
13+
import requests
1014

1115
from airbyte_cdk.models import AirbyteMessage
1216
from airbyte_cdk.models import Type as MessageType
@@ -46,6 +50,7 @@ class ParentStreamConfig:
4650
)
4751
request_option: Optional[RequestOption] = None
4852
incremental_dependency: bool = False
53+
lazy_read_pointer: Optional[List[Union[InterpolatedString, str]]] = None
4954

5055
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5156
self.parent_key = InterpolatedString.create(self.parent_key, parameters=parameters)
@@ -59,6 +64,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5964
for key_path in self.extra_fields
6065
]
6166

67+
self.lazy_read_pointer = (
68+
[
69+
InterpolatedString.create(path, parameters=parameters)
70+
if isinstance(path, str)
71+
else path
72+
for path in self.lazy_read_pointer
73+
]
74+
if self.lazy_read_pointer
75+
else None
76+
)
77+
6278

6379
@dataclass
6480
class SubstreamPartitionRouter(PartitionRouter):
@@ -196,6 +212,15 @@ def stream_slices(self) -> Iterable[StreamSlice]:
196212
# Add extra fields
197213
extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)
198214

215+
if parent_stream_config.lazy_read_pointer:
216+
extracted_extra_fields = {
217+
"child_response": self._extract_child_response(
218+
parent_record,
219+
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
220+
),
221+
**extracted_extra_fields,
222+
}
223+
199224
yield StreamSlice(
200225
partition={
201226
partition_field: partition_value,
@@ -205,6 +230,21 @@ def stream_slices(self) -> Iterable[StreamSlice]:
205230
extra_fields=extracted_extra_fields,
206231
)
207232

233+
def _extract_child_response(
234+
self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
235+
) -> requests.Response:
236+
"""Extract child records from a parent record based on lazy pointers."""
237+
238+
def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
239+
"""Create a SafeResponse with the given data."""
240+
response = SafeResponse()
241+
response.content = json.dumps(data).encode("utf-8")
242+
response.status_code = 200
243+
return response
244+
245+
path = [path.eval(self.config) for path in pointer]
246+
return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure
247+
208248
def _extract_extra_fields(
209249
self,
210250
parent_record: Mapping[str, Any] | AirbyteMessage,
@@ -376,3 +416,22 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
376416
@property
377417
def logger(self) -> logging.Logger:
378418
return logging.getLogger("airbyte.SubstreamPartitionRouter")
419+
420+
421+
class SafeResponse(requests.Response):
422+
"""
423+
A subclass of requests.Response that acts as an interface to migrate parsed child records
424+
into a response object. This allows seamless interaction with child records as if they
425+
were original response, ensuring compatibility with methods that expect requests.Response data type.
426+
"""
427+
428+
def __getattr__(self, name: str) -> Any:
429+
return getattr(requests.Response, name, None)
430+
431+
@property
432+
def content(self) -> Optional[bytes]:
433+
return super().content
434+
435+
@content.setter
436+
def content(self, value: Union[str, bytes]) -> None:
437+
self._content = value.encode() if isinstance(value, str) else value

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ def next_page_token(
7171
last_page_token_value: Optional[Any] = None,
7272
) -> Optional[Any]:
7373
decoded_response = next(self.decoder.decode(response))
74-
7574
# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
7675
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
7776
headers: Dict[str, Any] = dict(response.headers)

airbyte_cdk/sources/declarative/retrievers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
66
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
77
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
8+
LazySimpleRetriever,
89
SimpleRetriever,
910
SimpleRetrieverTestReadDecorator,
1011
)
@@ -14,4 +15,5 @@
1415
"SimpleRetriever",
1516
"SimpleRetrieverTestReadDecorator",
1617
"AsyncRetriever",
18+
"LazySimpleRetriever",
1719
]

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,20 @@
66
from dataclasses import InitVar, dataclass, field
77
from functools import partial
88
from itertools import islice
9-
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union
9+
from typing import (
10+
Any,
11+
Callable,
12+
Iterable,
13+
List,
14+
Mapping,
15+
Optional,
16+
Set,
17+
Tuple,
18+
Union,
19+
)
1020

1121
import requests
22+
from typing_extensions import deprecated
1223

1324
from airbyte_cdk.models import AirbyteMessage
1425
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
@@ -28,6 +39,7 @@
2839
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
2940
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
3041
from airbyte_cdk.sources.http_logger import format_http_message
42+
from airbyte_cdk.sources.source import ExperimentalClassWarning
3143
from airbyte_cdk.sources.streams.core import StreamData
3244
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
3345
from airbyte_cdk.utils.mapping_helpers import combine_mappings
@@ -438,8 +450,8 @@ def read_records(
438450
most_recent_record_from_slice = None
439451
record_generator = partial(
440452
self._parse_records,
453+
stream_slice=stream_slice,
441454
stream_state=self.state or {},
442-
stream_slice=_slice,
443455
records_schema=records_schema,
444456
)
445457

@@ -618,3 +630,73 @@ def _fetch_next_page(
618630
self.name,
619631
),
620632
)
633+
634+
635+
@deprecated(
636+
"This class is experimental. Use at your own risk.",
637+
category=ExperimentalClassWarning,
638+
)
639+
@dataclass
640+
class LazySimpleRetriever(SimpleRetriever):
641+
"""
642+
A retriever that supports lazy loading from parent streams.
643+
"""
644+
645+
def _read_pages(
646+
self,
647+
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
648+
stream_state: Mapping[str, Any],
649+
stream_slice: StreamSlice,
650+
) -> Iterable[Record]:
651+
response = stream_slice.extra_fields["child_response"]
652+
if response:
653+
last_page_size, last_record = 0, None
654+
for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func
655+
last_page_size += 1
656+
last_record = record
657+
yield record
658+
659+
next_page_token = self._next_page_token(response, last_page_size, last_record, None)
660+
if next_page_token:
661+
yield from self._paginate(
662+
next_page_token,
663+
records_generator_fn,
664+
stream_state,
665+
stream_slice,
666+
)
667+
668+
yield from []
669+
else:
670+
yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
671+
672+
def _paginate(
673+
self,
674+
next_page_token: Any,
675+
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
676+
stream_state: Mapping[str, Any],
677+
stream_slice: StreamSlice,
678+
) -> Iterable[Record]:
679+
"""Handle pagination by fetching subsequent pages."""
680+
pagination_complete = False
681+
682+
while not pagination_complete:
683+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
684+
last_page_size, last_record = 0, None
685+
686+
for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func
687+
last_page_size += 1
688+
last_record = record
689+
yield record
690+
691+
if not response:
692+
pagination_complete = True
693+
else:
694+
last_page_token_value = (
695+
next_page_token.get("next_page_token") if next_page_token else None
696+
)
697+
next_page_token = self._next_page_token(
698+
response, last_page_size, last_record, last_page_token_value
699+
)
700+
701+
if not next_page_token:
702+
pagination_complete = True

0 commit comments

Comments
 (0)