From 34f87967e5cd37866438e0285d2e72eb7370b72b Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 12 Jun 2025 13:35:25 -0700 Subject: [PATCH 1/7] ignore streams without schemas when normalizing them --- .../sources/declarative/parsers/manifest_normalizer.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index ad6de6ac1..661acc77f 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -169,8 +169,14 @@ def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: """ stream_name = stream["name"] + # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) + + # if the schema is not found, do nothing + if not schema: + return + if not SCHEMAS_TAG in self._normalized_manifest.keys(): self._normalized_manifest[SCHEMAS_TAG] = {} # add stream schema to the SCHEMAS_TAG @@ -198,7 +204,9 @@ def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: stream_name = stream["name"] if SCHEMAS_TAG in self._normalized_manifest.keys(): if stream_name in self._normalized_manifest[SCHEMAS_TAG]: - stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) + if SCHEMA_LOADER_TAG in stream.keys(): + if SCHEMA_TAG in stream[SCHEMA_LOADER_TAG].keys(): + stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: """ From 970eb3f1eb08c3a1fb4036e391806e264a5de0a6 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 13 Jun 2025 10:01:57 -0700 Subject: [PATCH 2/7] remove schema normalization --- .../parsers/manifest_normalizer.py | 124 ++--- .../sources/declarative/parsers/conftest.py | 437 +++--------------- .../resources/abnormal_schemas_manifest.yaml | 212 --------- .../parsers/test_manifest_normalizer.py | 165 ++++++- 4 files changed, 266 insertions(+), 672 deletions(-) delete mode 100644 unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index 661acc77f..1b02e3948 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -108,8 +108,7 @@ def normalize(self) -> ManifestType: ManifestNormalizationException: Caught internally and handled by returning the original manifest. """ try: - self._deduplicate_minifest() - self._reference_schemas() + self._deduplicate_manifest() return self._normalized_manifest except ManifestNormalizationException: @@ -131,7 +130,7 @@ def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: yield from [] - def _deduplicate_minifest(self) -> None: + def _deduplicate_manifest(self) -> None: """ Find commonalities in the input JSON structure and refactor it to avoid redundancy. """ @@ -141,8 +140,69 @@ def _deduplicate_minifest(self) -> None: self._prepare_definitions() # replace duplicates with references, if any self._handle_duplicates(self._collect_duplicates()) + # clean dangling fields after resolving $refs + self._clean_dangling_fields() except Exception as e: raise ManifestNormalizationException(str(e)) + + def _clean_dangling_fields(self) -> None: + """ + Clean the manifest by removing unused definitions and schemas. + This method removes any definitions or schemas that are not referenced by any $ref in the manifest. + """ + def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: + """ + Recursively find all $ref paths in the object. + + Args: + obj: The object to search through + refs: List to store found reference paths + """ + if not isinstance(obj, dict): + return + + for key, value in obj.items(): + if key == "$ref" and isinstance(value, str): + # Remove the leading #/ from the ref path + refs.append(value[2:]) + elif isinstance(value, dict): + find_all_refs(value, refs) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + find_all_refs(item, refs) + + def clean_section(section: Dict[str, Any], section_path: str) -> None: + """ + Clean a section by removing unreferenced fields. + + Args: + section: The section to clean + section_path: The path to this section in the manifest + """ + for key in list(section.keys()): + current_path = f"{section_path}/{key}" + # Check if this path is referenced or is a parent of a referenced path + if not any(ref.startswith(current_path) for ref in all_refs): + del section[key] + + # Find all references in the manifest + all_refs: List[str] = [] + find_all_refs(self._normalized_manifest, all_refs) + + # Clean definitions + if DEF_TAG in self._normalized_manifest: + clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG) + # Remove empty definitions section + if not self._normalized_manifest[DEF_TAG]: + del self._normalized_manifest[DEF_TAG] + + # Clean schemas + if SCHEMAS_TAG in self._normalized_manifest: + clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG) + # Remove empty schemas section + if not self._normalized_manifest[SCHEMAS_TAG]: + del self._normalized_manifest[SCHEMAS_TAG] def _prepare_definitions(self) -> None: """ @@ -163,51 +223,6 @@ def _prepare_definitions(self) -> None: if key != LINKED_TAG: self._normalized_manifest[DEF_TAG].pop(key, None) - def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: - """ - Extract the schema from the stream and add it to the `schemas` tag. - """ - - stream_name = stream["name"] - - # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key - schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) - - # if the schema is not found, do nothing - if not schema: - return - - if not SCHEMAS_TAG in self._normalized_manifest.keys(): - self._normalized_manifest[SCHEMAS_TAG] = {} - # add stream schema to the SCHEMAS_TAG - if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys(): - # add the schema to the SCHEMAS_TAG with the stream name as key - self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema - - def _reference_schemas(self) -> None: - """ - Set the schema reference for the given stream in the manifest. - This function modifies the manifest in place. - """ - - # reference the stream schema for the stream to where it's stored - if SCHEMAS_TAG in self._normalized_manifest.keys(): - for stream in self._get_manifest_streams(): - self._extract_stream_schema(stream) - self._set_stream_schema_ref(stream) - - def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: - """ - Set the schema reference for the given stream in the manifest. - This function modifies the manifest in place. - """ - stream_name = stream["name"] - if SCHEMAS_TAG in self._normalized_manifest.keys(): - if stream_name in self._normalized_manifest[SCHEMAS_TAG]: - if SCHEMA_LOADER_TAG in stream.keys(): - if SCHEMA_TAG in stream[SCHEMA_LOADER_TAG].keys(): - stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) - def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: """ Process duplicate objects and replace them with references. @@ -455,16 +470,3 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st """ return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"} - - def _create_schema_ref(self, key: str) -> Dict[str, str]: - """ - Create a reference object for stream schema using the specified key. - - Args: - key: The reference key to use - - Returns: - A reference object in the proper format - """ - - return {"$ref": f"#/{SCHEMAS_TAG}/{key}"} diff --git a/unit_tests/sources/declarative/parsers/conftest.py b/unit_tests/sources/declarative/parsers/conftest.py index 3f653ebb1..8837f8d9f 100644 --- a/unit_tests/sources/declarative/parsers/conftest.py +++ b/unit_tests/sources/declarative/parsers/conftest.py @@ -143,31 +143,41 @@ def manifest_with_multiple_url_base() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": { + "a": 1 + }, }, "B": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": { + "b": 2 + }, }, "C": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": { + "c": 3 + }, }, "D": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": { + "d": 4 + }, }, "E": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": {}, + "properties": { + "e": 5 + }, }, }, } @@ -198,7 +208,14 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "a": 1 + }, + }, }, }, { @@ -220,7 +237,14 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "b": 2 + }, + }, }, }, { @@ -242,7 +266,14 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "c": 3 + }, + }, }, }, { @@ -264,7 +295,14 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "d": 4 + }, + }, }, }, { @@ -286,42 +324,17 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: }, "schema_loader": { "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "e": 5 + } + } }, }, ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, } @@ -348,10 +361,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, - }, }, "B": { "type": "DeclarativeStream", @@ -369,10 +378,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, - }, }, "C": { "type": "DeclarativeStream", @@ -390,10 +395,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, - }, }, "D": { "type": "DeclarativeStream", @@ -411,10 +412,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, - }, }, "E": { "type": "DeclarativeStream", @@ -432,10 +429,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, - }, }, }, # dummy requesters to be resolved and deduplicated @@ -456,38 +449,6 @@ def manifest_with_url_base_linked_definition() -> Dict[str, Any]: {"$ref": "#/definitions/streams/D"}, {"$ref": "#/definitions/streams/E"}, ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, } @@ -514,10 +475,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/A"}, - }, }, { "type": "DeclarativeStream", @@ -536,10 +493,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/B"}, - }, }, { "type": "DeclarativeStream", @@ -558,10 +511,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/C"}, - }, }, { "type": "DeclarativeStream", @@ -580,10 +529,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/D"}, - }, }, { "type": "DeclarativeStream", @@ -602,278 +547,6 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, }, "decoder": {"type": "JsonDecoder"}, }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/E"}, - }, }, ], - "schemas": { - "A": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "B": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "C": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "D": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - "E": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": True, - "properties": {}, - }, - }, - } - - -@pytest.fixture -def manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas() -> Dict[str, Any]: - with open( - "unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml", - "r", - ) as file: - return dict(yaml.safe_load(file)) - - -@pytest.fixture -def expected_manifest_with_linked_definitions_url_base_authenticator_normalized() -> Dict[str, Any]: - return { - "version": "6.44.0", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["pokemon"]}, - "definitions": { - "linked": { - "HttpRequester": { - "url_base": "https://pokeapi.co/api/v1/", - "authenticator": { - "type": "ApiKeyAuthenticator", - "api_token": '{{ config["api_key"] }}', - "inject_into": { - "type": "RequestOption", - "field_name": "API_KEY", - "inject_into": "header", - }, - }, - } - } - }, - "streams": [ - { - "type": "DeclarativeStream", - "name": "pokemon", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": { - "$ref": "#/definitions/linked/HttpRequester/url_base", - }, - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/pokemon"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "trainers", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": { - "$ref": "#/definitions/linked/HttpRequester/url_base", - }, - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator", - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/trainers"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "items", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "pokemon", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/items"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "location", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "location", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/location"}, - }, - }, - { - "type": "DeclarativeStream", - "name": "berries", - "retriever": { - "type": "SimpleRetriever", - "decoder": {"type": "JsonDecoder"}, - "requester": { - "type": "HttpRequester", - "path": "berries", - "url_base": "https://pokeapi.co/api/v2/", - "http_method": "GET", - "authenticator": { - "$ref": "#/definitions/linked/HttpRequester/authenticator" - }, - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": {"$ref": "#/schemas/berries"}, - }, - }, - ], - "spec": { - "type": "Spec", - "connection_specification": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "required": ["api_key"], - "properties": { - "api_key": { - "type": "string", - "order": 0, - "title": "API Key", - "airbyte_secret": True, - } - }, - "additionalProperties": True, - }, - }, - "metadata": { - "assist": {}, - "testedStreams": { - "berries": {"streamHash": None}, - "pokemon": {"streamHash": None}, - "location": {"streamHash": None}, - "trainers": {"streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"}, - "items": {"streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"}, - }, - "autoImportSchema": {"berries": True, "pokemon": True, "location": True}, - }, - "schemas": { - "berries": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "berry_type": {"type": "integer"}, - }, - "additionalProperties": True, - }, - "pokemon": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - "location": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "location_type": {"type": "string"}, - }, - }, - "trainers": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - "items": { - "type": "object", - "$schema": "http://json-schema.org/draft-07/schema#", - "properties": { - "name": {"type": "string"}, - "pokemon_type": {"type": "integer"}, - }, - }, - }, } diff --git a/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml b/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml deleted file mode 100644 index 5b972334c..000000000 --- a/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml +++ /dev/null @@ -1,212 +0,0 @@ -version: 6.44.0 - -type: DeclarativeSource - -check: - type: CheckStream - stream_names: - - pokemon - -definitions: - linked: - HttpRequester: - url_base: https://pokeapi.co/api/v1/ - -streams: - - type: DeclarativeStream - name: pokemon - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: - $ref: "#/definitions/linked/HttpRequester/url_base" - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: trainers - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: - $ref: "#/definitions/linked/HttpRequester/url_base" - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: items - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: pokemon - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/pokemon" - - type: DeclarativeStream - name: location - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: location - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/location" - - type: DeclarativeStream - name: berries - retriever: - type: SimpleRetriever - decoder: - type: JsonDecoder - requester: - type: HttpRequester - path: berries - url_base: https://pokeapi.co/api/v2/ - http_method: GET - authenticator: - type: ApiKeyAuthenticator - api_token: "{{ config[\"api_key\"] }}" - inject_into: - type: RequestOption - field_name: API_KEY - inject_into: header - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: [] - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/berries" - -spec: - type: Spec - connection_specification: - type: object - $schema: http://json-schema.org/draft-07/schema# - required: - - api_key - properties: - api_key: - type: string - order: 0 - title: API Key - airbyte_secret: true - additionalProperties: true - -metadata: - assist: {} - testedStreams: - berries: - streamHash: null - pokemon: - streamHash: null - location: - streamHash: null - trainers: - streamHash: ca4ee51a2aaa2a53b9c0b91881a84ad621da575f - items: - streamHash: 12e624ecf47c6357c74c27d6a65c72e437b1534a - autoImportSchema: - berries: true - pokemon: true - location: true - -schemas: - berries: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - berry_type: - type: integer - additionalProperties: true - pokemon: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - pokemon_type: - type: integer - location: - type: object - $schema: http://json-schema.org/draft-07/schema# - properties: - name: - type: string - location_type: - type: string \ No newline at end of file diff --git a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py index f25c1bbd6..9258bfef3 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py @@ -48,26 +48,157 @@ def test_with_shared_definitions_url_base_are_present( assert normalized_manifest == expected_manifest_with_url_base_linked_definition_normalized -def test_with_linked_definitions_url_base_authenticator_when_multiple_streams_reference_the_same_schema( - manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas, - expected_manifest_with_linked_definitions_url_base_authenticator_normalized, -) -> None: +def test_clean_dangling_fields_removes_unreferenced_definitions() -> None: """ - This test is to check that the manifest is normalized when the `url_base` and the `authenticator` is linked - between the definitions and the `url_base` is present in the manifest. - The `authenticator` is not a normal schema, but a reference to another schema. + Test that unreferenced definitions are removed while referenced ones are kept. + """ + manifest = { + "definitions": { + "referenced": {"type": "object", "properties": {"a": 1}}, + "unreferenced": {"type": "object", "properties": {"b": 2}} + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": { + "def": {"$ref": "#/definitions/referenced"} + } + } + ] + } + expected = { + "definitions": { + "referenced": {"type": "object", "properties": {"a": 1}} + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": { + "def": {"$ref": "#/definitions/referenced"} + } + } + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected - The test also verifies the `stream.schema_loader.schema` is properly extracted to - the `schemas.`. + +def test_clean_dangling_fields_removes_unreferenced_schemas() -> None: + """ + Test that unreferenced schemas are removed while referenced ones are kept. """ + manifest = { + "schemas": { + "referenced": {"type": "object", "properties": {"a": 1}}, + "unreferenced": {"type": "object", "properties": {"b": 2}} + }, + "streams": [ + { + "name": "stream1", + "schema_loader": { + "schema": {"$ref": "#/schemas/referenced"} + } + } + ] + } + expected = { + "schemas": { + "referenced": {"type": "object", "properties": {"a": 1}} + }, + "streams": [ + { + "name": "stream1", + "schema_loader": { + "schema": {"$ref": "#/schemas/referenced"} + } + } + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected + +def test_clean_dangling_fields_keeps_parent_paths() -> None: + """ + Test that parent paths of referenced fields are kept. + """ + manifest = { + "definitions": { + "parent": { + "child": { + "grandchild": {"type": "object", "properties": {"a": 1}} + } + } + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": { + "def": {"$ref": "#/definitions/parent/child/grandchild"} + } + } + ] + } + expected = { + "definitions": { + "parent": { + "child": { + "grandchild": {"type": "object", "properties": {"a": 1}} + } + } + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": { + "def": {"$ref": "#/definitions/parent/child/grandchild"} + } + } + ] + } schema = _get_declarative_component_schema() - resolved_manifest = resolver.preprocess_manifest( - manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas - ) - normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected + - assert ( - normalized_manifest - == expected_manifest_with_linked_definitions_url_base_authenticator_normalized - ) +def test_clean_dangling_fields_removes_empty_sections() -> None: + """ + Test that empty sections are removed after cleaning. + """ + manifest = { + "definitions": { + "unreferenced": {"type": "object", "properties": {"b": 2}} + }, + "schemas": { + "unreferenced": {"type": "object", "properties": {"b": 2}} + }, + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {} + } + ] + } + expected = { + "streams": [ + { + "name": "stream1", + "type": "object", + "properties": {} + } + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._clean_dangling_fields() + assert normalizer._normalized_manifest == expected From 78c092b096955333dfd2e80348c74d5e617f1bc8 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 13 Jun 2025 11:46:29 -0700 Subject: [PATCH 3/7] add refs for parent streams --- .../parsers/manifest_normalizer.py | 38 ++++- .../parsers/test_manifest_normalizer.py | 160 ++++++++++++++++++ 2 files changed, 197 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index 1b02e3948..3cf328db4 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -140,11 +140,47 @@ def _deduplicate_manifest(self) -> None: self._prepare_definitions() # replace duplicates with references, if any self._handle_duplicates(self._collect_duplicates()) + # replace parent streams with $refs + # self._replace_parent_streams_with_refs() # clean dangling fields after resolving $refs self._clean_dangling_fields() except Exception as e: raise ManifestNormalizationException(str(e)) - + + def _replace_parent_streams_with_refs(self) -> None: + """ + For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs, + replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object + with a $ref to the correct stream index. + """ + import copy + + streams = self._normalized_manifest.get(STREAMS_TAG, []) + # Use deep copy for comparison to avoid mutation issues + stream_copies = [copy.deepcopy(s) for s in streams] + + for idx, stream in enumerate(streams): + retriever = stream.get("retriever") + if not retriever: + continue + partition_router = retriever.get("partition_router") + routers = partition_router if isinstance(partition_router, list) else [partition_router] if partition_router else [] + for router in routers: + if not isinstance(router, dict): + continue + if router.get("type") != "SubstreamPartitionRouter": + continue + parent_stream_configs = router.get("parent_stream_configs", []) + for parent_config in parent_stream_configs: + if not isinstance(parent_config, dict): + continue + stream_ref = parent_config.get("stream") + # Only replace if it's a dict and matches any stream in the manifest + for other_idx, other_stream in enumerate(stream_copies): + if stream_ref is not None and isinstance(stream_ref, dict) and stream_ref == other_stream: + parent_config["stream"] = {"$ref": f"#/streams/{other_idx}"} + break + def _clean_dangling_fields(self) -> None: """ Clean the manifest by removing unused definitions and schemas. diff --git a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py index 9258bfef3..34b8793f9 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py @@ -202,3 +202,163 @@ def test_clean_dangling_fields_removes_empty_sections() -> None: normalizer = ManifestNormalizer(manifest, schema) normalizer._clean_dangling_fields() assert normalizer._normalized_manifest == expected + + +def test_replace_parent_streams_with_refs_replaces_with_ref(): + """ + If a parent_stream_config's stream field matches another stream object, it should be replaced with a $ref to the correct index. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}} + manifest = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + {"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, + ] + } + } + }, + ] + } + expected = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + {"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, + ] + } + } + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected + + +def test_replace_parent_streams_with_refs_no_match(): + """ + If a parent_stream_config's stream field does not match any stream object, it should not be replaced. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + unrelated_stream = {"name": "X", "type": "DeclarativeStream", "retriever": {"foo": 1}} + manifest = { + "streams": [ + stream_a, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + ] + } + } + }, + ] + } + expected = { + "streams": [ + stream_a, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + ] + } + } + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected + + +def test_replace_parent_streams_with_refs_handles_multiple_partition_routers(): + """ + If there are multiple partition_routers (as a list), all should be checked. + """ + stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}} + stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}} + manifest = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": [ + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + ] + }, + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, + ] + } + ] + } + }, + ] + } + expected = { + "streams": [ + stream_a, + stream_b, + { + "name": "C", + "type": "DeclarativeStream", + "retriever": { + "partition_router": [ + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, + ] + }, + { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + {"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, + ] + } + ] + } + }, + ] + } + schema = _get_declarative_component_schema() + normalizer = ManifestNormalizer(manifest, schema) + normalizer._replace_parent_streams_with_refs() + assert normalizer._normalized_manifest == expected From b3b39f85f11a7aeeda2e85ee27f16e8615ddd04c Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 13 Jun 2025 17:18:09 -0700 Subject: [PATCH 4/7] uncomment replace parent streams with refs --- airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index 3cf328db4..d8456e899 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -141,7 +141,7 @@ def _deduplicate_manifest(self) -> None: # replace duplicates with references, if any self._handle_duplicates(self._collect_duplicates()) # replace parent streams with $refs - # self._replace_parent_streams_with_refs() + self._replace_parent_streams_with_refs() # clean dangling fields after resolving $refs self._clean_dangling_fields() except Exception as e: From a4f1267b9a9f8d427396d209fff461f8f1ae0f8a Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 13 Jun 2025 17:28:01 -0700 Subject: [PATCH 5/7] run poetry run ruff format --- .../parsers/manifest_normalizer.py | 21 +- .../sources/declarative/parsers/conftest.py | 42 +--- .../parsers/test_manifest_normalizer.py | 196 +++++++++--------- 3 files changed, 126 insertions(+), 133 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index d8456e899..fdeddf329 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -164,7 +164,13 @@ def _replace_parent_streams_with_refs(self) -> None: if not retriever: continue partition_router = retriever.get("partition_router") - routers = partition_router if isinstance(partition_router, list) else [partition_router] if partition_router else [] + routers = ( + partition_router + if isinstance(partition_router, list) + else [partition_router] + if partition_router + else [] + ) for router in routers: if not isinstance(router, dict): continue @@ -177,7 +183,11 @@ def _replace_parent_streams_with_refs(self) -> None: stream_ref = parent_config.get("stream") # Only replace if it's a dict and matches any stream in the manifest for other_idx, other_stream in enumerate(stream_copies): - if stream_ref is not None and isinstance(stream_ref, dict) and stream_ref == other_stream: + if ( + stream_ref is not None + and isinstance(stream_ref, dict) + and stream_ref == other_stream + ): parent_config["stream"] = {"$ref": f"#/streams/{other_idx}"} break @@ -186,17 +196,18 @@ def _clean_dangling_fields(self) -> None: Clean the manifest by removing unused definitions and schemas. This method removes any definitions or schemas that are not referenced by any $ref in the manifest. """ + def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: """ Recursively find all $ref paths in the object. - + Args: obj: The object to search through refs: List to store found reference paths """ if not isinstance(obj, dict): return - + for key, value in obj.items(): if key == "$ref" and isinstance(value, str): # Remove the leading #/ from the ref path @@ -211,7 +222,7 @@ def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None: def clean_section(section: Dict[str, Any], section_path: str) -> None: """ Clean a section by removing unreferenced fields. - + Args: section: The section to clean section_path: The path to this section in the manifest diff --git a/unit_tests/sources/declarative/parsers/conftest.py b/unit_tests/sources/declarative/parsers/conftest.py index 8837f8d9f..d353c73b6 100644 --- a/unit_tests/sources/declarative/parsers/conftest.py +++ b/unit_tests/sources/declarative/parsers/conftest.py @@ -143,41 +143,31 @@ def manifest_with_multiple_url_base() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "a": 1 - }, + "properties": {"a": 1}, }, "B": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "b": 2 - }, + "properties": {"b": 2}, }, "C": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "c": 3 - }, + "properties": {"c": 3}, }, "D": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "d": 4 - }, + "properties": {"d": 4}, }, "E": { "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "e": 5 - }, + "properties": {"e": 5}, }, }, } @@ -212,9 +202,7 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "a": 1 - }, + "properties": {"a": 1}, }, }, }, @@ -241,9 +229,7 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "b": 2 - }, + "properties": {"b": 2}, }, }, }, @@ -270,9 +256,7 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "c": 3 - }, + "properties": {"c": 3}, }, }, }, @@ -299,9 +283,7 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "d": 4 - }, + "properties": {"d": 4}, }, }, }, @@ -328,10 +310,8 @@ def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: "type": "object", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": True, - "properties": { - "e": 5 - } - } + "properties": {"e": 5}, + }, }, }, ], diff --git a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py index 34b8793f9..99c2da43a 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py @@ -55,31 +55,25 @@ def test_clean_dangling_fields_removes_unreferenced_definitions() -> None: manifest = { "definitions": { "referenced": {"type": "object", "properties": {"a": 1}}, - "unreferenced": {"type": "object", "properties": {"b": 2}} + "unreferenced": {"type": "object", "properties": {"b": 2}}, }, "streams": [ { "name": "stream1", "type": "object", - "properties": { - "def": {"$ref": "#/definitions/referenced"} - } + "properties": {"def": {"$ref": "#/definitions/referenced"}}, } - ] + ], } expected = { - "definitions": { - "referenced": {"type": "object", "properties": {"a": 1}} - }, + "definitions": {"referenced": {"type": "object", "properties": {"a": 1}}}, "streams": [ { "name": "stream1", "type": "object", - "properties": { - "def": {"$ref": "#/definitions/referenced"} - } + "properties": {"def": {"$ref": "#/definitions/referenced"}}, } - ] + ], } schema = _get_declarative_component_schema() normalizer = ManifestNormalizer(manifest, schema) @@ -94,29 +88,17 @@ def test_clean_dangling_fields_removes_unreferenced_schemas() -> None: manifest = { "schemas": { "referenced": {"type": "object", "properties": {"a": 1}}, - "unreferenced": {"type": "object", "properties": {"b": 2}} + "unreferenced": {"type": "object", "properties": {"b": 2}}, }, "streams": [ - { - "name": "stream1", - "schema_loader": { - "schema": {"$ref": "#/schemas/referenced"} - } - } - ] + {"name": "stream1", "schema_loader": {"schema": {"$ref": "#/schemas/referenced"}}} + ], } expected = { - "schemas": { - "referenced": {"type": "object", "properties": {"a": 1}} - }, + "schemas": {"referenced": {"type": "object", "properties": {"a": 1}}}, "streams": [ - { - "name": "stream1", - "schema_loader": { - "schema": {"$ref": "#/schemas/referenced"} - } - } - ] + {"name": "stream1", "schema_loader": {"schema": {"$ref": "#/schemas/referenced"}}} + ], } schema = _get_declarative_component_schema() normalizer = ManifestNormalizer(manifest, schema) @@ -130,39 +112,27 @@ def test_clean_dangling_fields_keeps_parent_paths() -> None: """ manifest = { "definitions": { - "parent": { - "child": { - "grandchild": {"type": "object", "properties": {"a": 1}} - } - } + "parent": {"child": {"grandchild": {"type": "object", "properties": {"a": 1}}}} }, "streams": [ { "name": "stream1", "type": "object", - "properties": { - "def": {"$ref": "#/definitions/parent/child/grandchild"} - } + "properties": {"def": {"$ref": "#/definitions/parent/child/grandchild"}}, } - ] + ], } expected = { "definitions": { - "parent": { - "child": { - "grandchild": {"type": "object", "properties": {"a": 1}} - } - } + "parent": {"child": {"grandchild": {"type": "object", "properties": {"a": 1}}}} }, "streams": [ { "name": "stream1", "type": "object", - "properties": { - "def": {"$ref": "#/definitions/parent/child/grandchild"} - } + "properties": {"def": {"$ref": "#/definitions/parent/child/grandchild"}}, } - ] + ], } schema = _get_declarative_component_schema() normalizer = ManifestNormalizer(manifest, schema) @@ -175,29 +145,11 @@ def test_clean_dangling_fields_removes_empty_sections() -> None: Test that empty sections are removed after cleaning. """ manifest = { - "definitions": { - "unreferenced": {"type": "object", "properties": {"b": 2}} - }, - "schemas": { - "unreferenced": {"type": "object", "properties": {"b": 2}} - }, - "streams": [ - { - "name": "stream1", - "type": "object", - "properties": {} - } - ] - } - expected = { - "streams": [ - { - "name": "stream1", - "type": "object", - "properties": {} - } - ] + "definitions": {"unreferenced": {"type": "object", "properties": {"b": 2}}}, + "schemas": {"unreferenced": {"type": "object", "properties": {"b": 2}}}, + "streams": [{"name": "stream1", "type": "object", "properties": {}}], } + expected = {"streams": [{"name": "stream1", "type": "object", "properties": {}}]} schema = _get_declarative_component_schema() normalizer = ManifestNormalizer(manifest, schema) normalizer._clean_dangling_fields() @@ -221,18 +173,28 @@ def test_replace_parent_streams_with_refs_replaces_with_ref(): "partition_router": { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - {"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, - ] + { + "stream": stream_a.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + { + "stream": stream_b.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], } - } + }, }, ] } expected = { "streams": [ stream_a, - stream_b, + stream_b, { "name": "C", "type": "DeclarativeStream", @@ -240,11 +202,21 @@ def test_replace_parent_streams_with_refs_replaces_with_ref(): "partition_router": { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - {"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, - ] + { + "stream": {"$ref": "#/streams/0"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + { + "stream": {"$ref": "#/streams/1"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], } - } + }, }, ] } @@ -270,10 +242,15 @@ def test_replace_parent_streams_with_refs_no_match(): "partition_router": { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - ] + { + "stream": unrelated_stream.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], } - } + }, }, ] } @@ -287,10 +264,15 @@ def test_replace_parent_streams_with_refs_no_match(): "partition_router": { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - ] + { + "stream": unrelated_stream.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], } - } + }, }, ] } @@ -318,17 +300,27 @@ def test_replace_parent_streams_with_refs_handles_multiple_partition_routers(): { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - ] + { + "stream": stream_a.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], }, { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, - ] - } + { + "stream": stream_b.copy(), + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + }, ] - } + }, }, ] } @@ -344,17 +336,27 @@ def test_replace_parent_streams_with_refs_handles_multiple_partition_routers(): { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"}, - ] + { + "stream": {"$ref": "#/streams/0"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "aid", + }, + ], }, { "type": "SubstreamPartitionRouter", "parent_stream_configs": [ - {"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"}, - ] - } + { + "stream": {"$ref": "#/streams/1"}, + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "bid", + }, + ], + }, ] - } + }, }, ] } From 2a1d874f0dfde1fe6d3421a4a806ccecd8c6de2b Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 25 Jun 2025 10:25:39 -0700 Subject: [PATCH 6/7] use stream hashing instead of direct comparisons --- .../parsers/manifest_normalizer.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index fdeddf329..aa05e7894 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -153,11 +153,13 @@ def _replace_parent_streams_with_refs(self) -> None: replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object with a $ref to the correct stream index. """ - import copy - streams = self._normalized_manifest.get(STREAMS_TAG, []) - # Use deep copy for comparison to avoid mutation issues - stream_copies = [copy.deepcopy(s) for s in streams] + + # Build a hash-to-index mapping for O(1) lookups + stream_hash_to_index = {} + for idx, stream in enumerate(streams): + stream_hash = self._hash_object(stream) + stream_hash_to_index[stream_hash] = idx for idx, stream in enumerate(streams): retriever = stream.get("retriever") @@ -182,14 +184,10 @@ def _replace_parent_streams_with_refs(self) -> None: continue stream_ref = parent_config.get("stream") # Only replace if it's a dict and matches any stream in the manifest - for other_idx, other_stream in enumerate(stream_copies): - if ( - stream_ref is not None - and isinstance(stream_ref, dict) - and stream_ref == other_stream - ): - parent_config["stream"] = {"$ref": f"#/streams/{other_idx}"} - break + if stream_ref is not None and isinstance(stream_ref, dict): + stream_ref_hash = self._hash_object(stream_ref) + if stream_ref_hash in stream_hash_to_index: + parent_config["stream"] = {"$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"} def _clean_dangling_fields(self) -> None: """ From ff36d87076451a69f7f1709459a86d4ebe061bbd Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 25 Jun 2025 10:33:06 -0700 Subject: [PATCH 7/7] run format --- .../sources/declarative/parsers/manifest_normalizer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py index aa05e7894..2098d78fb 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -154,7 +154,7 @@ def _replace_parent_streams_with_refs(self) -> None: with a $ref to the correct stream index. """ streams = self._normalized_manifest.get(STREAMS_TAG, []) - + # Build a hash-to-index mapping for O(1) lookups stream_hash_to_index = {} for idx, stream in enumerate(streams): @@ -187,7 +187,9 @@ def _replace_parent_streams_with_refs(self) -> None: if stream_ref is not None and isinstance(stream_ref, dict): stream_ref_hash = self._hash_object(stream_ref) if stream_ref_hash in stream_hash_to_index: - parent_config["stream"] = {"$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"} + parent_config["stream"] = { + "$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}" + } def _clean_dangling_fields(self) -> None: """