Skip to content

Commit a2063dd

Browse files
committed
move stream_state calculation before calling create_concurrent_cursor_from_perpartition_cursor
1 parent 64c1778 commit a2063dd

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,10 +1485,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
14851485
)
14861486
)
14871487

1488-
if not stream_state:
1489-
stream_state = self._connector_state_manager.get_stream_state(
1490-
stream_name, stream_namespace
1491-
)
14921488

14931489
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14941490
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
@@ -1999,14 +1995,19 @@ def _build_incremental_cursor(
19991995
) -> Optional[StreamSlicer]:
20001996
if model.incremental_sync and stream_slicer:
20011997
if model.retriever.type == "AsyncRetriever":
1998+
stream_name = model.name or ""
1999+
stream_namespace = None
2000+
stream_state = self._connector_state_manager.get_stream_state(
2001+
stream_name, stream_namespace
2002+
)
20022003
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
20032004
state_manager=self._connector_state_manager,
20042005
model_type=DatetimeBasedCursorModel,
20052006
component_definition=model.incremental_sync.__dict__,
2006-
stream_name=model.name or "",
2007-
stream_namespace=None,
2007+
stream_name=stream_name,
2008+
stream_namespace=stream_namespace,
20082009
config=config or {},
2009-
stream_state={},
2010+
stream_state=stream_state,
20102011
partition_router=stream_slicer,
20112012
)
20122013

0 commit comments

Comments
 (0)