Skip to content

Commit 85d041b

Browse files
committed
Force global state for GroupingPartitionRouter
1 parent e7bb9c0 commit 85d041b

File tree

4 files changed

+77
-18
lines changed

4 files changed

+77
-18
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2894,7 +2894,7 @@ definitions:
28942894
title: Lazy Read Pointer
28952895
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
28962896
type: array
2897-
default: [ ]
2897+
default: []
28982898
items:
28992899
- type: string
29002900
interpolation_context:
@@ -3199,7 +3199,7 @@ definitions:
31993199
properties:
32003200
type:
32013201
type: string
3202-
enum: [ StateDelegatingStream ]
3202+
enum: [StateDelegatingStream]
32033203
name:
32043204
title: Name
32053205
description: The stream name.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def __init__(
7979
connector_state_manager: ConnectorStateManager,
8080
connector_state_converter: AbstractStreamStateConverter,
8181
cursor_field: CursorField,
82+
use_global_cursor: bool = False,
8283
) -> None:
8384
self._global_cursor: Optional[StreamState] = {}
8485
self._stream_name = stream_name
@@ -106,7 +107,7 @@ def __init__(
106107
self._lookback_window: int = 0
107108
self._parent_state: Optional[StreamState] = None
108109
self._number_of_partitions: int = 0
109-
self._use_global_cursor: bool = False
110+
self._use_global_cursor: bool = use_global_cursor
110111
self._partition_serializer = PerPartitionKeySerializer()
111112
# Track the last time a state message was emitted
112113
self._last_emission_time: float = 0.0

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
13601360
)
13611361
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
13621362

1363+
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1364+
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
1365+
13631366
# Return the concurrent cursor and state converter
13641367
return ConcurrentPerPartitionCursor(
13651368
cursor_factory=cursor_factory,
@@ -1371,6 +1374,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
13711374
connector_state_manager=state_manager,
13721375
connector_state_converter=connector_state_converter,
13731376
cursor_field=cursor_field,
1377+
use_global_cursor=use_global_cursor,
13741378
)
13751379

13761380
@staticmethod

unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -312,21 +312,6 @@ def test_set_initial_state_delegation(mock_config, mock_underlying_router):
312312
mock_underlying_router.set_initial_state.assert_called_once_with(mock_state)
313313

314314

315-
def test_get_stream_state_delegation(mock_config, mock_underlying_router):
316-
"""Test that get_stream_state delegates to the underlying router."""
317-
router = GroupingPartitionRouter(
318-
group_size=2,
319-
underlying_partition_router=mock_underlying_router,
320-
config=mock_config,
321-
)
322-
mock_state = {"state_key": "state_value"}
323-
mock_underlying_router.get_stream_state = MagicMock(return_value=mock_state)
324-
325-
state = router.get_stream_state()
326-
assert state == mock_state
327-
mock_underlying_router.get_stream_state.assert_called_once()
328-
329-
330315
def test_stream_slices_extra_fields_varied(mock_config):
331316
"""Test grouping with varied extra fields across partitions."""
332317
parent_stream = MockStream(
@@ -458,3 +443,72 @@ def test_get_request_params_default(mock_config, mock_underlying_router):
458443
)
459444
)
460445
assert params == {}
446+
447+
448+
def test_stream_slices_resume_from_state(mock_config, mock_underlying_router):
449+
"""Test that stream_slices resumes correctly from a previous state."""
450+
451+
# Simulate underlying router state handling
452+
class MockPartitionRouter:
453+
def __init__(self):
454+
self.slices = [
455+
StreamSlice(
456+
partition={"board_ids": i},
457+
cursor_slice={},
458+
extra_fields={"name": f"Board {i}", "owner": f"User{i}"},
459+
)
460+
for i in range(5)
461+
]
462+
self.state = {"last_board_id": 0} # Initial state
463+
464+
def set_initial_state(self, state):
465+
self.state = state
466+
467+
def get_stream_state(self):
468+
return self.state
469+
470+
def stream_slices(self):
471+
last_board_id = self.state.get("last_board_id", -1)
472+
for slice in self.slices:
473+
board_id = slice.partition["board_ids"]
474+
if board_id <= last_board_id:
475+
continue
476+
self.state = {"last_board_id": board_id}
477+
yield slice
478+
479+
underlying_router = MockPartitionRouter()
480+
router = GroupingPartitionRouter(
481+
group_size=2,
482+
underlying_partition_router=underlying_router,
483+
config=mock_config,
484+
deduplicate=True,
485+
)
486+
487+
# First sync: process first two slices
488+
router.set_initial_state({"last_board_id": 0})
489+
slices_iter = router.stream_slices()
490+
first_batch = next(slices_iter)
491+
assert first_batch == StreamSlice(
492+
partition={"board_ids": [1, 2]},
493+
cursor_slice={},
494+
extra_fields={"name": ["Board 1", "Board 2"], "owner": ["User1", "User2"]},
495+
)
496+
state_after_first = router.get_stream_state()
497+
assert state_after_first == {"last_board_id": 2}, "State should reflect last processed board_id"
498+
499+
# Simulate a new sync resuming from the previous state
500+
new_router = GroupingPartitionRouter(
501+
group_size=2,
502+
underlying_partition_router=MockPartitionRouter(),
503+
config=mock_config,
504+
deduplicate=True,
505+
)
506+
new_router.set_initial_state(state_after_first)
507+
resumed_slices = list(new_router.stream_slices())
508+
assert resumed_slices == [
509+
StreamSlice(
510+
partition={"board_ids": [3, 4]},
511+
cursor_slice={},
512+
extra_fields={"name": ["Board 3", "Board 4"], "owner": ["User3", "User4"]},
513+
)
514+
], "Should resume from board_id 3"

0 commit comments

Comments
 (0)