Skip to content

Commit 3f92617

Browse files
committed
Move to a two-retriever instances approach
1 parent 11382f9 commit 3f92617

File tree

3 files changed

+101
-7
lines changed

3 files changed

+101
-7
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@
441441
AsyncRetriever,
442442
SimpleRetriever,
443443
SimpleRetrieverTestReadDecorator,
444+
StateDelegatingRetriever,
444445
)
445446
from airbyte_cdk.sources.declarative.schema import (
446447
ComplexFieldType,
@@ -2759,15 +2760,24 @@ def create_state_delegating_retriever(
27592760
stop_condition_on_cursor: bool = False,
27602761
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
27612762
transformations: List[RecordTransformation],
2762-
) -> Optional[SimpleRetriever]:
2763-
retriever_model = (
2764-
model.incremental_retriever
2765-
if self._connector_state_manager.get_stream_state(name, None)
2766-
else model.full_refresh_retriever
2763+
) -> Optional[StateDelegatingRetriever]:
2764+
if not isinstance(stream_slicer, DatetimeBasedCursor) and not isinstance(stream_slicer, PerPartitionCursor):
2765+
raise ValueError("StateDelegatingRetriever requires a DatetimeBasedCursor")
2766+
2767+
full_refresh_retriever = self._create_component_from_model(
2768+
model=model.full_refresh_retriever,
2769+
config=config,
2770+
name=name,
2771+
primary_key=primary_key,
2772+
stream_slicer=stream_slicer,
2773+
request_options_provider=request_options_provider,
2774+
stop_condition_on_cursor=stop_condition_on_cursor,
2775+
client_side_incremental_sync=client_side_incremental_sync,
2776+
transformations=transformations,
27672777
)
27682778

2769-
return self._create_component_from_model( # type: ignore[no-any-return] # Will be created SimpleRetriever
2770-
model=retriever_model,
2779+
incremental_retriever = self._create_component_from_model(
2780+
model=model.incremental_retriever,
27712781
config=config,
27722782
name=name,
27732783
primary_key=primary_key,
@@ -2778,6 +2788,13 @@ def create_state_delegating_retriever(
27782788
transformations=transformations,
27792789
)
27802790

2791+
return StateDelegatingRetriever(
2792+
full_data_retriever=full_refresh_retriever,
2793+
incremental_data_retriever=incremental_retriever,
2794+
cursor=stream_slicer,
2795+
started_with_state=bool(self._connector_state_manager.get_stream_state(name, None))
2796+
)
2797+
27812798
def _create_async_job_status_mapping(
27822799
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
27832800
) -> Mapping[str, AsyncJobStatus]:

airbyte_cdk/sources/declarative/retrievers/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
SimpleRetriever,
99
SimpleRetrieverTestReadDecorator,
1010
)
11+
from airbyte_cdk.sources.declarative.retrievers.state_delegating_retriever import (
12+
StateDelegatingRetriever,
13+
)
1114

1215
__all__ = [
1316
"Retriever",
1417
"SimpleRetriever",
1518
"SimpleRetrieverTestReadDecorator",
1619
"AsyncRetriever",
20+
"StateDelegatingRetriever",
1721
]
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass, field
6+
from typing import (
7+
Any,
8+
Mapping,
9+
)
10+
11+
from typing_extensions import deprecated
12+
13+
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
14+
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
15+
from airbyte_cdk.sources.source import ExperimentalClassWarning
16+
17+
18+
@deprecated(
19+
"This class is experimental. Use at your own risk.",
20+
category=ExperimentalClassWarning,
21+
)
22+
@dataclass
23+
class StateDelegatingRetriever:
24+
full_data_retriever: Retriever
25+
incremental_data_retriever: Retriever
26+
cursor: DeclarativeCursor
27+
started_with_state: bool = field(init=True, repr=False, default=False)
28+
29+
def __getattr__(self, name: str) -> Any:
30+
# Avoid delegation for these internal names.
31+
if name in {
32+
"full_data_retriever",
33+
"incremental_data_retriever",
34+
"cursor",
35+
"retriever",
36+
"state",
37+
}:
38+
return object.__getattribute__(self, name)
39+
# Delegate everything else to the active retriever.
40+
return getattr(self.retriever, name)
41+
42+
def __setattr__(self, name: str, value: Any) -> None:
43+
# For the internal attributes, set them directly on self.
44+
if name in {
45+
"full_data_retriever",
46+
"incremental_data_retriever",
47+
"cursor",
48+
"state",
49+
"started_with_state",
50+
}:
51+
super().__setattr__(name, value)
52+
else:
53+
# Delegate setting attributes to the underlying retriever.
54+
setattr(self.retriever, name, value)
55+
56+
@property
57+
def retriever(self) -> Retriever:
58+
return (
59+
self.incremental_data_retriever
60+
if self.started_with_state
61+
else self.full_data_retriever
62+
)
63+
64+
@property
65+
def state(self) -> Mapping[str, Any]:
66+
return self.cursor.get_stream_state() if self.cursor else {}
67+
68+
@state.setter
69+
def state(self, value: Mapping[str, Any]) -> None:
70+
"""State setter, accept state serialized by state getter."""
71+
if self.cursor:
72+
self.cursor.set_initial_state(value)
73+
self.started_with_state = bool(value)

0 commit comments

Comments
 (0)