Skip to content

Commit 7b4964e

Browse files
committed
Move acquiring the semaphore
1 parent 6498528 commit 7b4964e

File tree

1 file changed

+1
-2
lines changed

1 file changed

+1
-2
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def close_partition(self, partition: Partition) -> None:
146146

147147
partition_key = self._to_partition_key(stream_slice.partition)
148148
with self._lock:
149+
self._semaphore_per_partition[partition_key].acquire()
149150
if not self._use_global_cursor:
150151
self._cursor_per_partition[partition_key].close_partition(partition=partition)
151152
cursor = self._cursor_per_partition[partition_key]
@@ -156,8 +157,6 @@ def close_partition(self, partition: Partition) -> None:
156157
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
157158
self._emit_state_message()
158159

159-
self._semaphore_per_partition[partition_key].acquire()
160-
161160
def ensure_at_least_one_state_emitted(self) -> None:
162161
"""
163162
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be

0 commit comments

Comments
 (0)