Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ad36c6e
Add PoC for state delegating retriever
lazebnyi Feb 6, 2025
1f01589
Auto-fix lint and format issues
Feb 6, 2025
a0e5d92
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 6, 2025
3181ac2
Update annotations
lazebnyi Feb 6, 2025
5593d24
Merge master
lazebnyi Feb 6, 2025
f85a68e
Auto-fix lint and format issues
Feb 6, 2025
ff57a28
Update annotations for __getattr__
lazebnyi Feb 6, 2025
e46a88a
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 6, 2025
1e71e63
Fix mypy
lazebnyi Feb 12, 2025
9706535
Add incremental_sync validation
lazebnyi Feb 12, 2025
63e9951
Move async retriever validation to quit faster
lazebnyi Feb 12, 2025
387cf09
Refactor stream slicer merge method
lazebnyi Feb 12, 2025
b78cc6e
Fix errors messages
lazebnyi Feb 12, 2025
53b2980
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 12, 2025
14138ed
Auto-fix lint and format issues
Feb 12, 2025
bb3b176
Refactor _merge_stream_slicers
lazebnyi Feb 12, 2025
66001f1
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1a4b044
Auto-fix lint and format issues
Feb 12, 2025
8cbb9b2
Update retriever validation
lazebnyi Feb 12, 2025
407766d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 12, 2025
1c38282
Auto-fix lint and format issues
Feb 12, 2025
88d5adb
Rollback _merge_stream_slicers
lazebnyi Feb 12, 2025
d3a83a4
Merge master to branch
lazebnyi Feb 12, 2025
666c4fa
Auto-fix lint and format issues
Feb 12, 2025
8c1907a
Add ignore_first_request_options_provider and fix retriever in StateD…
lazebnyi Feb 14, 2025
8417712
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Feb 14, 2025
a05c391
Auto-fix lint and format issues
Feb 14, 2025
f0159de
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Feb 21, 2025
d7b0d25
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 4, 2025
0cd7471
Fix mypy
lazebnyi Mar 4, 2025
8e7b2a3
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 4, 2025
9af489d
Update StateDelegatingRetriever
lazebnyi Mar 6, 2025
06cccc5
Auto-fix lint and format issues
Mar 6, 2025
b218f3a
Update unit test for StateDelegatingRetriever
lazebnyi Mar 6, 2025
b35e1e9
Merge master to branch
lazebnyi Mar 6, 2025
d29bd30
Auto-fix lint and format issues
Mar 6, 2025
bf5c241
Fix mypy
lazebnyi Mar 6, 2025
c70913d
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
6fb23f6
Auto-fix lint and format issues
Mar 6, 2025
43a56ed
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 6, 2025
3481894
Rollback poetry.lock
lazebnyi Mar 6, 2025
35a83cd
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 6, 2025
19d1b22
Fix unit test
lazebnyi Mar 6, 2025
4ef852e
Merge branch 'main' into lazebnyi/add-state-delegating-retriever
lazebnyi Mar 7, 2025
11382f9
Add full_refresh_ignore_min_max_datetime flag
lazebnyi Mar 7, 2025
4862ec1
Auto-fix lint and format issues
Mar 7, 2025
3f92617
Move to a two-retriever instances approach
lazebnyi Mar 7, 2025
9eccc14
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
571ffa9
Auto-fix lint and format issues
Mar 7, 2025
63b156e
Fix mypy
lazebnyi Mar 7, 2025
5bf46a7
Merge branch 'lazebnyi/add-state-delegating-retriever' of github.com:…
lazebnyi Mar 7, 2025
b0d5689
Update cocurrent source
lazebnyi Mar 12, 2025
204726a
Add StateDelegatingStream to schema
lazebnyi Mar 12, 2025
c89bc24
Add component to constructor
lazebnyi Mar 12, 2025
cc759dc
Add model
lazebnyi Mar 12, 2025
35f359d
Update parents resolving
lazebnyi Mar 12, 2025
6632d29
Update stream test
lazebnyi Mar 12, 2025
d2f352a
Remove state delegation retriver implementation
lazebnyi Mar 12, 2025
2846522
Remove state delegation retriver import
lazebnyi Mar 12, 2025
02030b5
Auto-fix lint and format issues
Mar 12, 2025
69bc211
Fix mypy
lazebnyi Mar 12, 2025
c83dce6
Fix mypy
lazebnyi Mar 12, 2025
52ba2ec
Update comment to pass mypy check
lazebnyi Mar 12, 2025
8465c56
Auto-fix lint and format issues
Mar 12, 2025
9e6134d
Remove copy import
lazebnyi Mar 12, 2025
8f2554e
Split unit test to two
lazebnyi Mar 12, 2025
d8222f3
Update child_stat to has_parent_state
lazebnyi Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,7 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
- "$ref": "#/definitions/StateDelegatingRetriever"
incremental_sync:
title: Incremental Sync
description: Component used to fetch data incrementally based on a time field in the data.
Expand Down Expand Up @@ -3103,6 +3104,34 @@ definitions:
$parameters:
type: object
additionalProperties: true
StateDelegatingRetriever:
description: Test state condition retriever.
type: object
required:
- type
- incremental_data_retriever
- full_data_retriever
properties:
type:
type: string
enum: [ StateDelegatingRetriever ]
incremental_data_retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
full_data_retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
$parameters:
type: object
additionalProperties: true
SimpleRetriever:
description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1943,10 +1943,12 @@ class Config:
extra = Extra.allow

type: Literal["DeclarativeStream"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever, StateDelegatingRetriever] = (
Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
Expand Down Expand Up @@ -2202,6 +2204,21 @@ class ParentStreamConfig(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class StateDelegatingRetriever(BaseModel):
type: Literal["StateDelegatingRetriever"]
incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SimpleRetriever(BaseModel):
type: Literal["SimpleRetriever"]
record_selector: RecordSelector = Field(
Expand Down Expand Up @@ -2387,5 +2404,6 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
StateDelegatingRetriever.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
202 changes: 132 additions & 70 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@
SimpleRetriever as SimpleRetrieverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StateDelegatingRetriever as StateDelegatingRetrieverModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
StreamConfig as StreamConfigModel,
)
Expand Down Expand Up @@ -434,6 +437,7 @@
AsyncRetriever,
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
StateDelegatingRetriever,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
Expand Down Expand Up @@ -608,6 +612,7 @@ def _init_mappings(self) -> None:
LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator,
SelectiveAuthenticatorModel: self.create_selective_authenticator,
SimpleRetrieverModel: self.create_simple_retriever,
StateDelegatingRetrieverModel: self.create_state_delegating_retriever,
SpecModel: self.create_spec,
SubstreamPartitionRouterModel: self.create_substream_partition_router,
WaitTimeFromHeaderModel: self.create_wait_time_from_header,
Expand Down Expand Up @@ -1675,7 +1680,12 @@ def create_declarative_stream(

def _build_stream_slicer_from_partition_router(
self,
model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
model: Union[
AsyncRetrieverModel,
CustomRetrieverModel,
SimpleRetrieverModel,
StateDelegatingRetrieverModel,
],
config: Config,
) -> Optional[PartitionRouter]:
if (
Expand All @@ -1698,81 +1708,49 @@ def _build_stream_slicer_from_partition_router(
# Will be created PartitionRouter as stream_slicer_model is model.partition_router
return None

def _build_resumable_cursor_from_paginator(
def _build_incremental_cursor(
self,
model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel],
stream_slicer: Optional[StreamSlicer],
) -> Optional[StreamSlicer]:
if hasattr(model, "paginator") and model.paginator and not stream_slicer:
# For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor`
return ResumableFullRefreshCursor(parameters={})
return None

def _merge_stream_slicers(
self, model: DeclarativeStreamModel, config: Config
model: DeclarativeStreamModel,
stream_slicer: Optional[PartitionRouter],
config: Config,
) -> Optional[StreamSlicer]:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)

if model.incremental_sync and stream_slicer:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if stream_slicer:
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state={},
partition_router=stream_slicer,
)
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state={},
partition_router=stream_slicer,
)

incremental_sync_model = model.incremental_sync
if (
cursor_component = self._create_component_from_model(
model=incremental_sync_model, config=config
)
is_global_cursor = (
hasattr(incremental_sync_model, "global_substream_cursor")
and incremental_sync_model.global_substream_cursor
):
cursor_component = self._create_component_from_model(
model=incremental_sync_model, config=config
)
)

if is_global_cursor:
return GlobalSubstreamCursor(
stream_cursor=cursor_component, partition_router=stream_slicer
)
else:
cursor_component = self._create_component_from_model(
model=incremental_sync_model, config=config
)
return PerPartitionWithGlobalCursor(
cursor_factory=CursorFactory(
lambda: self._create_component_from_model(
model=incremental_sync_model, config=config
),
return PerPartitionWithGlobalCursor(
cursor_factory=CursorFactory(
lambda: self._create_component_from_model(
model=incremental_sync_model, config=config
),
partition_router=stream_slicer,
stream_cursor=cursor_component,
)
),
partition_router=stream_slicer,
stream_cursor=cursor_component,
)
elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the support or unordered slices (for example, when we trigger reports for January and February, the report in February can be completed first). Once we have support for custom concurrent cursor or have a new implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet"
)
if model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever")
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
Expand All @@ -1781,13 +1759,22 @@ def _merge_stream_slicers(
config=config or {},
stream_state_migrations=model.state_migrations,
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
else None
)
elif self._disable_resumable_full_refresh:
return stream_slicer
return self._create_component_from_model(model=model.incremental_sync, config=config)
return None

def _build_resumable_cursor(
self,
model: Union[
AsyncRetrieverModel,
CustomRetrieverModel,
SimpleRetrieverModel,
StateDelegatingRetrieverModel,
],
stream_slicer: Optional[PartitionRouter],
) -> Optional[StreamSlicer]:
if hasattr(model, "paginator") and model.paginator and not stream_slicer:
# For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor`
return ResumableFullRefreshCursor(parameters={})
elif stream_slicer:
# For the Full-Refresh sub-streams, we use the nested `ChildPartitionResumableFullRefreshCursor`
return PerPartitionCursor(
Expand All @@ -1796,7 +1783,40 @@ def _merge_stream_slicers(
),
partition_router=stream_slicer,
)
return self._build_resumable_cursor_from_paginator(model.retriever, stream_slicer)
return None

def _merge_stream_slicers(
self, model: DeclarativeStreamModel, config: Config
) -> Optional[StreamSlicer]:
if model.retriever.type == "StateDelegatingRetriever" and not model.incremental_sync:
raise ValueError("StateDelegatingRetriever requires 'incremental_sync' to be enabled.")

if (
model.retriever.type == "AsyncRetriever"
and model.incremental_sync.type != "DatetimeBasedCursor"
):
# We are currently in a transition to the Concurrent CDK and AsyncRetriever can only work with the
# support or unordered slices (for example, when we trigger reports for January and February, the report
# in February can be completed first). Once we have support for custom concurrent cursor or have a new
# implementation available in the CDK, we can enable more cursors here.
raise ValueError(
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet."
)

if model.retriever.type == "AsyncRetriever" and model.retriever.partition_router:
# Note that this development is also done in parallel to the per partition development which once merged
# we could support here by calling `create_concurrent_cursor_from_perpartition_cursor`
raise ValueError("Per partition state is not supported yet for AsyncRetriever.")

stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)

if self._disable_resumable_full_refresh:
return stream_slicer

if model.incremental_sync:
return self._build_incremental_cursor(model, stream_slicer, config)

return self._build_resumable_cursor(model.retriever, stream_slicer)

def create_default_error_handler(
self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any
Expand Down Expand Up @@ -2057,9 +2077,7 @@ def create_dynamic_schema_loader(
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
) -> DynamicSchemaLoader:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor_from_paginator(
model.retriever, stream_slicer
)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

schema_transformations = []
if model.schema_transformations:
Expand Down Expand Up @@ -2578,6 +2596,52 @@ def create_simple_retriever(
parameters=model.parameters or {},
)

def create_state_delegating_retriever(
self,
model: StateDelegatingRetrieverModel,
config: Config,
*,
name: str,
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
) -> StateDelegatingRetriever:
if not isinstance(stream_slicer, DeclarativeCursor):
raise ValueError("StateDelegatingRetriever requires a DeclarativeCursor")

full_data_retriever = self._create_component_from_model(
model=model.full_data_retriever,
config=config,
name=name,
primary_key=primary_key,
stream_slicer=stream_slicer,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
)

incremental_data_retriever = self._create_component_from_model(
model=model.incremental_data_retriever,
config=config,
name=name,
primary_key=primary_key,
stream_slicer=stream_slicer,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
)

return StateDelegatingRetriever(
full_data_retriever=full_data_retriever,
incremental_data_retriever=incremental_data_retriever,
cursor=stream_slicer,
)

def _create_async_job_status_mapping(
self, model: AsyncJobStatusMapModel, config: Config, **kwargs: Any
) -> Mapping[str, AsyncJobStatus]:
Expand Down Expand Up @@ -2868,9 +2932,7 @@ def create_http_components_resolver(
self, model: HttpComponentsResolverModel, config: Config
) -> Any:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor_from_paginator(
model.retriever, stream_slicer
)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

retriever = self._create_component_from_model(
model=model.retriever,
Expand Down
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/retrievers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.state_delegating_retriever import (
StateDelegatingRetriever,
)

__all__ = ["Retriever", "SimpleRetriever", "SimpleRetrieverTestReadDecorator", "AsyncRetriever"]
__all__ = [
"Retriever",
"SimpleRetriever",
"SimpleRetrieverTestReadDecorator",
"AsyncRetriever",
"StateDelegatingRetriever",
]
Loading
Loading