diff --git a/airbyte_cdk/connector.py b/airbyte_cdk/connector.py index 342ecee2d..46f02f161 100644 --- a/airbyte_cdk/connector.py +++ b/airbyte_cdk/connector.py @@ -31,8 +31,15 @@ def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: class BaseConnector(ABC, Generic[TConfig]): - # configure whether the `check_config_against_spec_or_exit()` needs to be called check_config_against_spec: bool = True + """Configure whether `check_config_against_spec_or_exit()` needs to be called.""" + + check_config_during_discover: bool = False + """Determines whether config validation should be skipped during discovery. + + By default, config validation is not skipped during discovery. This can be overridden + by sources that can provide catalog information without requiring authentication. + """ @abstractmethod def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 0a13cfebe..4c737007b 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -93,7 +93,7 @@ def parse_args(args: List[str]) -> argparse.Namespace: ) required_discover_parser = discover_parser.add_argument_group("required named arguments") required_discover_parser.add_argument( - "--config", type=str, required=True, help="path to the json configuration file" + "--config", type=str, required=False, help="path to the json configuration file" ) # read @@ -141,19 +141,34 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: ) if cmd == "spec": message = AirbyteMessage(type=Type.SPEC, spec=source_spec) - yield from [ + yield from ( self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source) - ] + ) yield self.airbyte_message_to_string(message) + elif ( + cmd == "discover" + and not parsed_args.config + and not self.source.check_config_during_discover + ): + # Connector supports unprivileged discover + empty_config: dict[str, Any] = {} + yield from ( + self.airbyte_message_to_string(queued_message) + for queued_message in self._emit_queued_messages(self.source) + ) + yield from map( + AirbyteEntrypoint.airbyte_message_to_string, + self.discover(source_spec, empty_config), + ) else: raw_config = self.source.read_config(parsed_args.config) config = self.source.configure(raw_config, temp_dir) - yield from [ + yield from ( self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source) - ] + ) if cmd == "check": yield from map( AirbyteEntrypoint.airbyte_message_to_string, @@ -225,7 +240,7 @@ def discover( self, source_spec: ConnectorSpecification, config: TConfig ) -> Iterable[AirbyteMessage]: self.set_up_secret_filter(config, source_spec.connectionSpecification) - if self.source.check_config_against_spec: + if not self.source.check_config_during_discover: self.validate_connection(source_spec, config) catalog = self.source.discover(self.logger, config) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index cfd258c6c..3ff6a824c 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -47,6 +47,7 @@ ) from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.source import Source from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.types import ConnectionDefinition from airbyte_cdk.sources.utils.slice_logger import ( @@ -109,6 +110,8 @@ def __init__( self._config = config or {} self._validate_source() + self.check_config_during_discover = self._uses_dynamic_schema_loader() + @property def resolved_manifest(self) -> Mapping[str, Any]: return self._source_config @@ -440,3 +443,35 @@ def _dynamic_stream_configs( def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: self.logger.debug("declarative source created from manifest", extra=extra_args) + + def _uses_dynamic_schema_loader(self) -> bool: + """ + Determines if any stream in the source uses a DynamicSchemaLoader. + + DynamicSchemaLoader makes a separate call to retrieve schema information, + which might not require authentication, so we can skip config validation + during discovery when it's used. + + Returns: + bool: True if any stream uses a DynamicSchemaLoader, False otherwise. + """ + for stream_config in self._stream_configs(self._source_config): + schema_loader = stream_config.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + dynamic_streams = self._source_config.get("dynamic_streams", []) + if dynamic_streams: + for dynamic_stream in dynamic_streams: + stream_template = dynamic_stream.get("stream_template", {}) + schema_loader = stream_template.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + return False diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py b/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py new file mode 100644 index 000000000..db91132f0 --- /dev/null +++ b/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py @@ -0,0 +1,174 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""Tests for the ManifestDeclarativeSource with DynamicSchemaLoader.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.models import AirbyteCatalog +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit + + +def test_check_config_during_discover_with_dynamic_schema_loader(): + """Test that check_config_during_discover is True when DynamicSchemaLoader is used.""" + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_stream", + "schema_loader": { + "type": "DynamicSchemaLoader", + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + "schema_type_identifier": { + "key_pointer": ["name"], + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is True + assert source.check_config_against_spec is True + + +def test_check_config_during_discover_without_dynamic_schema_loader(): + """Test that check_config_during_discover is False when DynamicSchemaLoader is not used.""" + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_stream", + "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True + + +@patch( + "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" +) +def test_discover_with_dynamic_schema_loader_no_config(mock_streams): + """Test that discovery works without config when DynamicSchemaLoader is used.""" + mock_stream = MagicMock() + mock_stream.name = "test_dynamic_stream" + + mock_airbyte_stream = MagicMock() + type(mock_airbyte_stream).name = "test_dynamic_stream" + mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream + + mock_streams.return_value = [mock_stream] + + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_dynamic_stream", + "schema_loader": { + "type": "DynamicSchemaLoader", + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + "schema_type_identifier": { + "key_pointer": ["name"], + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is True + assert source.check_config_against_spec is True + + logger = MagicMock() + catalog = source.discover(logger, {}) + + assert isinstance(catalog, AirbyteCatalog) + assert len(catalog.streams) == 1 + assert catalog.streams[0].name == "test_dynamic_stream" + + +@patch( + "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" +) +def test_discover_without_dynamic_schema_loader_no_config(mock_streams): + """Test that discovery validates config when DynamicSchemaLoader is not used.""" + mock_stream = MagicMock() + mock_stream.name = "test_static_stream" + + mock_airbyte_stream = MagicMock() + type(mock_airbyte_stream).name = "test_static_stream" + mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream + + mock_streams.return_value = [mock_stream] + + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_static_stream", + "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True + + logger = MagicMock() + catalog = source.discover(logger, {}) + + assert isinstance(catalog, AirbyteCatalog) + assert len(catalog.streams) == 1 + assert catalog.streams[0].name == "test_static_stream" + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index e906e8b39..6a5c9ce13 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -172,14 +172,13 @@ def test_parse_valid_args( ["cmd", "args"], [ ("check", {"config": "config_path"}), - ("discover", {"config": "config_path"}), ("read", {"config": "config_path", "catalog": "catalog_path"}), ], ) def test_parse_missing_required_args( cmd: str, args: MutableMapping[str, Any], entrypoint: AirbyteEntrypoint ): - required_args = {"check": ["config"], "discover": ["config"], "read": ["config", "catalog"]} + required_args = {"check": ["config"], "read": ["config", "catalog"]} for required_arg in required_args[cmd]: argcopy = deepcopy(args) del argcopy[required_arg]