Skip to content

Commit 639a734

Browse files
author
maxime.c
committed
a bit more cleanup
1 parent bb96293 commit 639a734

File tree

3 files changed

+8
-10
lines changed

3 files changed

+8
-10
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ def _group_streams(
703703
stream_slicer=declarative_stream.retriever.stream_slicer,
704704
slice_limit=self._limits.max_slices
705705
if self._limits
706-
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
706+
else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
707707
)
708708
else:
709709
if (
@@ -772,7 +772,7 @@ def _group_streams(
772772
declarative_stream.retriever.stream_slicer,
773773
slice_limit=self._limits.max_slices
774774
if self._limits
775-
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
775+
else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
776776
)
777777

778778
final_state_cursor = FinalStateCursor(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ def _init_mappings(self) -> None:
715715
CustomValidationStrategyModel: self.create_custom_component,
716716
CustomConfigTransformationModel: self.create_custom_component,
717717
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
718-
DeclarativeStreamModel: self.create_declarative_stream,
718+
DeclarativeStreamModel: self.create_default_stream,
719719
DefaultErrorHandlerModel: self.create_default_error_handler,
720720
DefaultPaginatorModel: self.create_default_paginator,
721721
DpathExtractorModel: self.create_dpath_extractor,
@@ -1960,7 +1960,7 @@ def create_datetime_based_cursor(
19601960
parameters=model.parameters or {},
19611961
)
19621962

1963-
def create_declarative_stream(
1963+
def create_default_stream(
19641964
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19651965
) -> Union[DeclarativeStream, AbstractStream]:
19661966
primary_key = model.primary_key.__root__ if model.primary_key else None
@@ -1970,7 +1970,7 @@ def create_declarative_stream(
19701970
)
19711971
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
19721972
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
1973-
cursor_model = model.incremental_sync
1973+
cursor_model: DatetimeBasedCursorModel = model.incremental_sync
19741974

19751975
end_time_option = (
19761976
self._create_component_from_model(
@@ -1990,7 +1990,7 @@ def create_declarative_stream(
19901990
datetime_request_options_provider = DatetimeBasedRequestOptionsProvider(
19911991
start_time_option=start_time_option,
19921992
end_time_option=end_time_option,
1993-
partition_field_start=cursor_model.partition_field_end,
1993+
partition_field_start=cursor_model.partition_field_start,
19941994
partition_field_end=cursor_model.partition_field_end,
19951995
config=config,
19961996
parameters=model.parameters or {},
@@ -2117,7 +2117,6 @@ def create_declarative_stream(
21172117
if hasattr(concurrent_cursor, "cursor_field")
21182118
else "", # FIXME we should have the cursor field has part of the interface of cursor,
21192119
logger=logging.getLogger(f"airbyte.{stream_name}"),
2120-
# FIXME this is a breaking change compared to the old implementation which used the source name instead
21212120
cursor=concurrent_cursor,
21222121
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
21232122
)
@@ -3484,7 +3483,7 @@ def create_state_delegating_stream(
34843483
False if has_parent_state is None else has_parent_state, model
34853484
)
34863485

3487-
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # Will be created DeclarativeStream as stream_model is stream description
3486+
return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel
34883487

34893488
def _get_state_delegating_stream_model(
34903489
self, has_parent_state: bool, model: StateDelegatingStreamModel
@@ -3811,7 +3810,7 @@ def _create_message_repository_substream_wrapper(
38113810
# getting the parent state
38123811
child_state = self._connector_state_manager.get_stream_state(
38133812
kwargs["stream_name"], None
3814-
) # FIXME adding `stream_name` as a parameter means it will be a breaking change. I assume this is mostly called internally so I don't think we need to bother that much about this but still raising the flag
3813+
)
38153814

38163815
# This flag will be used exclusively for StateDelegatingStream when a parent stream is created
38173816
has_parent_state = bool(

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ def stream_slices(self) -> Iterable[StreamSlice]:
261261
extra_fields=extracted_extra_fields,
262262
)
263263

264-
parent_stream.cursor.ensure_at_least_one_state_emitted()
265264
yield from []
266265

267266
def _extract_child_response(

0 commit comments

Comments
 (0)