Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
1d5b468
remove
Jul 31, 2025
76ac6f7
Auto-fix lint and format issues
Jul 31, 2025
2d1e2f4
remove unused file
Jul 31, 2025
b4a5fec
have declarative availability check support AbstractStream
Jul 31, 2025
fc6c6b6
Auto-fix lint and format issues
Jul 31, 2025
5fe2e02
mypy
Jul 31, 2025
1e8e968
Auto-fix lint and format issues
Jul 31, 2025
689e792
Remove RFR stuff
Aug 1, 2025
5399436
have bland stream be instantiated as DefaultStream
Aug 4, 2025
dff2559
fix test
Aug 4, 2025
7dc2164
fix test, format, lint and a bit of mypy
Aug 4, 2025
0bfbdfe
mypy
Aug 4, 2025
0b454bb
format
Aug 4, 2025
13c17f4
remove unused line
Aug 4, 2025
0f36dc5
Merge branch 'main' into maxi297/remove-availability-strategy-except-…
maxi297 Aug 4, 2025
6f95ebb
Merge branch 'maxi297/remove-availability-strategy-except-for-filebas…
maxi297 Aug 4, 2025
c94892a
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 4, 2025
fb75765
fix test
Aug 4, 2025
c078395
lint
Aug 4, 2025
decc557
format
Aug 4, 2025
b8daf64
code review
Aug 4, 2025
e8edc4b
Merge branch 'main' into maxi297/availability_strategy_to_support_abs…
maxi297 Aug 5, 2025
2bc4b30
code review
Aug 5, 2025
98e2227
Merge branch 'maxi297/availability_strategy_to_support_abstract_strea…
Aug 5, 2025
d9d09f0
incremental without partition router as DefaultStream
Aug 5, 2025
1af2264
refactor regarding async stuff
Aug 5, 2025
8c771bb
partially fix mypy
Aug 5, 2025
fb40a6b
fix mypy
Aug 5, 2025
9175283
format
Aug 5, 2025
1d84a49
mypy
Aug 5, 2025
2cba5ff
fix
Aug 5, 2025
8181c83
fix condition where we might override FinalStateCursor with null
Aug 5, 2025
1079629
supports_file_transfer
Aug 6, 2025
f196ea7
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 6, 2025
7f643e4
format
Aug 6, 2025
8566607
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 6, 2025
96f15c3
Merge branch 'main' into maxi297/bland_stream_instantiated_as_default…
Aug 11, 2025
6cb012b
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
11e3a35
format
Aug 11, 2025
86909df
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
maxi297 Aug 11, 2025
cee6157
[WIP still tests failing] Remove DeclarativeStream instantiation
Aug 11, 2025
ebb4b28
more fixes for DefaultStream in Connector Builder
Aug 11, 2025
6fef39b
mypy and format
Aug 11, 2025
e31fed9
format broke mypy
Aug 11, 2025
e996805
Merge branch 'maxi297/bland_stream_instantiated_as_defaultstream' int…
Aug 11, 2025
c68ae59
Merge branch 'maxi297/incremental_without_partition_router_as_default…
Aug 11, 2025
23c9712
fix connector builder tests and format
Aug 11, 2025
f5d1b71
Merging main into branch
Aug 21, 2025
7b704e0
Merge branch 'main' into maxi297/remove_declarative_stream
Aug 21, 2025
6b35969
fix failed merge
Aug 25, 2025
fbfcfd2
adding comments based on code review
Aug 26, 2025
8c6bd42
format
Aug 26, 2025
35db9d7
add test for per_partition_request_option_provider
Aug 26, 2025
f74aa7e
add comment to explain parameter not being propagated sometimes for c…
Aug 26, 2025
76492e9
remove legacy tests that can't be maintained
Aug 26, 2025
2135f18
re-add test
Aug 26, 2025
03ebdc8
Merge branch 'main' into maxi297/remove_declarative_stream
brianjlai Aug 26, 2025
485502e
add slice_limit parameter to StreamSlicerPartitionGenerator so connec…
brianjlai Aug 26, 2025
1bd76e8
fix 4 of 6 test_per_partition_cursor_integration.py tests
brianjlai Aug 27, 2025
9158b67
fix 2 more tests
brianjlai Aug 27, 2025
bb424b8
whatever
brianjlai Aug 27, 2025
6c8771c
clean tests
brianjlai Aug 27, 2025
f9eb050
fix remaining tests in test_per_partition_cursor_integration
Aug 27, 2025
cf8b084
coderabbitai code review
Aug 27, 2025
5286442
add tests, format and lint
Aug 27, 2025
4f46c3e
Merge branch 'main' into maxi297/remove_declarative_stream
Aug 27, 2025
eb635b1
fix linting but break test_per_partition_cursor.py for now
Aug 27, 2025
2b849f7
fix test_per_partition_cursor.py
Aug 27, 2025
ace8739
remove unused code and improve tests
Aug 28, 2025
0f99c9e
emit updated parent before last record, not after
Aug 28, 2025
efd1040
mypy
Aug 28, 2025
a04aead
add typing
Aug 28, 2025
f1cf1de
Merge branch 'main' into maxi297/remove_declarative_stream
Aug 28, 2025
c5b4837
mypy
Aug 28, 2025
bb96293
remove comments that documented the new behavior
Aug 29, 2025
639a734
a bit more cleanup
Aug 29, 2025
1b4b756
more clean up
Aug 29, 2025
c004637
remove unecessary test
Aug 29, 2025
25ca5b8
allow for specific parameters to be passed to custom components
Sep 2, 2025
e5ecf41
fix internal _get_url
Sep 2, 2025
f2f1363
fix case where request option provider is stream slicer
Sep 2, 2025
38cd657
add migration guide
Sep 3, 2025
5bafeca
code review comment
Sep 3, 2025
8bef9dd
format
Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def _init_mappings(self) -> None:
OAuthAuthenticatorModel: self.create_oauth_authenticator,
OffsetIncrementModel: self.create_offset_increment,
PageIncrementModel: self.create_page_increment,
ParentStreamConfigModel: self.create_parent_stream_config,
ParentStreamConfigModel: self._create_message_repository_substream_wrapper,
PredicateValidatorModel: self.create_predicate_validator,
PropertiesFromEndpointModel: self.create_properties_from_endpoint,
PropertyChunkingModel: self.create_property_chunking,
Expand Down Expand Up @@ -1748,7 +1748,7 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->

if self._is_component(model_value):
model_args[model_field] = self._create_nested_component(
model, model_field, model_value, config
model, model_field, model_value, config, **kwargs,
)
elif isinstance(model_value, list):
vals = []
Expand All @@ -1760,7 +1760,7 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
if derived_type:
v["type"] = derived_type
if self._is_component(v):
vals.append(self._create_nested_component(model, model_field, v, config))
vals.append(self._create_nested_component(model, model_field, v, config, **kwargs,))
else:
vals.append(v)
model_args[model_field] = vals
Expand Down Expand Up @@ -1850,7 +1850,7 @@ def _extract_missing_parameters(error: TypeError) -> List[str]:
return []

def _create_nested_component(
self, model: Any, model_field: str, model_value: Any, config: Config
self, model: Any, model_field: str, model_value: Any, config: Config, **kwargs: Any
) -> Any:
type_name = model_value.get("type", None)
if not type_name:
Expand All @@ -1875,8 +1875,11 @@ def _create_nested_component(
for kwarg in constructor_kwargs
if kwarg in model_parameters
}
matching_kwargs = {
kwarg: kwargs[kwarg] for kwarg in constructor_kwargs if kwarg in kwargs
}
return self._create_component_from_model(
model=parsed_model, config=config, **matching_parameters
model=parsed_model, config=config, **(matching_parameters | matching_kwargs)
)
except TypeError as error:
missing_parameters = self._extract_missing_parameters(error)
Expand Down Expand Up @@ -2871,7 +2874,7 @@ def create_page_increment(
)

def create_parent_stream_config(
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
self, model: ParentStreamConfigModel, config: Config, stream_name: str, **kwargs: Any
) -> ParentStreamConfig:
declarative_stream = self._create_component_from_model(
model.stream,
Expand Down Expand Up @@ -3695,11 +3698,11 @@ def create_substream_partition_router(
)

def _create_message_repository_substream_wrapper(
self, model: ParentStreamConfigModel, config: Config, **kwargs: Any
self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any
) -> Any:
# getting the parent state
child_state = self._connector_state_manager.get_stream_state(
kwargs["stream_name"], None
stream_name, None
)

# This flag will be used exclusively for StateDelegatingStream when a parent stream is created
Expand Down Expand Up @@ -3731,8 +3734,8 @@ def _create_message_repository_substream_wrapper(
),
)

return substream_factory._create_component_from_model(
model=model, config=config, has_parent_state=has_parent_state, **kwargs
return substream_factory.create_parent_stream_config(
model=model, config=config, stream_name=stream_name, **kwargs
)

def _instantiate_parent_stream_state_manager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
from airbyte_cdk.sources.streams.call_rate import MovingWindowCallRatePolicy
from airbyte_cdk.sources.streams.concurrent.clamping import (
ClampingEndProvider,
Expand Down Expand Up @@ -944,6 +945,58 @@ def test_stream_with_incremental_and_retriever_with_partition_router():
assert list_stream_slicer._cursor_field.string == "a_key"


def test_stream_with_custom_retriever_and_transformations():
content = """
a_stream:
type: DeclarativeStream
primary_key: id
schema_loader:
type: InlineSchemaLoader
schema:
$schema: "http://json-schema.org/draft-07/schema"
type: object
properties:
id:
type: string
retriever:
type: CustomRetriever
class_name: unit_tests.sources.declarative.parsers.testing_components.TestingCustomRetriever
name: "{{ parameters['name'] }}"
decoder:
type: JsonDecoder
requester:
type: HttpRequester
name: "{{ parameters['name'] }}"
url_base: "https://api.sendgrid.com/v3/"
http_method: "GET"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["records"]
transformations:
- type: AddFields
fields:
- path: ["extra"]
value: "{{ response.to_add }}"
$parameters:
name: a_stream
"""

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
stream_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["a_stream"], {}
)

stream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
)

assert isinstance(stream, DefaultStream)
assert get_retriever(stream).record_selector.transformations


@pytest.mark.parametrize(
"use_legacy_state",
[
Expand Down Expand Up @@ -2053,11 +2106,12 @@ def test_custom_components_do_not_contain_extra_fields():
}

custom_substream_partition_router = factory.create_component(
CustomPartitionRouterModel, custom_substream_partition_router_manifest, input_config
CustomPartitionRouterModel, custom_substream_partition_router_manifest, input_config, stream_name="child_stream_name",
)
assert isinstance(custom_substream_partition_router, TestingCustomSubstreamPartitionRouter)

assert len(custom_substream_partition_router.parent_stream_configs) == 1
assert isinstance(custom_substream_partition_router.parent_stream_configs[0].stream.cursor._message_repository, StateFilteringMessageRepository)
assert custom_substream_partition_router.parent_stream_configs[0].parent_key.eval({}) == "id"
assert (
custom_substream_partition_router.parent_stream_configs[0].partition_field.eval({})
Expand Down Expand Up @@ -2120,7 +2174,7 @@ def test_parse_custom_component_fields_if_subcomponent():
}

custom_substream_partition_router = factory.create_component(
CustomPartitionRouterModel, custom_substream_partition_router_manifest, input_config
CustomPartitionRouterModel, custom_substream_partition_router_manifest, input_config, stream_name="child_stream_name"
)
assert isinstance(custom_substream_partition_router, TestingCustomSubstreamPartitionRouter)
assert custom_substream_partition_router.custom_field == "here"
Expand Down
6 changes: 6 additions & 0 deletions unit_tests/sources/declarative/parsers/testing_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DefaultPaginator,
PaginationStrategy,
)
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever


@dataclass
Expand Down Expand Up @@ -43,3 +44,8 @@ class TestingCustomSubstreamPartitionRouter(SubstreamPartitionRouter):

custom_field: str
custom_pagination_strategy: PaginationStrategy


@dataclass
class TestingCustomRetriever(SimpleRetriever):
pass
Loading