Skip to content

Commit 970eb3f

Browse files
committed
remove schema normalization
1 parent 34f8796 commit 970eb3f

File tree

4 files changed

+266
-672
lines changed

4 files changed

+266
-672
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py

Lines changed: 63 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ def normalize(self) -> ManifestType:
108108
ManifestNormalizationException: Caught internally and handled by returning the original manifest.
109109
"""
110110
try:
111-
self._deduplicate_minifest()
112-
self._reference_schemas()
111+
self._deduplicate_manifest()
113112

114113
return self._normalized_manifest
115114
except ManifestNormalizationException:
@@ -131,7 +130,7 @@ def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
131130

132131
yield from []
133132

134-
def _deduplicate_minifest(self) -> None:
133+
def _deduplicate_manifest(self) -> None:
135134
"""
136135
Find commonalities in the input JSON structure and refactor it to avoid redundancy.
137136
"""
@@ -141,8 +140,69 @@ def _deduplicate_minifest(self) -> None:
141140
self._prepare_definitions()
142141
# replace duplicates with references, if any
143142
self._handle_duplicates(self._collect_duplicates())
143+
# clean dangling fields after resolving $refs
144+
self._clean_dangling_fields()
144145
except Exception as e:
145146
raise ManifestNormalizationException(str(e))
147+
148+
def _clean_dangling_fields(self) -> None:
149+
"""
150+
Clean the manifest by removing unused definitions and schemas.
151+
This method removes any definitions or schemas that are not referenced by any $ref in the manifest.
152+
"""
153+
def find_all_refs(obj: Dict[str, Any], refs: List[str]) -> None:
154+
"""
155+
Recursively find all $ref paths in the object.
156+
157+
Args:
158+
obj: The object to search through
159+
refs: List to store found reference paths
160+
"""
161+
if not isinstance(obj, dict):
162+
return
163+
164+
for key, value in obj.items():
165+
if key == "$ref" and isinstance(value, str):
166+
# Remove the leading #/ from the ref path
167+
refs.append(value[2:])
168+
elif isinstance(value, dict):
169+
find_all_refs(value, refs)
170+
elif isinstance(value, list):
171+
for item in value:
172+
if isinstance(item, dict):
173+
find_all_refs(item, refs)
174+
175+
def clean_section(section: Dict[str, Any], section_path: str) -> None:
176+
"""
177+
Clean a section by removing unreferenced fields.
178+
179+
Args:
180+
section: The section to clean
181+
section_path: The path to this section in the manifest
182+
"""
183+
for key in list(section.keys()):
184+
current_path = f"{section_path}/{key}"
185+
# Check if this path is referenced or is a parent of a referenced path
186+
if not any(ref.startswith(current_path) for ref in all_refs):
187+
del section[key]
188+
189+
# Find all references in the manifest
190+
all_refs: List[str] = []
191+
find_all_refs(self._normalized_manifest, all_refs)
192+
193+
# Clean definitions
194+
if DEF_TAG in self._normalized_manifest:
195+
clean_section(self._normalized_manifest[DEF_TAG], DEF_TAG)
196+
# Remove empty definitions section
197+
if not self._normalized_manifest[DEF_TAG]:
198+
del self._normalized_manifest[DEF_TAG]
199+
200+
# Clean schemas
201+
if SCHEMAS_TAG in self._normalized_manifest:
202+
clean_section(self._normalized_manifest[SCHEMAS_TAG], SCHEMAS_TAG)
203+
# Remove empty schemas section
204+
if not self._normalized_manifest[SCHEMAS_TAG]:
205+
del self._normalized_manifest[SCHEMAS_TAG]
146206

147207
def _prepare_definitions(self) -> None:
148208
"""
@@ -163,51 +223,6 @@ def _prepare_definitions(self) -> None:
163223
if key != LINKED_TAG:
164224
self._normalized_manifest[DEF_TAG].pop(key, None)
165225

166-
def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
167-
"""
168-
Extract the schema from the stream and add it to the `schemas` tag.
169-
"""
170-
171-
stream_name = stream["name"]
172-
173-
# copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
174-
schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
175-
176-
# if the schema is not found, do nothing
177-
if not schema:
178-
return
179-
180-
if not SCHEMAS_TAG in self._normalized_manifest.keys():
181-
self._normalized_manifest[SCHEMAS_TAG] = {}
182-
# add stream schema to the SCHEMAS_TAG
183-
if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
184-
# add the schema to the SCHEMAS_TAG with the stream name as key
185-
self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema
186-
187-
def _reference_schemas(self) -> None:
188-
"""
189-
Set the schema reference for the given stream in the manifest.
190-
This function modifies the manifest in place.
191-
"""
192-
193-
# reference the stream schema for the stream to where it's stored
194-
if SCHEMAS_TAG in self._normalized_manifest.keys():
195-
for stream in self._get_manifest_streams():
196-
self._extract_stream_schema(stream)
197-
self._set_stream_schema_ref(stream)
198-
199-
def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
200-
"""
201-
Set the schema reference for the given stream in the manifest.
202-
This function modifies the manifest in place.
203-
"""
204-
stream_name = stream["name"]
205-
if SCHEMAS_TAG in self._normalized_manifest.keys():
206-
if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
207-
if SCHEMA_LOADER_TAG in stream.keys():
208-
if SCHEMA_TAG in stream[SCHEMA_LOADER_TAG].keys():
209-
stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)
210-
211226
def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
212227
"""
213228
Process duplicate objects and replace them with references.
@@ -455,16 +470,3 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st
455470
"""
456471

457472
return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
458-
459-
def _create_schema_ref(self, key: str) -> Dict[str, str]:
460-
"""
461-
Create a reference object for stream schema using the specified key.
462-
463-
Args:
464-
key: The reference key to use
465-
466-
Returns:
467-
A reference object in the proper format
468-
"""
469-
470-
return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}

0 commit comments

Comments
 (0)