Skip to content

Commit 4d2bb96

Browse files
committed
add additional check to skip calling read_records() if max records is already reached
1 parent 77d2b59 commit 4d2bb96

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,12 @@ def __init__(
6969
self._hash = SliceHasher.hash(self._stream_name, self._stream_slice)
7070

7171
def read(self) -> Iterable[Record]:
72+
if self._max_records_limit:
73+
global total_record_counter
74+
if total_record_counter >= self._max_records_limit:
75+
return
7276
for stream_data in self._retriever.read_records(self._json_schema, self._stream_slice):
7377
if self._max_records_limit:
74-
global total_record_counter
7578
if total_record_counter >= self._max_records_limit:
7679
break
7780

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ def test_max_records_reached_on_previous_partition(self) -> None:
142142
second_partition_records = list(partition.read())
143143
assert len(second_partition_records) == 0
144144

145+
# The DeclarativePartition exits out of the read before attempting to read_records() if
146+
# the max_records_limit has already been reached. So we only expect to see read_records()
147+
# called for the first partition read and not the second
148+
retriever.read_records.assert_called_once()
149+
145150
@staticmethod
146151
def _mock_retriever(read_return_value: List[StreamData]) -> Mock:
147152
retriever = Mock(spec=Retriever)

0 commit comments

Comments
 (0)