Skip to content

Commit b35e1e9

Browse files
committed
Merge master to branch
2 parents b218f3a + 06cccc5 commit b35e1e9

File tree

3 files changed

+31
-9
lines changed

3 files changed

+31
-9
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,11 @@ def _group_streams(
217217
declarative_stream.name
218218
].get("incremental_sync")
219219

220-
if name_to_stream_mapping[declarative_stream.name].get("retriever", {}).get("full_refresh_no_slice_in_params", False):
220+
if (
221+
name_to_stream_mapping[declarative_stream.name]
222+
.get("retriever", {})
223+
.get("full_refresh_no_slice_in_params", False)
224+
):
221225
incremental_sync_component_definition["step"] = None
222226
incremental_sync_component_definition["cursor_granularity"] = None
223227
incremental_sync_component_definition["start_time_option"] = None

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,12 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
163163
for parent_config in parent_configs:
164164
parent_streams.add(parent_config["stream"]["name"])
165165
if parent_config["stream"]["retriever"]["type"] == "StateDelegatingRetriever":
166-
parent_config["stream"]["retriever"]["full_refresh_retriever"]["requester"]["use_cache"] = True
167-
parent_config["stream"]["retriever"]["incremental_retriever"]["requester"]["use_cache"] = True
166+
parent_config["stream"]["retriever"]["full_refresh_retriever"]["requester"][
167+
"use_cache"
168+
] = True
169+
parent_config["stream"]["retriever"]["incremental_retriever"]["requester"][
170+
"use_cache"
171+
] = True
168172
else:
169173
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
170174

@@ -190,8 +194,12 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
190194
for stream_config in stream_configs:
191195
if stream_config["name"] in parent_streams:
192196
if stream_config["retriever"]["type"] == "StateDelegatingRetriever":
193-
stream_config["retriever"]["full_refresh_retriever"]["requester"]["use_cache"] = True
194-
stream_config["retriever"]["incremental_retriever"]["requester"]["use_cache"] = True
197+
stream_config["retriever"]["full_refresh_retriever"]["requester"][
198+
"use_cache"
199+
] = True
200+
stream_config["retriever"]["incremental_retriever"]["requester"][
201+
"use_cache"
202+
] = True
195203
else:
196204
stream_config["retriever"]["requester"]["use_cache"] = True
197205

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,7 +1808,10 @@ def _build_incremental_cursor(
18081808
stream_slicer: Optional[PartitionRouter],
18091809
config: Config,
18101810
) -> Optional[StreamSlicer]:
1811-
if model.retriever.type == "StateDelegatingRetriever" and model.retriever.full_refresh_no_slice_in_params:
1811+
if (
1812+
model.retriever.type == "StateDelegatingRetriever"
1813+
and model.retriever.full_refresh_no_slice_in_params
1814+
):
18121815
model.incremental_sync.step = None
18131816
model.incremental_sync.cursor_granularity = None
18141817
model.incremental_sync.start_time_option = None
@@ -1918,7 +1921,11 @@ def _merge_stream_slicers(
19181921
raise ValueError("Per partition state is not supported yet for AsyncRetriever.")
19191922

19201923
if retriever_model.type == "StateDelegatingRetriever":
1921-
retriever_model = retriever_model.incremental_retriever if self._connector_state_manager.get_stream_state(model.name, None) else retriever_model.full_refresh_retriever
1924+
retriever_model = (
1925+
retriever_model.incremental_retriever
1926+
if self._connector_state_manager.get_stream_state(model.name, None)
1927+
else retriever_model.full_refresh_retriever
1928+
)
19221929

19231930
stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config)
19241931

@@ -2746,8 +2753,11 @@ def create_state_delegating_retriever(
27462753
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
27472754
transformations: List[RecordTransformation],
27482755
) -> SimpleRetriever:
2749-
2750-
retriever_model = model.incremental_retriever if self._connector_state_manager.get_stream_state(name, None) else model.full_refresh_retriever
2756+
retriever_model = (
2757+
model.incremental_retriever
2758+
if self._connector_state_manager.get_stream_state(name, None)
2759+
else model.full_refresh_retriever
2760+
)
27512761

27522762
return self._create_component_from_model(
27532763
model=retriever_model,

0 commit comments

Comments
 (0)