Skip to content

Commit 3723a3f

Browse files
committed
Add implementation
1 parent 956ab46 commit 3723a3f

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@
435435
)
436436
from airbyte_cdk.sources.declarative.retrievers import (
437437
AsyncRetriever,
438+
LazySimpleRetriever,
438439
SimpleRetriever,
439440
SimpleRetrieverTestReadDecorator,
440441
)
@@ -2647,6 +2648,8 @@ def create_simple_retriever(
26472648
stop_condition_on_cursor: bool = False,
26482649
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
26492650
transformations: List[RecordTransformation],
2651+
incremental_sync: Optional[Union[IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel]] = None,
2652+
**kwargs: Any,
26502653
) -> SimpleRetriever:
26512654
decoder = (
26522655
self._create_component_from_model(model=model.decoder, config=config)

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from itertools import islice
99
from typing import Any, Callable, Iterable, List, Mapping, Optional, Set, Tuple, Union
1010

11+
import dpath
1112
import requests
1213

1314
from airbyte_cdk.models import AirbyteMessage
@@ -18,6 +19,7 @@
1819
from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
1920
SinglePartitionRouter,
2021
)
22+
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
2123
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
2224
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
2325
from airbyte_cdk.sources.declarative.requesters.request_options import (
@@ -32,6 +34,7 @@
3234
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
3335
from airbyte_cdk.utils.mapping_helpers import combine_mappings
3436

37+
3538
FULL_REFRESH_SYNC_COMPLETE_KEY = "__ab_full_refresh_sync_complete"
3639

3740

@@ -618,3 +621,111 @@ def _fetch_next_page(
618621
self.name,
619622
),
620623
)
624+
625+
626+
class SafeResponse(requests.Response):
627+
def __getattr__(self, name):
628+
return getattr(requests.Response, name, None)
629+
630+
@property
631+
def content(self):
632+
return super().content
633+
634+
@content.setter
635+
def content(self, value):
636+
self._content = value.encode() if isinstance(value, str) else value
637+
638+
639+
@dataclass
640+
class LazySimpleRetriever(SimpleRetriever):
641+
"""
642+
A retriever that supports lazy loading from parent streams.
643+
"""
644+
partition_router: SubstreamPartitionRouter = field(init=True, repr=False, default=None)
645+
lazy_read_pointer: Optional[List[InterpolatedString]] = None
646+
647+
def _read_pages(
648+
self,
649+
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
650+
stream_state: Mapping[str, Any],
651+
stream_slice: StreamSlice,
652+
) -> Iterable[Record]:
653+
parent_stream_config = self.partition_router.parent_stream_configs[-1]
654+
parent_stream = parent_stream_config.stream
655+
656+
for parent_record in parent_stream.read_only_records():
657+
parent_record, parent_partition = self.partition_router.process_parent_record(parent_record, parent_stream.name)
658+
if parent_record is None:
659+
continue
660+
661+
childs = self._extract_child_records(parent_record)
662+
response = self._create_response(childs)
663+
664+
yield from self._yield_records_with_pagination(
665+
response, records_generator_fn, stream_state, stream_slice, parent_record, parent_stream_config
666+
)
667+
668+
yield from []
669+
670+
def _extract_child_records(self, parent_record: Mapping) -> Mapping:
671+
"""Extract child records from a parent record based on lazy pointers."""
672+
if not self.lazy_read_pointer:
673+
return parent_record
674+
675+
path = [path.eval(self.config) for path in self.lazy_read_pointer]
676+
return dpath.values(parent_record, path) if "*" in path else dpath.get(parent_record, path, default=[])
677+
678+
def _create_response(self, data: Mapping) -> SafeResponse:
679+
"""Create a SafeResponse with the given data."""
680+
response = SafeResponse()
681+
response.content = json.dumps(data).encode("utf-8")
682+
response.status_code = 200
683+
return response
684+
685+
def _yield_records_with_pagination(
686+
self,
687+
response: requests.Response,
688+
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
689+
stream_state: Mapping[str, Any],
690+
stream_slice: StreamSlice,
691+
parent_record: Record,
692+
parent_stream_config: Any,
693+
) -> Iterable[Record]:
694+
"""Yield records, handling pagination if needed."""
695+
last_page_size, last_record = 0, None
696+
697+
for record in records_generator_fn(response):
698+
last_page_size += 1
699+
last_record = record
700+
yield record
701+
702+
next_page_token = self._next_page_token(response, last_page_size, last_record, None)
703+
if next_page_token:
704+
yield from self._paginate(next_page_token, records_generator_fn, stream_state, stream_slice, parent_record, parent_stream_config)
705+
706+
def _paginate(
707+
self,
708+
next_page_token: Any,
709+
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
710+
stream_state: Mapping[str, Any],
711+
stream_slice: StreamSlice,
712+
parent_record: Record,
713+
parent_stream_config: Any,
714+
) -> Iterable[Record]:
715+
"""Handle pagination by fetching subsequent pages."""
716+
partition_field = parent_stream_config.partition_field.eval(self.config)
717+
partition_value = dpath.get(parent_record, parent_stream_config.parent_key.eval(self.config))
718+
stream_slice = StreamSlice(partition={partition_field: partition_value, "parent_slice": {}}, cursor_slice=stream_slice.cursor_slice)
719+
720+
while next_page_token:
721+
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
722+
last_page_size, last_record = 0, None
723+
724+
for record in records_generator_fn(response):
725+
last_page_size += 1
726+
last_record = record
727+
yield record
728+
729+
last_page_token_value = next_page_token.get("next_page_token") if next_page_token else None
730+
next_page_token = self._next_page_token(response, last_page_size, last_record, last_page_token_value)
731+

0 commit comments

Comments
 (0)