Skip to content

Commit 138fc60

Browse files
committed
Feat: Add Hidden-Check Streams
1 parent acc1003 commit 138fc60

File tree

4 files changed

+129
-32
lines changed

4 files changed

+129
-32
lines changed

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, Dict, List, Mapping, Optional, Tuple
8+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
99

1010
from airbyte_cdk import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import DeclarativeStream
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

@@ -25,13 +26,17 @@ class DynamicStreamCheckConfig:
2526
@dataclass
2627
class CheckStream(ConnectionChecker):
2728
"""
28-
Checks the connections by checking availability of one or many streams selected by the developer
29+
Checks the connection by checking the availability of one or more streams specified by the developer.
2930
3031
Attributes:
31-
stream_name (List[str]): names of streams to check
32+
stream_names (List[Union[str, DeclarativeStream]]):
33+
Names of streams to check. Each item can be:
34+
- a string (referencing a stream in the manifest's streams block)
35+
- a dict (an inline DeclarativeStream definition from YAML)
36+
- a DeclarativeStream Pydantic model (from parsed manifest)
3237
"""
3338

34-
stream_names: List[str]
39+
stream_names: List[Union[str, DeclarativeStream]]
3540
parameters: InitVar[Mapping[str, Any]]
3641
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
3742

@@ -49,37 +54,75 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T
4954
def check_connection(
5055
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
5156
) -> Tuple[bool, Any]:
52-
"""Checks the connection to the source and its streams."""
57+
"""
58+
Checks the connection to the source and its streams.
59+
60+
Handles both:
61+
- Referenced streams (by name)
62+
- Inline check-only streams (as dicts or DeclarativeStream models)
63+
"""
5364
try:
5465
streams = source.streams(config=config)
55-
if not streams:
56-
return False, f"No streams to connect to from source {source}"
57-
except Exception as error:
58-
return self._log_error(logger, "discovering streams", error)
59-
60-
stream_name_to_stream = {s.name: s for s in streams}
61-
for stream_name in self.stream_names:
62-
if stream_name not in stream_name_to_stream:
63-
raise ValueError(
64-
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
66+
stream_name_to_stream = {s.name: s for s in streams}
67+
68+
# Add inline check-only streams to the map
69+
for stream_def in self.stream_names:
70+
# Handle dicts (from YAML) and DeclarativeStream objects (from Pydantic)
71+
if isinstance(stream_def, dict):
72+
if hasattr(source, "_instantiate_stream_from_dict"):
73+
stream_obj = source._instantiate_stream_from_dict(stream_def, config)
74+
stream_name_to_stream[stream_obj.name] = stream_obj
75+
else:
76+
raise NotImplementedError(
77+
f"Source {type(source)} does not support inline stream definitions for check-only streams."
78+
)
79+
elif isinstance(stream_def, DeclarativeStream):
80+
# Convert the Pydantic model to dict before passing to the factory
81+
if hasattr(source, "_instantiate_stream_from_dict"):
82+
stream_obj = source._instantiate_stream_from_dict(stream_def.dict(), config)
83+
stream_name_to_stream[stream_obj.name] = stream_obj
84+
else:
85+
raise NotImplementedError(
86+
f"Source {type(source)} does not support inline stream definitions for check-only streams."
87+
)
88+
# Optionally: warn if stream_def is an unexpected type
89+
elif not isinstance(stream_def, str):
90+
logger.warning(f"Unexpected stream definition type: {type(stream_def)}")
91+
92+
# Now check availability
93+
for stream_def in self.stream_names:
94+
if isinstance(stream_def, dict):
95+
stream_name = stream_def.get("name")
96+
elif hasattr(stream_def, "name"): # DeclarativeStream object
97+
stream_name = stream_def.name
98+
else:
99+
stream_name = stream_def # string
100+
101+
if stream_name not in stream_name_to_stream:
102+
raise ValueError(
103+
f"{stream_name} is not part of the catalog or check-only streams. Expected one of {list(stream_name_to_stream.keys())}."
104+
)
105+
106+
stream_availability, message = self._check_stream_availability(
107+
stream_name_to_stream, stream_name, logger
65108
)
109+
if not stream_availability:
110+
return stream_availability, message
66111

67-
stream_availability, message = self._check_stream_availability(
68-
stream_name_to_stream, stream_name, logger
112+
should_check_dynamic_streams = (
113+
hasattr(source, "resolved_manifest")
114+
and hasattr(source, "dynamic_streams")
115+
and self.dynamic_streams_check_configs
69116
)
70-
if not stream_availability:
71-
return stream_availability, message
72117

73-
should_check_dynamic_streams = (
74-
hasattr(source, "resolved_manifest")
75-
and hasattr(source, "dynamic_streams")
76-
and self.dynamic_streams_check_configs
77-
)
78-
79-
if should_check_dynamic_streams:
80-
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
118+
if should_check_dynamic_streams:
119+
return self._check_dynamic_streams_availability(
120+
source, stream_name_to_stream, logger
121+
)
81122

82-
return True, None
123+
return True, None
124+
except Exception as error:
125+
return self._log_error(logger, "discovering streams", error)
83126

84127
def _check_stream_availability(
85128
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,27 @@ definitions:
322322
enum: [CheckStream]
323323
stream_names:
324324
title: Stream Names
325-
description: Names of the streams to try reading from when running a check operation.
325+
description: Names of the streams to try reading from when running a check operation. Each item can be a string (referencing a stream in the streams block) or an inline DeclarativeStream object (for check-only streams).
326326
type: array
327327
items:
328-
type: string
328+
anyOf:
329+
- type: string
330+
- $ref: "#/definitions/DeclarativeStream"
329331
examples:
330332
- ["users"]
331333
- ["users", "contacts"]
334+
- name: "check_only_stream"
335+
type: DeclarativeStream
336+
retriever:
337+
type: SimpleRetriever
338+
requester:
339+
type: HttpRequester
340+
url_base: "https://api.example.com"
341+
record_selector:
342+
type: RecordSelector
343+
extractor:
344+
type: DpathExtractor
345+
field_path: []
332346
dynamic_streams_check_configs:
333347
type: array
334348
items:

airbyte_cdk/sources/declarative/declarative_source.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
)
1212
from airbyte_cdk.sources.abstract_source import AbstractSource
1313
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
14+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
15+
DeclarativeStream as DeclarativeStreamModel,
16+
)
17+
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
18+
ModelToComponentFactory,
19+
)
1420

1521

1622
class DeclarativeSource(AbstractSource):
@@ -43,3 +49,14 @@ def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
4349
Returns a list of deprecation warnings for the source.
4450
"""
4551
return []
52+
53+
def _instantiate_stream_from_dict(self, stream_def: dict, config: Mapping[str, Any]):
54+
"""
55+
Instantiates a stream from a stream definition dict (used for check-only streams).
56+
"""
57+
factory = ModelToComponentFactory()
58+
return factory.create_component(
59+
model_type=DeclarativeStreamModel,
60+
component_definition=stream_def,
61+
config=config,
62+
)

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -1696,10 +1698,30 @@ class AuthFlow(BaseModel):
16961698

16971699
class CheckStream(BaseModel):
16981700
type: Literal["CheckStream"]
1699-
stream_names: Optional[List[str]] = Field(
1701+
stream_names: List[Union[str, "DeclarativeStream"]] = Field(
17001702
None,
17011703
description="Names of the streams to try reading from when running a check operation.",
1702-
examples=[["users"], ["users", "contacts"]],
1704+
examples=[
1705+
["users"],
1706+
["users", "contacts"],
1707+
[
1708+
{
1709+
"name": "check_only_stream",
1710+
"type": "DeclarativeStream",
1711+
"retriever": {
1712+
"type": "SimpleRetriever",
1713+
"requester": {
1714+
"type": "HttpRequester",
1715+
"url_base": "https://api.example.com",
1716+
},
1717+
"record_selector": {
1718+
"type": "RecordSelector",
1719+
"extractor": {"type": "DpathExtractor", "field_path": []},
1720+
},
1721+
},
1722+
}
1723+
],
1724+
],
17031725
title="Stream Names",
17041726
)
17051727
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
@@ -2945,3 +2967,4 @@ class DynamicDeclarativeStream(BaseModel):
29452967
PropertiesFromEndpoint.update_forward_refs()
29462968
SimpleRetriever.update_forward_refs()
29472969
AsyncRetriever.update_forward_refs()
2970+
CheckStream.update_forward_refs()

0 commit comments

Comments
 (0)