Skip to content

Commit fc4728c

Browse files
committed
Test fix for garbage collection issue
1 parent dd49c3d commit fc4728c

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -295,16 +295,20 @@ def _generate_slices_from_partition(
295295
):
296296
self._partition_parent_state_map[partition_key] = (deepcopy(parent_state), seq)
297297

298-
for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
299-
cursor.stream_slices(),
300-
lambda: None,
301-
):
302-
self._semaphore_per_partition[partition_key].release()
303-
if is_last_slice:
304-
self._partitions_done_generating_stream_slices.add(partition_key)
305-
yield StreamSlice(
306-
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
307-
)
298+
try:
299+
for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state(
300+
cursor.stream_slices(),
301+
lambda: None,
302+
):
303+
self._semaphore_per_partition[partition_key].release()
304+
if is_last_slice:
305+
self._partitions_done_generating_stream_slices.add(partition_key)
306+
yield StreamSlice(
307+
partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields
308+
)
309+
finally:
310+
del cursor
311+
del partition
308312

309313
def _ensure_partition_limit(self) -> None:
310314
"""
@@ -492,11 +496,10 @@ def _to_dict(self, partition_key: str) -> Mapping[str, Any]:
492496
def _create_cursor(
493497
self, cursor_state: Any, runtime_lookback_window: int = 0
494498
) -> ConcurrentCursor:
495-
cursor = self._cursor_factory.create(
499+
return self._cursor_factory.create(
496500
stream_state=deepcopy(cursor_state),
497501
runtime_lookback_window=timedelta(seconds=runtime_lookback_window),
498502
)
499-
return cursor
500503

501504
def should_be_synced(self, record: Record) -> bool:
502505
return self._get_cursor(record).should_be_synced(record)
@@ -511,8 +514,7 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
511514
raise ValueError(
512515
"Invalid state as stream slices that are emitted should refer to an existing cursor"
513516
)
514-
cursor = self._cursor_per_partition[partition_key]
515-
return cursor
517+
return self._cursor_per_partition[partition_key]
516518

517519
def limit_reached(self) -> bool:
518520
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

0 commit comments

Comments
 (0)