Skip to content

Commit 2b849f7

Browse files
author
maxime.c
committed
fix test_per_partition_cursor.py
1 parent eb635b1 commit 2b849f7

File tree

2 files changed

+4
-43
lines changed

2 files changed

+4
-43
lines changed

airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,10 @@ def set_initial_state(self, stream_state: StreamState) -> None:
146146
if "state" in stream_state:
147147
self._state_to_migrate_from = stream_state["state"]
148148

149-
# Set parent state for partition routers based on parent streams
150-
self._partition_router.set_initial_state(stream_state)
149+
# We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the
150+
# Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called.
151+
# We are still keeping this line as a comment to be explicit about the past behavior.
152+
# self._partition_router.set_initial_state(stream_state)
151153

152154
def observe(self, stream_slice: StreamSlice, record: Record) -> None:
153155
self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe(

unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -508,47 +508,6 @@ def test_get_request_body_json(
508508
cursor.get_request_body_json(stream_slice=stream_slice)
509509

510510

511-
def test_parent_state_is_set_for_per_partition_cursor(
512-
mocked_cursor_factory, mocked_partition_router
513-
):
514-
# Define the parent state to be used in the test
515-
parent_state = {"parent_cursor": "parent_state_value"}
516-
517-
# Mock the partition router to return a stream slice
518-
partition = StreamSlice(
519-
partition={"partition_field_1": "a value", "partition_field_2": "another value"},
520-
cursor_slice={},
521-
)
522-
mocked_partition_router.stream_slices.return_value = [partition]
523-
524-
# Mock the cursor factory to create cursors with specific states
525-
mocked_cursor_factory.create.side_effect = [
526-
MockedCursorBuilder()
527-
.with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}])
528-
.with_stream_state(CURSOR_STATE)
529-
.build(),
530-
]
531-
532-
# Mock the get_parent_state method to return the parent state
533-
mocked_partition_router.get_stream_state.return_value = parent_state
534-
535-
# Initialize the PerPartitionCursor with the mocked cursor factory and partition router
536-
cursor = PerPartitionCursor(mocked_cursor_factory, mocked_partition_router)
537-
538-
# Set the initial state, including the parent state
539-
initial_state = {
540-
"states": [{"partition": partition.partition, "cursor": CURSOR_STATE}],
541-
"parent_state": parent_state,
542-
}
543-
cursor.set_initial_state(initial_state)
544-
545-
# Verify that the parent state has been set correctly
546-
assert cursor.get_stream_state()["parent_state"] == parent_state
547-
548-
# Verify that set_parent_state was called on the partition router with the initial state
549-
mocked_partition_router.set_initial_state.assert_called_once_with(initial_state)
550-
551-
552511
def test_get_stream_state_includes_parent_state(mocked_cursor_factory, mocked_partition_router):
553512
# Define the parent state to be used in the test
554513
parent_state = {"parent_cursor": "parent_state_value"}

0 commit comments

Comments
 (0)