Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 110 additions & 53 deletions airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ def normalize(self) -> ManifestType:
ManifestNormalizationException: Caught internally and handled by returning the original manifest.
"""
try:
self._deduplicate_minifest()
self._reference_schemas()
self._deduplicate_manifest()

return self._normalized_manifest
except ManifestNormalizationException:
Expand All @@ -131,7 +130,7 @@ def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:

yield from []

def _deduplicate_minifest(self) -> None:
def _deduplicate_manifest(self) -> None:
"""
Find commonalities in the input JSON structure and refactor it to avoid redundancy.
"""
Expand All @@ -141,9 +140,117 @@ def _deduplicate_minifest(self) -> None:
self._prepare_definitions()
# replace duplicates with references, if any
self._handle_duplicates(self._collect_duplicates())
# replace parent streams with $refs
self._replace_parent_streams_with_refs()
# clean dangling fields after resolving $refs
self._clean_dangling_fields()
except Exception as e:
raise ManifestNormalizationException(str(e))

def _replace_parent_streams_with_refs(self) -> None:
"""
For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs,
replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object
with a $ref to the correct stream index.
"""
streams = self._normalized_manifest.get(STREAMS_TAG, [])

# Build a hash-to-index mapping for O(1) lookups
stream_hash_to_index = {}
for idx, stream in enumerate(streams):
stream_hash = self._hash_object(stream)
stream_hash_to_index[stream_hash] = idx

for idx, stream in enumerate(streams):
retriever = stream.get("retriever")
if not retriever:
continue
partition_router = retriever.get("partition_router")
routers = (
partition_router
if isinstance(partition_router, list)
else [partition_router]
if partition_router
else []
)
for router in routers:
if not isinstance(router, dict):
continue
if router.get("type") != "SubstreamPartitionRouter":
continue
parent_stream_configs = router.get("parent_stream_configs", [])
for parent_config in parent_stream_configs:
if not isinstance(parent_config, dict):
continue
stream_ref = parent_config.get("stream")
# Only replace if it's a dict and matches any stream in the manifest
if stream_ref is not None and isinstance(stream_ref, dict):
stream_ref_hash = self._hash_object(stream_ref)
if stream_ref_hash in stream_hash_to_index:
parent_config["stream"] = {
"$ref": f"#/streams/{stream_hash_to_index[stream_ref_hash]}"
}

def _clean_dangling_fields(self) -> None:
"""
Clean the manifest by removing unused definitions and schemas.
This method removes any definitions or schemas that are not referenced by any $ref in the manifest.
"""

def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None:
"""
Recursively find all $ref paths in the object.

Args:
obj: The object to search through
refs: List to store found reference paths
"""
if not isinstance(obj, dict):
return

for key, value in obj.items():
if key == "$ref" and isinstance(value, str):
# Remove the leading #/ from the ref path
refs.append(value[2:])
elif isinstance(value, dict):
find_all_refs(value, refs)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
find_all_refs(item, refs)

def clean_section(section: Dict[str, Any], section_path: str) -> None:
"""
Clean a section by removing unreferenced fields.

Args:
section: The section to clean
section_path: The path to this section in the manifest
"""
for key in list(section.keys()):
current_path = f"{section_path}/{key}"
# Check if this path is referenced or is a parent of a referenced path
if not any(ref.startswith(current_path) for ref in all_refs):
del section[key]

# Find all references in the manifest
all_refs: List[str] = []
find_all_refs(self._normalized_manifest, all_refs)

# Clean definitions
if DEF_TAG in self._normalized_manifest:
clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG)
# Remove empty definitions section
if not self._normalized_manifest[DEF_TAG]:
del self._normalized_manifest[DEF_TAG]

# Clean schemas
if SCHEMAS_TAG in self._normalized_manifest:
clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG)
# Remove empty schemas section
if not self._normalized_manifest[SCHEMAS_TAG]:
del self._normalized_manifest[SCHEMAS_TAG]

def _prepare_definitions(self) -> None:
"""
Clean the definitions in the manifest by removing unnecessary properties.
Expand All @@ -163,43 +270,6 @@ def _prepare_definitions(self) -> None:
if key != LINKED_TAG:
self._normalized_manifest[DEF_TAG].pop(key, None)

def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
"""
Extract the schema from the stream and add it to the `schemas` tag.
"""

stream_name = stream["name"]
# copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
if not SCHEMAS_TAG in self._normalized_manifest.keys():
self._normalized_manifest[SCHEMAS_TAG] = {}
# add stream schema to the SCHEMAS_TAG
if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
# add the schema to the SCHEMAS_TAG with the stream name as key
self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema

def _reference_schemas(self) -> None:
"""
Set the schema reference for the given stream in the manifest.
This function modifies the manifest in place.
"""

# reference the stream schema for the stream to where it's stored
if SCHEMAS_TAG in self._normalized_manifest.keys():
for stream in self._get_manifest_streams():
self._extract_stream_schema(stream)
self._set_stream_schema_ref(stream)

def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
"""
Set the schema reference for the given stream in the manifest.
This function modifies the manifest in place.
"""
stream_name = stream["name"]
if SCHEMAS_TAG in self._normalized_manifest.keys():
if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)

def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
"""
Process duplicate objects and replace them with references.
Expand Down Expand Up @@ -447,16 +517,3 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st
"""

return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}

def _create_schema_ref(self, key: str) -> Dict[str, str]:
"""
Create a reference object for stream schema using the specified key.

Args:
key: The reference key to use

Returns:
A reference object in the proper format
"""

return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}
Loading
Loading