Skip to content

Commit 1f1e6cc

Browse files
committed
remove checking for multiple chunks and handling records agnostic of how many chunks
1 parent 22607ef commit 1f1e6cc

File tree

1 file changed

+13
-29
lines changed

1 file changed

+13
-29
lines changed

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -450,17 +450,18 @@ def read_records(
450450
:return: The records read from the API source
451451
"""
452452

453-
if self.additional_query_properties:
454-
property_chunks = list(
453+
property_chunks = (
454+
list(
455455
self.additional_query_properties.get_request_property_chunks(
456456
stream_slice=stream_slice
457457
)
458458
)
459-
has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice)
460-
else:
461-
property_chunks = [[""]]
462-
has_multiple_chunks = False
459+
if self.additional_query_properties
460+
else []
461+
)
462+
records_without_merge_key = []
463463
merged_records: MutableMapping[str, Any] = defaultdict(dict)
464+
464465
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
465466
most_recent_record_from_slice = None
466467

@@ -491,13 +492,7 @@ def read_records(
491492
most_recent_record_from_slice, current_record, _slice
492493
)
493494

494-
# Record merging should only be done if there are multiple property chunks. Otherwise,
495-
# yielding immediately is more efficient so records can be emitted immediately
496-
if (
497-
has_multiple_chunks
498-
and self.additional_query_properties.property_chunking
499-
and current_record
500-
):
495+
if current_record and self.additional_query_properties.property_chunking:
501496
merge_key = (
502497
self.additional_query_properties.property_chunking.get_merge_key(
503498
current_record
@@ -506,17 +501,20 @@ def read_records(
506501
if merge_key:
507502
merged_records[merge_key].update(current_record)
508503
else:
509-
yield stream_data
504+
# We should still emit records even if the record did not have a merge key
505+
records_without_merge_key.append(current_record)
510506
else:
511507
yield stream_data
512508
if self.cursor:
513509
self.cursor.close_slice(_slice, most_recent_record_from_slice)
514510

515-
if has_multiple_chunks:
511+
if len(merged_records) > 0:
516512
yield from [
517513
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
518514
for merged_record in merged_records.values()
519515
]
516+
if len(records_without_merge_key) > 0:
517+
yield from records_without_merge_key
520518
else:
521519
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check
522520

@@ -596,20 +594,6 @@ def _extract_record(
596594
)
597595
return None
598596

599-
def _has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
600-
if not self.additional_query_properties:
601-
return False
602-
603-
property_chunks = iter(
604-
self.additional_query_properties.get_request_property_chunks(stream_slice=stream_slice)
605-
)
606-
try:
607-
next(property_chunks)
608-
next(property_chunks)
609-
return True
610-
except StopIteration:
611-
return False
612-
613597
# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
614598
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
615599
"""

0 commit comments

Comments
 (0)