Skip to content

Commit 8181c83

Browse files
author
maxime.c
committed
fix condition where we might override FinalStateCursor with null
1 parent 2cba5ff commit 8181c83

File tree

1 file changed

+10
-12
lines changed

1 file changed

+10
-12
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,19 +1929,8 @@ def create_datetime_based_cursor(
19291929
def create_declarative_stream(
19301930
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19311931
) -> Union[DeclarativeStream, AbstractStream]:
1932-
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
1933-
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
1934-
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
1935-
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
1936-
combined_slicers = self._merge_stream_slicers(model=model, config=config)
1937-
19381932
primary_key = model.primary_key.__root__ if model.primary_key else None
19391933

1940-
partition_router = self._build_stream_slicer_from_partition_router(
1941-
model.retriever, config, stream_name=model.name
1942-
)
1943-
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
1944-
19451934
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19461935
cursor_model = model.incremental_sync
19471936

@@ -2008,6 +1997,15 @@ def create_declarative_stream(
20081997
model=model.file_uploader, config=config
20091998
)
20101999

2000+
# When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field
2001+
# components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the
2002+
# Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in
2003+
# the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one.
2004+
combined_slicers = self._merge_stream_slicers(model=model, config=config)
2005+
partition_router = self._build_stream_slicer_from_partition_router(
2006+
model.retriever, config, stream_name=model.name
2007+
)
2008+
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
20112009
retriever = self._create_component_from_model(
20122010
model=model.retriever,
20132011
config=config,
@@ -2085,7 +2083,7 @@ def create_declarative_stream(
20852083
cursor = combined_slicers
20862084
elif isinstance(combined_slicers, PartitionRouter):
20872085
stream_slicer = combined_slicers
2088-
else:
2086+
elif concurrent_cursor:
20892087
cursor = concurrent_cursor
20902088

20912089
partition_generator = StreamSlicerPartitionGenerator(

0 commit comments

Comments
 (0)