diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 6b6b31111..b70e2c85f 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -56,12 +56,22 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) +def should_normalize_manifest(config: Mapping[str, Any]) -> bool: + """ + Check if the manifest should be normalized. + :param config: The configuration to check + :return: True if the manifest should be normalized, False otherwise. + """ + return config.get("__should_normalize", False) + + def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( config=config, emit_connector_builder_messages=True, source_config=manifest, + normalize_manifest=should_normalize_manifest(config), component_factory=ModelToComponentFactory( emit_connector_builder_messages=True, limit_pages_fetched_per_slice=limits.max_pages_per_slice, diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2de404598..5376505c8 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1917,8 +1917,9 @@ definitions: type: string enum: [HttpRequester] url_base: + linkable: true title: API Base URL - description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. type: string interpolation_context: - config @@ -1936,7 +1937,7 @@ definitions: - "https://example.com/api/v1/resource/{{ next_page_token['id'] }}" path: title: URL Path - description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. type: string interpolation_context: - config @@ -1964,6 +1965,7 @@ definitions: - POST authenticator: title: Authenticator + linkable: true description: Authentication method to use for requests sent to the API. anyOf: - "$ref": "#/definitions/NoAuth" diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 903caf83c..31895abeb 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -29,7 +29,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DeclarativeStream as DeclarativeStreamModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + Spec as SpecModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( StateDelegatingStream as StateDelegatingStreamModel, ) @@ -39,6 +41,9 @@ from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( ManifestComponentTransformer, ) +from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( + ManifestNormalizer, +) from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( ManifestReferenceResolver, ) @@ -57,6 +62,24 @@ from airbyte_cdk.utils.traced_exception import AirbyteTracedException +def _get_declarative_component_schema() -> Dict[str, Any]: + try: + raw_component_schema = pkgutil.get_data( + "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" + ) + if raw_component_schema is not None: + declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) + return declarative_component_schema # type: ignore + else: + raise RuntimeError( + "Failed to read manifest component json schema required for deduplication" + ) + except FileNotFoundError as e: + raise FileNotFoundError( + f"Failed to read manifest component json schema required for deduplication: {e}" + ) + + class ManifestDeclarativeSource(DeclarativeSource): """Declarative source defined by a manifest of low-code components that define source connector behavior""" @@ -68,7 +91,8 @@ def __init__( debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, - ): + normalize_manifest: Optional[bool] = False, + ) -> None: """ Args: config: The provided config dict. @@ -76,21 +100,16 @@ def __init__( debug: True if debug mode is enabled. emit_connector_builder_messages: True if messages should be emitted to the connector builder. component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. + normalize_manifest: Optional flag to indicate if the manifest should be normalized. """ self.logger = logging.getLogger(f"airbyte.{self.name}") - # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing - manifest = dict(source_config) - if "type" not in manifest: - manifest["type"] = "DeclarativeSource" - + self._should_normalize = normalize_manifest + self._declarative_component_schema = _get_declarative_component_schema() # If custom components are needed, locate and/or register them. self.components_module: ModuleType | None = get_registered_components_module(config=config) + # resolve all components in the manifest + self._source_config = self._preprocess_manifest(dict(source_config)) - resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) - propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( - "", resolved_source_config, {} - ) - self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages self._constructor = ( @@ -105,14 +124,81 @@ def __init__( self._slice_logger: SliceLogger = ( AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() ) - self._config = config or {} + + # validate resolved manifest against the declarative component schema self._validate_source() + # apply additional post-processing to the manifest + self._postprocess_manifest() + @property def resolved_manifest(self) -> Mapping[str, Any]: + """ + Returns the resolved manifest configuration for the source. + + This property provides access to the internal source configuration as a mapping, + which contains all settings and parameters required to define the source's behavior. + + Returns: + Mapping[str, Any]: The resolved source configuration manifest. + """ return self._source_config + def _preprocess_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: + """ + Preprocesses the provided manifest dictionary by resolving any manifest references. + + This method modifies the input manifest in place, resolving references using the + ManifestReferenceResolver to ensure all references within the manifest are properly handled. + + Args: + manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. + + Returns: + None + """ + # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing + manifest = self._fix_source_type(manifest) + # Resolve references in the manifest + resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) + # Propagate types and parameters throughout the manifest + propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( + "", resolved_manifest, {} + ) + + return propagated_manifest + + def _postprocess_manifest(self) -> None: + """ + Post-processes the manifest after validation. + This method is responsible for any additional modifications or transformations needed + after the manifest has been validated and before it is used in the source. + """ + # apply manifest normalization, if required + self._normalize_manifest() + + def _normalize_manifest(self) -> None: + """ + This method is used to normalize the manifest. It should be called after the manifest has been validated. + + Connector Builder UI rendering requires the manifest to be in a specific format. + - references have been resolved + - the commonly used definitions are extracted to the `definitions.linked.*` + """ + if self._should_normalize: + normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) + self._source_config = normalizer.normalize() + + def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: + """ + Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. + """ + if "type" not in manifest: + manifest["type"] = "DeclarativeSource" + + return manifest + @property def message_repository(self) -> MessageRepository: return self._message_repository @@ -120,7 +206,9 @@ def message_repository(self) -> MessageRepository: @property def dynamic_streams(self) -> List[Dict[str, Any]]: return self._dynamic_stream_configs( - manifest=self._source_config, config=self._config, with_dynamic_stream_name=True + manifest=self._source_config, + config=self._config, + with_dynamic_stream_name=True, ) @property @@ -143,7 +231,10 @@ def connection_checker(self) -> ConnectionChecker: def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message( - extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} + extra_args={ + "source_name": self.name, + "parsed_config": json.dumps(self._source_config), + } ) stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( @@ -156,9 +247,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: source_streams = [ self._constructor.create_component( - StateDelegatingStreamModel - if stream_config.get("type") == StateDelegatingStreamModel.__name__ - else DeclarativeStreamModel, + ( + StateDelegatingStreamModel + if stream_config.get("type") == StateDelegatingStreamModel.__name__ + else DeclarativeStreamModel + ), stream_config, config, emit_connector_builder_messages=self._emit_connector_builder_messages, @@ -174,7 +267,9 @@ def _initialize_cache_for_parent_streams( ) -> List[Dict[str, Any]]: parent_streams = set() - def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: + def update_with_cache_parent_configs( + parent_configs: list[dict[str, Any]], + ) -> None: for parent_config in parent_configs: parent_streams.add(parent_config["stream"]["name"]) if parent_config["stream"]["type"] == "StateDelegatingStream": @@ -229,7 +324,10 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: """ self._configure_logger_level(logger) self._emit_manifest_debug_message( - extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} + extra_args={ + "source_name": self.name, + "parsed_config": json.dumps(self._source_config), + } ) spec = self._source_config.get("spec") @@ -266,25 +364,9 @@ def _validate_source(self) -> None: """ Validates the connector manifest against the declarative component schema """ - try: - raw_component_schema = pkgutil.get_data( - "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" - ) - if raw_component_schema is not None: - declarative_component_schema = yaml.load( - raw_component_schema, Loader=yaml.SafeLoader - ) - else: - raise RuntimeError( - "Failed to read manifest component json schema required for validation" - ) - except FileNotFoundError as e: - raise FileNotFoundError( - f"Failed to read manifest component json schema required for validation: {e}" - ) try: - validate(self._source_config, declarative_component_schema) + validate(self._source_config, self._declarative_component_schema) except ValidationError as e: raise ValidationError( "Validation against json schema defined in declarative_component_schema.yaml schema failed" @@ -382,7 +464,9 @@ def _dynamic_stream_configs( # Create a resolver for dynamic components based on type components_resolver = self._constructor.create_component( - COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config + COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], + components_resolver_config, + config, ) stream_template_config = dynamic_definition["stream_template"] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2762a1f70..ebed3dbae 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2209,7 +2209,7 @@ class HttpRequester(BaseModel): type: Literal["HttpRequester"] url_base: str = Field( ..., - description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + description="The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "https://connect.squareup.com/v2", "{{ config['base_url'] or 'https://app.posthog.com'}}/api", @@ -2220,7 +2220,7 @@ class HttpRequester(BaseModel): ) path: Optional[str] = Field( None, - description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + description="The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "/products", "/quotes/{{ stream_partition['id'] }}/quote_line_groups", diff --git a/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py b/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py index d6fdee695..6c5847d3f 100644 --- a/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py +++ b/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py @@ -19,3 +19,12 @@ class UndefinedReferenceException(Exception): def __init__(self, path: str, reference: str) -> None: super().__init__(f"Undefined reference {reference} from {path}") + + +class ManifestNormalizationException(Exception): + """ + Raised when a circular reference is detected in a manifest. + """ + + def __init__(self, message: str) -> None: + super().__init__(f"Failed to deduplicate manifest: {message}") diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 6779b54ab..44f414343 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -4,7 +4,7 @@ import copy import typing -from typing import Any, Mapping, Optional +from typing import Any, Dict, Mapping, Optional PARAMETERS_STR = "$parameters" @@ -95,7 +95,7 @@ def propagate_types_and_parameters( declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None, - ) -> Mapping[str, Any]: + ) -> Dict[str, Any]: """ Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py new file mode 100644 index 000000000..ad6de6ac1 --- /dev/null +++ b/airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py @@ -0,0 +1,462 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import copy +import hashlib +import json +from collections import defaultdict +from itertools import chain +from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple + +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ManifestNormalizationException + +# Type definitions for better readability +ManifestType = Dict[str, Any] +DefinitionsType = Dict[str, Any] +DuplicateOccurancesType = List[Tuple[List[str], Dict[str, Any], Dict[str, Any]]] +DuplicatesType = DefaultDict[str, DuplicateOccurancesType] + +# Configuration constants +N_OCCURANCES = 2 + +DEF_TAG = "definitions" +LINKABLE_TAG = "linkable" +LINKED_TAG = "linked" +PROPERTIES_TAG = "properties" +SCHEMA_LOADER_TAG = "schema_loader" +SCHEMA_TAG = "schema" +SCHEMAS_TAG = "schemas" +STREAMS_TAG = "streams" + + +def _get_linkable_schema_tags(schema: DefinitionsType) -> List[str]: + """ + Extracts linkable tags from schema definitions. + This function identifies properties within a schema's definitions that are marked as linkable. + It traverses through each definition in the schema, examines its properties, and collects + the keys of properties that contain the LINKABLE_TAG. + + Args: + schema (DefinitionsType): The schema definition dictionary to process + + Returns: + List[str]: A deduplicated list of property keys that are marked as linkable + """ + + # the linkable scope: ['definitions.*'] + schema_definitions = schema.get(DEF_TAG, {}) + + linkable_tags: List[str] = [] + # Extract linkable keys from properties + + extract_linkable_keys: Callable[[Dict[str, Dict[str, Any]]], List[str]] = lambda properties: [ + key for key, value in properties.items() if LINKABLE_TAG in value.keys() + ] + + # Process each root value to get its linkable keys + process_root: Callable[[Dict[str, Any]], List[str]] = lambda root_value: extract_linkable_keys( + root_value.get(PROPERTIES_TAG, {}) + ) + + # Map the process_root function over all schema values and flatten the results + all_linkable_tags = chain.from_iterable(map(process_root, schema_definitions.values())) + + # Add all found linkable tags to the tags list + linkable_tags.extend(all_linkable_tags) + + # return unique tags only + return list(set(linkable_tags)) + + +class ManifestNormalizer: + """ + This class is responsible for normalizing the manifest by appliying processing such as: + - removing duplicated definitions + - replacing them with references. + + To extend the functionality, use the `normilize()` method to include any additional processing steps. + """ + + def __init__( + self, + resolved_manifest: ManifestType, + declarative_schema: DefinitionsType, + ) -> None: + self._resolved_manifest = resolved_manifest + self._declarative_schema = declarative_schema + self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest) + # get the tags marked as `linkable` in the component schema + self._linkable_tags = _get_linkable_schema_tags(self._declarative_schema) + + def to_json_str(self) -> str: + return json.dumps(self._normalized_manifest, indent=2) + + def normalize(self) -> ManifestType: + """ + Normalizes the manifest by deduplicating and resolving schema references. + + This method processes the manifest in two steps: + 1. Deduplicates elements within the manifest + 2. Resolves and references schemas + + Returns: + ManifestType: The normalized manifest if processing succeeds, + or the original resolved manifest if normalization fails. + + Raises: + ManifestNormalizationException: Caught internally and handled by returning the original manifest. + """ + try: + self._deduplicate_minifest() + self._reference_schemas() + + return self._normalized_manifest + except ManifestNormalizationException: + # if any error occurs, we just return the original manifest. + # TODO: enable debug logging + return self._resolved_manifest + + def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]: + """ + Get the streams from the manifest. + + Returns: + An Iterable of streams. + """ + + if STREAMS_TAG in self._normalized_manifest.keys(): + for stream in self._normalized_manifest[STREAMS_TAG]: + yield stream + + yield from [] + + def _deduplicate_minifest(self) -> None: + """ + Find commonalities in the input JSON structure and refactor it to avoid redundancy. + """ + + try: + # prepare the `definitions` tag + self._prepare_definitions() + # replace duplicates with references, if any + self._handle_duplicates(self._collect_duplicates()) + except Exception as e: + raise ManifestNormalizationException(str(e)) + + def _prepare_definitions(self) -> None: + """ + Clean the definitions in the manifest by removing unnecessary properties. + This function modifies the manifest in place. + """ + + # Check if the definitions tag exists + if not DEF_TAG in self._normalized_manifest: + self._normalized_manifest[DEF_TAG] = {} + + # Check if the linked tag exists + if not LINKED_TAG in self._normalized_manifest[DEF_TAG]: + self._normalized_manifest[DEF_TAG][LINKED_TAG] = {} + + # remove everything from definitions tag except of `linked`, after processing + for key in list(self._normalized_manifest[DEF_TAG].keys()): + if key != LINKED_TAG: + self._normalized_manifest[DEF_TAG].pop(key, None) + + def _extract_stream_schema(self, stream: Dict[str, Any]) -> None: + """ + Extract the schema from the stream and add it to the `schemas` tag. + """ + + stream_name = stream["name"] + # copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key + schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG) + if not SCHEMAS_TAG in self._normalized_manifest.keys(): + self._normalized_manifest[SCHEMAS_TAG] = {} + # add stream schema to the SCHEMAS_TAG + if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys(): + # add the schema to the SCHEMAS_TAG with the stream name as key + self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema + + def _reference_schemas(self) -> None: + """ + Set the schema reference for the given stream in the manifest. + This function modifies the manifest in place. + """ + + # reference the stream schema for the stream to where it's stored + if SCHEMAS_TAG in self._normalized_manifest.keys(): + for stream in self._get_manifest_streams(): + self._extract_stream_schema(stream) + self._set_stream_schema_ref(stream) + + def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None: + """ + Set the schema reference for the given stream in the manifest. + This function modifies the manifest in place. + """ + stream_name = stream["name"] + if SCHEMAS_TAG in self._normalized_manifest.keys(): + if stream_name in self._normalized_manifest[SCHEMAS_TAG]: + stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name) + + def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None: + """ + Process duplicate objects and replace them with references. + + Args: + duplicates: The duplicates dictionary collected from the given manifest. + """ + + for _, occurrences in duplicates.items(): + type_key, key, value = self._get_occurance_samples(occurrences) + is_linked_def = self._is_linked_definition(type_key, key) + + # Add to definitions if not there already + if not is_linked_def: + self._add_to_linked_definitions(type_key, key, value) + + # Replace occurrences with references + for _, parent_obj, value in occurrences: + if is_linked_def: + if value == self._get_linked_definition_value(type_key, key): + parent_obj[key] = self._create_linked_definition_ref(type_key, key) + else: + parent_obj[key] = self._create_linked_definition_ref(type_key, key) + + def _handle_duplicates(self, duplicates: DuplicatesType) -> None: + """ + Process the duplicates and replace them with references. + + Args: + duplicates: The duplicates dictionary collected from the given manifest. + """ + + if len(duplicates) > 0: + self._replace_duplicates_with_refs(duplicates) + + def _add_duplicate( + self, + duplicates: DuplicatesType, + current_path: List[str], + obj: Dict[str, Any], + value: Any, + key: Optional[str] = None, + ) -> None: + """ + Adds a duplicate record of an observed object by computing a unique hash for the provided value. + + This function computes a hash for the given value (or a dictionary composed of the key and value if a key is provided) + and appends a tuple containing the current path, the original object, and the value to the duplicates + dictionary under the corresponding hash. + + Parameters: + duplicates (DuplicatesType): The dictionary to store duplicate records. + current_path (List[str]): The list of keys or indices representing the current location in the object hierarchy. + obj (Dict): The original dictionary object where the duplicate is observed. + value (Any): The value to be hashed and used for identifying duplicates. + key (Optional[str]): An optional key that, if provided, wraps the value in a dictionary before hashing. + """ + + # create hash for each duplicate observed + value_to_hash = {key: value} if key is not None else value + duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value)) + + def _add_to_linked_definitions( + self, + type_key: str, + key: str, + value: Any, + ) -> None: + """ + Add a value to the linked definitions under the specified key. + + Args: + definitions: The definitions dictionary to modify + key: The key to use + value: The value to add + """ + if type_key not in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): + self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key] = {} + + if key not in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): + self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] = value + + def _collect_duplicates(self) -> DuplicatesType: + """ + Traverse the JSON object and collect all potential duplicate values and objects. + + Returns: + duplicates: A dictionary of duplicate objects. + """ + + def _collect(obj: Dict[str, Any], path: Optional[List[str]] = None) -> None: + """ + The closure to recursively collect duplicates in the JSON object. + + Args: + obj: The current object being analyzed. + path: The current path in the object hierarchy. + """ + + if not isinstance(obj, dict): + return + + path = [] if path is None else path + # Check if the object is empty + for key, value in obj.items(): + # do not collect duplicates from `definitions` tag + if key == DEF_TAG: + continue + + current_path = path + [key] + + if isinstance(value, dict): + # First process nested dictionaries + _collect(value, current_path) + # Process allowed-only component tags + if key in self._linkable_tags: + self._add_duplicate(duplicates, current_path, obj, value) + + # handle primitive types + elif isinstance(value, (str, int, float, bool)): + # Process allowed-only field tags + if key in self._linkable_tags: + self._add_duplicate(duplicates, current_path, obj, value, key) + + # handle list cases + elif isinstance(value, list): + for i, item in enumerate(value): + _collect(item, current_path + [str(i)]) + + duplicates: DuplicatesType = defaultdict(list, {}) + try: + if self._linkable_tags: + _collect(self._normalized_manifest) + # clean non-duplicates and sort based on the count of occurrences + return self._clean_and_sort_duplicates(duplicates) + return duplicates + except Exception as e: + raise ManifestNormalizationException(str(e)) + + def _clean_and_sort_duplicates(self, duplicates: DuplicatesType) -> DuplicatesType: + """ + Clean non-duplicates and sort the duplicates by their occurrences. + + Args: + duplicates: The duplicates dictionary to sort + + Returns: + A sorted duplicates dictionary. + """ + + # clean non-duplicates + duplicates = defaultdict( + list, + {k: v for k, v in duplicates.items() if len(v) >= N_OCCURANCES}, + ) + + # sort the duplicates by their occurrences, more frequent ones go first + duplicates = defaultdict( + list, + {k: v for k, v in sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True)}, + ) + + return duplicates + + def _hash_object(self, obj: Dict[str, Any]) -> str: + """ + Create a unique hash for a dictionary object. + + Args: + node: The dictionary to hash + + Returns: + A hashed string + """ + + # Sort keys to ensure consistent hash for same content + return hashlib.md5(json.dumps(obj, sort_keys=True).encode()).hexdigest() + + def _is_linked_definition(self, type_key: str, key: str) -> bool: + """ + Check if the key already exists in the linked definitions. + + Args: + key: The key to check + definitions: The definitions dictionary with definitions + + Returns: + True if the key exists in the linked definitions, False otherwise + """ + + if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): + # Check if the key exists in the linked definitions + if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): + return True + + return False + + def _get_linked_definition_value(self, type_key: str, key: str) -> Any: + """ + Get the value of a linked definition by its key. + + Args: + key: The key to check + definitions: The definitions dictionary with definitions + + Returns: + The value of the linked definition + """ + if type_key in self._normalized_manifest[DEF_TAG][LINKED_TAG].keys(): + if key in self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key].keys(): + return self._normalized_manifest[DEF_TAG][LINKED_TAG][type_key][key] + else: + raise ManifestNormalizationException( + f"Key {key} not found in linked definitions. Please check the manifest." + ) + + def _get_occurance_samples(self, occurrences: DuplicateOccurancesType) -> Tuple[str, str, Any]: + """ + Get the key from the occurrences list. + + Args: + occurrences: The occurrences list + + Returns: + The key, type and value from the occurrences + """ + + # Take the value from the first occurrence, as they are the same + path, obj, value = occurrences[0] + return ( + obj["type"], + path[-1], + value, + ) # Return the component's name as the last part of its path + + def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, str]: + """ + Create a reference object for the linked definitions using the specified key. + + Args: + ref_key: The reference key to use + + Returns: + A reference object in the proper format + """ + + return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"} + + def _create_schema_ref(self, key: str) -> Dict[str, str]: + """ + Create a reference object for stream schema using the specified key. + + Args: + key: The reference key to use + + Returns: + A reference object in the proper format + """ + + return {"$ref": f"#/{SCHEMAS_TAG}/{key}"} diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index 045ea9a2c..1c5ae0485 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -3,7 +3,7 @@ # import re -from typing import Any, Mapping, Set, Tuple, Union +from typing import Any, Dict, Mapping, Set, Tuple, Union from airbyte_cdk.sources.declarative.parsers.custom_exceptions import ( CircularReferenceException, @@ -99,7 +99,7 @@ class ManifestReferenceResolver: until we find a key with the given path, or until there is nothing to traverse. """ - def preprocess_manifest(self, manifest: Mapping[str, Any]) -> Mapping[str, Any]: + def preprocess_manifest(self, manifest: Mapping[str, Any]) -> Dict[str, Any]: """ :param manifest: incoming manifest that could have references to previously defined components :return: diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index d98a49a8c..36680a6bb 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -503,7 +503,7 @@ def test_handle_resolve_manifest(valid_resolve_manifest_config_file, dummy_catal str(valid_resolve_manifest_config_file), "--catalog", str(dummy_catalog), - ] + ], ) assert patched_handle.call_count == 1 @@ -515,7 +515,13 @@ def test_handle_test_read(valid_read_config_file, configured_catalog): return_value=AirbyteMessage(type=MessageType.RECORD), ) as patch: handle_request( - ["read", "--config", str(valid_read_config_file), "--catalog", str(configured_catalog)] + [ + "read", + "--config", + str(valid_read_config_file), + "--catalog", + str(configured_catalog), + ], ) assert patch.call_count == 1 @@ -919,7 +925,13 @@ def test_missing_config(valid_resolve_manifest_config_file): def test_invalid_config_command(invalid_config_file, dummy_catalog): with pytest.raises(ValueError): handle_request( - ["read", "--config", str(invalid_config_file), "--catalog", str(dummy_catalog)] + [ + "read", + "--config", + str(invalid_config_file), + "--catalog", + str(dummy_catalog), + ], ) diff --git a/unit_tests/sources/declarative/parsers/conftest.py b/unit_tests/sources/declarative/parsers/conftest.py new file mode 100644 index 000000000..3f653ebb1 --- /dev/null +++ b/unit_tests/sources/declarative/parsers/conftest.py @@ -0,0 +1,879 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Dict + +import pytest +import yaml + + +@pytest.fixture +def manifest_with_multiple_url_base() -> Dict[str, Any]: + return { + "type": "DeclarativeSource", + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "C", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "D", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "E", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # dummy requesters to be resolved and deduplicated + # to the linked `url_base` in the `definitions.linked` section + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_multiple_url_base_normalized() -> Dict[str, Any]: + return { + "type": "DeclarativeSource", + "definitions": {"linked": {"HttpRequester": {"url_base": "https://example.com/v2/"}}}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + "path": "A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + "path": "C", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "D", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "E", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + }, + } + + +@pytest.fixture +def manifest_with_url_base_linked_definition() -> Dict[str, Any]: + return { + "type": "DeclarativeSource", + "definitions": { + "linked": {"HttpRequester": {"url_base": "https://example.com/v2/"}}, + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "C", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "D", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "E", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # dummy requesters to be resolved and deduplicated + # to the linked `url_base` in the `definitions.linked` section + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + }, + "requester_B": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str, Any]: + return { + "type": "DeclarativeSource", + "definitions": {"linked": {"HttpRequester": {"url_base": "https://example.com/v2/"}}}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + "path": "A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + "path": "C", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "D", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": {"$ref": "#/definitions/linked/HttpRequester/url_base"}, + "path": "E", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "decoder": {"type": "JsonDecoder"}, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {}, + }, + }, + } + + +@pytest.fixture +def manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas() -> Dict[str, Any]: + with open( + "unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml", + "r", + ) as file: + return dict(yaml.safe_load(file)) + + +@pytest.fixture +def expected_manifest_with_linked_definitions_url_base_authenticator_normalized() -> Dict[str, Any]: + return { + "version": "6.44.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["pokemon"]}, + "definitions": { + "linked": { + "HttpRequester": { + "url_base": "https://pokeapi.co/api/v1/", + "authenticator": { + "type": "ApiKeyAuthenticator", + "api_token": '{{ config["api_key"] }}', + "inject_into": { + "type": "RequestOption", + "field_name": "API_KEY", + "inject_into": "header", + }, + }, + } + } + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "pokemon", + "retriever": { + "type": "SimpleRetriever", + "decoder": {"type": "JsonDecoder"}, + "requester": { + "type": "HttpRequester", + "path": "pokemon", + "url_base": { + "$ref": "#/definitions/linked/HttpRequester/url_base", + }, + "http_method": "GET", + "authenticator": { + "$ref": "#/definitions/linked/HttpRequester/authenticator", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/pokemon"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "trainers", + "retriever": { + "type": "SimpleRetriever", + "decoder": {"type": "JsonDecoder"}, + "requester": { + "type": "HttpRequester", + "path": "pokemon", + "url_base": { + "$ref": "#/definitions/linked/HttpRequester/url_base", + }, + "http_method": "GET", + "authenticator": { + "$ref": "#/definitions/linked/HttpRequester/authenticator", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/trainers"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "items", + "retriever": { + "type": "SimpleRetriever", + "decoder": {"type": "JsonDecoder"}, + "requester": { + "type": "HttpRequester", + "path": "pokemon", + "url_base": "https://pokeapi.co/api/v2/", + "http_method": "GET", + "authenticator": { + "$ref": "#/definitions/linked/HttpRequester/authenticator" + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/items"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "location", + "retriever": { + "type": "SimpleRetriever", + "decoder": {"type": "JsonDecoder"}, + "requester": { + "type": "HttpRequester", + "path": "location", + "url_base": "https://pokeapi.co/api/v2/", + "http_method": "GET", + "authenticator": { + "$ref": "#/definitions/linked/HttpRequester/authenticator" + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/location"}, + }, + }, + { + "type": "DeclarativeStream", + "name": "berries", + "retriever": { + "type": "SimpleRetriever", + "decoder": {"type": "JsonDecoder"}, + "requester": { + "type": "HttpRequester", + "path": "berries", + "url_base": "https://pokeapi.co/api/v2/", + "http_method": "GET", + "authenticator": { + "$ref": "#/definitions/linked/HttpRequester/authenticator" + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/berries"}, + }, + }, + ], + "spec": { + "type": "Spec", + "connection_specification": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "order": 0, + "title": "API Key", + "airbyte_secret": True, + } + }, + "additionalProperties": True, + }, + }, + "metadata": { + "assist": {}, + "testedStreams": { + "berries": {"streamHash": None}, + "pokemon": {"streamHash": None}, + "location": {"streamHash": None}, + "trainers": {"streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"}, + "items": {"streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"}, + }, + "autoImportSchema": {"berries": True, "pokemon": True, "location": True}, + }, + "schemas": { + "berries": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "name": {"type": "string"}, + "berry_type": {"type": "integer"}, + }, + "additionalProperties": True, + }, + "pokemon": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "name": {"type": "string"}, + "pokemon_type": {"type": "integer"}, + }, + }, + "location": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "name": {"type": "string"}, + "location_type": {"type": "string"}, + }, + }, + "trainers": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "name": {"type": "string"}, + "pokemon_type": {"type": "integer"}, + }, + }, + "items": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "name": {"type": "string"}, + "pokemon_type": {"type": "integer"}, + }, + }, + }, + } diff --git a/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml b/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml new file mode 100644 index 000000000..5b972334c --- /dev/null +++ b/unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml @@ -0,0 +1,212 @@ +version: 6.44.0 + +type: DeclarativeSource + +check: + type: CheckStream + stream_names: + - pokemon + +definitions: + linked: + HttpRequester: + url_base: https://pokeapi.co/api/v1/ + +streams: + - type: DeclarativeStream + name: pokemon + retriever: + type: SimpleRetriever + decoder: + type: JsonDecoder + requester: + type: HttpRequester + path: pokemon + url_base: + $ref: "#/definitions/linked/HttpRequester/url_base" + http_method: GET + authenticator: + type: ApiKeyAuthenticator + api_token: "{{ config[\"api_key\"] }}" + inject_into: + type: RequestOption + field_name: API_KEY + inject_into: header + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + $ref: "#/schemas/pokemon" + - type: DeclarativeStream + name: trainers + retriever: + type: SimpleRetriever + decoder: + type: JsonDecoder + requester: + type: HttpRequester + path: pokemon + url_base: + $ref: "#/definitions/linked/HttpRequester/url_base" + http_method: GET + authenticator: + type: ApiKeyAuthenticator + api_token: "{{ config[\"api_key\"] }}" + inject_into: + type: RequestOption + field_name: API_KEY + inject_into: header + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + $ref: "#/schemas/pokemon" + - type: DeclarativeStream + name: items + retriever: + type: SimpleRetriever + decoder: + type: JsonDecoder + requester: + type: HttpRequester + path: pokemon + url_base: https://pokeapi.co/api/v2/ + http_method: GET + authenticator: + type: ApiKeyAuthenticator + api_token: "{{ config[\"api_key\"] }}" + inject_into: + type: RequestOption + field_name: API_KEY + inject_into: header + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + $ref: "#/schemas/pokemon" + - type: DeclarativeStream + name: location + retriever: + type: SimpleRetriever + decoder: + type: JsonDecoder + requester: + type: HttpRequester + path: location + url_base: https://pokeapi.co/api/v2/ + http_method: GET + authenticator: + type: ApiKeyAuthenticator + api_token: "{{ config[\"api_key\"] }}" + inject_into: + type: RequestOption + field_name: API_KEY + inject_into: header + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + $ref: "#/schemas/location" + - type: DeclarativeStream + name: berries + retriever: + type: SimpleRetriever + decoder: + type: JsonDecoder + requester: + type: HttpRequester + path: berries + url_base: https://pokeapi.co/api/v2/ + http_method: GET + authenticator: + type: ApiKeyAuthenticator + api_token: "{{ config[\"api_key\"] }}" + inject_into: + type: RequestOption + field_name: API_KEY + inject_into: header + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + $ref: "#/schemas/berries" + +spec: + type: Spec + connection_specification: + type: object + $schema: http://json-schema.org/draft-07/schema# + required: + - api_key + properties: + api_key: + type: string + order: 0 + title: API Key + airbyte_secret: true + additionalProperties: true + +metadata: + assist: {} + testedStreams: + berries: + streamHash: null + pokemon: + streamHash: null + location: + streamHash: null + trainers: + streamHash: ca4ee51a2aaa2a53b9c0b91881a84ad621da575f + items: + streamHash: 12e624ecf47c6357c74c27d6a65c72e437b1534a + autoImportSchema: + berries: true + pokemon: true + location: true + +schemas: + berries: + type: object + $schema: http://json-schema.org/draft-07/schema# + properties: + name: + type: string + berry_type: + type: integer + additionalProperties: true + pokemon: + type: object + $schema: http://json-schema.org/draft-07/schema# + properties: + name: + type: string + pokemon_type: + type: integer + location: + type: object + $schema: http://json-schema.org/draft-07/schema# + properties: + name: + type: string + location_type: + type: string \ No newline at end of file diff --git a/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py new file mode 100644 index 000000000..f25c1bbd6 --- /dev/null +++ b/unit_tests/sources/declarative/parsers/test_manifest_normalizer.py @@ -0,0 +1,73 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.manifest_declarative_source import ( + _get_declarative_component_schema, +) +from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( + ManifestNormalizer, +) +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( + ManifestReferenceResolver, +) + +resolver = ManifestReferenceResolver() + + +def test_when_multiple_url_base_are_resolved_and_most_frequent_is_shared( + manifest_with_multiple_url_base, + expected_manifest_with_multiple_url_base_normalized, +) -> None: + """ + This test is to check that the manifest is normalized when multiple url_base are resolved + and the most frequent one is shared. + """ + + schema = _get_declarative_component_schema() + resolved_manifest = resolver.preprocess_manifest(manifest_with_multiple_url_base) + normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize() + + assert normalized_manifest == expected_manifest_with_multiple_url_base_normalized + + +def test_with_shared_definitions_url_base_are_present( + manifest_with_url_base_linked_definition, + expected_manifest_with_url_base_linked_definition_normalized, +) -> None: + """ + This test is to check that the manifest is normalized when the `url_base` is shared + between the definitions and the `url_base` is present in the manifest. + """ + + schema = _get_declarative_component_schema() + resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_linked_definition) + normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize() + + assert normalized_manifest == expected_manifest_with_url_base_linked_definition_normalized + + +def test_with_linked_definitions_url_base_authenticator_when_multiple_streams_reference_the_same_schema( + manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas, + expected_manifest_with_linked_definitions_url_base_authenticator_normalized, +) -> None: + """ + This test is to check that the manifest is normalized when the `url_base` and the `authenticator` is linked + between the definitions and the `url_base` is present in the manifest. + The `authenticator` is not a normal schema, but a reference to another schema. + + The test also verifies the `stream.schema_loader.schema` is properly extracted to + the `schemas.`. + """ + + schema = _get_declarative_component_schema() + resolved_manifest = resolver.preprocess_manifest( + manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas + ) + normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize() + + assert ( + normalized_manifest + == expected_manifest_with_linked_definitions_url_base_authenticator_normalized + ) diff --git a/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index 1fdbf2d55..29afa0e70 100644 --- a/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -16,7 +16,6 @@ resolver = ManifestReferenceResolver() -# @ def test_refer(): content = {"limit": 50, "limit_ref": "#/limit"} config = resolver.preprocess_manifest(content) @@ -152,7 +151,10 @@ def test_list_of_dicts(): def test_multiple_levels_of_indexing(): - content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "#/list/1/B/0"} + content = { + "list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], + "elem_ref": "#/list/1/B/0", + } config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "b1"