Skip to content

Commit 4b92f46

Browse files
committed
subsequent read test
1 parent 917df68 commit 4b92f46

File tree

1 file changed

+49
-0
lines changed

1 file changed

+49
-0
lines changed

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,55 @@ def test_max_records_reached_on_previous_partition(self) -> None:
145145
# called for the first partition read and not the second
146146
retriever.read_records.assert_called_once()
147147

148+
def test_record_counter_isolation_between_different_factories(self) -> None:
149+
"""Test that record counters are isolated between different DeclarativePartitionFactory instances."""
150+
151+
# Create mock records that exceed the limit
152+
records = [
153+
Record(data={"id": 1, "name": "Record1"}, stream_name="stream_name"),
154+
Record(data={"id": 2, "name": "Record2"}, stream_name="stream_name"),
155+
Record(
156+
data={"id": 3, "name": "Record3"}, stream_name="stream_name"
157+
), # Should be blocked by limit
158+
]
159+
160+
# Create first factory with record limit of 2
161+
retriever1 = self._mock_retriever(records)
162+
message_repository1 = Mock(spec=MessageRepository)
163+
factory1 = DeclarativePartitionFactory(
164+
_STREAM_NAME,
165+
_SCHEMA_LOADER,
166+
retriever1,
167+
message_repository1,
168+
max_records_limit=2,
169+
)
170+
171+
# First factory should read up to limit (2 records)
172+
partition1 = factory1.create(_A_STREAM_SLICE)
173+
first_factory_records = list(partition1.read())
174+
assert len(first_factory_records) == 2
175+
176+
# Create second factory with same limit - should be independent
177+
retriever2 = self._mock_retriever(records)
178+
message_repository2 = Mock(spec=MessageRepository)
179+
factory2 = DeclarativePartitionFactory(
180+
_STREAM_NAME,
181+
_SCHEMA_LOADER,
182+
retriever2,
183+
message_repository2,
184+
max_records_limit=2,
185+
)
186+
187+
# Second factory should also be able to read up to limit (2 records)
188+
# This would fail before the fix because record counter was global
189+
partition2 = factory2.create(_A_STREAM_SLICE)
190+
second_factory_records = list(partition2.read())
191+
assert len(second_factory_records) == 2
192+
193+
# Verify both retrievers were called (confirming isolation)
194+
retriever1.read_records.assert_called_once()
195+
retriever2.read_records.assert_called_once()
196+
148197
@staticmethod
149198
def _mock_retriever(read_return_value: List[StreamData]) -> Mock:
150199
retriever = Mock(spec=Retriever)

0 commit comments

Comments
 (0)