Skip to content

Commit 0f99c9e

Browse files
author
maxime.c
committed
emit updated parent before last record, not after
1 parent ace8739 commit 0f99c9e

File tree

2 files changed

+4
-0
lines changed

2 files changed

+4
-0
lines changed

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ def stream_slices(self) -> Iterable[StreamSlice]:
249249

250250
if is_last_record_in_slice:
251251
parent_stream.cursor.close_partition(partition)
252+
if is_last_slice:
253+
parent_stream.cursor.ensure_at_least_one_state_emitted()
252254

253255
yield StreamSlice(
254256
partition={

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,8 @@ def run_incremental_parent_state_test(
11191119

11201120
# Assert that the number of intermediate states is as expected
11211121
assert len(intermediate_states) - 1 == num_intermediate_states
1122+
# Assert that ensure_at_least_one_state_emitted is called before yielding the last record from the last slice
1123+
assert intermediate_states[-1][0].stream.stream_state.__dict__["parent_state"] == intermediate_states[-2][0].stream.stream_state.__dict__["parent_state"]
11221124

11231125
# For each intermediate state, perform another read starting from that state
11241126
for state, records_before_state in intermediate_states[:-1]:

0 commit comments

Comments
 (0)