Skip to content

Commit c0b35ab

Browse files
committed
Merge branch 'main' into revert-705-revert_concurrent_changes
2 parents 772c77a + cd48741 commit c0b35ab

File tree

2 files changed

+133
-123
lines changed

2 files changed

+133
-123
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@
9494
ClientSideIncrementalRecordFilterDecorator,
9595
)
9696
from airbyte_cdk.sources.declarative.incremental import (
97-
ChildPartitionResumableFullRefreshCursor,
9897
ConcurrentCursorFactory,
9998
ConcurrentPerPartitionCursor,
10099
CursorFactory,
101100
DatetimeBasedCursor,
102101
DeclarativeCursor,
103102
GlobalSubstreamCursor,
104-
PerPartitionCursor,
105103
PerPartitionWithGlobalCursor,
106-
ResumableFullRefreshCursor,
107104
)
108105
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
109106
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
@@ -446,10 +443,6 @@
446443
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
447444
ZipfileDecoder as ZipfileDecoderModel,
448445
)
449-
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
450-
COMPONENTS_MODULE_NAME,
451-
SDM_COMPONENTS_MODULE_NAME,
452-
)
453446
from airbyte_cdk.sources.declarative.partition_routers import (
454447
CartesianProductStreamSlicer,
455448
GroupingPartitionRouter,
@@ -508,7 +501,7 @@
508501
RequestOptionsProvider,
509502
)
510503
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
511-
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
504+
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
512505
from airbyte_cdk.sources.declarative.resolvers import (
513506
ComponentMappingDefinition,
514507
ConfigComponentsResolver,
@@ -617,6 +610,9 @@
617610
)
618611
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
619612
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
613+
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import (
614+
StreamSlicer as ConcurrentStreamSlicer,
615+
)
620616
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
621617
CustomFormatConcurrentStreamStateConverter,
622618
DateTimeStreamStateConverter,
@@ -1937,29 +1933,7 @@ def create_datetime_based_cursor(
19371933
def create_declarative_stream(
19381934
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19391935
) -> Union[DeclarativeStream, AbstractStream]:
1940-
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
1941-
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
1942-
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
1943-
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
1944-
combined_slicers = self._merge_stream_slicers(model=model, config=config)
1945-
19461936
primary_key = model.primary_key.__root__ if model.primary_key else None
1947-
stop_condition_on_cursor = (
1948-
model.incremental_sync
1949-
and hasattr(model.incremental_sync, "is_data_feed")
1950-
and model.incremental_sync.is_data_feed
1951-
)
1952-
client_side_filtering_enabled = (
1953-
model.incremental_sync
1954-
and hasattr(model.incremental_sync, "is_client_side_incremental")
1955-
and model.incremental_sync.is_client_side_incremental
1956-
)
1957-
concurrent_cursor = None
1958-
if stop_condition_on_cursor or client_side_filtering_enabled:
1959-
stream_slicer = self._build_stream_slicer_from_partition_router(
1960-
model.retriever, config, stream_name=model.name
1961-
)
1962-
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
19631937

19641938
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19651939
cursor_model = model.incremental_sync
@@ -2027,16 +2001,27 @@ def create_declarative_stream(
20272001
model=model.file_uploader, config=config
20282002
)
20292003

2004+
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
2005+
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
2006+
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
2007+
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
2008+
combined_slicers = self._merge_stream_slicers(model=model, config=config)
2009+
partition_router = self._build_stream_slicer_from_partition_router(
2010+
model.retriever, config, stream_name=model.name
2011+
)
2012+
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
20302013
retriever = self._create_component_from_model(
20312014
model=model.retriever,
20322015
config=config,
20332016
name=model.name,
20342017
primary_key=primary_key,
20352018
stream_slicer=combined_slicers,
20362019
request_options_provider=request_options_provider,
2037-
stop_condition_cursor=concurrent_cursor,
2020+
stop_condition_cursor=concurrent_cursor
2021+
if self._is_stop_condition_on_cursor(model)
2022+
else None,
20382023
client_side_incremental_sync={"cursor": concurrent_cursor}
2039-
if client_side_filtering_enabled
2024+
if self._is_client_side_filtering_enabled(model)
20402025
else None,
20412026
transformations=transformations,
20422027
file_uploader=file_uploader,
@@ -2070,43 +2055,61 @@ def create_declarative_stream(
20702055
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20712056

20722057
if (
2073-
isinstance(combined_slicers, PartitionRouter)
2058+
(
2059+
isinstance(combined_slicers, PartitionRouter)
2060+
or isinstance(concurrent_cursor, ConcurrentCursor)
2061+
)
20742062
and not self._emit_connector_builder_messages
20752063
and not is_parent
20762064
):
20772065
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
20782066
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
20792067
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
2080-
# * Streams without partition router but with cursor
2068+
# * Streams without partition router but with cursor. This is the `isinstance(concurrent_cursor, ConcurrentCursor)` condition
20812069
# * Streams with both partition router and cursor
20822070
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
20832071
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway
2072+
20842073
stream_name = model.name or ""
2074+
stream_slicer: ConcurrentStreamSlicer = (
2075+
concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={})
2076+
)
2077+
cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository)
2078+
if isinstance(retriever, AsyncRetriever):
2079+
# The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method
2080+
# `_build_incremental_cursor` which we would usually think would return only declarative stuff has a
2081+
# special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when
2082+
# built because the async retriever has a specific partition router which relies on this stream slicer.
2083+
# We can't re-use `concurrent_cursor` because it is a different instance than the one passed in
2084+
# AsyncJobPartitionRouter.
2085+
stream_slicer = retriever.stream_slicer
2086+
if isinstance(combined_slicers, Cursor):
2087+
cursor = combined_slicers
2088+
elif isinstance(combined_slicers, PartitionRouter):
2089+
stream_slicer = combined_slicers
2090+
elif concurrent_cursor:
2091+
cursor = concurrent_cursor
2092+
20852093
partition_generator = StreamSlicerPartitionGenerator(
20862094
DeclarativePartitionFactory(
20872095
stream_name,
20882096
schema_loader,
20892097
retriever,
20902098
self._message_repository,
20912099
),
2092-
stream_slicer=cast(
2093-
StreamSlicer,
2094-
StreamSlicerTestReadDecorator(
2095-
wrapped_slicer=combined_slicers,
2096-
maximum_number_of_slices=self._limit_slices_fetched or 5,
2097-
),
2098-
),
2100+
stream_slicer=stream_slicer,
20992101
)
21002102
return DefaultStream(
21012103
partition_generator=partition_generator,
21022104
name=stream_name,
21032105
json_schema=schema_loader.get_json_schema,
21042106
primary_key=get_primary_key_from_stream(primary_key),
2105-
cursor_field=None,
2106-
# FIXME we should have the cursor field has part of the interface of cursor
2107+
cursor_field=cursor.cursor_field.cursor_field_key
2108+
if hasattr(cursor, "cursor_field")
2109+
else "", # FIXME we should have the cursor field has part of the interface of cursor,
21072110
logger=logging.getLogger(f"airbyte.{stream_name}"),
2108-
# FIXME this is a breaking change compared to the old implementation,
2109-
cursor=FinalStateCursor(stream_name, None, self._message_repository),
2111+
# FIXME this is a breaking change compared to the old implementation which used the source name instead
2112+
cursor=cursor,
21102113
supports_file_transfer=hasattr(model, "file_uploader")
21112114
and bool(model.file_uploader),
21122115
)
@@ -2130,6 +2133,20 @@ def create_declarative_stream(
21302133
parameters=model.parameters or {},
21312134
)
21322135

2136+
def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool:
2137+
return bool(
2138+
model.incremental_sync
2139+
and hasattr(model.incremental_sync, "is_data_feed")
2140+
and model.incremental_sync.is_data_feed
2141+
)
2142+
2143+
def _is_client_side_filtering_enabled(self, model: DeclarativeStreamModel) -> bool:
2144+
return bool(
2145+
model.incremental_sync
2146+
and hasattr(model.incremental_sync, "is_client_side_incremental")
2147+
and model.incremental_sync.is_client_side_incremental
2148+
)
2149+
21332150
def _build_stream_slicer_from_partition_router(
21342151
self,
21352152
model: Union[

0 commit comments

Comments
 (0)