Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2112,23 +2112,22 @@ def _build_incremental_cursor(
stream_slicer: Optional[PartitionRouter],
config: Config,
) -> Optional[StreamSlicer]:
state_transformations = (
[
self._create_component_from_model(state_migration, config, declarative_stream=model)
for state_migration in model.state_migrations
]
if model.state_migrations
else []
)

if model.incremental_sync and stream_slicer:
if model.retriever.type == "AsyncRetriever":
stream_name = model.name or ""
stream_namespace = None
stream_state = self._connector_state_manager.get_stream_state(
stream_name, stream_namespace
)
state_transformations = (
[
self._create_component_from_model(
state_migration, config, declarative_stream=model
)
for state_migration in model.state_migrations
]
if model.state_migrations
else []
)

return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
Expand Down Expand Up @@ -2172,7 +2171,7 @@ def _build_incremental_cursor(
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
stream_state_migrations=state_transformations,
)
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
return None
Expand All @@ -2187,19 +2186,15 @@ def _build_concurrent_cursor(
stream_name=model.name or "", namespace=None
)

if model.incremental_sync and stream_slicer:
# FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
# ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
if model.state_migrations:
state_transformations = [
self._create_component_from_model(
state_migration, config, declarative_stream=model
)
for state_migration in model.state_migrations
]
else:
state_transformations = []
if model.state_migrations:
state_transformations = [
self._create_component_from_model(state_migration, config, declarative_stream=model)
for state_migration in model.state_migrations
]
else:
state_transformations = []

if model.incremental_sync and stream_slicer:
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand All @@ -2220,7 +2215,7 @@ def _build_concurrent_cursor(
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
stream_state_migrations=state_transformations,
)
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
Expand All @@ -2229,7 +2224,7 @@ def _build_concurrent_cursor(
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
stream_state_migrations=state_transformations,
attempt_to_create_cursor_if_not_provided=True,
)
else:
Expand Down
Loading