Skip to content

Commit c89bc24

Browse files
committed
Add component to constructor
1 parent 204726a commit c89bc24

File tree

1 file changed

+19
-53
lines changed

1 file changed

+19
-53
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@
204204
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
205205
DeclarativeStream as DeclarativeStreamModel,
206206
)
207+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
208+
StateDelegatingStream as StateDelegatingStreamModel,
209+
)
207210
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
208211
DefaultErrorHandler as DefaultErrorHandlerModel,
209212
)
@@ -620,6 +623,7 @@ def _init_mappings(self) -> None:
620623
LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator,
621624
SelectiveAuthenticatorModel: self.create_selective_authenticator,
622625
SimpleRetrieverModel: self.create_simple_retriever,
626+
StateDelegatingStreamModel: self.create_state_delegating_stream,
623627
StateDelegatingRetrieverModel: self.create_state_delegating_retriever,
624628
SpecModel: self.create_spec,
625629
SubstreamPartitionRouterModel: self.create_substream_partition_router,
@@ -1784,6 +1788,7 @@ def _build_stream_slicer_from_partition_router(
17841788
StateDelegatingRetrieverModel,
17851789
],
17861790
config: Config,
1791+
stream_name: Optional[str] = None,
17871792
) -> Optional[PartitionRouter]:
17881793
if (
17891794
hasattr(model, "partition_router")
@@ -1794,13 +1799,13 @@ def _build_stream_slicer_from_partition_router(
17941799
if isinstance(stream_slicer_model, list):
17951800
return CartesianProductStreamSlicer(
17961801
[
1797-
self._create_component_from_model(model=slicer, config=config)
1802+
self._create_component_from_model(model=slicer, config=config, stream_name=stream_name or "")
17981803
for slicer in stream_slicer_model
17991804
],
18001805
parameters={},
18011806
)
18021807
else:
1803-
return self._create_component_from_model(model=stream_slicer_model, config=config) # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
1808+
return self._create_component_from_model(model=stream_slicer_model, config=config, stream_name=stream_name or "") # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
18041809
return None
18051810

18061811
def _build_incremental_cursor(
@@ -2511,7 +2516,7 @@ def create_page_increment(
25112516
def create_parent_stream_config(
25122517
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
25132518
) -> ParentStreamConfig:
2514-
declarative_stream = self._create_component_from_model(model.stream, config=config)
2519+
declarative_stream = self._create_component_from_model(model.stream, config=config, **kwargs)
25152520
request_option = (
25162521
self._create_component_from_model(model.request_option, config=config)
25172522
if model.request_option
@@ -2665,7 +2670,6 @@ def create_simple_retriever(
26652670
request_options_provider: Optional[RequestOptionsProvider] = None,
26662671
stop_condition_on_cursor: bool = False,
26672672
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
2668-
cursor: Optional[DeclarativeCursor] = None,
26692673
transformations: List[RecordTransformation],
26702674
) -> SimpleRetriever:
26712675
decoder = (
@@ -2753,54 +2757,15 @@ def create_simple_retriever(
27532757
parameters=model.parameters or {},
27542758
)
27552759

2756-
def create_state_delegating_retriever(
2757-
self,
2758-
model: StateDelegatingRetrieverModel,
2759-
config: Config,
2760-
*,
2761-
name: str,
2762-
primary_key: Optional[Union[str, List[str], List[List[str]]]],
2763-
stream_slicer: Optional[StreamSlicer],
2764-
request_options_provider: Optional[RequestOptionsProvider] = None,
2765-
stop_condition_on_cursor: bool = False,
2766-
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
2767-
transformations: List[RecordTransformation],
2768-
) -> Optional[StateDelegatingRetriever]:
2769-
if not isinstance(stream_slicer, DatetimeBasedCursor) and not isinstance(
2770-
stream_slicer, PerPartitionCursor
2771-
):
2772-
raise ValueError("StateDelegatingRetriever requires a DatetimeBasedCursor")
2773-
2774-
full_refresh_retriever = self._create_component_from_model(
2775-
model=model.full_refresh_retriever,
2776-
config=config,
2777-
name=name,
2778-
primary_key=primary_key,
2779-
stream_slicer=stream_slicer,
2780-
request_options_provider=request_options_provider,
2781-
stop_condition_on_cursor=stop_condition_on_cursor,
2782-
client_side_incremental_sync=client_side_incremental_sync,
2783-
transformations=transformations,
2784-
)
2760+
def create_state_delegating_stream(self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any
2761+
) -> DeclarativeStream:
2762+
if model.full_refresh_stream.name != model.incremental_stream.name:
2763+
raise ValueError(f"full_refresh_stream name and incremental_stream must have equal names. Instead has {model.full_refresh_stream.name}, {model.incremental_stream.name}.")
27852764

2786-
incremental_retriever = self._create_component_from_model(
2787-
model=model.incremental_retriever,
2788-
config=config,
2789-
name=name,
2790-
primary_key=primary_key,
2791-
stream_slicer=stream_slicer,
2792-
request_options_provider=request_options_provider,
2793-
stop_condition_on_cursor=stop_condition_on_cursor,
2794-
client_side_incremental_sync=client_side_incremental_sync,
2795-
transformations=transformations,
2796-
)
2765+
stream_name = model.full_refresh_stream.name
2766+
stream_model = model.incremental_stream if self._connector_state_manager.get_stream_state(stream_name, None) or child_state else model.full_refresh_stream
27972767

2798-
return StateDelegatingRetriever(
2799-
full_data_retriever=full_refresh_retriever,
2800-
incremental_data_retriever=incremental_retriever,
2801-
cursor=stream_slicer,
2802-
started_with_state=bool(self._connector_state_manager.get_stream_state(name, None)),
2803-
)
2768+
return self._create_component_from_model(stream_model, config=config, **kwargs)
28042769

28052770
def _create_async_job_status_mapping(
28062771
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
@@ -3028,7 +2993,7 @@ def create_substream_partition_router(
30282993
parent_stream_configs.extend(
30292994
[
30302995
self._create_message_repository_substream_wrapper(
3031-
model=parent_stream_config, config=config
2996+
model=parent_stream_config, config=config, **kwargs
30322997
)
30332998
for parent_stream_config in model.parent_stream_configs
30342999
]
@@ -3041,7 +3006,7 @@ def create_substream_partition_router(
30413006
)
30423007

30433008
def _create_message_repository_substream_wrapper(
3044-
self, model: ParentStreamConfigModel, config: Config
3009+
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
30453010
) -> Any:
30463011
substream_factory = ModelToComponentFactory(
30473012
limit_pages_fetched_per_slice=self._limit_pages_fetched_per_slice,
@@ -3055,7 +3020,8 @@ def _create_message_repository_substream_wrapper(
30553020
self._evaluate_log_level(self._emit_connector_builder_messages),
30563021
),
30573022
)
3058-
return substream_factory._create_component_from_model(model=model, config=config)
3023+
child_state = self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None) if model.incremental_dependency or False else None
3024+
return substream_factory._create_component_from_model(model=model, config=config, child_state=child_state, **kwargs)
30593025

30603026
@staticmethod
30613027
def create_wait_time_from_header(

0 commit comments

Comments
 (0)