Skip to content

Commit 8617cc8

Browse files
committed
Merge branch 'tolik0/refactor-concurrent-global-cursor' into tolik0/concurrent-perpartition-add-parent-state-updates
2 parents d3e7fe2 + 7b4964e commit 8617cc8

File tree

1 file changed

+3
-4
lines changed

1 file changed

+3
-4
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,21 +150,20 @@ def close_partition(self, partition: Partition) -> None:
150150

151151
partition_key = self._to_partition_key(stream_slice.partition)
152152
with self._lock:
153+
self._semaphore_per_partition[partition_key].acquire()
153154
if not self._use_global_cursor:
154155
self._cursor_per_partition[partition_key].close_partition(partition=partition)
155156
cursor = self._cursor_per_partition[partition_key]
156157
if (
157-
partition_key in self._finished_partitions
158-
and self._semaphore_per_partition[partition_key]._value == 0
158+
partition_key in self._finished_partitions
159+
and self._semaphore_per_partition[partition_key]._value == 0
159160
):
160161
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
161162

162163
self._check_and_update_parent_state()
163164

164165
self._emit_state_message()
165166

166-
self._semaphore_per_partition[partition_key].acquire()
167-
168167
def _check_and_update_parent_state(self) -> None:
169168
"""
170169
If all slices for the earliest partitions are closed, pop them from the left

0 commit comments

Comments
 (0)