Skip to content

Commit d9d09f0

Browse files
author
maxime.c
committed
incremental without partition router as DefaultStream
1 parent 98e2227 commit d9d09f0

File tree

2 files changed

+115
-92
lines changed

2 files changed

+115
-92
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,22 +1940,11 @@ def create_declarative_stream(
19401940
combined_slicers = self._merge_stream_slicers(model=model, config=config)
19411941

19421942
primary_key = model.primary_key.__root__ if model.primary_key else None
1943-
stop_condition_on_cursor = (
1944-
model.incremental_sync
1945-
and hasattr(model.incremental_sync, "is_data_feed")
1946-
and model.incremental_sync.is_data_feed
1947-
)
1948-
client_side_filtering_enabled = (
1949-
model.incremental_sync
1950-
and hasattr(model.incremental_sync, "is_client_side_incremental")
1951-
and model.incremental_sync.is_client_side_incremental
1943+
1944+
stream_slicer = self._build_stream_slicer_from_partition_router(
1945+
model.retriever, config, stream_name=model.name
19521946
)
1953-
concurrent_cursor = None
1954-
if stop_condition_on_cursor or client_side_filtering_enabled:
1955-
stream_slicer = self._build_stream_slicer_from_partition_router(
1956-
model.retriever, config, stream_name=model.name
1957-
)
1958-
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
1947+
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19591948

19601949
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19611950
cursor_model = model.incremental_sync
@@ -2030,9 +2019,9 @@ def create_declarative_stream(
20302019
primary_key=primary_key,
20312020
stream_slicer=combined_slicers,
20322021
request_options_provider=request_options_provider,
2033-
stop_condition_cursor=concurrent_cursor,
2022+
stop_condition_cursor=concurrent_cursor if self._is_stop_condition_on_cursor(model) else None,
20342023
client_side_incremental_sync={"cursor": concurrent_cursor}
2035-
if client_side_filtering_enabled
2024+
if self._is_client_side_filtering_enabled(model)
20362025
else None,
20372026
transformations=transformations,
20382027
file_uploader=file_uploader,
@@ -2066,17 +2055,30 @@ def create_declarative_stream(
20662055
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20672056

20682057
if (
2069-
isinstance(combined_slicers, PartitionRouter)
2058+
(isinstance(combined_slicers, PartitionRouter) or isinstance(concurrent_cursor, ConcurrentCursor))
20702059
and not is_parent
20712060
and not self._emit_connector_builder_messages
20722061
):
20732062
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20742063
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
20752064
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
2076-
# * Streams without partition router but with cursor
2065+
# * Streams without partition router but with cursor. This is the `isinstance(concurrent_cursor, ConcurrentCursor)` condition
20772066
# * Streams with both partition router and cursor
20782067
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
20792068
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway
2069+
2070+
stream_slicer = concurrent_cursor
2071+
if isinstance(retriever, AsyncRetriever):
2072+
# The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method
2073+
# `_build_incremental_cursor` which we would usually think would return only declarative stuff has a
2074+
# special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when
2075+
# built because the async retriever has a specific partition router which relies on this stream slicer.
2076+
# We can't re-use `concurrent_cursor` because it is a different instance than the one passed in
2077+
# AsyncJobPartitionRouter.
2078+
stream_slicer = retriever.stream_slicer
2079+
elif isinstance(combined_slicers, PartitionRouter):
2080+
stream_slicer = combined_slicers
2081+
20802082
stream_name = model.name or ""
20812083
partition_generator = StreamSlicerPartitionGenerator(
20822084
DeclarativePartitionFactory(
@@ -2085,18 +2087,19 @@ def create_declarative_stream(
20852087
retriever,
20862088
self._message_repository,
20872089
),
2088-
combined_slicers,
2090+
stream_slicer,
20892091
)
2092+
cursor = concurrent_cursor if concurrent_cursor else FinalStateCursor(stream_name, None, self._message_repository)
20902093
return DefaultStream(
20912094
partition_generator=partition_generator,
20922095
name=stream_name,
20932096
json_schema=schema_loader.get_json_schema,
20942097
primary_key=get_primary_key_from_stream(primary_key),
2095-
cursor_field=None,
2098+
cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") else "", # FIXME we should have the cursor field has part of the interface of cursor,
20962099
# FIXME we should have the cursor field has part of the interface of cursor
20972100
logger=logging.getLogger(f"airbyte.{stream_name}"),
20982101
# FIXME this is a breaking change compared to the old implementation,
2099-
cursor=FinalStateCursor(stream_name, None, self._message_repository),
2102+
cursor=cursor,
21002103
)
21012104

21022105
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
@@ -2118,6 +2121,21 @@ def create_declarative_stream(
21182121
parameters=model.parameters or {},
21192122
)
21202123

2124+
def _is_stop_condition_on_cursor(self, model):
2125+
return (
2126+
model.incremental_sync
2127+
and hasattr(model.incremental_sync, "is_data_feed")
2128+
and model.incremental_sync.is_data_feed
2129+
)
2130+
2131+
def _is_client_side_filtering_enabled(self, model):
2132+
client_side_filtering_enabled = (
2133+
model.incremental_sync
2134+
and hasattr(model.incremental_sync, "is_client_side_incremental")
2135+
and model.incremental_sync.is_client_side_incremental
2136+
)
2137+
return client_side_filtering_enabled
2138+
21212139
def _build_stream_slicer_from_partition_router(
21222140
self,
21232141
model: Union[

0 commit comments

Comments
 (0)