-
Notifications
You must be signed in to change notification settings - Fork 33
feat: skip config validation during discovery for declarative sources that don't use DynamicSchemaLoader #464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
cdd1ac9
7490ad1
9e84e1c
47bd67c
36d7f1f
b002218
acbab7e
d33dcdd
4253f28
64610b9
b228857
77772c3
6ca213c
dce4f8c
24a0919
f920f04
f01525f
769d361
c3cbad8
3cb8faf
08397ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ def parse_args(args: List[str]) -> argparse.Namespace: | |
| ) | ||
| required_discover_parser = discover_parser.add_argument_group("required named arguments") | ||
| required_discover_parser.add_argument( | ||
| "--config", type=str, required=True, help="path to the json configuration file" | ||
| "--config", type=str, required=False, help="path to the json configuration file" | ||
| ) | ||
|
|
||
| # read | ||
|
|
@@ -141,19 +141,34 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: | |
| ) | ||
| if cmd == "spec": | ||
| message = AirbyteMessage(type=Type.SPEC, spec=source_spec) | ||
| yield from [ | ||
| yield from ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aside - Note: I corrected Devin's implementation to use generator comprehension instead of list comprehension, and Devin applies in these adjacent locations as well. I think this is a positive change, calling out though to explain why other code paths are touched. More about generator comprehensions here: https://stackoverflow.com/a/47826 |
||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ] | ||
| ) | ||
| yield self.airbyte_message_to_string(message) | ||
| elif ( | ||
| cmd == "discover" | ||
| and not parsed_args.config | ||
| and not self.source.check_config_against_spec | ||
| ): | ||
| # Connector supports unprivileged discover | ||
| empty_config: dict[str, Any] = {} | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| yield from ( | ||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ) | ||
| yield from map( | ||
| AirbyteEntrypoint.airbyte_message_to_string, | ||
| self.discover(source_spec, empty_config), | ||
| ) | ||
| else: | ||
| raw_config = self.source.read_config(parsed_args.config) | ||
| config = self.source.configure(raw_config, temp_dir) | ||
|
|
||
| yield from [ | ||
| yield from ( | ||
| self.airbyte_message_to_string(queued_message) | ||
| for queued_message in self._emit_queued_messages(self.source) | ||
| ] | ||
| ) | ||
| if cmd == "check": | ||
| yield from map( | ||
| AirbyteEntrypoint.airbyte_message_to_string, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| # | ||
| # | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from airbyte_cdk.models import AirbyteCatalog | ||
| from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource | ||
| from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit | ||
|
|
||
|
|
||
| def test_check_config_against_spec_with_dynamic_schema_loader(): | ||
| """Test that check_config_against_spec is False when DynamicSchemaLoader is used.""" | ||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_stream", | ||
| "schema_loader": { | ||
| "type": "DynamicSchemaLoader", | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| "schema_type_identifier": { | ||
| "key_pointer": ["name"], | ||
| }, | ||
| }, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_against_spec is False | ||
|
|
||
|
|
||
| def test_check_config_against_spec_without_dynamic_schema_loader(): | ||
| """Test that check_config_against_spec is True when DynamicSchemaLoader is not used.""" | ||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_stream", | ||
| "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
|
|
||
| @patch( | ||
| "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" | ||
| ) | ||
| def test_discover_with_dynamic_schema_loader_no_config(mock_streams): | ||
| """Test that discovery works without config when DynamicSchemaLoader is used.""" | ||
| mock_stream = MagicMock() | ||
| mock_stream.name = "test_dynamic_stream" | ||
|
|
||
| mock_airbyte_stream = MagicMock() | ||
| type(mock_airbyte_stream).name = "test_dynamic_stream" | ||
| mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream | ||
|
|
||
| mock_streams.return_value = [mock_stream] | ||
|
|
||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_dynamic_stream", | ||
| "schema_loader": { | ||
| "type": "DynamicSchemaLoader", | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| "schema_type_identifier": { | ||
| "key_pointer": ["name"], | ||
| }, | ||
| }, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_against_spec is False | ||
|
|
||
| logger = MagicMock() | ||
| catalog = source.discover(logger, {}) | ||
|
|
||
| assert isinstance(catalog, AirbyteCatalog) | ||
| assert len(catalog.streams) == 1 | ||
| assert catalog.streams[0].name == "test_dynamic_stream" | ||
|
|
||
|
|
||
| @patch( | ||
| "airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" | ||
| ) | ||
| def test_discover_without_dynamic_schema_loader_no_config(mock_streams): | ||
| """Test that discovery validates config when DynamicSchemaLoader is not used.""" | ||
| mock_stream = MagicMock() | ||
| mock_stream.name = "test_static_stream" | ||
|
|
||
| mock_airbyte_stream = MagicMock() | ||
| type(mock_airbyte_stream).name = "test_static_stream" | ||
| mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream | ||
|
|
||
| mock_streams.return_value = [mock_stream] | ||
|
|
||
| source_config = { | ||
| "type": "DeclarativeSource", | ||
| "check": {"type": "CheckStream"}, | ||
| "streams": [ | ||
| { | ||
| "name": "test_static_stream", | ||
| "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, | ||
| "retriever": { | ||
| "type": "SimpleRetriever", | ||
| "requester": {"url_base": "https://example.com", "http_method": "GET"}, | ||
| "record_selector": {"extractor": {"field_path": []}}, | ||
| }, | ||
| } | ||
| ], | ||
| "version": "0.1.0", | ||
| } | ||
|
|
||
| source = ManifestDeclarativeSource(source_config=source_config) | ||
|
|
||
| assert source.check_config_against_spec is True | ||
|
|
||
| logger = MagicMock() | ||
| catalog = source.discover(logger, {}) | ||
|
|
||
| assert isinstance(catalog, AirbyteCatalog) | ||
| assert len(catalog.streams) == 1 | ||
| assert catalog.streams[0].name == "test_static_stream" | ||
|
|
||
| assert source.check_config_against_spec is True | ||
Uh oh!
There was an error while loading. Please reload this page.