Skip to content

Commit 772c77a

Browse files
committed
fix test to use PartitionReader where cursor close is now invoked
1 parent 78e0469 commit 772c77a

File tree

1 file changed

+22
-17
lines changed

1 file changed

+22
-17
lines changed

unit_tests/sources/streams/test_stream_read.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66
from copy import deepcopy
7+
from queue import Queue
78
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
89
from unittest.mock import Mock
910

@@ -589,7 +590,10 @@ def test_concurrent_incremental_read_two_slices():
589590
*records_partition_2,
590591
]
591592

592-
expected_state = _create_state_message(
593+
expected_state_1 = _create_state_message(
594+
"__mock_stream", {"1": {"created_at": slice_timestamp_1}}
595+
)
596+
expected_state_2 = _create_state_message(
593597
"__mock_stream",
594598
{"1": {"created_at": slice_timestamp_1}, "2": {"created_at": slice_timestamp_2}},
595599
)
@@ -617,26 +621,27 @@ def test_concurrent_incremental_read_two_slices():
617621
for record in expected_records:
618622
assert record in actual_records
619623

620-
# We need run on_record to update cursor with record cursor value
621-
for record in actual_records:
622-
list(
623-
handler.on_record(
624-
Record(
625-
data=record,
626-
stream_name="__mock_stream",
627-
)
628-
)
629-
)
624+
# We need to process partitions generated by a PartitionReader in order to trigger
625+
# the ConcurrentCursor.close_partition() flow and validate state is updated with
626+
# the observed record values
627+
partition_reader = PartitionReader(queue=Mock(spec=Queue))
628+
assert isinstance(stream, StreamFacade)
629+
abstract_stream = stream._abstract_stream
630+
for partition in abstract_stream.generate_partitions():
631+
partition_reader.process_partition(partition=partition, cursor=cursor)
630632

631633
assert len(actual_records) == len(expected_records)
632634

633-
# We don't have a real source that reads from the message_repository for state, so we read from the queue directly to verify
634-
# the cursor observed records correctly and updated partition states
635-
mock_partition = Mock()
636-
cursor.close_partition(mock_partition)
637635
actual_state = [state for state in message_repository.consume_queue()]
638-
assert len(actual_state) == 1
639-
assert actual_state[0] == expected_state
636+
assert len(actual_state) == 2
637+
assert (
638+
actual_state[0].state.stream.stream_state.__dict__
639+
== expected_state_1.state.stream.stream_state.__dict__
640+
)
641+
assert (
642+
actual_state[1].state.stream.stream_state.__dict__
643+
== expected_state_2.state.stream.stream_state.__dict__
644+
)
640645

641646

642647
def setup_stream_dependencies(configured_json_schema):

0 commit comments

Comments
 (0)