Skip to content

Commit 5d20ab9

Browse files
committed
add ConditionalStreams component to low-code framework
1 parent e44362a commit 5d20ab9

File tree

7 files changed

+404
-30
lines changed

7 files changed

+404
-30
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,4 @@ repos:
7474
args: [--config-file=mypy.ini, --show-column-numbers]
7575
files: ^airbyte_cdk/
7676
pass_filenames: true
77+
additional_dependencies: ["types-requests", "types-PyYAML"]

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ properties:
2525
type: array
2626
items:
2727
anyOf:
28+
- "$ref": "#/definitions/ConditionalStreams"
2829
- "$ref": "#/definitions/DeclarativeStream"
2930
- "$ref": "#/definitions/StateDelegatingStream"
3031
dynamic_streams:
@@ -424,6 +425,36 @@ definitions:
424425
$parameters:
425426
type: object
426427
additionalProperties: true
428+
ConditionalStreams:
429+
title: Conditional Streams
430+
description: Streams that are only available while performing a connector operation when the condition is met.
431+
type: object
432+
required:
433+
- type
434+
- streams
435+
- condition
436+
properties:
437+
type:
438+
type: string
439+
enum: [ConditionalStreams]
440+
condition:
441+
title: Condition
442+
description: Condition that will be evaluated to determine if a set of streams should be available.
443+
type: string
444+
interpolation_context:
445+
- config
446+
- parameters
447+
examples:
448+
- "{{ config['is_sandbox'] }}"
449+
streams:
450+
title: Streams
451+
description: Streams that will be used during an operation based on the condition.
452+
type: array
453+
items:
454+
"$ref": "#/definitions/DeclarativeStream"
455+
$parameters:
456+
type: object
457+
additionalProperties: true
427458
ConstantBackoffStrategy:
428459
title: Constant Backoff
429460
description: Backoff strategy with a constant backoff interval.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
1212

1313
import orjson
1414
import yaml
@@ -35,6 +35,9 @@
3535
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
3636
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
3737
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
38+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
39+
ConditionalStreams as ConditionalStreamsModel,
40+
)
3841
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3942
DeclarativeStream as DeclarativeStreamModel,
4043
)
@@ -306,20 +309,37 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
306309
if api_budget_model:
307310
self._constructor.set_api_budget(api_budget_model, config)
308311

309-
source_streams = [
310-
self._constructor.create_component(
311-
(
312-
StateDelegatingStreamModel
313-
if stream_config.get("type") == StateDelegatingStreamModel.__name__
314-
else DeclarativeStreamModel
315-
),
316-
stream_config,
317-
config,
318-
emit_connector_builder_messages=self._emit_connector_builder_messages,
319-
)
320-
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
321-
]
322-
312+
source_streams = []
313+
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)):
314+
match stream_config.get("type"):
315+
case StateDelegatingStreamModel.__name__:
316+
source_streams.append(
317+
self._constructor.create_component(
318+
StateDelegatingStreamModel,
319+
stream_config,
320+
config,
321+
emit_connector_builder_messages=self._emit_connector_builder_messages,
322+
)
323+
)
324+
case ConditionalStreamsModel.__name__:
325+
# ConditionalStreamsModel returns a list of DeclarativeStreams so it must be extended
326+
source_streams.extend(
327+
self._constructor.create_component(
328+
ConditionalStreamsModel,
329+
stream_config,
330+
config,
331+
emit_connector_builder_messages=self._emit_connector_builder_messages,
332+
)
333+
)
334+
case DeclarativeStreamModel.__name__:
335+
source_streams.append(
336+
self._constructor.create_component(
337+
DeclarativeStreamModel,
338+
stream_config,
339+
config,
340+
emit_connector_builder_messages=self._emit_connector_builder_messages,
341+
)
342+
)
323343
return source_streams
324344

325345
@staticmethod
@@ -363,17 +383,26 @@ def update_with_cache_parent_configs(
363383
update_with_cache_parent_configs(router["parent_stream_configs"])
364384

365385
for stream_config in stream_configs:
366-
if stream_config["name"] in parent_streams:
367-
if stream_config["type"] == "StateDelegatingStream":
368-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
369-
True
370-
)
371-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
372-
True
373-
)
386+
current_stream_configs = []
387+
if stream_config.get("type") == ConditionalStreamsModel.__name__:
388+
current_stream_configs = [
389+
conditional_stream_config
390+
for conditional_stream_config in stream_config.get("streams") or []
391+
if conditional_stream_config["name"] in parent_streams
392+
]
393+
elif stream_config["name"] in parent_streams:
394+
current_stream_configs = [stream_config]
395+
396+
for current_stream_config in current_stream_configs:
397+
if current_stream_config["type"] == "StateDelegatingStream":
398+
current_stream_config["full_refresh_stream"]["retriever"]["requester"][
399+
"use_cache"
400+
] = True
401+
current_stream_config["incremental_stream"]["retriever"]["requester"][
402+
"use_cache"
403+
] = True
374404
else:
375-
stream_config["retriever"]["requester"]["use_cache"] = True
376-
405+
current_stream_config["retriever"]["requester"]["use_cache"] = True
377406
return stream_configs
378407

379408
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2168,7 +2170,7 @@ class Config:
21682170

21692171
type: Literal["DeclarativeSource"]
21702172
check: Union[CheckStream, CheckDynamicStream]
2171-
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
2173+
streams: List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
21722174
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
21732175
version: str = Field(
21742176
...,
@@ -2201,7 +2203,9 @@ class Config:
22012203

22022204
type: Literal["DeclarativeSource"]
22032205
check: Union[CheckStream, CheckDynamicStream]
2204-
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
2206+
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
2207+
None
2208+
)
22052209
dynamic_streams: List[DynamicDeclarativeStream]
22062210
version: str = Field(
22072211
...,
@@ -2280,6 +2284,22 @@ class Config:
22802284
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22812285

22822286

2287+
class ConditionalStreams(BaseModel):
2288+
type: Literal["ConditionalStreams"]
2289+
condition: str = Field(
2290+
...,
2291+
description="Condition that will be evaluated to determine if a set of streams should be available.",
2292+
examples=["{{ config['is_sandbox'] }}"],
2293+
title="Condition",
2294+
)
2295+
streams: List[DeclarativeStream] = Field(
2296+
...,
2297+
description="Streams that will be used during an operation based on the condition.",
2298+
title="Streams",
2299+
)
2300+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2301+
2302+
22832303
class FileUploader(BaseModel):
22842304
type: Literal["FileUploader"]
22852305
requester: Union[HttpRequester, CustomRequester] = Field(
@@ -2936,6 +2956,7 @@ class DynamicDeclarativeStream(BaseModel):
29362956
DeclarativeSource1.update_forward_refs()
29372957
DeclarativeSource2.update_forward_refs()
29382958
SelectiveAuthenticator.update_forward_refs()
2959+
ConditionalStreams.update_forward_refs()
29392960
FileUploader.update_forward_refs()
29402961
DeclarativeStream.update_forward_refs()
29412962
SessionTokenAuthenticator.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
PerPartitionWithGlobalCursor,
106106
ResumableFullRefreshCursor,
107107
)
108-
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
108+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString
109109
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
110110
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import (
111111
LegacyToPerPartitionStateMigration,
@@ -156,6 +156,9 @@
156156
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
157157
ConcurrencyLevel as ConcurrencyLevelModel,
158158
)
159+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
160+
ConditionalStreams as ConditionalStreamsModel,
161+
)
159162
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
160163
ConfigAddFields as ConfigAddFieldsModel,
161164
)
@@ -656,6 +659,7 @@ def _init_mappings(self) -> None:
656659
CheckDynamicStreamModel: self.create_check_dynamic_stream,
657660
CompositeErrorHandlerModel: self.create_composite_error_handler,
658661
ConcurrencyLevelModel: self.create_concurrency_level,
662+
ConditionalStreamsModel: self.create_conditional_streams,
659663
ConfigMigrationModel: self.create_config_migration,
660664
ConfigAddFieldsModel: self.create_config_add_fields,
661665
ConfigRemapFieldModel: self.create_config_remap_field,
@@ -1619,6 +1623,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
16191623
use_global_cursor=use_global_cursor,
16201624
)
16211625

1626+
def create_conditional_streams(
1627+
self, model: ConditionalStreamsModel, config: Config, **kwargs: Any
1628+
) -> List[DeclarativeStream]:
1629+
condition = InterpolatedBoolean(
1630+
condition=model.condition, parameters=model.parameters or {}
1631+
)
1632+
should_include_streams = condition.eval(config=config)
1633+
return (
1634+
[
1635+
self._create_component_from_model(stream, config=config, **kwargs)
1636+
for stream in model.streams
1637+
]
1638+
if should_include_streams
1639+
else []
1640+
)
1641+
16221642
@staticmethod
16231643
def create_constant_backoff_strategy(
16241644
model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any
@@ -3150,12 +3170,12 @@ def _get_url() -> str:
31503170
This is needed because the URL is not set until the requester is created.
31513171
"""
31523172

3153-
_url = (
3173+
_url: str = (
31543174
model.requester.url
31553175
if hasattr(model.requester, "url") and model.requester.url is not None
31563176
else requester.get_url()
31573177
)
3158-
_url_base = (
3178+
_url_base: str = (
31593179
model.requester.url_base
31603180
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
31613181
else requester.get_url_base()

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
CompositeErrorHandler as CompositeErrorHandlerModel,
6060
)
6161
from airbyte_cdk.sources.declarative.models import ConcurrencyLevel as ConcurrencyLevelModel
62+
from airbyte_cdk.sources.declarative.models import ConditionalStreams as ConditionalStreamsModel
6263
from airbyte_cdk.sources.declarative.models import CustomErrorHandler as CustomErrorHandlerModel
6364
from airbyte_cdk.sources.declarative.models import (
6465
CustomPartitionRouter as CustomPartitionRouterModel,
@@ -4801,3 +4802,109 @@ def test_create_stream_with_multiple_schema_loaders():
48014802
assert len(schema_loader.schema_loaders) == 2
48024803
assert isinstance(schema_loader.schema_loaders[0], InlineSchemaLoader)
48034804
assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader)
4805+
4806+
4807+
def test_conditional_streams():
4808+
content = """
4809+
lists_stream:
4810+
type: DeclarativeStream
4811+
name: lists
4812+
primary_key: id
4813+
retriever:
4814+
paginator:
4815+
type: "DefaultPaginator"
4816+
page_size_option:
4817+
type: RequestOption
4818+
inject_into: request_parameter
4819+
field_name: page_size
4820+
page_token_option:
4821+
type: RequestPath
4822+
pagination_strategy:
4823+
type: "CursorPagination"
4824+
cursor_value: "{{ response._metadata.next }}"
4825+
page_size: 10
4826+
requester:
4827+
url_base: "https://testing.com"
4828+
path: "/api/v1/lists"
4829+
authenticator:
4830+
type: "BearerAuthenticator"
4831+
api_token: "{{ config.apikey }}"
4832+
request_parameters:
4833+
page_size: 10
4834+
record_selector:
4835+
extractor:
4836+
field_path: ["result"]
4837+
schema_loader:
4838+
type: JsonFileSchemaLoader
4839+
file_path: "./source_test/schemas/lists.yaml"
4840+
conditions_stream:
4841+
type: DeclarativeStream
4842+
name: sometimes
4843+
primary_key: id
4844+
retriever:
4845+
paginator:
4846+
type: "DefaultPaginator"
4847+
page_size_option:
4848+
type: RequestOption
4849+
inject_into: request_parameter
4850+
field_name: page_size
4851+
page_token_option:
4852+
type: RequestPath
4853+
pagination_strategy:
4854+
type: "CursorPagination"
4855+
cursor_value: "{{ response._metadata.next }}"
4856+
page_size: 10
4857+
requester:
4858+
type: HttpRequester
4859+
url_base: "https://testing.com"
4860+
path: "/api/v1/sometimes"
4861+
authenticator:
4862+
type: "BearerAuthenticator"
4863+
api_token: "{{ config.apikey }}"
4864+
request_parameters:
4865+
page_size: 10
4866+
record_selector:
4867+
type: RecordSelector
4868+
extractor:
4869+
type: DpathExtractor
4870+
field_path: ["result"]
4871+
schema_loader:
4872+
type: JsonFileSchemaLoader
4873+
file_path: "./source_test/schemas/sometimes.yaml"
4874+
streams:
4875+
- "#/lists_stream"
4876+
- type: ConditionalStreams
4877+
condition: "{{ config['is_sandbox'] }}"
4878+
streams:
4879+
- "#/conditions_stream"
4880+
"""
4881+
parsed_manifest = YamlDeclarativeSource._parse(content)
4882+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4883+
resolved_manifest["type"] = "DeclarativeSource"
4884+
stream_manifest = transformer.propagate_types_and_parameters(
4885+
"", resolved_manifest["conditions_stream"], {}
4886+
)
4887+
4888+
conditional_streams_manifest = resolved_manifest["streams"][1]
4889+
conditional_streams_manifest["streams"] = [stream_manifest]
4890+
4891+
sandbox_config = {
4892+
**input_config,
4893+
"is_sandbox": True,
4894+
}
4895+
4896+
streams = factory.create_component(
4897+
model_type=ConditionalStreamsModel,
4898+
component_definition=conditional_streams_manifest,
4899+
config=sandbox_config,
4900+
)
4901+
4902+
assert len(streams) == 1
4903+
conditional_stream = streams[0]
4904+
assert isinstance(conditional_stream, DeclarativeStream)
4905+
assert conditional_stream.name == "sometimes"
4906+
assert conditional_stream.primary_key == "id"
4907+
4908+
assert isinstance(conditional_stream.retriever, SimpleRetriever)
4909+
assert conditional_stream.retriever.name == "sometimes"
4910+
assert conditional_stream.retriever.primary_key == "id"

0 commit comments

Comments
 (0)