diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index ad6de6ac1..2098d78fb 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -108,8 +108,7 @@ def normalize(self) -> ManifestType: ManifestNormalizationException: Caught internally and handled by returning the original manifest. """ try: - self._deduplicate_minifest() - self._reference_schemas() + self._deduplicate_manifest() return self._normalized_manifest except ManifestNormalizationException: @@ -131,7 +130,7 @@ def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: yield from [] - def _deduplicate_minifest(self) -> None: + def _deduplicate_manifest(self) -> None: """ Find commonalities in the input JSON structure and refactor it to avoid redundancy. """ @@ -141,9 +140,117 @@ def _deduplicate_minifest(self) -> None: self._prepare_definitions() # replace duplicates with references, if any self._handle_duplicates(self._collect_duplicates()) + # replace parent streams with $refs + self._replace_parent_streams_with_refs() + # clean dangling fields after resolving $refs + self._clean_dangling_fields() except Exception as e: raise ManifestNormalizationException(str(e)) + def _replace_parent_streams_with_refs(self) -> None: + """ + For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs, + replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object + with a $ref to the correct stream index. + """ + streams = self._normalized_manifest.get(STREAMS_TAG, []) + + # Build a hash-to-index mapping for O(1) lookups + stream_hash_to_index = {} + for idx, stream in enumerate(streams): + stream_hash = self._hash_object(stream) + stream_hash_to_index[stream_hash] = idx + + for idx, stream in enumerate(streams): + retriever = stream.get("retriever") + if not retriever: + continue + partition_router = retriever.get("partition_router") + routers = ( + partition_router + if isinstance(partition_router, list) + else [partition_router] + if partition_router + else [] + ) + for router in routers: + if not isinstance(router, dict): + continue + if router.get("type") != "SubstreamPartitionRouter": + continue + parent_stream_configs = router.get("parent_stream_configs", []) + for parent_config in parent_stream_configs: + if not isinstance(parent_config, dict): + continue + stream_ref = parent_config.get("stream") + # Only replace if it's a dict and matches any stream in the manifest + if stream_ref is not None and isinstance(stream_ref, dict): + stream_ref_hash = self._hash_object(stream_ref) + if stream_ref_hash in stream_hash_to_index: + parent_config["stream"] = { + "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}" + } + + def _clean_dangling_fields(self) -> None: + """ + Clean the manifest by removing unused definitions and schemas. + This method removes any definitions or schemas that are not referenced by any $ref in the manifest. + """ + + def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: + """ + Recursively find all $ref paths in the object. + + Args: + obj: The object to search through + refs: List to store found reference paths + """ + if not isinstance(obj, dict): + return + + for key, value in obj.items(): + if key == "$ref" and isinstance(value, str): + # Remove the leading #/ from the ref path + refs.append(value[2:]) + elif isinstance(value, dict): + find_all_refs(value, refs) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + find_all_refs(item, refs) + + def clean_section(section: Dict[str, Any], section_path: str) -> None: + """ + Clean a section by removing unreferenced fields. + + Args: + section: The section to clean + section_path: The path to this section in the manifest + """ + for key in list(section.keys()): + current_path = f"{section_path}/{key}" + # Check if this path is referenced or is a parent of a referenced path + if not any(ref.startswith(current_path) for ref in all_refs): + del section[key] + + # Find all references in the manifest + all_refs: List[str] = [] + find_all_refs(self._normalized_manifest, all_refs) + + # Clean definitions + if DEF_TAG in self._normalized_manifest: + clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG) + # Remove empty definitions section + if not self._normalized_manifest[DEF_TAG]: + del self._normalized_manifest[DEF_TAG] + + # Clean schemas + if SCHEMAS_TAG in self._normalized_manifest: + clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG) + # Remove empty schemas section + if not self._normalized_manifest[SCHEMAS_TAG]: + del self._normalized_manifest[SCHEMAS_TAG] + def _prepare_definitions(self) -> None: """ Clean the definitions in the manifest by removing unnecessary properties. @@ -163,43 +270,6 @@ def _prepare_definitions(self) -> None: if key != LINKED_TAG: self._normalized_manifest[DEF_TAG].pop(key, None) - def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: - """ - Extract the schema from the stream and add it to the `schemas` tag. - """ - - stream_name = stream["name"] - # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key - schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) - if not SCHEMAS_TAG in self._normalized_manifest.keys(): - self._normalized_manifest[SCHEMAS_TAG] = {} - # add stream schema to the SCHEMAS_TAG - if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys(): - # add the schema to the SCHEMAS_TAG with the stream name as key - self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema - - def _reference_schemas(self) -> None: - """ - Set the schema reference for the given stream in the manifest. - This function modifies the manifest in place. - """ - - # reference the stream schema for the stream to where it's stored - if SCHEMAS_TAG in self._normalized_manifest.keys(): - for stream in self._get_manifest_streams(): - self._extract_stream_schema(stream) - self._set_stream_schema_ref(stream) - - def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: - """ - Set the schema reference for the given stream in the manifest. - This function modifies the manifest in place. - """ - stream_name = stream["name"] - if SCHEMAS_TAG in self._normalized_manifest.keys(): - if stream_name in self._normalized_manifest[SCHEMAS_TAG]: - stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) - def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: """ Process duplicate objects and replace them with references. @@ -447,16 +517,3 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st """ return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"} - - def _create_schema_ref(self, key: str) -> Dict[str, str]: - """ - Create a reference object for stream schema using the specified key. - - Args: - key: The reference key to use - - Returns: - A reference object in the proper format - """ - - return {"$ref": f"#/{SCHEMAS_TAG}/{key}"} diff --git a/unit_tests/sources/declarative/parsers/conftest.py b/unit_tests/sources/declarative/parsers/conftest.py index 3f653ebb1..d353c73b6 100644 --- a/unit_tests/sources/declarative/parsers/conftest.py +++ b/unit_tests/sources/declarative/parsers/conftest.py @@ -143,31 +143,31 @@ def manifest_with_multiple_url_base() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": {"a": 1}, }, "B": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": {"b": 2}, }, "C": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": {"c": 3}, }, "D": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": {"d": 4}, }, "E": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": {"e": 5}, }, }, } @@ -198,7 +198,12 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"a": 1}, + }, }, }, { @@ -220,7 +225,12 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"b": 2}, + }, }, }, { @@ -242,7 +252,12 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"c": 3}, + }, }, }, { @@ -264,7 +279,12 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"d": 4}, + }, }, }, { @@ -286,42 +306,15 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"e": 5}, + }, }, }, ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, } @@ -348,10 +341,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, - }, }, "B": { "type": "DeclarativeStream", @@ -369,10 +358,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, - }, }, "C": { "type": "DeclarativeStream", @@ -390,10 +375,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, - }, }, "D": { "type": "DeclarativeStream", @@ -411,10 +392,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, - }, }, "E": { "type": "DeclarativeStream", @@ -432,10 +409,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, - }, }, }, # dummy requesters to be resolved and deduplicated @@ -456,38 +429,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: {"$ref": "#/definitions/streams/D"}, {"$ref": "#/definitions/streams/E"}, ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, } @@ -514,10 +455,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, - }, }, { "type": "DeclarativeStream", @@ -536,10 +473,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, - }, }, { "type": "DeclarativeStream", @@ -558,10 +491,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, - }, }, { "type": "DeclarativeStream", @@ -580,10 +509,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, - }, }, { "type": "DeclarativeStream", @@ -602,278 +527,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, - }, - }, - ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, - } - - -@pytest.fixture -def manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas() -> Dict[str, Any]: - with open( - "unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml", - "r", - ) as file: - return dict(yaml.safe_load(file)) - - -@pytest.fixture -def expected_manifest_with_linked_definitions_url_base_authenticator_normalized() -> Dict[str, Any]: - return { - "version": "6.44.0", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["pokemon"]}, - "definitions": { - "linked": { - "HttpRequester": { - "url_base": "https://pokeapi.co/api/v1/", - "authenticator": { - "type": "ApiKeyAuthenticator", - "api_token": '{{ config["api_key"] }}', - "inject_into": { - "type": "RequestOption", - "field_name": "API_KEY", - "inject_into": "header", - }, - }, - } - } - }, - "streams": [ - { - "type": "DeclarativeStream", - "name": "pokemon", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": { - "$ref": "#/definitions/linked/HttpRequester/url_base", - }, - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/pokemon"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "trainers", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": { - "$ref": "#/definitions/linked/HttpRequester/url_base", - }, - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/trainers"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "items", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/items"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "location", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "location", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/location"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "berries", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "berries", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/berries"}, - }, }, ], - "spec": { - "type": "Spec", - "connection_specification": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "order": 0, - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - }, - "metadata": { - "assist": {}, - "testedStreams": { - "berries": {"streamHash": None}, - "pokemon": {"streamHash": None}, - "location": {"streamHash": None}, - "trainers": {"streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"}, - "items": {"streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"}, - }, - "autoImportSchema": {"berries": True, "pokemon": True, "location": True}, - }, - "schemas": { - "berries": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "berry_type": {"type": "integer"}, - }, - "additionalProperties": True, - }, - "pokemon": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - "location": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "location_type": {"type": "string"}, - }, - }, - "trainers": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - "items": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - }, } diff --git a/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml b/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml deleted file mode 100644 index 5b972334c..000000000 --- a/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml +++ /dev/null @@ -1,212 +0,0 @@ -version: 6.44.0 - -type: DeclarativeSource - -check: - type: CheckStream - stream_names: - - pokemon - -definitions: - linked: - HttpRequester: - url_base: https://pokeapi.co/api/v1/ - -streams: - - type: DeclarativeStream - name: pokemon - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: - $ref: "#/definitions/linked/HttpRequester/url_base" - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: trainers - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: - $ref: "#/definitions/linked/HttpRequester/url_base" - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: items - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: location - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: location - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/location" - - type: DeclarativeStream - name: berries - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: berries - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/berries" - -spec: - type: Spec - connection_specification: - type: object - $schema: http://json-schema.org/draft-07/schema# - required: - - api_key - properties: - api_key: - type: string - order: 0 - title: API Key - airbyte_secret: true - additionalProperties: true - -metadata: - assist: {} - testedStreams: - berries: - streamHash: null - pokemon: - streamHash: null - location: - streamHash: null - trainers: - streamHash: ca4ee51a2aaa2a53b9c0b91881a84ad621da575f - items: - streamHash: 12e624ecf47c6357c74c27d6a65c72e437b1534a - autoImportSchema: - berries: true - pokemon: true - location: true - -schemas: - berries: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - berry_type: - type: integer - additionalProperties: true - pokemon: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - pokemon_type: - type: integer - location: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - location_type: - type: string \ No newline at end of file diff --git a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py index f25c1bbd6..99c2da43a 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py @@ -48,26 +48,319 @@ def test_with_shared_definitions_url_base_are_present( assert normalized_manifest == expected_manifest_with_url_base_linked_definition_normalized -def test_with_linked_definitions_url_base_authenticator_when_multiple_streams_reference_the_same_schema( - manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas, - expected_manifest_with_linked_definitions_url_base_authenticator_normalized, -) -> None: +def test_clean_dangling_fields_removes_unreferenced_definitions() -> None: """ - This test is to check that the manifest is normalized when the `url_base` and the `authenticator` is linked - between the definitions and the `url_base` is present in the manifest. - The `authenticator` is not a normal schema, but a reference to another schema. + Test that unreferenced definitions are removed while referenced ones are kept. + """ + manifest = { + "definitions": { + "referenced": {"type": "object", "properties": {"a": 1}}, + "unreferenced": {"type": "object", "properties": {"b": 2}}, + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {"def": {"$ref": "#/definitions/referenced"}}, + } + ], + } + expected = { + "definitions": {"referenced": {"type": "object", "properties": {"a": 1}}}, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {"def": {"$ref": "#/definitions/referenced"}}, + } + ], + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected - The test also verifies the `stream.schema_loader.schema` is properly extracted to - the `schemas.`. + +def test_clean_dangling_fields_removes_unreferenced_schemas() -> None: + """ + Test that unreferenced schemas are removed while referenced ones are kept. """ + manifest = { + "schemas": { + "referenced": {"type": "object", "properties": {"a": 1}}, + "unreferenced": {"type": "object", "properties": {"b": 2}}, + }, + "streams": [ + {"name": "stream1", "schema_loader": {"schema": {"$ref": "#/schemas/referenced"}}} + ], + } + expected = { + "schemas": {"referenced": {"type": "object", "properties": {"a": 1}}}, + "streams": [ + {"name": "stream1", "schema_loader": {"schema": {"$ref": "#/schemas/referenced"}}} + ], + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected + +def test_clean_dangling_fields_keeps_parent_paths() -> None: + """ + Test that parent paths of referenced fields are kept. + """ + manifest = { + "definitions": { + "parent": {"child": {"grandchild": {"type": "object", "properties": {"a": 1}}}} + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {"def": {"$ref": "#/definitions/parent/child/grandchild"}}, + } + ], + } + expected = { + "definitions": { + "parent": {"child": {"grandchild": {"type": "object", "properties": {"a": 1}}}} + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {"def": {"$ref": "#/definitions/parent/child/grandchild"}}, + } + ], + } schema = _get_declarative_component_schema() - resolved_manifest = resolver.preprocess_manifest( - manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas - ) - normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected + + +def test_clean_dangling_fields_removes_empty_sections() -> None: + """ + Test that empty sections are removed after cleaning. + """ + manifest = { + "definitions": {"unreferenced": {"type": "object", "properties": {"b": 2}}}, + "schemas": {"unreferenced": {"type": "object", "properties": {"b": 2}}}, + "streams": [{"name": "stream1", "type": "object", "properties": {}}], + } + expected = {"streams": [{"name": "stream1", "type": "object", "properties": {}}]} + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected + + +def test_replace_parent_streams_with_refs_replaces_with_ref(): + """ + If a parent_stream_config's stream field matches another stream object, it should be replaced with a $ref to the correct index. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}} + manifest = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": stream_a.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + { + "stream": stream_b.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + } + }, + }, + ] + } + expected = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": {"$ref": "#/streams/0"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + { + "stream": {"$ref": "#/streams/1"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + } + }, + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected - assert ( - normalized_manifest - == expected_manifest_with_linked_definitions_url_base_authenticator_normalized - ) + +def test_replace_parent_streams_with_refs_no_match(): + """ + If a parent_stream_config's stream field does not match any stream object, it should not be replaced. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + unrelated_stream = {"name": "X", "type": "DeclarativeStream", "retriever": {"foo": 1}} + manifest = { + "streams": [ + stream_a, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": unrelated_stream.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], + } + }, + }, + ] + } + expected = { + "streams": [ + stream_a, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": unrelated_stream.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], + } + }, + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected + + +def test_replace_parent_streams_with_refs_handles_multiple_partition_routers(): + """ + If there are multiple partition_routers (as a list), all should be checked. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}} + manifest = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": [ + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": stream_a.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], + }, + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": stream_b.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + }, + ] + }, + }, + ] + } + expected = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": [ + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": {"$ref": "#/streams/0"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], + }, + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": {"$ref": "#/streams/1"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + }, + ] + }, + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected