Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ repos:
args: [--config-file=mypy.ini, --show-column-numbers]
files: ^airbyte_cdk/
pass_filenames: true
additional_dependencies: ["types-requests", "types-PyYAML"]
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _group_streams(

# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
self._source_config, config
)

Expand Down
31 changes: 31 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ properties:
type: array
items:
anyOf:
- "$ref": "#/definitions/ConditionalStreams"
- "$ref": "#/definitions/DeclarativeStream"
- "$ref": "#/definitions/StateDelegatingStream"
dynamic_streams:
Expand Down Expand Up @@ -424,6 +425,36 @@ definitions:
$parameters:
type: object
additionalProperties: true
ConditionalStreams:
title: Conditional Streams
description: Streams that are only available while performing a connector operation when the condition is met.
type: object
required:
- type
- streams
- condition
properties:
type:
type: string
enum: [ConditionalStreams]
condition:
title: Condition
description: Condition that will be evaluated to determine if a set of streams should be available.
type: string
interpolation_context:
- config
- parameters
examples:
- "{{ config['is_sandbox'] }}"
streams:
title: Streams
description: Streams that will be used during an operation based on the condition.
type: array
items:
"$ref": "#/definitions/DeclarativeStream"
$parameters:
type: object
additionalProperties: true
ConstantBackoffStrategy:
title: Constant Backoff
description: Backoff strategy with a constant backoff interval.
Expand Down
37 changes: 28 additions & 9 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import orjson
import yaml
Expand All @@ -35,6 +35,10 @@
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConditionalStreams as ConditionalStreamsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down Expand Up @@ -300,7 +304,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
stream_configs = (
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
)

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand All @@ -319,7 +325,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
]

return source_streams

@staticmethod
Expand Down Expand Up @@ -373,7 +378,6 @@ def update_with_cache_parent_configs(
)
else:
stream_config["retriever"]["requester"]["use_cache"] = True

return stream_configs

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
Expand Down Expand Up @@ -477,12 +481,27 @@ def _parse_version(
# No exception
return parsed_version

def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
def _stream_configs(
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
) -> List[Dict[str, Any]]:
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
for s in stream_configs:
if "type" not in s:
s["type"] = "DeclarativeStream"
stream_configs = []
for current_stream_config in manifest.get("streams", []):
if (
"type" in current_stream_config
and current_stream_config["type"] == "ConditionalStreams"
):
interpolated_boolean = InterpolatedBoolean(
condition=current_stream_config.get("condition"),
parameters={},
)

if interpolated_boolean.eval(config=config):
stream_configs.extend(current_stream_config.get("streams", []))
else:
if "type" not in current_stream_config:
current_stream_config["type"] = "DeclarativeStream"
stream_configs.append(current_stream_config)
return stream_configs

def _dynamic_stream_configs(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2168,7 +2170,7 @@ class Config:

type: Literal["DeclarativeSource"]
check: Union[CheckStream, CheckDynamicStream]
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
streams: List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
version: str = Field(
...,
Expand Down Expand Up @@ -2201,7 +2203,9 @@ class Config:

type: Literal["DeclarativeSource"]
check: Union[CheckStream, CheckDynamicStream]
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
None
)
dynamic_streams: List[DynamicDeclarativeStream]
version: str = Field(
...,
Expand Down Expand Up @@ -2280,6 +2284,22 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConditionalStreams(BaseModel):
type: Literal["ConditionalStreams"]
condition: str = Field(
...,
description="Condition that will be evaluated to determine if a set of streams should be available.",
examples=["{{ config['is_sandbox'] }}"],
title="Condition",
)
streams: List[DeclarativeStream] = Field(
...,
description="Streams that will be used during an operation based on the condition.",
title="Streams",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FileUploader(BaseModel):
type: Literal["FileUploader"]
requester: Union[HttpRequester, CustomRequester] = Field(
Expand Down Expand Up @@ -2936,6 +2956,7 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
SelectiveAuthenticator.update_forward_refs()
ConditionalStreams.update_forward_refs()
FileUploader.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3150,12 +3150,12 @@ def _get_url() -> str:
This is needed because the URL is not set until the requester is created.
"""

_url = (
_url: str = (
model.requester.url
if hasattr(model.requester, "url") and model.requester.url is not None
else requester.get_url()
)
_url_base = (
_url_base: str = (
model.requester.url_base
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
else requester.get_url_base()
Expand Down
Loading
Loading