@@ -367,14 +367,66 @@ def _read_pages(
367367 {"next_page_token" : initial_token } if initial_token is not None else None
368368 )
369369 while not pagination_complete :
370- response = self ._fetch_next_page (stream_state , stream_slice , next_page_token )
370+ property_chunks = (
371+ list (
372+ self .additional_query_properties .get_request_property_chunks (
373+ stream_slice = stream_slice
374+ )
375+ )
376+ if self .additional_query_properties
377+ else [
378+ None
379+ ] # A single None property chunk represents the case where property chunking is not configured
380+ )
371381
382+ records_without_merge_key = []
383+ merged_records : MutableMapping [str , Any ] = defaultdict (dict )
372384 last_page_size = 0
373385 last_record : Optional [Record ] = None
374- for record in records_generator_fn (response ):
375- last_page_size += 1
376- last_record = record
377- yield record
386+ response : Optional [requests .Response ] = None
387+ for properties in property_chunks :
388+ if properties :
389+ stream_slice = StreamSlice (
390+ partition = stream_slice .partition or {},
391+ cursor_slice = stream_slice .cursor_slice or {},
392+ extra_fields = {"query_properties" : properties },
393+ )
394+
395+ response = self ._fetch_next_page (stream_state , stream_slice , next_page_token )
396+ for current_record in records_generator_fn (response ):
397+ if (
398+ current_record
399+ and self .additional_query_properties
400+ and self .additional_query_properties .property_chunking
401+ ):
402+ merge_key = (
403+ self .additional_query_properties .property_chunking .get_merge_key (
404+ current_record
405+ )
406+ )
407+ if merge_key :
408+ merged_records [merge_key ].update (current_record )
409+ else :
410+ # We should still emit records even if the record did not have a merge key
411+ last_page_size += 1
412+ last_record = current_record
413+ yield current_record
414+ else :
415+ last_page_size += 1
416+ last_record = current_record
417+ yield current_record
418+
419+ if (
420+ self .additional_query_properties
421+ and self .additional_query_properties .property_chunking
422+ ):
423+ for merged_record in merged_records .values ():
424+ record = Record (
425+ data = merged_record , stream_name = self .name , associated_slice = stream_slice
426+ )
427+ last_page_size += 1
428+ last_record = record
429+ yield record
378430
379431 if not response :
380432 pagination_complete = True
@@ -449,110 +501,43 @@ def read_records(
449501 :param stream_slice: The stream slice to read data for
450502 :return: The records read from the API source
451503 """
452-
453- property_chunks = (
454- list (
455- self .additional_query_properties .get_request_property_chunks (
456- stream_slice = stream_slice
457- )
458- )
459- if self .additional_query_properties
460- else []
461- )
462- records_without_merge_key = []
463- merged_records : MutableMapping [str , Any ] = defaultdict (dict )
464-
465504 _slice = stream_slice or StreamSlice (partition = {}, cursor_slice = {}) # None-check
505+
466506 most_recent_record_from_slice = None
507+ record_generator = partial (
508+ self ._parse_records ,
509+ stream_slice = stream_slice ,
510+ stream_state = self .state or {},
511+ records_schema = records_schema ,
512+ )
467513
468- if self .additional_query_properties :
469- for properties in property_chunks :
470- _slice = StreamSlice (
471- partition = _slice .partition or {},
472- cursor_slice = _slice .cursor_slice or {},
473- extra_fields = {"query_properties" : properties },
474- ) # None-check
475-
476- record_generator = partial (
477- self ._parse_records ,
478- stream_slice = _slice ,
479- stream_state = self .state or {},
480- records_schema = records_schema ,
481- )
514+ if self .cursor and isinstance (self .cursor , ResumableFullRefreshCursor ):
515+ stream_state = self .state
482516
483- for stream_data in self ._read_pages (record_generator , self .state , _slice ):
484- current_record = self ._extract_record (stream_data , _slice )
485- if self .cursor and current_record :
486- self .cursor .observe (_slice , current_record )
517+ # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
518+ # fetch more records. The platform deletes stream state for full refresh streams before starting a
519+ # new job, so we don't need to worry about this value existing for the initial attempt
520+ if stream_state .get (FULL_REFRESH_SYNC_COMPLETE_KEY ):
521+ return
487522
488- # Latest record read, not necessarily within slice boundaries.
489- # TODO Remove once all custom components implement `observe` method.
490- # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
491- most_recent_record_from_slice = self ._get_most_recent_record (
492- most_recent_record_from_slice , current_record , _slice
493- )
523+ yield from self ._read_single_page (record_generator , stream_state , _slice )
524+ else :
525+ for stream_data in self ._read_pages (record_generator , self .state , _slice ):
526+ current_record = self ._extract_record (stream_data , _slice )
527+ if self .cursor and current_record :
528+ self .cursor .observe (_slice , current_record )
529+
530+ # Latest record read, not necessarily within slice boundaries.
531+ # TODO Remove once all custom components implement `observe` method.
532+ # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
533+ most_recent_record_from_slice = self ._get_most_recent_record (
534+ most_recent_record_from_slice , current_record , _slice
535+ )
536+ yield stream_data
494537
495- if current_record and self .additional_query_properties .property_chunking :
496- merge_key = (
497- self .additional_query_properties .property_chunking .get_merge_key (
498- current_record
499- )
500- )
501- if merge_key :
502- merged_records [merge_key ].update (current_record )
503- else :
504- # We should still emit records even if the record did not have a merge key
505- records_without_merge_key .append (current_record )
506- else :
507- yield stream_data
508538 if self .cursor :
509539 self .cursor .close_slice (_slice , most_recent_record_from_slice )
510-
511- if len (merged_records ) > 0 :
512- yield from [
513- Record (data = merged_record , stream_name = self .name , associated_slice = stream_slice )
514- for merged_record in merged_records .values ()
515- ]
516- if len (records_without_merge_key ) > 0 :
517- yield from records_without_merge_key
518- else :
519- _slice = stream_slice or StreamSlice (partition = {}, cursor_slice = {}) # None-check
520-
521- most_recent_record_from_slice = None
522- record_generator = partial (
523- self ._parse_records ,
524- stream_slice = stream_slice ,
525- stream_state = self .state or {},
526- records_schema = records_schema ,
527- )
528-
529- if self .cursor and isinstance (self .cursor , ResumableFullRefreshCursor ):
530- stream_state = self .state
531-
532- # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
533- # fetch more records. The platform deletes stream state for full refresh streams before starting a
534- # new job, so we don't need to worry about this value existing for the initial attempt
535- if stream_state .get (FULL_REFRESH_SYNC_COMPLETE_KEY ):
536- return
537-
538- yield from self ._read_single_page (record_generator , stream_state , _slice )
539- else :
540- for stream_data in self ._read_pages (record_generator , self .state , _slice ):
541- current_record = self ._extract_record (stream_data , _slice )
542- if self .cursor and current_record :
543- self .cursor .observe (_slice , current_record )
544-
545- # Latest record read, not necessarily within slice boundaries.
546- # TODO Remove once all custom components implement `observe` method.
547- # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
548- most_recent_record_from_slice = self ._get_most_recent_record (
549- most_recent_record_from_slice , current_record , _slice
550- )
551- yield stream_data
552-
553- if self .cursor :
554- self .cursor .close_slice (_slice , most_recent_record_from_slice )
555- return
540+ return
556541
557542 def _get_most_recent_record (
558543 self ,
0 commit comments