Skip to content

Commit 64c1778

Browse files
committed
move stream_state calulcation before apply_stream_state_migrations
1 parent 80f4976 commit 64c1778

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,17 +1484,18 @@ def create_concurrent_cursor_from_perpartition_cursor(
14841484
stream_state_migrations=stream_state_migrations,
14851485
)
14861486
)
1487-
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1488-
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1489-
use_global_cursor = isinstance(
1490-
partition_router, GroupingPartitionRouter
1491-
) or component_definition.get("global_substream_cursor", False)
14921487

14931488
if not stream_state:
14941489
stream_state = self._connector_state_manager.get_stream_state(
14951490
stream_name, stream_namespace
14961491
)
14971492

1493+
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1494+
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1495+
use_global_cursor = isinstance(
1496+
partition_router, GroupingPartitionRouter
1497+
) or component_definition.get("global_substream_cursor", False)
1498+
14981499
# Return the concurrent cursor and state converter
14991500
return ConcurrentPerPartitionCursor(
15001501
cursor_factory=cursor_factory,

0 commit comments

Comments
 (0)