Skip to content

Commit 6632d29

Browse files
committed
Update stream test
1 parent 35f359d commit 6632d29

File tree

4 files changed

+74
-78
lines changed

4 files changed

+74
-78
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3160,12 +3160,20 @@ definitions:
31603160
type: object
31613161
required:
31623162
- type
3163+
- name
31633164
- full_refresh_stream
31643165
- incremental_stream
31653166
properties:
31663167
type:
31673168
type: string
31683169
enum: [ StateDelegatingStream ]
3170+
name:
3171+
title: Name
3172+
description: The stream name.
3173+
type: string
3174+
default: ""
3175+
example:
3176+
- "Users"
31693177
full_refresh_stream:
31703178
title: Retriever
31713179
description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,6 +2248,9 @@ class ParentStreamConfig(BaseModel):
22482248

22492249
class StateDelegatingStream(BaseModel):
22502250
type: Literal["StateDelegatingStream"]
2251+
name: str = Field(
2252+
..., description="The stream name.", example=["Users"], title="Name"
2253+
)
22512254
full_refresh_stream: DeclarativeStream = Field(
22522255
...,
22532256
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,6 @@
355355
SimpleRetriever as SimpleRetrieverModel,
356356
)
357357
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
358-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
359-
StateDelegatingRetriever as StateDelegatingRetrieverModel,
360-
)
361358
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
362359
StreamConfig as StreamConfigModel,
363360
)
@@ -444,7 +441,6 @@
444441
AsyncRetriever,
445442
SimpleRetriever,
446443
SimpleRetrieverTestReadDecorator,
447-
StateDelegatingRetriever,
448444
)
449445
from airbyte_cdk.sources.declarative.schema import (
450446
ComplexFieldType,
@@ -624,7 +620,6 @@ def _init_mappings(self) -> None:
624620
SelectiveAuthenticatorModel: self.create_selective_authenticator,
625621
SimpleRetrieverModel: self.create_simple_retriever,
626622
StateDelegatingStreamModel: self.create_state_delegating_stream,
627-
StateDelegatingRetrieverModel: self.create_state_delegating_retriever,
628623
SpecModel: self.create_spec,
629624
SubstreamPartitionRouterModel: self.create_substream_partition_router,
630625
WaitTimeFromHeaderModel: self.create_wait_time_from_header,
@@ -1785,7 +1780,6 @@ def _build_stream_slicer_from_partition_router(
17851780
AsyncRetrieverModel,
17861781
CustomRetrieverModel,
17871782
SimpleRetrieverModel,
1788-
StateDelegatingRetrieverModel,
17891783
],
17901784
config: Config,
17911785
stream_name: Optional[str] = None,
@@ -1814,29 +1808,6 @@ def _build_incremental_cursor(
18141808
stream_slicer: Optional[PartitionRouter],
18151809
config: Config,
18161810
) -> Optional[StreamSlicer]:
1817-
if model.retriever.type == "StateDelegatingRetriever":
1818-
if not model.incremental_sync:
1819-
raise ValueError(
1820-
"StateDelegatingRetriever requires 'incremental_sync' to be enabled."
1821-
)
1822-
elif model.incremental_sync.type != "DatetimeBasedCursor":
1823-
raise ValueError("StateDelegatingRetriever support only DatetimeBasedCursor.")
1824-
elif model.retriever.full_refresh_no_slice_in_params:
1825-
model.incremental_sync.step = None
1826-
model.incremental_sync.cursor_granularity = None
1827-
model.incremental_sync.start_time_option = None
1828-
model.incremental_sync.end_time_option = None
1829-
elif model.retriever.full_refresh_ignore_min_max_datetime:
1830-
start_datetime = model.incremental_sync.start_datetime
1831-
end_datetime = model.incremental_sync.end_datetime
1832-
1833-
if isinstance(start_datetime, MinMaxDatetimeModel):
1834-
start_datetime.max_datetime = ""
1835-
start_datetime.min_datetime = ""
1836-
if isinstance(end_datetime, MinMaxDatetimeModel):
1837-
end_datetime.max_datetime = ""
1838-
end_datetime.min_datetime = ""
1839-
18401811
if model.incremental_sync and stream_slicer:
18411812
if model.retriever.type == "AsyncRetriever":
18421813
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
@@ -1891,7 +1862,6 @@ def _build_resumable_cursor(
18911862
AsyncRetrieverModel,
18921863
CustomRetrieverModel,
18931864
SimpleRetrieverModel,
1894-
StateDelegatingRetrieverModel,
18951865
],
18961866
stream_slicer: Optional[PartitionRouter],
18971867
) -> Optional[StreamSlicer]:
@@ -1937,14 +1907,6 @@ def _merge_stream_slicers(
19371907
# we could support here by calling create_concurrent_cursor_from_perpartition_cursor
19381908
raise ValueError("Per partition state is not supported yet for AsyncRetriever.")
19391909

1940-
if retriever_model.type == "StateDelegatingRetriever":
1941-
stream_name = model.name or ""
1942-
retriever_model = (
1943-
retriever_model.incremental_retriever
1944-
if self._connector_state_manager.get_stream_state(stream_name, None)
1945-
else retriever_model.full_refresh_retriever
1946-
)
1947-
19481910
stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config)
19491911

19501912
if model.incremental_sync:
@@ -2759,11 +2721,10 @@ def create_simple_retriever(
27592721

27602722
def create_state_delegating_stream(self, model: StateDelegatingStreamModel, config: Config, child_state: Optional[MutableMapping[str, Any]] = None, **kwargs: Any
27612723
) -> DeclarativeStream:
2762-
if model.full_refresh_stream.name != model.incremental_stream.name:
2763-
raise ValueError(f"full_refresh_stream name and incremental_stream must have equal names. Instead has {model.full_refresh_stream.name}, {model.incremental_stream.name}.")
2724+
if model.full_refresh_stream.name != model.name or model.name != model.incremental_stream.name:
2725+
raise ValueError(f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}.")
27642726

2765-
stream_name = model.full_refresh_stream.name
2766-
stream_model = model.incremental_stream if self._connector_state_manager.get_stream_state(stream_name, None) or child_state else model.full_refresh_stream
2727+
stream_model = model.incremental_stream if self._connector_state_manager.get_stream_state(model.name, None) or child_state else model.full_refresh_stream
27672728

27682729
return self._create_component_from_model(stream_model, config=config, **kwargs)
27692730

unit_tests/sources/declarative/retrievers/test_state_delegating_retriever.py renamed to unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,21 @@
3030
"check": {"type": "CheckStream", "stream_names": ["TestStream"]},
3131
"definitions": {
3232
"TestStream": {
33-
"type": "DeclarativeStream",
33+
"type": "StateDelegatingStream",
3434
"name": "TestStream",
35-
"primary_key": [],
36-
"schema_loader": {
37-
"type": "InlineSchemaLoader",
38-
"schema": {
39-
"$schema": "http://json-schema.org/schema#",
40-
"properties": {},
41-
"type": "object",
35+
"full_refresh_stream": {
36+
"type": "DeclarativeStream",
37+
"name": "TestStream",
38+
"primary_key": [],
39+
"schema_loader": {
40+
"type": "InlineSchemaLoader",
41+
"schema": {
42+
"$schema": "http://json-schema.org/schema#",
43+
"properties": {},
44+
"type": "object",
45+
},
4246
},
43-
},
44-
"retriever": {
45-
"type": "StateDelegatingRetriever",
46-
"full_refresh_retriever": {
47+
"retriever": {
4748
"type": "SimpleRetriever",
4849
"requester": {
4950
"type": "HttpRequester",
@@ -56,7 +57,30 @@
5657
"extractor": {"type": "DpathExtractor", "field_path": []},
5758
},
5859
},
59-
"incremental_retriever": {
60+
"incremental_sync": {
61+
"type": "DatetimeBasedCursor",
62+
"start_datetime": {
63+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
64+
},
65+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
66+
"datetime_format": "%Y-%m-%d",
67+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
68+
"cursor_field": "updated_at",
69+
},
70+
},
71+
"incremental_stream": {
72+
"type": "DeclarativeStream",
73+
"name": "TestStream",
74+
"primary_key": [],
75+
"schema_loader": {
76+
"type": "InlineSchemaLoader",
77+
"schema": {
78+
"$schema": "http://json-schema.org/schema#",
79+
"properties": {},
80+
"type": "object",
81+
},
82+
},
83+
"retriever": {
6084
"type": "SimpleRetriever",
6185
"requester": {
6286
"type": "HttpRequester",
@@ -69,29 +93,29 @@
6993
"extractor": {"type": "DpathExtractor", "field_path": []},
7094
},
7195
},
72-
},
73-
"incremental_sync": {
74-
"type": "DatetimeBasedCursor",
75-
"start_datetime": {
76-
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
77-
},
78-
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
79-
"datetime_format": "%Y-%m-%d",
80-
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
81-
"cursor_granularity": "P1D",
82-
"step": "P15D",
83-
"cursor_field": "updated_at",
84-
"start_time_option": {
85-
"type": "RequestOption",
86-
"field_name": "start",
87-
"inject_into": "request_parameter",
88-
},
89-
"end_time_option": {
90-
"type": "RequestOption",
91-
"field_name": "end",
92-
"inject_into": "request_parameter",
96+
"incremental_sync": {
97+
"type": "DatetimeBasedCursor",
98+
"start_datetime": {
99+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
100+
},
101+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
102+
"datetime_format": "%Y-%m-%d",
103+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
104+
"cursor_granularity": "P1D",
105+
"step": "P15D",
106+
"cursor_field": "updated_at",
107+
"start_time_option": {
108+
"type": "RequestOption",
109+
"field_name": "start",
110+
"inject_into": "request_parameter",
111+
},
112+
"end_time_option": {
113+
"type": "RequestOption",
114+
"field_name": "end",
115+
"inject_into": "request_parameter",
116+
},
93117
},
94-
},
118+
}
95119
},
96120
},
97121
"streams": [{"$ref": "#/definitions/TestStream"}],
@@ -166,7 +190,7 @@ def get_records(
166190
def test_state_retriever():
167191
with HttpMocker() as http_mocker:
168192
http_mocker.get(
169-
HttpRequest(url="https://api.test.com/items?start=2024-07-01&end=2024-07-15"),
193+
HttpRequest(url="https://api.test.com/items"),
170194
HttpResponse(
171195
body=json.dumps(
172196
[

0 commit comments

Comments
 (0)