Skip to content

Commit 7ff1b7d

Browse files
committed
Update check streams to use dynamic streams configs
1 parent d7516ec commit 7ff1b7d

File tree

6 files changed

+457
-56
lines changed

6 files changed

+457
-56
lines changed

airbyte_cdk/sources/declarative/checks/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from pydantic.v1 import BaseModel
88

99
from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10-
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
10+
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream, DynamicStreamCheckConfig
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1212
from airbyte_cdk.sources.declarative.models import (
1313
CheckDynamicStream as CheckDynamicStreamModel,
@@ -21,4 +21,4 @@
2121
"CheckDynamicStream": CheckDynamicStreamModel,
2222
}
2323

24-
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
24+
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,21 @@
88
from typing import Any, List, Mapping, Tuple
99

1010
from airbyte_cdk import AbstractSource
11+
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
1112
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1213
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1314

1415

16+
@dataclass(frozen=True)
17+
class DynamicStreamCheckConfig:
18+
"""Defines the configuration for dynamic stream during connection checking. This class specifies
19+
what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation
20+
and type enforcement."""
21+
22+
dynamic_stream_name: str
23+
stream_count: int = 0
24+
25+
1526
@dataclass
1627
class CheckStream(ConnectionChecker):
1728
"""
@@ -22,6 +33,7 @@ class CheckStream(ConnectionChecker):
2233
"""
2334

2435
stream_names: List[str]
36+
dynamic_streams_check_configs: List[DynamicStreamCheckConfig]
2537
parameters: InitVar[Mapping[str, Any]]
2638

2739
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
@@ -30,7 +42,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3042
def check_connection(
3143
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
3244
) -> Tuple[bool, Any]:
33-
streams = source.streams(config=config)
45+
try:
46+
streams = source.streams(config=config)
47+
except Exception as error:
48+
error_message = (
49+
f"Encountered an error trying to connect to streams. Error: {error}"
50+
)
51+
logger.error(error_message, exc_info=True)
52+
return False, error_message
53+
3454
stream_name_to_stream = {s.name: s for s in streams}
3555
if len(streams) == 0:
3656
return False, f"No streams to connect to from source {source}"
@@ -53,4 +73,41 @@ def check_connection(
5373
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
5474
)
5575
return False, f"Unable to connect to stream {stream_name} - {error}"
76+
77+
if isinstance(source, ManifestDeclarativeSource) and self.dynamic_streams_check_configs:
78+
dynamic_stream_name_to_dynamic_stream = {dynamic_stream["name"]: dynamic_stream for dynamic_stream in
79+
source.resolved_manifest.get("dynamic_streams", [])}
80+
81+
dynamic_stream_name_to_generated_streams = {}
82+
for stream in source.dynamic_streams:
83+
dynamic_stream_name_to_generated_streams[
84+
stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault(
85+
stream["dynamic_stream_name"], []).append(stream)
86+
87+
for dynamic_streams_check_config in self.dynamic_streams_check_configs:
88+
dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name)
89+
90+
is_config_depend = dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver"
91+
92+
if not is_config_depend and not bool(dynamic_streams_check_config.stream_count):
93+
continue
94+
95+
generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name)
96+
availability_strategy = HttpAvailabilityStrategy()
97+
98+
for stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]:
99+
try:
100+
stream_is_available, reason = availability_strategy.check_availability(
101+
stream, logger
102+
)
103+
if not stream_is_available:
104+
logger.warning(f"Stream {stream.name} is not available: {reason}")
105+
return False, reason
106+
except Exception as error:
107+
error_message = (
108+
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
109+
)
110+
logger.error(error_message, exc_info=True)
111+
return False, error_message
112+
56113
return True, None

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,11 @@ definitions:
316316
type: object
317317
required:
318318
- type
319-
- stream_names
319+
anyOf:
320+
- required:
321+
- stream_names
322+
- required:
323+
- dynamic_streams_check_configs
320324
properties:
321325
type:
322326
type: string
@@ -330,6 +334,28 @@ definitions:
330334
examples:
331335
- ["users"]
332336
- ["users", "contacts"]
337+
dynamic_streams_check_configs:
338+
type: array
339+
items:
340+
"$ref": "#/definitions/DynamicStreamCheckConfig"
341+
DynamicStreamCheckConfig:
342+
type: object
343+
required:
344+
- type
345+
- dynamic_stream_name
346+
properties:
347+
type:
348+
type: string
349+
enum: [ DynamicStreamCheckConfig ]
350+
dynamic_stream_name:
351+
title: Dynamic Stream Name
352+
description: The dynamic stream name.
353+
type: string
354+
stream_count:
355+
title: Stream Count
356+
description: Numbers of the streams to try reading from when running a check operation.
357+
type: integer
358+
default: 0
333359
CheckDynamicStream:
334360
title: Dynamic Streams to Check
335361
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.

0 commit comments

Comments
 (0)