Skip to content

Commit 35f359d

Browse files
committed
Update parents resolving
1 parent cc759dc commit 35f359d

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
32+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
33+
StateDelegatingStream as StateDelegatingStreamModel,
34+
)
3235
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
3336
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
3437
get_registered_components_module,
@@ -143,7 +146,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
143146

144147
source_streams = [
145148
self._constructor.create_component(
146-
DeclarativeStreamModel,
149+
StateDelegatingStreamModel if stream_config.get("type") == StateDelegatingStreamModel.__name__ else DeclarativeStreamModel,
147150
stream_config,
148151
config,
149152
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -162,11 +165,11 @@ def _initialize_cache_for_parent_streams(
162165
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
163166
for parent_config in parent_configs:
164167
parent_streams.add(parent_config["stream"]["name"])
165-
if parent_config["stream"]["retriever"]["type"] == "StateDelegatingRetriever":
166-
parent_config["stream"]["retriever"]["full_refresh_retriever"]["requester"][
168+
if parent_config["stream"]["type"] == "StateDelegatingStream":
169+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
167170
"use_cache"
168171
] = True
169-
parent_config["stream"]["retriever"]["incremental_retriever"]["requester"][
172+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
170173
"use_cache"
171174
] = True
172175
else:
@@ -193,11 +196,11 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
193196

194197
for stream_config in stream_configs:
195198
if stream_config["name"] in parent_streams:
196-
if stream_config["retriever"]["type"] == "StateDelegatingRetriever":
197-
stream_config["retriever"]["full_refresh_retriever"]["requester"][
199+
if stream_config["type"] == "StateDelegatingStream":
200+
stream_config["full_refresh_stream"]["retriever"]["requester"][
198201
"use_cache"
199202
] = True
200-
stream_config["retriever"]["incremental_retriever"]["requester"][
203+
stream_config["incremental_stream"]["retriever"]["requester"][
201204
"use_cache"
202205
] = True
203206
else:

0 commit comments

Comments
 (0)