Skip to content

Commit eb612f9

Browse files
author
maxi297
committed
do not reset semaphore when duplicate partitions
1 parent d665ca0 commit eb612f9

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ConcurrentPerPartitionCursor(Cursor):
6565
_NO_CURSOR_STATE: Mapping[str, Any] = {}
6666
_GLOBAL_STATE_KEY = "state"
6767
_PERPARTITION_STATE_KEY = "states"
68+
_IS_PARTITION_DUPLICATION_LOGGED = False
6869
_KEY = 0
6970
_VALUE = 1
7071

@@ -279,7 +280,12 @@ def _generate_slices_from_partition(
279280
with self._lock:
280281
self._number_of_partitions += 1
281282
self._cursor_per_partition[partition_key] = cursor
282-
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
283+
284+
if not self._IS_PARTITION_DUPLICATION_LOGGED and partition_key in self._semaphore_per_partition:
285+
logger.warning(f"Partition duplication detected for stream {self._stream_name}")
286+
self._IS_PARTITION_DUPLICATION_LOGGED = True
287+
else:
288+
self._semaphore_per_partition[partition_key] = threading.Semaphore(0)
283289

284290
with self._lock:
285291
if (

0 commit comments

Comments
 (0)