From 0f8858cc201120acd7dced32d0752d5ae3159333 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Sat, 2 Aug 2025 19:11:24 -0400 Subject: [PATCH 1/2] fix state migration --- .../parsers/model_to_component_factory.py | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 5d2415525..689089196 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2112,6 +2112,17 @@ def _build_incremental_cursor( stream_slicer: Optional[PartitionRouter], config: Config, ) -> Optional[StreamSlicer]: + state_transformations = ( + [ + self._create_component_from_model( + state_migration, config, declarative_stream=model + ) + for state_migration in model.state_migrations + ] + if model.state_migrations + else [] + ) + if model.incremental_sync and stream_slicer: if model.retriever.type == "AsyncRetriever": stream_name = model.name or "" @@ -2119,16 +2130,6 @@ def _build_incremental_cursor( stream_state = self._connector_state_manager.get_stream_state( stream_name, stream_namespace ) - state_transformations = ( - [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) - for state_migration in model.state_migrations - ] - if model.state_migrations - else [] - ) return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, @@ -2172,7 +2173,7 @@ def _build_incremental_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, ) return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync return None @@ -2187,19 +2188,17 @@ def _build_concurrent_cursor( stream_name=model.name or "", namespace=None ) - if model.incremental_sync and stream_slicer: - # FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for - # ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor - if model.state_migrations: - state_transformations = [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) - for state_migration in model.state_migrations - ] - else: - state_transformations = [] + if model.state_migrations: + state_transformations = [ + self._create_component_from_model( + state_migration, config, declarative_stream=model + ) + for state_migration in model.state_migrations + ] + else: + state_transformations = [] + if model.incremental_sync and stream_slicer: return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, @@ -2220,7 +2219,7 @@ def _build_concurrent_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, ) elif type(model.incremental_sync) == DatetimeBasedCursorModel: return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing @@ -2229,7 +2228,7 @@ def _build_concurrent_cursor( stream_name=model.name or "", stream_namespace=None, config=config or {}, - stream_state_migrations=model.state_migrations, + stream_state_migrations=state_transformations, attempt_to_create_cursor_if_not_provided=True, ) else: From 6c4f8353facba385ee3f911538cea50d9ad44530 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Sat, 2 Aug 2025 23:20:42 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../declarative/parsers/model_to_component_factory.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 689089196..e1d07fdc2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2114,9 +2114,7 @@ def _build_incremental_cursor( ) -> Optional[StreamSlicer]: state_transformations = ( [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) + self._create_component_from_model(state_migration, config, declarative_stream=model) for state_migration in model.state_migrations ] if model.state_migrations @@ -2190,9 +2188,7 @@ def _build_concurrent_cursor( if model.state_migrations: state_transformations = [ - self._create_component_from_model( - state_migration, config, declarative_stream=model - ) + self._create_component_from_model(state_migration, config, declarative_stream=model) for state_migration in model.state_migrations ] else: