-
Notifications
You must be signed in to change notification settings - Fork 33
feat: skip config validation during discovery for declarative sources that don't use DynamicSchemaLoader #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
cdd1ac9
7490ad1
9e84e1c
47bd67c
36d7f1f
b002218
acbab7e
d33dcdd
4253f28
64610b9
b228857
77772c3
6ca213c
dce4f8c
24a0919
f920f04
f01525f
769d361
c3cbad8
3cb8faf
08397ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ def parse_args(args: List[str]) -> argparse.Namespace: | |
| ) | ||
| required_discover_parser = discover_parser.add_argument_group("required named arguments") | ||
| required_discover_parser.add_argument( | ||
| "--config", type=str, required=True, help="path to the json configuration file" | ||
| "--config", type=str, required=False, help="path to the json configuration file" | ||
| ) | ||
|
|
||
| # read | ||
|
|
@@ -141,19 +141,35 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: | |
| ) | ||
| if cmd == "spec": | ||
| message = AirbyteMessage(type=Type.SPEC, spec=source_spec) | ||
| yield from [ | ||
| yield from ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aside - Note: I corrected Devin's implementation to use generator comprehension instead of list comprehension, and Devin applies in these adjacent locations as well. I think this is a positive change, calling out though to explain why other code paths are touched. More about generator comprehensions here: https://stackoverflow.com/a/47826 |
||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ] | ||
| ) | ||
| yield self.airbyte_message_to_string(message) | ||
| elif ( | ||
| cmd == "discover" | ||
| and not parsed_args.config | ||
| and hasattr(self.source, "check_config_during_discover") | ||
| and self.source.check_config_during_discover | ||
aaronsteers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ): | ||
| # Connector supports unprivileged discover | ||
| empty_config: dict[str, Any] = {} | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| yield from ( | ||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ) | ||
| yield from map( | ||
| AirbyteEntrypoint.airbyte_message_to_string, | ||
| self.discover(source_spec, empty_config), | ||
| ) | ||
| else: | ||
| raw_config = self.source.read_config(parsed_args.config) | ||
| config = self.source.configure(raw_config, temp_dir) | ||
|
|
||
| yield from [ | ||
| yield from ( | ||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ] | ||
| ) | ||
| if cmd == "check": | ||
| yield from map( | ||
| AirbyteEntrypoint.airbyte_message_to_string, | ||
|
|
@@ -225,7 +241,7 @@ def discover( | |
| self, source_spec: ConnectorSpecification, config: TConfig | ||
| ) -> Iterable[AirbyteMessage]: | ||
| self.set_up_secret_filter(config, source_spec.connectionSpecification) | ||
| if self.source.check_config_against_spec: | ||
| if not self.source.check_config_during_discover: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this changing? |
||
| self.validate_connection(source_spec, config) | ||
| catalog = self.source.discover(self.logger, config) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| # | ||
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| """Tests for the ManifestDeclarativeSource with DynamicSchemaLoader.""" | ||
|
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from airbyte_cdk.models import AirbyteCatalog | ||
| from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource | ||
| from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit | ||
|
|
||
|
|
||
| def test_check_config_during_discover_with_dynamic_schema_loader(): | ||
| """Test that check_config_during_discover is True when DynamicSchemaLoader is used.""" | ||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_stream", | ||
| "schema_loader": { | ||
| "type": "DynamicSchemaLoader", | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| "schema_type_identifier": { | ||
| "key_pointer": ["name"], | ||
| }, | ||
| }, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_during_discover is True | ||
| assert source.check_config_against_spec is True | ||
|
|
||
|
|
||
| def test_check_config_during_discover_without_dynamic_schema_loader(): | ||
| """Test that check_config_during_discover is False when DynamicSchemaLoader is not used.""" | ||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_stream", | ||
| "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_during_discover is False | ||
| assert source.check_config_against_spec is True | ||
|
|
||
|
|
||
| @patch( | ||
| "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" | ||
| ) | ||
| def test_discover_with_dynamic_schema_loader_no_config(mock_streams): | ||
| """Test that discovery works without config when DynamicSchemaLoader is used.""" | ||
| mock_stream = MagicMock() | ||
| mock_stream.name = "test_dynamic_stream" | ||
|
|
||
| mock_airbyte_stream = MagicMock() | ||
| type(mock_airbyte_stream).name = "test_dynamic_stream" | ||
| mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream | ||
|
|
||
| mock_streams.return_value = [mock_stream] | ||
|
|
||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_dynamic_stream", | ||
| "schema_loader": { | ||
| "type": "DynamicSchemaLoader", | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| "schema_type_identifier": { | ||
| "key_pointer": ["name"], | ||
| }, | ||
| }, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_during_discover is True | ||
| assert source.check_config_against_spec is True | ||
|
|
||
| logger = MagicMock() | ||
| catalog = source.discover(logger, {}) | ||
|
|
||
| assert isinstance(catalog, AirbyteCatalog) | ||
| assert len(catalog.streams) == 1 | ||
| assert catalog.streams[0].name == "test_dynamic_stream" | ||
|
|
||
|
|
||
| @patch( | ||
| "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" | ||
| ) | ||
| def test_discover_without_dynamic_schema_loader_no_config(mock_streams): | ||
| """Test that discovery validates config when DynamicSchemaLoader is not used.""" | ||
| mock_stream = MagicMock() | ||
| mock_stream.name = "test_static_stream" | ||
|
|
||
| mock_airbyte_stream = MagicMock() | ||
| type(mock_airbyte_stream).name = "test_static_stream" | ||
| mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream | ||
|
|
||
| mock_streams.return_value = [mock_stream] | ||
|
|
||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_static_stream", | ||
| "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_during_discover is False | ||
| assert source.check_config_against_spec is True | ||
|
|
||
| logger = MagicMock() | ||
| catalog = source.discover(logger, {}) | ||
|
|
||
| assert isinstance(catalog, AirbyteCatalog) | ||
| assert len(catalog.streams) == 1 | ||
| assert catalog.streams[0].name == "test_static_stream" | ||
|
|
||
| assert source.check_config_during_discover is False | ||
| assert source.check_config_against_spec is True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the value and the default mentionned in the comment don't align.