Skip to content

Commit a488ab3

Browse files
author
Oleksandr Bazarnov
committed
deduplication version 1
1 parent d7516ec commit a488ab3

File tree

4 files changed

+582
-2
lines changed

4 files changed

+582
-2
lines changed

airbyte_cdk/sources/declarative/parsers/custom_exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,12 @@ class UndefinedReferenceException(Exception):
1919

2020
def __init__(self, path: str, reference: str) -> None:
2121
super().__init__(f"Undefined reference {reference} from {path}")
22+
23+
24+
class ManifestDeduplicationException(Exception):
25+
"""
26+
Raised when a circular reference is detected in a manifest.
27+
"""
28+
29+
def __init__(self, exception: str) -> None:
30+
super().__init__(f"Failed to deduplicate manifest: {exception}")
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import copy
6+
import hashlib
7+
import json
8+
from collections import defaultdict
9+
from typing import Any, DefaultDict, Dict, Hashable, List, Optional, Tuple
10+
11+
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestDeduplicationException
12+
13+
# Type definitions for better readability
14+
ManifestType = Dict[str, Any]
15+
DefinitionsType = Dict[str, Any]
16+
FieldDuplicatesType = DefaultDict[Tuple[str, Any], List[Tuple[List[str], Dict]]]
17+
ComponentDuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict, Dict]]]
18+
19+
# Configuration constants
20+
N_OCCURANCES = 2
21+
22+
DEF_TAG = "definitions"
23+
SHARED_TAG = "shared"
24+
25+
# SPECIFY COMPONENT TAGS FOR DEDUPLICATION
26+
COMPONENT_TAGS = [
27+
"authenticator",
28+
]
29+
30+
# SPECIFY FIELD TAGS FOR DEDUPLICATION
31+
FIELD_TAGS = [
32+
"url_base",
33+
]
34+
35+
36+
def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
37+
"""
38+
Find commonalities in the input JSON structure and refactor it to avoid redundancy.
39+
40+
Args:
41+
resolved_manifest: A dictionary representing a JSON structure to be analyzed.
42+
43+
Returns:
44+
A refactored JSON structure with common properties extracted.
45+
"""
46+
47+
try:
48+
_manifest = copy.deepcopy(resolved_manifest)
49+
definitions = _manifest.get(DEF_TAG, {})
50+
field_duplicates, component_duplicates = _collect_all_duplicates(definitions)
51+
_process_duplicates(definitions, field_duplicates, component_duplicates)
52+
return _manifest
53+
except ManifestDeduplicationException:
54+
# we don't want to fix every single error which might occur,
55+
# due to the varaety of possible manifest configurations,
56+
# if any arror occurs, we just return the original manifest.
57+
return resolved_manifest
58+
59+
60+
def _process_duplicates(
61+
definitions: DefinitionsType,
62+
field_duplicates: FieldDuplicatesType,
63+
component_duplicates: ComponentDuplicatesType,
64+
) -> None:
65+
"""
66+
Process the duplicates and replace them with references.
67+
68+
Args:
69+
field_duplicates: Dictionary of duplicate primitive values
70+
component_duplicates: Dictionary of duplicate objects
71+
"""
72+
# process duplicates only if there are any
73+
if len(field_duplicates) > 0 or len(component_duplicates) > 0:
74+
if not SHARED_TAG in definitions:
75+
definitions[SHARED_TAG] = {}
76+
77+
try:
78+
_process_component_duplicates(definitions, component_duplicates)
79+
_process_field_duplicates(definitions, field_duplicates)
80+
except Exception as e:
81+
raise ManifestDeduplicationException(str(e))
82+
83+
84+
def _is_allowed_component(key: str) -> bool:
85+
"""
86+
Check if the key is an allowed component tag.
87+
88+
Args:
89+
key: The key to check
90+
91+
Returns:
92+
True if the key is allowed, False otherwise
93+
"""
94+
return key in COMPONENT_TAGS
95+
96+
97+
def _is_allowed_field(key: str) -> bool:
98+
"""
99+
Check if the key is an allowed field tag.
100+
101+
Args:
102+
key: The key to check
103+
104+
Returns:
105+
True if the key is allowed, False otherwise
106+
"""
107+
return key in FIELD_TAGS
108+
109+
110+
def _collect_all_duplicates(
111+
node: ManifestType,
112+
) -> Tuple[FieldDuplicatesType, ComponentDuplicatesType]:
113+
"""
114+
Traverse the JSON object and collect all potential duplicate values and objects.
115+
116+
Args:
117+
node: The JSON object to analyze
118+
119+
Returns:
120+
A tuple of (field_duplicates, component_duplicates)
121+
"""
122+
123+
field_duplicates: FieldDuplicatesType = defaultdict(list)
124+
component_duplicates: ComponentDuplicatesType = defaultdict(list)
125+
126+
def collect_duplicates(obj: Dict, path: Optional[List[str]] = None) -> None:
127+
if not isinstance(obj, dict):
128+
return
129+
130+
path = [] if path is None else path
131+
# Check if the object is empty
132+
for key, value in obj.items():
133+
current_path = path + [key]
134+
135+
if isinstance(value, dict):
136+
# First process nested dictionaries
137+
collect_duplicates(value, current_path)
138+
139+
# Process allowed-only component tags
140+
if _is_allowed_component(key):
141+
obj_hash = _hash_object(value)
142+
if obj_hash:
143+
component_duplicates[obj_hash].append((current_path, obj, value))
144+
145+
# handle list[dict] cases
146+
elif isinstance(value, list):
147+
for i, item in enumerate(value):
148+
collect_duplicates(item, current_path + [str(i)])
149+
150+
# Process allowed-only field tags
151+
elif _is_allowed_field(key):
152+
hashable_value = _make_hashable(value)
153+
field_duplicates[(key, hashable_value)].append((current_path, obj))
154+
155+
try:
156+
collect_duplicates(node)
157+
except Exception as e:
158+
raise ManifestDeduplicationException(str(e))
159+
160+
return field_duplicates, component_duplicates
161+
162+
163+
def _hash_object(node: Dict) -> Optional[str]:
164+
"""
165+
Create a unique hash for a dictionary object.
166+
167+
Args:
168+
node: The dictionary to hash
169+
170+
Returns:
171+
A hash string or None if not hashable
172+
"""
173+
if isinstance(node, Dict):
174+
# Sort keys to ensure consistent hash for same content
175+
return hashlib.md5(json.dumps(node, sort_keys=True).encode()).hexdigest()
176+
177+
return None
178+
179+
180+
def _make_hashable(value: Any) -> Any:
181+
"""
182+
Convert a value to a hashable representation.
183+
184+
Args:
185+
value: The value to make hashable
186+
187+
Returns:
188+
A hashable representation of the value
189+
"""
190+
return json.dumps(value) if not isinstance(value, Hashable) else value
191+
192+
193+
def _create_reference_key(
194+
definitions: DefinitionsType, key: str, value: Optional[Any] = None
195+
) -> str:
196+
"""
197+
Create a unique reference key and handle collisions.
198+
199+
Args:
200+
key: The base key to use
201+
definitions: The definitions dictionary with definitions
202+
203+
Returns:
204+
A unique reference key
205+
"""
206+
207+
counter = 1
208+
while key in definitions[SHARED_TAG]:
209+
# If the value is already in shared definitions with this key, no need to rename
210+
if value is not None and _is_same_value(definitions[SHARED_TAG].get(key), value):
211+
return key
212+
key = f"{key}_{counter}"
213+
counter += 1
214+
return key
215+
216+
217+
def _create_ref_object(ref_key: str) -> Dict[str, str]:
218+
"""
219+
Create a reference object using the specified key.
220+
221+
Args:
222+
ref_key: The reference key to use
223+
224+
Returns:
225+
A reference object in the proper format
226+
"""
227+
return {"$ref": f"#/{DEF_TAG}/{SHARED_TAG}/{ref_key}"}
228+
229+
230+
def _is_same_value(val1: Any, val2: Any) -> bool:
231+
"""
232+
Check if two values are the same by comparing their JSON representation.
233+
234+
Args:
235+
val1: First value
236+
val2: Second value
237+
238+
Returns:
239+
True if the values are the same, False otherwise
240+
"""
241+
return json.dumps(val1, sort_keys=True) == json.dumps(val2, sort_keys=True)
242+
243+
244+
def _process_component_duplicates(
245+
definitions: ManifestType,
246+
component_duplicates: ComponentDuplicatesType,
247+
) -> None:
248+
"""
249+
Process duplicate objects and replace them with references.
250+
251+
Args:
252+
definitions: The definitions dictionary to modify
253+
component_duplicates: Dictionary of duplicate objects
254+
"""
255+
for obj_hash, occurrences in component_duplicates.items():
256+
# Skip non-duplicates
257+
if len(occurrences) < N_OCCURANCES:
258+
continue
259+
260+
# Take the value from the first occurrence, as they are the same
261+
path, _, value = occurrences[0]
262+
# take the component's name as the last part of it's path
263+
key = path[-1]
264+
# Create a meaningful reference key
265+
ref_key = _create_reference_key(definitions, key)
266+
# Add to definitions
267+
_add_to_shared_definitions(definitions, ref_key, value)
268+
269+
# Replace all occurrences with references
270+
for path, parent_obj, _ in occurrences:
271+
if path: # Make sure the path is valid
272+
key = path[-1]
273+
parent_obj[key] = _create_ref_object(ref_key)
274+
275+
276+
def _add_to_shared_definitions(
277+
definitions: DefinitionsType,
278+
key: str,
279+
value: Any,
280+
) -> DefinitionsType:
281+
"""
282+
Add a value to the shared definitions under the specified key.
283+
284+
Args:
285+
definitions: The definitions dictionary to modify
286+
key: The key to use
287+
value: The value to add
288+
"""
289+
290+
if key not in definitions[SHARED_TAG]:
291+
definitions[SHARED_TAG][key] = value
292+
293+
return definitions
294+
295+
296+
def _process_field_duplicates(
297+
definitions: ManifestType,
298+
field_duplicates: FieldDuplicatesType,
299+
) -> None:
300+
"""
301+
Process duplicate primitive values and replace them with references.
302+
303+
Args:
304+
definitions: The definitions dictionary to modify
305+
field_duplicates: Dictionary of duplicate primitive values
306+
"""
307+
308+
for (key, value), occurrences in field_duplicates.items():
309+
# Skip non-duplicates
310+
if len(occurrences) < N_OCCURANCES:
311+
continue
312+
313+
ref_key = _create_reference_key(definitions, key, value)
314+
# Add to definitions if not already there
315+
_add_to_shared_definitions(definitions, ref_key, value)
316+
317+
# Replace all occurrences with references
318+
for path, parent_obj in occurrences:
319+
if path: # Make sure the path is valid
320+
key = path[-1]
321+
parent_obj[key] = _create_ref_object(ref_key)

airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
CircularReferenceException,
1010
UndefinedReferenceException,
1111
)
12+
from airbyte_cdk.sources.declarative.parsers.manifest_deduplicator import deduplicate_definitions
1213

1314
REF_TAG = "$ref"
1415

@@ -102,9 +103,14 @@ class ManifestReferenceResolver:
102103
def preprocess_manifest(self, manifest: Mapping[str, Any]) -> Mapping[str, Any]:
103104
"""
104105
:param manifest: incoming manifest that could have references to previously defined components
105-
:return:
106106
"""
107-
return self._evaluate_node(manifest, manifest, set()) # type: ignore[no-any-return]
107+
108+
preprocessed_manifest = self._evaluate_node(manifest, manifest, set())
109+
110+
# we need to reduce commonalities in the manifest after the references have been resolved
111+
reduced_manifest = deduplicate_definitions(preprocessed_manifest)
112+
113+
return reduced_manifest
108114

109115
def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set[Any]) -> Any:
110116
if isinstance(node, dict):

0 commit comments

Comments
 (0)