Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1d5b468
remove
Jul 31, 2025
76ac6f7
Auto-fix lint and format issues
Jul 31, 2025
2d1e2f4
remove unused file
Jul 31, 2025
b4a5fec
have declarative availability check support AbstractStream
Jul 31, 2025
fc6c6b6
Auto-fix lint and format issues
Jul 31, 2025
5fe2e02
mypy
Jul 31, 2025
1e8e968
Auto-fix lint and format issues
Jul 31, 2025
689e792
Remove RFR stuff
Aug 1, 2025
5399436
have bland stream be instantiated as DefaultStream
Aug 4, 2025
dff2559
fix test
Aug 4, 2025
7dc2164
fix test, format, lint and a bit of mypy
Aug 4, 2025
0bfbdfe
mypy
Aug 4, 2025
0b454bb
format
Aug 4, 2025
13c17f4
remove unused line
Aug 4, 2025
0f36dc5
Merge branch 'main' into maxi297/remove-availability-strategy-except-…
maxi297 Aug 4, 2025
6f95ebb
Merge branch 'maxi297/remove-availability-strategy-except-for-filebas…
maxi297 Aug 4, 2025
c94892a
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 4, 2025
fb75765
fix test
Aug 4, 2025
c078395
lint
Aug 4, 2025
decc557
format
Aug 4, 2025
b8daf64
code review
Aug 4, 2025
e8edc4b
Merge branch 'main' into maxi297/availability_strategy_to_support_abs…
maxi297 Aug 5, 2025
2bc4b30
code review
Aug 5, 2025
98e2227
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 5, 2025
d9d09f0
incremental without partition router as DefaultStream
Aug 5, 2025
1af2264
refactor regarding async stuff
Aug 5, 2025
8c771bb
partially fix mypy
Aug 5, 2025
fb40a6b
fix mypy
Aug 5, 2025
9175283
format
Aug 5, 2025
1d84a49
mypy
Aug 5, 2025
2cba5ff
fix
Aug 5, 2025
8181c83
fix condition where we might override FinalStateCursor with null
Aug 5, 2025
1079629
supports_file_transfer
Aug 6, 2025
f196ea7
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 6, 2025
7f643e4
format
Aug 6, 2025
8566607
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 6, 2025
96f15c3
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 11, 2025
6cb012b
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
11e3a35
format
Aug 11, 2025
86909df
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
ebb4b28
more fixes for DefaultStream in Connector Builder
Aug 11, 2025
6fef39b
mypy and format
Aug 11, 2025
e31fed9
format broke mypy
Aug 11, 2025
e996805
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 11, 2025
90eeaa6
Merge branch 'main' into maxi297/incremental_without_partition_router…
Aug 20, 2025
1be518b
format
Aug 20, 2025
59c1fd8
lint
Aug 20, 2025
d01497d
Merge branch 'main' into maxi297/incremental_without_partition_router…
Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 62 additions & 39 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,13 @@
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental import (
ChildPartitionResumableFullRefreshCursor,
ConcurrentCursorFactory,
ConcurrentPerPartitionCursor,
CursorFactory,
DatetimeBasedCursor,
DeclarativeCursor,
GlobalSubstreamCursor,
PerPartitionCursor,
PerPartitionWithGlobalCursor,
ResumableFullRefreshCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
Expand Down Expand Up @@ -446,10 +443,6 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
COMPONENTS_MODULE_NAME,
SDM_COMPONENTS_MODULE_NAME,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
Expand Down Expand Up @@ -508,7 +501,7 @@
RequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.resolvers import (
ComponentMappingDefinition,
ConfigComponentsResolver,
Expand Down Expand Up @@ -617,6 +610,9 @@
)
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import (
StreamSlicer as ConcurrentStreamSlicer,
)
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
Expand Down Expand Up @@ -1933,29 +1929,7 @@ def create_datetime_based_cursor(
def create_declarative_stream(
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
) -> Union[DeclarativeStream, AbstractStream]:
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
combined_slicers = self._merge_stream_slicers(model=model, config=config)

primary_key = model.primary_key.__root__ if model.primary_key else None
stop_condition_on_cursor = (
model.incremental_sync
and hasattr(model.incremental_sync, "is_data_feed")
and model.incremental_sync.is_data_feed
)
client_side_filtering_enabled = (
model.incremental_sync
and hasattr(model.incremental_sync, "is_client_side_incremental")
and model.incremental_sync.is_client_side_incremental
)
concurrent_cursor = None
if stop_condition_on_cursor or client_side_filtering_enabled:
stream_slicer = self._build_stream_slicer_from_partition_router(
model.retriever, config, stream_name=model.name
)
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)

if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
cursor_model = model.incremental_sync
Expand Down Expand Up @@ -2023,16 +1997,27 @@ def create_declarative_stream(
model=model.file_uploader, config=config
)

# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
combined_slicers = self._merge_stream_slicers(model=model, config=config)
partition_router = self._build_stream_slicer_from_partition_router(
model.retriever, config, stream_name=model.name
)
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name=model.name,
primary_key=primary_key,
stream_slicer=combined_slicers,
request_options_provider=request_options_provider,
stop_condition_cursor=concurrent_cursor,
stop_condition_cursor=concurrent_cursor
if self._is_stop_condition_on_cursor(model)
else None,
client_side_incremental_sync={"cursor": concurrent_cursor}
if client_side_filtering_enabled
if self._is_client_side_filtering_enabled(model)
else None,
transformations=transformations,
file_uploader=file_uploader,
Expand Down Expand Up @@ -2066,37 +2051,61 @@ def create_declarative_stream(
schema_loader = DefaultSchemaLoader(config=config, parameters=options)

if (
isinstance(combined_slicers, PartitionRouter)
(
isinstance(combined_slicers, PartitionRouter)
or isinstance(concurrent_cursor, ConcurrentCursor)
)
and not self._emit_connector_builder_messages
and not is_parent
):
# We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the
# DeclarativeStream and assembling the DefaultStream from that. The plan is the following:
# * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter
# * Streams without partition router but with cursor
# * Streams without partition router but with cursor. This is the `isinstance(concurrent_cursor, ConcurrentCursor)` condition
# * Streams with both partition router and cursor
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway

stream_name = model.name or ""
stream_slicer: ConcurrentStreamSlicer = (
concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={})
)
cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository)
if isinstance(retriever, AsyncRetriever):
# The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method
# `_build_incremental_cursor` which we would usually think would return only declarative stuff has a
# special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when
# built because the async retriever has a specific partition router which relies on this stream slicer.
# We can't re-use `concurrent_cursor` because it is a different instance than the one passed in
# AsyncJobPartitionRouter.
stream_slicer = retriever.stream_slicer
if isinstance(combined_slicers, Cursor):
cursor = combined_slicers
elif isinstance(combined_slicers, PartitionRouter):
stream_slicer = combined_slicers
elif concurrent_cursor:
cursor = concurrent_cursor

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
stream_name,
schema_loader,
retriever,
self._message_repository,
),
stream_slicer=combined_slicers,
stream_slicer=stream_slicer,
)
return DefaultStream(
partition_generator=partition_generator,
name=stream_name,
json_schema=schema_loader.get_json_schema,
primary_key=get_primary_key_from_stream(primary_key),
cursor_field=None,
# FIXME we should have the cursor field has part of the interface of cursor
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
else "", # FIXME we should have the cursor field has part of the interface of cursor,
logger=logging.getLogger(f"airbyte.{stream_name}"),
# FIXME this is a breaking change compared to the old implementation,
cursor=FinalStateCursor(stream_name, None, self._message_repository),
# FIXME this is a breaking change compared to the old implementation which used the source name instead
cursor=cursor,
supports_file_transfer=hasattr(model, "file_uploader")
and bool(model.file_uploader),
)
Expand All @@ -2120,6 +2129,20 @@ def create_declarative_stream(
parameters=model.parameters or {},
)

def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool:
return bool(
model.incremental_sync
and hasattr(model.incremental_sync, "is_data_feed")
and model.incremental_sync.is_data_feed
)

def _is_client_side_filtering_enabled(self, model: DeclarativeStreamModel) -> bool:
return bool(
model.incremental_sync
and hasattr(model.incremental_sync, "is_client_side_incremental")
and model.incremental_sync.is_client_side_incremental
)

def _build_stream_slicer_from_partition_router(
self,
model: Union[
Expand Down
Loading
Loading