Skip to content

Commit cdd1ac9

Browse files
feat: skip config validation during discovery for sources with DynamicSchemaLoader
Co-Authored-By: Aaron <AJ> Steers <[email protected]>
1 parent 5366b13 commit cdd1ac9

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby
44

55
# Changelog
66

7+
## Unreleased
8+
9+
- Added automatic detection of DynamicSchemaLoader to skip config validation during discovery
10+
711
## 6.5.2
812

913
bugfix: Ensure that streams with partition router are not executed concurrently

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@
5858

5959

6060
class ManifestDeclarativeSource(DeclarativeSource):
61-
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
61+
"""Declarative source defined by a manifest of low-code components that define source connector behavior
62+
63+
If any stream in the source uses a DynamicSchemaLoader, config validation will be skipped during
64+
discovery. This allows sources with dynamic schemas to run discovery without requiring authentication
65+
when the schema endpoint doesn't need auth to provide catalog information.
66+
"""
6267

6368
def __init__(
6469
self,
@@ -108,6 +113,8 @@ def __init__(
108113

109114
self._config = config or {}
110115
self._validate_source()
116+
117+
self.check_config_against_spec = not self._uses_dynamic_schema_loader()
111118

112119
@property
113120
def resolved_manifest(self) -> Mapping[str, Any]:
@@ -440,3 +447,29 @@ def _dynamic_stream_configs(
440447

441448
def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
442449
self.logger.debug("declarative source created from manifest", extra=extra_args)
450+
451+
def _uses_dynamic_schema_loader(self) -> bool:
452+
"""
453+
Determines if any stream in the source uses a DynamicSchemaLoader.
454+
455+
DynamicSchemaLoader makes a separate call to retrieve schema information,
456+
which might not require authentication, so we can skip config validation
457+
during discovery when it's used.
458+
459+
Returns:
460+
bool: True if any stream uses a DynamicSchemaLoader, False otherwise.
461+
"""
462+
for stream_config in self._stream_configs(self._source_config):
463+
schema_loader = stream_config.get("schema_loader", {})
464+
if isinstance(schema_loader, dict) and schema_loader.get("type") == "DynamicSchemaLoader":
465+
return True
466+
467+
dynamic_streams = self._source_config.get("dynamic_streams", [])
468+
if dynamic_streams:
469+
for dynamic_stream in dynamic_streams:
470+
stream_template = dynamic_stream.get("stream_template", {})
471+
schema_loader = stream_template.get("schema_loader", {})
472+
if isinstance(schema_loader, dict) and schema_loader.get("type") == "DynamicSchemaLoader":
473+
return True
474+
475+
return False
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#
2+
#
3+
4+
from unittest.mock import MagicMock
5+
6+
import pytest
7+
8+
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
9+
10+
11+
def test_check_config_against_spec_with_dynamic_schema_loader():
12+
"""Test that check_config_against_spec is False when DynamicSchemaLoader is used."""
13+
source_config = {
14+
"type": "DeclarativeSource",
15+
"check": {"type": "CheckStream"},
16+
"streams": [
17+
{
18+
"name": "test_stream",
19+
"schema_loader": {
20+
"type": "DynamicSchemaLoader",
21+
"retriever": {
22+
"type": "SimpleRetriever",
23+
"requester": {"url_base": "https://example.com", "http_method": "GET"},
24+
"record_selector": {"extractor": {"field_path": []}},
25+
},
26+
"schema_type_identifier": {
27+
"key_pointer": ["name"],
28+
}
29+
},
30+
"retriever": {
31+
"type": "SimpleRetriever",
32+
"requester": {"url_base": "https://example.com", "http_method": "GET"},
33+
"record_selector": {"extractor": {"field_path": []}},
34+
}
35+
}
36+
],
37+
"version": "0.1.0"
38+
}
39+
40+
source = ManifestDeclarativeSource(source_config=source_config)
41+
42+
assert source.check_config_against_spec is False
43+
44+
45+
def test_check_config_against_spec_without_dynamic_schema_loader():
46+
"""Test that check_config_against_spec is True when DynamicSchemaLoader is not used."""
47+
source_config = {
48+
"type": "DeclarativeSource",
49+
"check": {"type": "CheckStream"},
50+
"streams": [
51+
{
52+
"name": "test_stream",
53+
"schema_loader": {
54+
"type": "InlineSchemaLoader",
55+
"schema": {}
56+
},
57+
"retriever": {
58+
"type": "SimpleRetriever",
59+
"requester": {"url_base": "https://example.com", "http_method": "GET"},
60+
"record_selector": {"extractor": {"field_path": []}},
61+
}
62+
}
63+
],
64+
"version": "0.1.0"
65+
}
66+
67+
source = ManifestDeclarativeSource(source_config=source_config)
68+
69+
assert source.check_config_against_spec is True

0 commit comments

Comments
 (0)