@@ -2167,78 +2167,6 @@ def _build_stream_slicer_from_partition_router(
21672167 )
21682168 return SinglePartitionRouter (parameters = {})
21692169
2170- def _build_incremental_cursor (
2171- self ,
2172- model : DeclarativeStreamModel ,
2173- stream_slicer : Optional [PartitionRouter ],
2174- config : Config ,
2175- ) -> Optional [StreamSlicer ]:
2176- state_transformations = (
2177- [
2178- self ._create_component_from_model (state_migration , config , declarative_stream = model )
2179- for state_migration in model .state_migrations
2180- ]
2181- if model .state_migrations
2182- else []
2183- )
2184-
2185- if model .incremental_sync and (
2186- stream_slicer and not isinstance (stream_slicer , SinglePartitionRouter )
2187- ):
2188- if model .retriever .type == "AsyncRetriever" :
2189- stream_name = model .name or ""
2190- stream_namespace = None
2191- stream_state = self ._connector_state_manager .get_stream_state (
2192- stream_name , stream_namespace
2193- )
2194-
2195- return self .create_concurrent_cursor_from_perpartition_cursor ( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2196- state_manager = self ._connector_state_manager ,
2197- model_type = DatetimeBasedCursorModel ,
2198- component_definition = model .incremental_sync .__dict__ ,
2199- stream_name = stream_name ,
2200- stream_namespace = stream_namespace ,
2201- config = config or {},
2202- stream_state = stream_state ,
2203- stream_state_migrations = state_transformations ,
2204- partition_router = stream_slicer ,
2205- )
2206-
2207- incremental_sync_model = model .incremental_sync
2208- cursor_component = self ._create_component_from_model (
2209- model = incremental_sync_model , config = config
2210- )
2211- is_global_cursor = (
2212- hasattr (incremental_sync_model , "global_substream_cursor" )
2213- and incremental_sync_model .global_substream_cursor
2214- )
2215-
2216- if is_global_cursor :
2217- return GlobalSubstreamCursor (
2218- stream_cursor = cursor_component , partition_router = stream_slicer
2219- )
2220- return PerPartitionWithGlobalCursor (
2221- cursor_factory = CursorFactory (
2222- lambda : self ._create_component_from_model (
2223- model = incremental_sync_model , config = config
2224- ),
2225- ),
2226- partition_router = stream_slicer ,
2227- stream_cursor = cursor_component ,
2228- )
2229- elif model .incremental_sync :
2230- if model .retriever .type == "AsyncRetriever" :
2231- return self .create_concurrent_cursor_from_datetime_based_cursor ( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
2232- model_type = DatetimeBasedCursorModel ,
2233- component_definition = model .incremental_sync .__dict__ ,
2234- stream_name = model .name or "" ,
2235- stream_namespace = None ,
2236- config = config or {},
2237- stream_state_migrations = state_transformations ,
2238- )
2239- return self ._create_component_from_model (model = model .incremental_sync , config = config ) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
2240- return None
2241-
22422170 def _build_concurrent_cursor (
22432171 self ,
22442172 model : DeclarativeStreamModel ,
@@ -2301,44 +2229,6 @@ def _build_concurrent_cursor(
23012229 )
23022230 return FinalStateCursor (stream_name , None , self ._message_repository )
23032231
2304- def _merge_stream_slicers (
2305- self , model : DeclarativeStreamModel , config : Config
2306- ) -> Optional [StreamSlicer ]:
2307- retriever_model = model .retriever
2308-
2309- stream_slicer = self ._build_stream_slicer_from_partition_router (
2310- retriever_model , config , stream_name = model .name
2311- )
2312-
2313- if retriever_model .type == "AsyncRetriever" :
2314- is_not_datetime_cursor = (
2315- model .incremental_sync .type != "DatetimeBasedCursor"
2316- if model .incremental_sync
2317- else None
2318- )
2319- is_partition_router = (
2320- bool (retriever_model .partition_router ) if model .incremental_sync else None
2321- )
2322-
2323- if is_not_datetime_cursor :
2324- # We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the
2325- # support or unordered slices (for example, when we trigger reports for January and February, the report
2326- # in February can be completed first). Once we have support for custom concurrent cursor or have a new
2327- # implementation available in the CDK, we can enable more cursors here.
2328- raise ValueError (
2329- "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet."
2330- )
2331-
2332- if is_partition_router and not stream_slicer :
2333- # Note that this development is also done in parallel to the per partition development which once merged
2334- # we could support here by calling create_concurrent_cursor_from_perpartition_cursor
2335- raise ValueError ("Per partition state is not supported yet for AsyncRetriever." )
2336-
2337- if model .incremental_sync :
2338- return self ._build_incremental_cursor (model , stream_slicer , config )
2339-
2340- return stream_slicer
2341-
23422232 def create_default_error_handler (
23432233 self , model : DefaultErrorHandlerModel , config : Config , ** kwargs : Any
23442234 ) -> DefaultErrorHandler :
0 commit comments