Skip to content

Commit bf5c241

Browse files
committed
Fix mypy
1 parent b35e1e9 commit bf5c241

File tree

2 files changed

+17
-16
lines changed

2 files changed

+17
-16
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,12 +215,12 @@ def _group_streams(
215215
):
216216
incremental_sync_component_definition = name_to_stream_mapping[
217217
declarative_stream.name
218-
].get("incremental_sync")
218+
].get("incremental_sync", {})
219219

220220
if (
221221
name_to_stream_mapping[declarative_stream.name]
222222
.get("retriever", {})
223-
.get("full_refresh_no_slice_in_params", False)
223+
.get("full_refresh_no_slice_in_params", False) and incremental_sync_component_definition
224224
):
225225
incremental_sync_component_definition["step"] = None
226226
incremental_sync_component_definition["cursor_granularity"] = None

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,14 +1808,17 @@ def _build_incremental_cursor(
18081808
stream_slicer: Optional[PartitionRouter],
18091809
config: Config,
18101810
) -> Optional[StreamSlicer]:
1811-
if (
1812-
model.retriever.type == "StateDelegatingRetriever"
1813-
and model.retriever.full_refresh_no_slice_in_params
1814-
):
1815-
model.incremental_sync.step = None
1816-
model.incremental_sync.cursor_granularity = None
1817-
model.incremental_sync.start_time_option = None
1818-
model.incremental_sync.end_time_option = None
1811+
1812+
if model.retriever.type == "StateDelegatingRetriever":
1813+
if not model.incremental_sync:
1814+
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")
1815+
elif model.incremental_sync.type != "DatetimeBasedCursor":
1816+
raise ValueError("StateDelegatingRetriever support only DatetimeBasedCursor.")
1817+
elif model.retriever.full_refresh_no_slice_in_params:
1818+
model.incremental_sync.step = None
1819+
model.incremental_sync.cursor_granularity = None
1820+
model.incremental_sync.start_time_option = None
1821+
model.incremental_sync.end_time_option = None
18191822

18201823
if model.incremental_sync and stream_slicer:
18211824
if model.retriever.type == "AsyncRetriever":
@@ -1893,9 +1896,6 @@ def _merge_stream_slicers(
18931896
) -> Optional[StreamSlicer]:
18941897
retriever_model = model.retriever
18951898

1896-
if retriever_model.type == "StateDelegatingRetriever" and not model.incremental_sync:
1897-
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")
1898-
18991899
if retriever_model.type == "AsyncRetriever":
19001900
is_not_datetime_cursor = (
19011901
model.incremental_sync.type != "DatetimeBasedCursor"
@@ -1921,9 +1921,10 @@ def _merge_stream_slicers(
19211921
raise ValueError("Per partition state is not supported yet for AsyncRetriever.")
19221922

19231923
if retriever_model.type == "StateDelegatingRetriever":
1924+
stream_name = model.name or ""
19241925
retriever_model = (
19251926
retriever_model.incremental_retriever
1926-
if self._connector_state_manager.get_stream_state(model.name, None)
1927+
if self._connector_state_manager.get_stream_state(stream_name, None)
19271928
else retriever_model.full_refresh_retriever
19281929
)
19291930

@@ -2752,14 +2753,14 @@ def create_state_delegating_retriever(
27522753
stop_condition_on_cursor: bool = False,
27532754
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
27542755
transformations: List[RecordTransformation],
2755-
) -> SimpleRetriever:
2756+
) -> Optional[SimpleRetriever]:
27562757
retriever_model = (
27572758
model.incremental_retriever
27582759
if self._connector_state_manager.get_stream_state(name, None)
27592760
else model.full_refresh_retriever
27602761
)
27612762

2762-
return self._create_component_from_model(
2763+
return self._create_component_from_model( # type: ignore[no-any-return] # Will be created SimpleRetriever
27632764
model=retriever_model,
27642765
config=config,
27652766
name=name,

0 commit comments

Comments
 (0)