Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a488ab3
deduplication version 1
Mar 26, 2025
7d910ee
deduplication version 2
Mar 26, 2025
691d16a
updated duplicates collection
Mar 27, 2025
081e7a8
deduplicate most frequent tags, use existing refs if definitions.shar…
Mar 31, 2025
180af86
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Mar 31, 2025
138b607
formatted"
Mar 31, 2025
f10e601
updated to account type for the given duplicated key
Mar 31, 2025
66fe38e
add the reduce_commons: true, for Connector Builder case
Mar 31, 2025
8798042
enabled the reduce_commons: True for Connector Builder case
Mar 31, 2025
1d425ee
refactorred and cleaned up the code, moved to use the class instead
Apr 1, 2025
06b183a
formatted
Apr 1, 2025
1fa891c
formatted
Apr 1, 2025
00e31a7
cleaned up
Apr 1, 2025
a5aba82
added the dedicated tests
Apr 1, 2025
e017e92
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 1, 2025
0e8394f
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 2, 2025
9f7d498
formatted
Apr 2, 2025
6ec240a
updated normalizer
Apr 8, 2025
acdecdb
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 8, 2025
5f5c6b1
attempt to fix the Connector Builder tests
Apr 8, 2025
e97afa5
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 11, 2025
be3bab1
revert test
Apr 11, 2025
748892d
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 15, 2025
b10d7a1
removed post_resolve_manifest flag
Apr 15, 2025
0587481
nit
Apr 15, 2025
d929167
add _-should_normalize flag handling
Apr 17, 2025
3859c5b
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 17, 2025
9de27ef
formatted
Apr 17, 2025
c403a0e
rename sharable > linkable, shared > linked
Apr 17, 2025
297ae37
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 19, 2025
38f7da6
updated the order of operations; normalization should go after pre-pr…
Apr 19, 2025
7d71f4b
fixed
Apr 19, 2025
304235c
add schema extraction + unit test
Apr 21, 2025
348aaae
Merge branch 'main' into baz/cdk/extract-common-manifest-parts
Apr 23, 2025
2c8d164
updated test comments
Apr 24, 2025
2010419
Merge remote-tracking branch 'origin/main' into baz/cdk/extract-commo…
Apr 25, 2025
8d7be4e
updated linked
Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ class UndefinedReferenceException(Exception):

def __init__(self, path: str, reference: str) -> None:
super().__init__(f"Undefined reference {reference} from {path}")


class ManifestDeduplicationException(Exception):
"""
Raised when a circular reference is detected in a manifest.
"""

def __init__(self, exception: str) -> None:
super().__init__(f"Failed to deduplicate manifest: {exception}")
262 changes: 262 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/manifest_deduplicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
import hashlib
import json
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Optional, Tuple

from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestDeduplicationException

# Type definitions for better readability
ManifestType = Dict[str, Any]
DefinitionsType = Dict[str, Any]
DuplicatesType = DefaultDict[str, List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]]]

# Configuration constants
N_OCCURANCES = 2

DEF_TAG = "definitions"
SHARED_TAG = "shared"

# SPECIFY TAGS FOR DEDUPLICATION
TAGS = [
"authenticator",
"url_base",
]


def deduplicate_definitions(resolved_manifest: ManifestType) -> ManifestType:
"""
Find commonalities in the input JSON structure and refactor it to avoid redundancy.

Args:
resolved_manifest: A dictionary representing a JSON structure to be analyzed.

Returns:
A refactored JSON structure with common properties extracted to `definitions.shared`,
the duplicated properties replaced with references
"""

try:
_manifest = copy.deepcopy(resolved_manifest)
definitions = _manifest.get(DEF_TAG, {})

duplicates = _collect_duplicates(definitions)
_handle_duplicates(definitions, duplicates)

return _manifest
except ManifestDeduplicationException:
# if any arror occurs, we just return the original manifest.
return resolved_manifest


def _replace_duplicates_with_refs(definitions: ManifestType, duplicates: DuplicatesType) -> None:
"""
Process duplicate objects and replace them with references.

Args:
definitions: The definitions dictionary to modify
"""
for _, occurrences in duplicates.items():
# Skip non-duplicates
if len(occurrences) < N_OCCURANCES:
continue

# Take the value from the first occurrence, as they are the same
path, _, value = occurrences[0]
# take the component's name as the last part of it's path
key = path[-1]
# Create a meaningful reference key
ref_key = _create_reference_key(definitions, key)
# Add to definitions
_add_to_shared_definitions(definitions, ref_key, value)

# Replace all occurrences with references
for path, parent_obj, _ in occurrences:
if path: # Make sure the path is valid
key = path[-1]
parent_obj[key] = _create_ref_object(ref_key)


def _handle_duplicates(definitions: DefinitionsType, duplicates: DuplicatesType) -> None:
"""
Process the duplicates and replace them with references.

Args:
duplicates: Dictionary of duplicate objects
"""
# process duplicates only if there are any
if len(duplicates) > 0:
if not SHARED_TAG in definitions:
definitions[SHARED_TAG] = {}

try:
_replace_duplicates_with_refs(definitions, duplicates)
except Exception as e:
raise ManifestDeduplicationException(str(e))


def _is_allowed_tag(key: str) -> bool:
"""
Check if the key is an allowed tag for deduplication.

Args:
key: The key to check

Returns:
True if the key is allowed, False otherwise
"""
return key in TAGS


def _add_duplicate(
duplicates: DuplicatesType,
current_path: List[str],
obj: Dict[str, Any],
value: Any,
key: Optional[str] = None,
) -> None:
"""
Adds a duplicate record of an observed object by computing a unique hash for the provided value.

This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided)
and appends a tuple containing the current path, the original object, and the value to the duplicates
dictionary under the corresponding hash.

Parameters:
duplicates (DuplicatesType): The dictionary to store duplicate records.
current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy.
obj (Dict): The original dictionary object where the duplicate is observed.
value (Any): The value to be hashed and used for identifying duplicates.
key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing.
"""
# create hash for the duplicate observed
value_to_hash = value if key is None else {key: value}
obj_hash = _hash_object(value_to_hash)
if obj_hash:
duplicates[obj_hash].append((current_path, obj, value))


def _add_to_shared_definitions(
definitions: DefinitionsType,
key: str,
value: Any,
) -> DefinitionsType:
"""
Add a value to the shared definitions under the specified key.

Args:
definitions: The definitions dictionary to modify
key: The key to use
value: The value to add
"""

if key not in definitions[SHARED_TAG]:
definitions[SHARED_TAG][key] = value

return definitions


def _collect_duplicates(node: ManifestType) -> DuplicatesType:
"""
Traverse the JSON object and collect all potential duplicate values and objects.

Args:
node: The JSON object to analyze.

Returns:
duplicates: A dictionary of duplicate objects.
"""

def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None:
"""
The closure to recursively collect duplicates in the JSON object.

Args:
obj: The current object being analyzed.
path: The current path in the object hierarchy.
"""
if not isinstance(obj, dict):
return

path = [] if path is None else path
# Check if the object is empty
for key, value in obj.items():
current_path = path + [key]

if isinstance(value, dict):
# First process nested dictionaries
_collect(value, current_path)
# Process allowed-only component tags
if _is_allowed_tag(key):
_add_duplicate(duplicates, current_path, obj, value)

# handle primitive types
elif isinstance(value, (str, int, float, bool)):
# Process allowed-only field tags
if _is_allowed_tag(key):
_add_duplicate(duplicates, current_path, obj, value, key)

# handle list cases
elif isinstance(value, list):
for i, item in enumerate(value):
_collect(item, current_path + [str(i)])

duplicates: DuplicatesType = defaultdict(list, {})

try:
_collect(node)
return duplicates
except Exception as e:
raise ManifestDeduplicationException(str(e))


def _hash_object(node: Dict[str, Any]) -> Optional[str]:
"""
Create a unique hash for a dictionary object.

Args:
node: The dictionary to hash

Returns:
A hash string or None if not hashable
"""
if isinstance(node, Dict):
# Sort keys to ensure consistent hash for same content
return hashlib.md5(json.dumps(node, sort_keys=True).encode()).hexdigest()
return None


def _create_reference_key(definitions: DefinitionsType, key: str) -> str:
"""
Create a unique reference key and handle collisions.

Args:
key: The base key to use
definitions: The definitions dictionary with definitions

Returns:
A unique reference key
"""

counter = 1
while key in definitions[SHARED_TAG]:
key = f"{key}_{counter}"
counter += 1
return key


def _create_ref_object(ref_key: str) -> Dict[str, str]:
"""
Create a reference object using the specified key.

Args:
ref_key: The reference key to use

Returns:
A reference object in the proper format
"""
return {"$ref": f"#/{DEF_TAG}/{SHARED_TAG}/{ref_key}"}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CircularReferenceException,
UndefinedReferenceException,
)
from airbyte_cdk.sources.declarative.parsers.manifest_deduplicator import deduplicate_definitions

REF_TAG = "$ref"

Expand Down Expand Up @@ -102,9 +103,14 @@ class ManifestReferenceResolver:
def preprocess_manifest(self, manifest: Mapping[str, Any]) -> Mapping[str, Any]:
"""
:param manifest: incoming manifest that could have references to previously defined components
:return:
"""
return self._evaluate_node(manifest, manifest, set()) # type: ignore[no-any-return]

preprocessed_manifest = self._evaluate_node(manifest, manifest, set())

# we need to reduce commonalities in the manifest after the references have been resolved
deduplicated_manifest = deduplicate_definitions(preprocessed_manifest)

return deduplicated_manifest

def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set[Any]) -> Any:
if isinstance(node, dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
},
"record_selector": {"extractor": {"field_path": ["result"]}},
},
"shared": {},
},
"streams": [
{
Expand Down
2 changes: 2 additions & 0 deletions unit_tests/sources/declarative/interpolation/test_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime

import pytest
from freezegun import freeze_time

from airbyte_cdk.sources.declarative.interpolation.macros import macros

Expand All @@ -29,6 +30,7 @@ def test_macros_export(test_name, fn_name, found_in_macros):
assert fn_name not in macros


@freeze_time("2022-01-01")
@pytest.mark.parametrize(
"input_value, format, input_format, expected_output",
[
Expand Down
Loading
Loading