Skip to content

Commit 0f8858c

Browse files
author
maxime.c
committed
fix state migration
1 parent cb193ab commit 0f8858c

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,23 +2112,24 @@ def _build_incremental_cursor(
21122112
stream_slicer: Optional[PartitionRouter],
21132113
config: Config,
21142114
) -> Optional[StreamSlicer]:
2115+
state_transformations = (
2116+
[
2117+
self._create_component_from_model(
2118+
state_migration, config, declarative_stream=model
2119+
)
2120+
for state_migration in model.state_migrations
2121+
]
2122+
if model.state_migrations
2123+
else []
2124+
)
2125+
21152126
if model.incremental_sync and stream_slicer:
21162127
if model.retriever.type == "AsyncRetriever":
21172128
stream_name = model.name or ""
21182129
stream_namespace = None
21192130
stream_state = self._connector_state_manager.get_stream_state(
21202131
stream_name, stream_namespace
21212132
)
2122-
state_transformations = (
2123-
[
2124-
self._create_component_from_model(
2125-
state_migration, config, declarative_stream=model
2126-
)
2127-
for state_migration in model.state_migrations
2128-
]
2129-
if model.state_migrations
2130-
else []
2131-
)
21322133

21332134
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
21342135
state_manager=self._connector_state_manager,
@@ -2172,7 +2173,7 @@ def _build_incremental_cursor(
21722173
stream_name=model.name or "",
21732174
stream_namespace=None,
21742175
config=config or {},
2175-
stream_state_migrations=model.state_migrations,
2176+
stream_state_migrations=state_transformations,
21762177
)
21772178
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
21782179
return None
@@ -2187,19 +2188,17 @@ def _build_concurrent_cursor(
21872188
stream_name=model.name or "", namespace=None
21882189
)
21892190

2190-
if model.incremental_sync and stream_slicer:
2191-
# FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
2192-
# ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
2193-
if model.state_migrations:
2194-
state_transformations = [
2195-
self._create_component_from_model(
2196-
state_migration, config, declarative_stream=model
2197-
)
2198-
for state_migration in model.state_migrations
2199-
]
2200-
else:
2201-
state_transformations = []
2191+
if model.state_migrations:
2192+
state_transformations = [
2193+
self._create_component_from_model(
2194+
state_migration, config, declarative_stream=model
2195+
)
2196+
for state_migration in model.state_migrations
2197+
]
2198+
else:
2199+
state_transformations = []
22022200

2201+
if model.incremental_sync and stream_slicer:
22032202
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
22042203
state_manager=self._connector_state_manager,
22052204
model_type=DatetimeBasedCursorModel,
@@ -2220,7 +2219,7 @@ def _build_concurrent_cursor(
22202219
stream_name=model.name or "",
22212220
stream_namespace=None,
22222221
config=config or {},
2223-
stream_state_migrations=model.state_migrations,
2222+
stream_state_migrations=state_transformations,
22242223
)
22252224
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
22262225
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
@@ -2229,7 +2228,7 @@ def _build_concurrent_cursor(
22292228
stream_name=model.name or "",
22302229
stream_namespace=None,
22312230
config=config or {},
2232-
stream_state_migrations=model.state_migrations,
2231+
stream_state_migrations=state_transformations,
22332232
attempt_to_create_cursor_if_not_provided=True,
22342233
)
22352234
else:

0 commit comments

Comments
 (0)