Skip to content

Commit f19d048

Browse files
committed
Add extra_fields support
1 parent ed4ea74 commit f19d048

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ def _fetch_next_page(
358358
# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
359359
def _read_pages(
360360
self,
361-
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
361+
records_generator_fn: Callable[[Optional[requests.Response], Optional[StreamSlice]], Iterable[Record]],
362362
stream_state: Mapping[str, Any],
363363
stream_slice: StreamSlice,
364364
) -> Iterable[Record]:
@@ -372,7 +372,7 @@ def _read_pages(
372372

373373
last_page_size = 0
374374
last_record: Optional[Record] = None
375-
for record in records_generator_fn(response):
375+
for record in records_generator_fn(response, stream_slice=stream_slice): # type: ignore[call-arg] # only _parse_records expected as a func
376376
last_page_size += 1
377377
last_record = record
378378
yield record
@@ -397,7 +397,7 @@ def _read_pages(
397397

398398
def _read_single_page(
399399
self,
400-
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
400+
records_generator_fn: Callable[[Optional[requests.Response], Optional[StreamSlice]], Iterable[Record]],
401401
stream_state: Mapping[str, Any],
402402
stream_slice: StreamSlice,
403403
) -> Iterable[StreamData]:
@@ -412,7 +412,7 @@ def _read_single_page(
412412

413413
last_page_size = 0
414414
last_record: Optional[Record] = None
415-
for record in records_generator_fn(response):
415+
for record in records_generator_fn(response, stream_slice=stream_slice): # type: ignore[call-arg] # only _parse_records expected as a func
416416
last_page_size += 1
417417
last_record = record
418418
yield record
@@ -456,7 +456,6 @@ def read_records(
456456
record_generator = partial(
457457
self._parse_records,
458458
stream_state=self.state or {},
459-
stream_slice=_slice,
460459
records_schema=records_schema,
461460
)
462461

@@ -665,7 +664,7 @@ class LazySimpleRetriever(SimpleRetriever):
665664

666665
def _read_pages(
667666
self,
668-
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
667+
records_generator_fn: Callable[[Optional[requests.Response], Optional[StreamSlice]], Iterable[Record]],
669668
stream_state: Mapping[str, Any],
670669
stream_slice: StreamSlice,
671670
) -> Iterable[Record]:
@@ -682,6 +681,20 @@ def _read_pages(
682681
child_records = self._extract_child_records(parent_record)
683682
response = self._create_response(child_records)
684683

684+
if parent_stream_config.extra_fields:
685+
extra_fields = [
686+
[field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr]
687+
for field_path in parent_stream_config.extra_fields
688+
]
689+
690+
extracted_extra_fields = self.partition_router._extract_extra_fields(parent_record, extra_fields)
691+
692+
stream_slice = StreamSlice(
693+
partition=stream_slice.partition,
694+
cursor_slice=stream_slice.cursor_slice,
695+
extra_fields=extracted_extra_fields
696+
)
697+
685698
yield from self._yield_records_with_pagination(
686699
response,
687700
records_generator_fn,
@@ -717,7 +730,7 @@ def _create_response(self, data: Mapping[str, Any]) -> SafeResponse:
717730
def _yield_records_with_pagination(
718731
self,
719732
response: requests.Response,
720-
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
733+
records_generator_fn: Callable[[Optional[requests.Response], Optional[StreamSlice]], Iterable[Record]],
721734
stream_state: Mapping[str, Any],
722735
stream_slice: StreamSlice,
723736
parent_record: MutableMapping[str, Any],
@@ -726,7 +739,7 @@ def _yield_records_with_pagination(
726739
"""Yield records, handling pagination if needed."""
727740
last_page_size, last_record = 0, None
728741

729-
for record in records_generator_fn(response):
742+
for record in records_generator_fn(response, stream_slice=stream_slice): # type: ignore[call-arg] # only _parse_records expected as a func
730743
last_page_size += 1
731744
last_record = record
732745
yield record
@@ -745,7 +758,7 @@ def _yield_records_with_pagination(
745758
def _paginate(
746759
self,
747760
next_page_token: Any,
748-
records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
761+
records_generator_fn: Callable[[Optional[requests.Response], Optional[StreamSlice]], Iterable[Record]],
749762
stream_state: Mapping[str, Any],
750763
stream_slice: StreamSlice,
751764
parent_record: MutableMapping[str, Any],
@@ -767,7 +780,7 @@ def _paginate(
767780
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
768781
last_page_size, last_record = 0, None
769782

770-
for record in records_generator_fn(response):
783+
for record in records_generator_fn(response, stream_slice=stream_slice): # type: ignore[call-arg] # only _parse_records expected as a func
771784
last_page_size += 1
772785
last_record = record
773786
yield record

0 commit comments

Comments
 (0)