Skip to content

Commit 7048b30

Browse files
feat: Add unprivileged discover support for declarative sources with static schemas
- Move check_config_during_discover logic to ConcurrentDeclarativeSource (the non-deprecated class) - Add _uses_dynamic_schema_loader() method to detect DynamicSchemaLoader usage - Set check_config_during_discover=False for sources without DynamicSchemaLoader - Fix docstring in ManifestDeclarativeSource to correctly describe DynamicSchemaLoader behavior - Add entrypoint unit tests for discover with/without config - Improve error message when config is required but not provided - Update test imports to use legacy module location This enables declarative sources with static schemas (InlineSchemaLoader) to run discover without credentials, while sources with DynamicSchemaLoader still require config for authentication. Co-Authored-By: AJ Steers <[email protected]>
1 parent a69c252 commit 7048b30

File tree

5 files changed

+121
-5
lines changed

5 files changed

+121
-5
lines changed

airbyte_cdk/entrypoint.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,11 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
199199
)
200200
elif parsed_args.config is None:
201201
# Raise a helpful error message if we reach here with no config.
202-
raise ValueError("The '--config' arg is required but was not provided.")
202+
raise ValueError(
203+
"The '--config' argument is required but was not provided. "
204+
"This connector does not support unprivileged discovery. "
205+
"Please provide a valid configuration file using the --config flag."
206+
)
203207
else:
204208
raw_config = self.source.read_config(parsed_args.config)
205209
config = self.source.configure(raw_config, temp_dir)

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -624,11 +624,12 @@ def _uses_dynamic_schema_loader(self) -> bool:
624624
Determines if any stream in the source uses a DynamicSchemaLoader.
625625
626626
DynamicSchemaLoader makes a separate call to retrieve schema information,
627-
which might not require authentication, so we can skip config validation
628-
during discovery when it's used.
627+
which might require authentication. When present, config validation cannot
628+
be skipped during discovery.
629629
630630
Returns:
631-
bool: True if any stream uses a DynamicSchemaLoader, False otherwise.
631+
bool: True if any stream uses a DynamicSchemaLoader (config required for discover),
632+
False otherwise (unprivileged discover may be supported).
632633
"""
633634
empty_config: Dict[str, Any] = {}
634635
for stream_config in self._stream_configs(self._source_config, empty_config):

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ def __init__(
242242
message_repository=self._message_repository,
243243
)
244244

245+
self.check_config_during_discover = self._uses_dynamic_schema_loader()
246+
245247
def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
246248
"""
247249
Preprocesses the provided manifest dictionary by resolving any manifest references.
@@ -651,3 +653,36 @@ def _select_streams(
651653
as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
652654
)
653655
return abstract_streams
656+
657+
def _uses_dynamic_schema_loader(self) -> bool:
658+
"""
659+
Determines if any stream in the source uses a DynamicSchemaLoader.
660+
661+
DynamicSchemaLoader makes a separate call to retrieve schema information,
662+
which might require authentication. When present, config validation cannot
663+
be skipped during discovery.
664+
665+
Returns:
666+
bool: True if any stream uses a DynamicSchemaLoader (config required for discover),
667+
False otherwise (unprivileged discover may be supported).
668+
"""
669+
for stream_config in self._stream_configs(self._source_config):
670+
schema_loader = stream_config.get("schema_loader", {})
671+
if (
672+
isinstance(schema_loader, dict)
673+
and schema_loader.get("type") == "DynamicSchemaLoader"
674+
):
675+
return True
676+
677+
dynamic_stream_definitions = self._source_config.get("dynamic_streams", [])
678+
if dynamic_stream_definitions:
679+
for dynamic_definition in dynamic_stream_definitions:
680+
stream_template = dynamic_definition.get("stream_template", {})
681+
schema_loader = stream_template.get("schema_loader", {})
682+
if (
683+
isinstance(schema_loader, dict)
684+
and schema_loader.get("type") == "DynamicSchemaLoader"
685+
):
686+
return True
687+
688+
return False

unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88

99
import pytest
1010

11+
from airbyte_cdk.legacy.sources.declarative.manifest_declarative_source import (
12+
ManifestDeclarativeSource,
13+
)
1114
from airbyte_cdk.models import AirbyteCatalog
12-
from airbyte_cdk.legacy.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
1315
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
1416

1517

unit_tests/test_entrypoint.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,3 +855,77 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
855855
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
856856
record_messages = list(filter(lambda message: "RECORD" in message, messages))
857857
assert len(record_messages) == 2
858+
859+
def test_run_discover_without_config_when_supported(mocker):
860+
"""Test that discover works without config when check_config_during_discover is False."""
861+
# Create a mock source that supports unprivileged discover
862+
mock_source = MockSource()
863+
mock_source.check_config_during_discover = False
864+
865+
message_repository = MagicMock()
866+
message_repository.consume_queue.return_value = []
867+
mocker.patch.object(
868+
MockSource,
869+
"message_repository",
870+
new_callable=mocker.PropertyMock,
871+
return_value=message_repository,
872+
)
873+
874+
entrypoint = AirbyteEntrypoint(mock_source)
875+
876+
parsed_args = Namespace(command="discover", config=None)
877+
expected_catalog = AirbyteCatalog(
878+
streams=[
879+
AirbyteStream(
880+
name="test_stream",
881+
json_schema={"type": "object"},
882+
supported_sync_modes=[SyncMode.full_refresh]
883+
)
884+
]
885+
)
886+
887+
spec = ConnectorSpecification(connectionSpecification={})
888+
mocker.patch.object(MockSource, "spec", return_value=spec)
889+
mocker.patch.object(MockSource, "discover", return_value=expected_catalog)
890+
891+
messages = list(entrypoint.run(parsed_args))
892+
893+
# Should successfully return catalog without config
894+
assert len(messages) == 1
895+
assert _wrap_message(expected_catalog) == messages[0]
896+
897+
# Verify discover was called with empty config
898+
MockSource.discover.assert_called_once()
899+
call_args = MockSource.discover.call_args
900+
assert call_args[0][1] == {} # config argument should be empty dict
901+
902+
903+
def test_run_discover_without_config_when_not_supported(mocker):
904+
"""Test that discover fails with helpful error when config is required but not provided."""
905+
# Create a mock source that requires config for discover
906+
mock_source = MockSource()
907+
mock_source.check_config_during_discover = True
908+
909+
message_repository = MagicMock()
910+
message_repository.consume_queue.return_value = []
911+
mocker.patch.object(
912+
MockSource,
913+
"message_repository",
914+
new_callable=mocker.PropertyMock,
915+
return_value=message_repository,
916+
)
917+
918+
entrypoint = AirbyteEntrypoint(mock_source)
919+
920+
parsed_args = Namespace(command="discover", config=None)
921+
922+
spec = ConnectorSpecification(connectionSpecification={})
923+
mocker.patch.object(MockSource, "spec", return_value=spec)
924+
925+
# Should raise ValueError with helpful message
926+
with pytest.raises(ValueError) as exc_info:
927+
list(entrypoint.run(parsed_args))
928+
929+
error_message = str(exc_info.value)
930+
assert "The '--config' argument is required but was not provided" in error_message
931+
assert "does not support unprivileged discovery" in error_message

0 commit comments

Comments
 (0)