diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5d2415525..e1d07fdc2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2112,6 +2112,15 @@ def _build_incremental_cursor( stream_slicer: Optional[PartitionRouter], config: Config, ) -> Optional[StreamSlicer]: + state_transformations = ( + [ + self._create_component_from_model(state_migration, config, declarative_stream=model) + for state_migration in model.state_migrations + ] + if model.state_migrations + else [] + ) + if model.incremental_sync and stream_slicer: if model.retriever.type == "AsyncRetriever": stream_name = model.name or "" @@ -2119,16 +2128,6 @@ def _build_incremental_cursor( stream_state = self._connector_state_manager.get_stream_state( stream_name, stream_namespace ) - state_transformations = ( - [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) - for state_migration in model.state_migrations - ] - if model.state_migrations - else [] - ) return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, @@ -2172,7 +2171,7 @@ def _build_incremental_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, ) return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync return None @@ -2187,19 +2186,15 @@ def _build_concurrent_cursor( stream_name=model.name or "", namespace=None ) - if model.incremental_sync and stream_slicer: - # FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for - # ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor - if model.state_migrations: - state_transformations = [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) - for state_migration in model.state_migrations - ] - else: - state_transformations = [] + if model.state_migrations: + state_transformations = [ + self._create_component_from_model(state_migration, config, declarative_stream=model) + for state_migration in model.state_migrations + ] + else: + state_transformations = [] + if model.incremental_sync and stream_slicer: return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, @@ -2220,7 +2215,7 @@ def _build_concurrent_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, ) elif type(model.incremental_sync) == DatetimeBasedCursorModel: return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing @@ -2229,7 +2224,7 @@ def _build_concurrent_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, attempt_to_create_cursor_if_not_provided=True, ) else: