Skip to content

Commit 689e792

Browse files
author
maxime.c
committed
Remove RFR stuff
1 parent 5fe2e02 commit 689e792

File tree

4 files changed

+28
-218
lines changed

4 files changed

+28
-218
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ def __init__(
8484
# incremental streams running in full refresh.
8585
component_factory = component_factory or ModelToComponentFactory(
8686
emit_connector_builder_messages=emit_connector_builder_messages,
87-
disable_resumable_full_refresh=True,
8887
connector_state_manager=self._connector_state_manager,
8988
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
9089
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datetime
88
import importlib
99
import inspect
10+
import logging
1011
import re
1112
from functools import partial
1213
from typing import (
@@ -544,6 +545,8 @@
544545
StreamSlicer,
545546
StreamSlicerTestReadDecorator,
546547
)
548+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import \
549+
StreamSlicerPartitionGenerator, DeclarativePartitionFactory
547550
from airbyte_cdk.sources.declarative.transformations import (
548551
AddFields,
549552
RecordTransformation,
@@ -604,7 +607,9 @@
604607
WeekClampingStrategy,
605608
Weekday,
606609
)
607-
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
610+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, Cursor, FinalStateCursor
611+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
612+
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
608613
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
609614
CustomFormatConcurrentStreamStateConverter,
610615
DateTimeStreamStateConverter,
@@ -634,7 +639,6 @@ def __init__(
634639
emit_connector_builder_messages: bool = False,
635640
disable_retries: bool = False,
636641
disable_cache: bool = False,
637-
disable_resumable_full_refresh: bool = False,
638642
message_repository: Optional[MessageRepository] = None,
639643
connector_state_manager: Optional[ConnectorStateManager] = None,
640644
max_concurrent_async_job_count: Optional[int] = None,
@@ -645,7 +649,6 @@ def __init__(
645649
self._emit_connector_builder_messages = emit_connector_builder_messages
646650
self._disable_retries = disable_retries
647651
self._disable_cache = disable_cache
648-
self._disable_resumable_full_refresh = disable_resumable_full_refresh
649652
self._message_repository = message_repository or InMemoryMessageRepository(
650653
self._evaluate_log_level(emit_connector_builder_messages)
651654
)
@@ -2035,15 +2038,6 @@ def create_declarative_stream(
20352038
file_uploader=file_uploader,
20362039
incremental_sync=model.incremental_sync,
20372040
)
2038-
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
2039-
2040-
if model.state_migrations:
2041-
state_transformations = [
2042-
self._create_component_from_model(state_migration, config, declarative_stream=model)
2043-
for state_migration in model.state_migrations
2044-
]
2045-
else:
2046-
state_transformations = []
20472041

20482042
schema_loader: Union[
20492043
CompositeSchemaLoader,
@@ -2071,6 +2065,15 @@ def create_declarative_stream(
20712065
options["name"] = model.name
20722066
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20732067

2068+
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
2069+
2070+
if model.state_migrations:
2071+
state_transformations = [
2072+
self._create_component_from_model(state_migration, config, declarative_stream=model)
2073+
for state_migration in model.state_migrations
2074+
]
2075+
else:
2076+
state_transformations = []
20742077
return DeclarativeStream(
20752078
name=model.name or "",
20762079
primary_key=primary_key,
@@ -2185,28 +2188,6 @@ def _build_incremental_cursor(
21852188
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
21862189
return None
21872190

2188-
def _build_resumable_cursor(
2189-
self,
2190-
model: Union[
2191-
AsyncRetrieverModel,
2192-
CustomRetrieverModel,
2193-
SimpleRetrieverModel,
2194-
],
2195-
stream_slicer: Optional[PartitionRouter],
2196-
) -> Optional[StreamSlicer]:
2197-
if hasattr(model, "paginator") and model.paginator and not stream_slicer:
2198-
# For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor`
2199-
return ResumableFullRefreshCursor(parameters={})
2200-
elif stream_slicer:
2201-
# For the Full-Refresh sub-streams, we use the nested `ChildPartitionResumableFullRefreshCursor`
2202-
return PerPartitionCursor(
2203-
cursor_factory=CursorFactory(
2204-
create_function=partial(ChildPartitionResumableFullRefreshCursor, {})
2205-
),
2206-
partition_router=stream_slicer,
2207-
)
2208-
return None
2209-
22102191
def _merge_stream_slicers(
22112192
self, model: DeclarativeStreamModel, config: Config
22122193
) -> Optional[StreamSlicer]:
@@ -2243,11 +2224,7 @@ def _merge_stream_slicers(
22432224
if model.incremental_sync:
22442225
return self._build_incremental_cursor(model, stream_slicer, config)
22452226

2246-
return (
2247-
stream_slicer
2248-
if self._disable_resumable_full_refresh
2249-
else self._build_resumable_cursor(retriever_model, stream_slicer)
2250-
)
2227+
return stream_slicer
22512228

22522229
def create_default_error_handler(
22532230
self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any
@@ -2529,9 +2506,6 @@ def create_schema_type_identifier(
25292506
def create_dynamic_schema_loader(
25302507
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
25312508
) -> DynamicSchemaLoader:
2532-
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
2533-
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)
2534-
25352509
schema_transformations = []
25362510
if model.schema_transformations:
25372511
for transformation_model in model.schema_transformations:
@@ -2544,7 +2518,7 @@ def create_dynamic_schema_loader(
25442518
config=config,
25452519
name=name,
25462520
primary_key=None,
2547-
stream_slicer=combined_slicers,
2521+
stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config),
25482522
transformations=[],
25492523
use_cache=True,
25502524
log_formatter=(
@@ -3808,15 +3782,12 @@ def create_components_mapping_definition(
38083782
def create_http_components_resolver(
38093783
self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None
38103784
) -> Any:
3811-
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
3812-
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)
3813-
38143785
retriever = self._create_component_from_model(
38153786
model=model.retriever,
38163787
config=config,
38173788
name=f"{stream_name if stream_name else '__http_components_resolver'}",
38183789
primary_key=None,
3819-
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
3790+
stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config),
38203791
transformations=[],
38213792
)
38223793

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 1 addition & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,152 +1055,6 @@ def test_stream_with_incremental_and_async_retriever_with_partition_router(use_l
10551055
assert stream_slices == expected_stream_slices
10561056

10571057

1058-
def test_resumable_full_refresh_stream():
1059-
content = """
1060-
decoder:
1061-
type: JsonDecoder
1062-
extractor:
1063-
type: DpathExtractor
1064-
selector:
1065-
type: RecordSelector
1066-
record_filter:
1067-
type: RecordFilter
1068-
condition: "{{ record['id'] > stream_state['id'] }}"
1069-
metadata_paginator:
1070-
type: DefaultPaginator
1071-
page_size_option:
1072-
type: RequestOption
1073-
inject_into: body_json
1074-
field_path: ["variables", "page_size"]
1075-
page_token_option:
1076-
type: RequestPath
1077-
pagination_strategy:
1078-
type: "CursorPagination"
1079-
cursor_value: "{{ response._metadata.next }}"
1080-
page_size: 10
1081-
requester:
1082-
type: HttpRequester
1083-
url_base: "https://api.sendgrid.com/v3/"
1084-
http_method: "GET"
1085-
authenticator:
1086-
type: BearerAuthenticator
1087-
api_token: "{{ config['apikey'] }}"
1088-
request_parameters:
1089-
unit: "day"
1090-
retriever:
1091-
paginator:
1092-
type: NoPagination
1093-
decoder:
1094-
$ref: "#/decoder"
1095-
partial_stream:
1096-
type: DeclarativeStream
1097-
schema_loader:
1098-
type: JsonFileSchemaLoader
1099-
file_path: "./source_sendgrid/schemas/{{ parameters.name }}.json"
1100-
list_stream:
1101-
$ref: "#/partial_stream"
1102-
$parameters:
1103-
name: "lists"
1104-
extractor:
1105-
$ref: "#/extractor"
1106-
field_path: ["{{ parameters['name'] }}"]
1107-
name: "lists"
1108-
primary_key: "id"
1109-
retriever:
1110-
$ref: "#/retriever"
1111-
requester:
1112-
$ref: "#/requester"
1113-
path: "{{ next_page_token['next_page_url'] }}"
1114-
paginator:
1115-
$ref: "#/metadata_paginator"
1116-
record_selector:
1117-
$ref: "#/selector"
1118-
transformations:
1119-
- type: AddFields
1120-
fields:
1121-
- path: ["extra"]
1122-
value: "{{ response.to_add }}"
1123-
check:
1124-
type: CheckStream
1125-
stream_names: ["list_stream"]
1126-
spec:
1127-
type: Spec
1128-
documentation_url: https://airbyte.com/#yaml-from-manifest
1129-
connection_specification:
1130-
title: Test Spec
1131-
type: object
1132-
required:
1133-
- api_key
1134-
additionalProperties: false
1135-
properties:
1136-
api_key:
1137-
type: string
1138-
airbyte_secret: true
1139-
title: API Key
1140-
description: Test API Key
1141-
order: 0
1142-
advanced_auth:
1143-
auth_flow_type: "oauth2.0"
1144-
"""
1145-
parsed_manifest = YamlDeclarativeSource._parse(content)
1146-
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
1147-
resolved_manifest["type"] = "DeclarativeSource"
1148-
manifest = transformer.propagate_types_and_parameters("", resolved_manifest, {})
1149-
1150-
stream_manifest = manifest["list_stream"]
1151-
assert stream_manifest["type"] == "DeclarativeStream"
1152-
stream = factory.create_component(
1153-
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
1154-
)
1155-
1156-
assert isinstance(stream, DeclarativeStream)
1157-
assert stream.primary_key == "id"
1158-
assert stream.name == "lists"
1159-
assert stream._stream_cursor_field.string == ""
1160-
1161-
assert isinstance(stream.retriever, SimpleRetriever)
1162-
assert stream.retriever.primary_key == stream.primary_key
1163-
assert stream.retriever.name == stream.name
1164-
1165-
assert isinstance(stream.retriever.record_selector, RecordSelector)
1166-
1167-
assert isinstance(stream.retriever.stream_slicer, ResumableFullRefreshCursor)
1168-
assert isinstance(stream.retriever.cursor, ResumableFullRefreshCursor)
1169-
1170-
assert isinstance(stream.retriever.paginator, DefaultPaginator)
1171-
assert isinstance(stream.retriever.paginator.decoder, PaginationDecoderDecorator)
1172-
for string in stream.retriever.paginator.page_size_option.field_path:
1173-
assert isinstance(string, InterpolatedString)
1174-
assert len(stream.retriever.paginator.page_size_option.field_path) == 2
1175-
assert stream.retriever.paginator.page_size_option.inject_into == RequestOptionType.body_json
1176-
assert isinstance(stream.retriever.paginator.page_token_option, RequestPath)
1177-
assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/"
1178-
assert stream.retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/"
1179-
1180-
assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy)
1181-
assert isinstance(
1182-
stream.retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator
1183-
)
1184-
assert (
1185-
stream.retriever.paginator.pagination_strategy._cursor_value.string
1186-
== "{{ response._metadata.next }}"
1187-
)
1188-
assert (
1189-
stream.retriever.paginator.pagination_strategy._cursor_value.default
1190-
== "{{ response._metadata.next }}"
1191-
)
1192-
assert stream.retriever.paginator.pagination_strategy.page_size == 10
1193-
1194-
checker = factory.create_component(
1195-
model_type=CheckStreamModel, component_definition=manifest["check"], config=input_config
1196-
)
1197-
1198-
assert isinstance(checker, CheckStream)
1199-
streams_to_check = checker.stream_names
1200-
assert len(streams_to_check) == 1
1201-
assert list(streams_to_check)[0] == "list_stream"
1202-
1203-
12041058
def test_incremental_data_feed():
12051059
content = """
12061060
selector:
@@ -2592,7 +2446,7 @@ def test_default_schema_loader(self):
25922446
"values": "{{config['repos']}}",
25932447
"cursor_field": "a_key",
25942448
},
2595-
PerPartitionCursor,
2449+
ListPartitionRouter,
25962450
id="test_create_simple_retriever_with_partition_router",
25972451
),
25982452
pytest.param(

unit_tests/sources/declarative/test_manifest_declarative_source.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,8 +1818,8 @@ def _create_page(response_body):
18181818
[
18191819
call({}, {}, None),
18201820
call(
1821-
{"next_page_token": "next"},
1822-
{"next_page_token": "next"},
1821+
{},
1822+
{},
18231823
{"next_page_token": "next"},
18241824
),
18251825
],
@@ -1907,16 +1907,9 @@ def _create_page(response_body):
19071907
),
19081908
[{"ABC": 0, "partition": 0}, {"AED": 1, "partition": 0}, {"ABC": 2, "partition": 1}],
19091909
[
1910-
call({"states": []}, {"partition": "0"}, None),
1910+
call({}, {"partition": "0"}, None),
19111911
call(
1912-
{
1913-
"states": [
1914-
{
1915-
"partition": {"partition": "0"},
1916-
"cursor": {"__ab_full_refresh_sync_complete": True},
1917-
}
1918-
]
1919-
},
1912+
{},
19201913
{"partition": "1"},
19211914
None,
19221915
),
@@ -2022,17 +2015,10 @@ def _create_page(response_body):
20222015
{"ABC": 2, "partition": 1},
20232016
],
20242017
[
2025-
call({"states": []}, {"partition": "0"}, None),
2026-
call({"states": []}, {"partition": "0"}, {"next_page_token": "next"}),
2018+
call({}, {"partition": "0"}, None),
2019+
call({}, {"partition": "0"}, {"next_page_token": "next"}),
20272020
call(
2028-
{
2029-
"states": [
2030-
{
2031-
"partition": {"partition": "0"},
2032-
"cursor": {"__ab_full_refresh_sync_complete": True},
2033-
}
2034-
]
2035-
},
2021+
{},
20362022
{"partition": "1"},
20372023
None,
20382024
),
@@ -2201,12 +2187,12 @@ def test_only_parent_streams_use_cache():
22012187

22022188
# Parent stream created for substream
22032189
assert (
2204-
streams[1].retriever.stream_slicer._partition_router.parent_stream_configs[0].stream.name
2190+
streams[1].retriever.stream_slicer.parent_stream_configs[0].stream.name
22052191
== "applications"
22062192
)
22072193
assert (
22082194
streams[1]
2209-
.retriever.stream_slicer._partition_router.parent_stream_configs[0]
2195+
.retriever.stream_slicer.parent_stream_configs[0]
22102196
.stream.retriever.requester.use_cache
22112197
)
22122198

0 commit comments

Comments
 (0)