@@ -1264,22 +1264,12 @@ def create_concurrent_cursor_from_datetime_based_cursor(
12641264 component_definition : ComponentDefinition ,
12651265 stream_name : str ,
12661266 stream_namespace : Optional [str ],
1267+ stream_state : MutableMapping [str , Any ],
12671268 config : Config ,
12681269 message_repository : Optional [MessageRepository ] = None ,
12691270 runtime_lookback_window : Optional [datetime .timedelta ] = None ,
1270- stream_state_migrations : Optional [List [Any ]] = None ,
12711271 ** kwargs : Any ,
12721272 ) -> ConcurrentCursor :
1273- # Per-partition incremental streams can dynamically create child cursors which will pass their current
1274- # state via the stream_state keyword argument. Incremental syncs without parent streams use the
1275- # incoming state and connector_state_manager that is initialized when the component factory is created
1276- stream_state = (
1277- self ._connector_state_manager .get_stream_state (stream_name , stream_namespace )
1278- if "stream_state" not in kwargs
1279- else kwargs ["stream_state" ]
1280- )
1281- stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
1282-
12831273 component_type = component_definition .get ("type" )
12841274 if component_definition .get ("type" ) != model_type .__name__ :
12851275 raise ValueError (
@@ -1498,21 +1488,11 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
14981488 component_definition : ComponentDefinition ,
14991489 stream_name : str ,
15001490 stream_namespace : Optional [str ],
1491+ stream_state : MutableMapping [str , Any ],
15011492 config : Config ,
15021493 message_repository : Optional [MessageRepository ] = None ,
1503- stream_state_migrations : Optional [List [Any ]] = None ,
15041494 ** kwargs : Any ,
15051495 ) -> ConcurrentCursor :
1506- # Per-partition incremental streams can dynamically create child cursors which will pass their current
1507- # state via the stream_state keyword argument. Incremental syncs without parent streams use the
1508- # incoming state and connector_state_manager that is initialized when the component factory is created
1509- stream_state = (
1510- self ._connector_state_manager .get_stream_state (stream_name , stream_namespace )
1511- if "stream_state" not in kwargs
1512- else kwargs ["stream_state" ]
1513- )
1514- stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
1515-
15161496 component_type = component_definition .get ("type" )
15171497 if component_definition .get ("type" ) != model_type .__name__ :
15181498 raise ValueError (
@@ -1587,7 +1567,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
15871567 config : Config ,
15881568 stream_state : MutableMapping [str , Any ],
15891569 partition_router : PartitionRouter ,
1590- stream_state_migrations : Optional [List [Any ]] = None ,
15911570 attempt_to_create_cursor_if_not_provided : bool = False ,
15921571 ** kwargs : Any ,
15931572 ) -> ConcurrentPerPartitionCursor :
@@ -1647,11 +1626,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
16471626 stream_namespace = stream_namespace ,
16481627 config = config ,
16491628 message_repository = NoopMessageRepository (),
1650- # stream_state_migrations=stream_state_migrations, # FIXME is it expected to run migration on per partition state too?
16511629 )
16521630 )
16531631
1654- stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
16551632 # Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
16561633 use_global_cursor = isinstance (
16571634 partition_router , GroupingPartitionRouter
@@ -1974,6 +1951,7 @@ def create_default_stream(
19741951 self , model : DeclarativeStreamModel , config : Config , is_parent : bool = False , ** kwargs : Any
19751952 ) -> AbstractStream :
19761953 primary_key = model .primary_key .__root__ if model .primary_key else None
1954+ self ._migrate_state (config , model )
19771955
19781956 partition_router = self ._build_stream_slicer_from_partition_router (
19791957 model .retriever ,
@@ -2135,6 +2113,20 @@ def create_default_stream(
21352113 supports_file_transfer = hasattr (model , "file_uploader" ) and bool (model .file_uploader ),
21362114 )
21372115
2116+ def _migrate_state (self , config , model ):
2117+ stream_state = self ._connector_state_manager .get_stream_state (
2118+ stream_name = model .name , namespace = None
2119+ )
2120+ if model .state_migrations :
2121+ state_transformations = [
2122+ self ._create_component_from_model (state_migration , config , declarative_stream = model )
2123+ for state_migration in model .state_migrations
2124+ ]
2125+ else :
2126+ state_transformations = []
2127+ stream_state = self .apply_stream_state_migrations (state_transformations , stream_state )
2128+ self ._connector_state_manager .update_state_for_stream (stream_name = model .name , namespace = None , value = stream_state )
2129+
21382130 def _is_stop_condition_on_cursor (self , model : DeclarativeStreamModel ) -> bool :
21392131 return bool (
21402132 model .incremental_sync
@@ -2206,17 +2198,7 @@ def _build_concurrent_cursor(
22062198 config : Config ,
22072199 ) -> Cursor :
22082200 stream_name = model .name or ""
2209- stream_state = self ._connector_state_manager .get_stream_state (
2210- stream_name = stream_name , namespace = None
2211- )
2212-
2213- if model .state_migrations :
2214- state_transformations = [
2215- self ._create_component_from_model (state_migration , config , declarative_stream = model )
2216- for state_migration in model .state_migrations
2217- ]
2218- else :
2219- state_transformations = []
2201+ stream_state = self ._connector_state_manager .get_stream_state (model .name , None )
22202202
22212203 if (
22222204 model .incremental_sync
@@ -2228,10 +2210,9 @@ def _build_concurrent_cursor(
22282210 model_type = DatetimeBasedCursorModel ,
22292211 component_definition = model .incremental_sync .__dict__ ,
22302212 stream_name = stream_name ,
2213+ stream_state = stream_state ,
22312214 stream_namespace = None ,
22322215 config = config or {},
2233- stream_state = stream_state ,
2234- stream_state_migrations = state_transformations ,
22352216 partition_router = stream_slicer ,
22362217 attempt_to_create_cursor_if_not_provided = True , # FIXME can we remove that now?
22372218 )
@@ -2242,17 +2223,17 @@ def _build_concurrent_cursor(
22422223 component_definition = model .incremental_sync .__dict__ ,
22432224 stream_name = stream_name ,
22442225 stream_namespace = None ,
2226+ stream_state = stream_state ,
22452227 config = config or {},
2246- stream_state_migrations = state_transformations ,
22472228 )
22482229 elif type (model .incremental_sync ) == DatetimeBasedCursorModel :
22492230 return self .create_concurrent_cursor_from_datetime_based_cursor ( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22502231 model_type = type (model .incremental_sync ),
22512232 component_definition = model .incremental_sync .__dict__ ,
22522233 stream_name = stream_name ,
22532234 stream_namespace = None ,
2235+ stream_state = stream_state ,
22542236 config = config or {},
2255- stream_state_migrations = state_transformations ,
22562237 attempt_to_create_cursor_if_not_provided = True ,
22572238 )
22582239 else :
0 commit comments