Skip to content

Commit 2580057

Browse files
committed
add max records per-partition
1 parent 781606f commit 2580057

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_source.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def create(
4545
message_repository: MessageRepository,
4646
queue: Optional[Queue[QueueItem]] = None,
4747
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
48+
max_records_per_partition: Optional[int] = None,
4849
) -> "ConcurrentSource":
4950
is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1
5051
too_many_generator = (
@@ -67,6 +68,7 @@ def create(
6768
message_repository=message_repository,
6869
initial_number_partitions_to_generate=initial_number_of_partitions_to_generate,
6970
timeout_seconds=timeout_seconds,
71+
max_records_per_partition=max_records_per_partition,
7072
)
7173

7274
def __init__(
@@ -78,6 +80,7 @@ def __init__(
7880
message_repository: MessageRepository = InMemoryMessageRepository(),
7981
initial_number_partitions_to_generate: int = 1,
8082
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
83+
max_records_per_partition: Optional[int] = None,
8184
) -> None:
8285
"""
8386
:param threadpool: The threadpool to submit tasks to
@@ -93,6 +96,7 @@ def __init__(
9396
self._message_repository = message_repository
9497
self._initial_number_partitions_to_generate = initial_number_partitions_to_generate
9598
self._timeout_seconds = timeout_seconds
99+
self._max_records_per_partition = max_records_per_partition
96100

97101
# We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
98102
# threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
@@ -115,6 +119,7 @@ def read(
115119
PartitionReader(
116120
self._queue,
117121
PartitionLogger(self._slice_logger, self._logger, self._message_repository),
122+
self._max_records_per_partition,
118123
),
119124
)
120125

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def __init__(
170170
slice_logger=self._slice_logger,
171171
queue=queue,
172172
message_repository=self.message_repository,
173+
max_records_per_partition=limits.max_records if limits else None,
173174
)
174175

175176
# TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ class PartitionReader:
5050
_IS_SUCCESSFUL = True
5151

5252
def __init__(
53-
self, queue: Queue[QueueItem], partition_logger: Optional[PartitionLogger] = None
53+
self,
54+
queue: Queue[QueueItem],
55+
partition_logger: Optional[PartitionLogger] = None,
56+
max_records_per_partition: Optional[int] = None,
5457
) -> None:
5558
"""
5659
:param queue: The queue to put the records in.
5760
"""
5861
self._queue = queue
5962
self._partition_logger = partition_logger
63+
self._max_records_per_partition = max_records_per_partition
6064

6165
def process_partition(self, partition: Partition, cursor: Cursor) -> None:
6266
"""
@@ -74,9 +78,18 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None:
7478
if self._partition_logger:
7579
self._partition_logger.log(partition)
7680

81+
record_count = 0
7782
for record in partition.read():
7883
self._queue.put(record)
7984
cursor.observe(record)
85+
record_count += 1
86+
if (
87+
self._max_records_per_partition
88+
and record_count >= self._max_records_per_partition
89+
):
90+
# We stop processing a partition after exceeding the max_records for Connector
91+
# Builder test reads. The record limit only applies to an individual partition
92+
break
8093
cursor.close_partition(partition)
8194
self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL))
8295
except Exception as e:

0 commit comments

Comments
 (0)