Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 85 additions & 101 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,65 @@ def _read_pages(
{"next_page_token": initial_token} if initial_token is not None else None
)
while not pagination_complete:
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
property_chunks: List[List[str]] = (
list(
self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice
)
)
if self.additional_query_properties
else [
[]
] # A single empty property chunk represents the case where property chunking is not configured
)

merged_records: MutableMapping[str, Any] = defaultdict(dict)
last_page_size = 0
last_record: Optional[Record] = None
for record in records_generator_fn(response):
last_page_size += 1
last_record = record
yield record
response: Optional[requests.Response] = None
for properties in property_chunks:
if len(properties) > 0:
stream_slice = StreamSlice(
partition=stream_slice.partition or {},
cursor_slice=stream_slice.cursor_slice or {},
extra_fields={"query_properties": properties},
)

response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
for current_record in records_generator_fn(response):
if (
current_record
and self.additional_query_properties
and self.additional_query_properties.property_chunking
):
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
)
)
if merge_key:
merged_records[merge_key].update(current_record)
else:
# We should still emit records even if the record did not have a merge key
last_page_size += 1
last_record = current_record
yield current_record
else:
last_page_size += 1
last_record = current_record
yield current_record

if (
self.additional_query_properties
and self.additional_query_properties.property_chunking
):
for merged_record in merged_records.values():
record = Record(
data=merged_record, stream_name=self.name, associated_slice=stream_slice
)
last_page_size += 1
last_record = record
yield record

if not response:
pagination_complete = True
Expand Down Expand Up @@ -449,110 +500,43 @@ def read_records(
:param stream_slice: The stream slice to read data for
:return: The records read from the API source
"""

property_chunks = (
list(
self.additional_query_properties.get_request_property_chunks(
stream_slice=stream_slice
)
)
if self.additional_query_properties
else []
)
records_without_merge_key = []
merged_records: MutableMapping[str, Any] = defaultdict(dict)

_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
stream_state=self.state or {},
records_schema=records_schema,
)

if self.additional_query_properties:
for properties in property_chunks:
_slice = StreamSlice(
partition=_slice.partition or {},
cursor_slice=_slice.cursor_slice or {},
extra_fields={"query_properties": properties},
) # None-check

record_generator = partial(
self._parse_records,
stream_slice=_slice,
stream_state=self.state or {},
records_schema=records_schema,
)
if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
stream_state = self.state

for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)
# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
# fetch more records. The platform deletes stream state for full refresh streams before starting a
# new job, so we don't need to worry about this value existing for the initial attempt
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
return

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield from self._read_single_page(record_generator, stream_state, _slice)
else:
for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield stream_data

if current_record and self.additional_query_properties.property_chunking:
merge_key = (
self.additional_query_properties.property_chunking.get_merge_key(
current_record
)
)
if merge_key:
merged_records[merge_key].update(current_record)
else:
# We should still emit records even if the record did not have a merge key
records_without_merge_key.append(current_record)
else:
yield stream_data
if self.cursor:
self.cursor.close_slice(_slice, most_recent_record_from_slice)

if len(merged_records) > 0:
yield from [
Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice)
for merged_record in merged_records.values()
]
if len(records_without_merge_key) > 0:
yield from records_without_merge_key
else:
_slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check

most_recent_record_from_slice = None
record_generator = partial(
self._parse_records,
stream_slice=stream_slice,
stream_state=self.state or {},
records_schema=records_schema,
)

if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
stream_state = self.state

# Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
# fetch more records. The platform deletes stream state for full refresh streams before starting a
# new job, so we don't need to worry about this value existing for the initial attempt
if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
return

yield from self._read_single_page(record_generator, stream_state, _slice)
else:
for stream_data in self._read_pages(record_generator, self.state, _slice):
current_record = self._extract_record(stream_data, _slice)
if self.cursor and current_record:
self.cursor.observe(_slice, current_record)

# Latest record read, not necessarily within slice boundaries.
# TODO Remove once all custom components implement `observe` method.
# https://github.com/airbytehq/airbyte-internal-issues/issues/6955
most_recent_record_from_slice = self._get_most_recent_record(
most_recent_record_from_slice, current_record, _slice
)
yield stream_data

if self.cursor:
self.cursor.close_slice(_slice, most_recent_record_from_slice)
return
return

def _get_most_recent_record(
self,
Expand Down
Loading