diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index b70e2c85f..bb6b0929a 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -56,6 +56,16 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) +def should_migrate_manifest(config: Mapping[str, Any]) -> bool: + """ + Determines whether the manifest should be migrated, + based on the presence of the "__should_migrate" key in the config. + + This flag is set by the UI. + """ + return config.get("__should_migrate", False) + + def should_normalize_manifest(config: Mapping[str, Any]) -> bool: """ Check if the manifest should be normalized. @@ -71,6 +81,7 @@ def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDecl config=config, emit_connector_builder_messages=True, source_config=manifest, + migrate_manifest=should_migrate_manifest(config), normalize_manifest=should_normalize_manifest(config), component_factory=ModelToComponentFactory( emit_connector_builder_messages=True, diff --git a/airbyte_cdk/manifest_migrations/README.md b/airbyte_cdk/manifest_migrations/README.md new file mode 100644 index 000000000..12ec41837 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/README.md @@ -0,0 +1,73 @@ +# Manifest Migrations + +This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves. + +## Adding a New Migration + +1. **Create a Migration File:** + - Add a new Python file in the `migrations/` subdirectory. + - Name the file using the pattern: `.py`. + - Example: `http_requester_url_base_to_url.py` + - The filename should be unique and descriptive. + +2. **Define the Migration Class:** + - The migration class must inherit from `ManifestMigration`. + - Name the class using a descriptive name (e.g., `HttpRequesterUrlBaseToUrl`). + - Implement the following methods: + - `should_migrate(self, manifest: ManifestType) -> bool` + - `migrate(self, manifest: ManifestType) -> None` + - `validate(self, manifest: ManifestType) -> bool` + +3. **Register the Migration:** + - Open `migrations/registry.yaml`. + - Add an entry under the appropriate version, or create a new version section if needed. + - Each migration entry should include: + - `name`: The filename (without `.py`) + - `order`: The order in which this migration should be applied for the version + - `description`: A short description of the migration + + Example: + ```yaml + manifest_migrations: + - version: 6.45.2 + migrations: + - name: http_requester_url_base_to_url + order: 1 + description: | + This migration updates the `url_base` field in the `HttpRequester` component spec to `url`. + ``` + +4. **Testing:** + - Ensure your migration is covered by unit tests. + - Tests should verify both `should_migrate`, `migrate`, and `validate` behaviors. + +## Migration Discovery + +- Migrations are discovered and registered automatically based on the entries in `migrations/registry.yaml`. +- Do not modify the migration registry in code manually. +- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`. + +## Example Migration Skeleton + +```python +from airbyte_cdk.manifest_migrations.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType + +class ExampleMigration(ManifestMigration): + component_type = "ExampleComponent" + original_key = "old_key" + replacement_key = "new_key" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) + + def validate(self, manifest: ManifestType) -> bool: + return self.replacement_key in manifest and self.original_key not in manifest +``` + +--- + +For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder. \ No newline at end of file diff --git a/airbyte_cdk/manifest_migrations/__init__.py b/airbyte_cdk/manifest_migrations/__init__.py new file mode 100644 index 000000000..2acb8555b --- /dev/null +++ b/airbyte_cdk/manifest_migrations/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte_cdk/manifest_migrations/exceptions.py b/airbyte_cdk/manifest_migrations/exceptions.py new file mode 100644 index 000000000..43c88a334 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/exceptions.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +class ManifestMigrationException(Exception): + """ + Raised when a migration error occurs in the manifest. + """ + + def __init__(self, message: str) -> None: + super().__init__(f"Failed to migrate the manifest: {message}") diff --git a/airbyte_cdk/manifest_migrations/manifest_migration.py b/airbyte_cdk/manifest_migrations/manifest_migration.py new file mode 100644 index 000000000..2cf26f6d5 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/manifest_migration.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC, abstractmethod +from dataclasses import asdict, dataclass +from typing import Any, Dict + +ManifestType = Dict[str, Any] + + +TYPE_TAG = "type" + +NON_MIGRATABLE_TYPES = [ + # more info here: https://github.com/airbytehq/airbyte-internal-issues/issues/12423 + "DynamicDeclarativeStream", +] + + +@dataclass +class MigrationTrace: + """ + This class represents a migration that has been applied to the manifest. + It contains information about the migration, including the version it was applied from, + the version it was applied to, and the time it was applied. + """ + + from_version: str + to_version: str + migration: str + migrated_at: str + + def as_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class ManifestMigration(ABC): + """ + Base class for manifest migrations. + This class provides a framework for migrating manifest components. + It defines the structure for migration classes, including methods for checking if a migration is needed, + performing the migration, and validating the migration. + """ + + def __init__(self) -> None: + self.is_migrated: bool = False + + @abstractmethod + def should_migrate(self, manifest: ManifestType) -> bool: + """ + Check if the manifest should be migrated. + + :param manifest: The manifest to potentially migrate + + :return: true if the manifest is of the expected format and should be migrated. False otherwise. + """ + + @abstractmethod + def migrate(self, manifest: ManifestType) -> None: + """ + Migrate the manifest. Assumes should_migrate(manifest) returned True. + + :param manifest: The manifest to migrate + """ + + @abstractmethod + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the manifest to ensure the migration was successfully applied. + + :param manifest: The manifest to validate + """ + + def _is_component(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a component. + + :param obj: The object to check + :return: True if the object is a component, False otherwise + """ + return TYPE_TAG in obj.keys() + + def _is_migratable_type(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a migratable component, + based on the Type of the component and the migration version. + + :param obj: The object to check + :return: True if the object is a migratable component, False otherwise + """ + return obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES + + def _process_manifest(self, obj: Any) -> None: + """ + Recursively processes a manifest object, migrating components that match the migration criteria. + + This method traverses the entire manifest structure (dictionaries and lists) and applies + migrations to components that: + 1. Have a type tag + 2. Are not in the list of non-migratable types + 3. Meet the conditions defined in the should_migrate method + + Parameters: + obj (Any): The object to process, which can be a dictionary, list, or any other type. + Dictionary objects are checked for component type tags and potentially migrated. + List objects have each of their items processed recursively. + Other types are ignored. + + Returns: + None, since we process the manifest in place. + """ + if isinstance(obj, dict): + # Check if the object is a component + if self._is_component(obj): + # Check if the object is allowed to be migrated + if not self._is_migratable_type(obj): + return + + # Check if the object should be migrated + if self.should_migrate(obj): + # Perform the migration, if needed + self.migrate(obj) + # validate the migration + self.is_migrated = self.validate(obj) + + # Process all values in the dictionary + for value in list(obj.values()): + self._process_manifest(value) + + elif isinstance(obj, list): + # Process all items in the list + for item in obj: + self._process_manifest(item) diff --git a/airbyte_cdk/manifest_migrations/migration_handler.py b/airbyte_cdk/manifest_migrations/migration_handler.py new file mode 100644 index 000000000..f843c25ce --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migration_handler.py @@ -0,0 +1,163 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +import copy +import logging +from datetime import datetime, timezone +from typing import Type + +from packaging.version import Version + +from airbyte_cdk.manifest_migrations.exceptions import ( + ManifestMigrationException, +) +from airbyte_cdk.manifest_migrations.manifest_migration import ( + ManifestMigration, + ManifestType, + MigrationTrace, +) +from airbyte_cdk.manifest_migrations.migrations_registry import ( + MANIFEST_MIGRATIONS, +) + +METADATA_TAG = "metadata" +MANIFEST_VERSION_TAG = "version" +APPLIED_MIGRATIONS_TAG = "applied_migrations" + +LOGGER = logging.getLogger("airbyte.cdk.manifest_migrations") + + +class ManifestMigrationHandler: + """ + This class is responsible for handling migrations in the manifest. + """ + + def __init__(self, manifest: ManifestType) -> None: + self._manifest = manifest + self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest) + + def apply_migrations(self) -> ManifestType: + """ + Apply all registered migrations to the manifest. + + This method iterates through all migrations in the migrations registry and applies + them sequentially to the current manifest. If any migration fails with a + ManifestMigrationException, the original unmodified manifest is returned instead. + + Returns: + ManifestType: The migrated manifest if all migrations succeeded, or the original + manifest if any migration failed. + """ + try: + manifest_version = self._get_manifest_version() + for migration_version, migrations in MANIFEST_MIGRATIONS.items(): + for migration_cls in migrations: + self._handle_migration(migration_cls, manifest_version, migration_version) + return self._migrated_manifest + except ManifestMigrationException: + # if any errors occur we return the original resolved manifest + return self._manifest + + def _handle_migration( + self, + migration_class: Type[ManifestMigration], + manifest_version: str, + migration_version: str, + ) -> None: + """ + Handles a single manifest migration by instantiating the migration class and processing the manifest. + + Args: + migration_class (Type[ManifestMigration]): The migration class to apply to the manifest. + + Raises: + ManifestMigrationException: If the migration process encounters any errors. + """ + try: + migration_instance = migration_class() + if self._version_is_valid_for_migration(manifest_version, migration_version): + migration_instance._process_manifest(self._migrated_manifest) + if migration_instance.is_migrated: + # set the updated manifest version, after migration has been applied + self._set_manifest_version(migration_version) + self._set_migration_trace(migration_class, manifest_version, migration_version) + else: + LOGGER.info( + f"Manifest migration: `{self._get_migration_name(migration_class)}` is not supported for the given manifest version `{manifest_version}`.", + ) + except Exception as e: + raise ManifestMigrationException(str(e)) from e + + def _get_migration_name(self, migration_class: Type[ManifestMigration]) -> str: + """ + Get the name of the migration instance. + + Returns: + str: The name of the migration. + """ + return migration_class.__name__ + + def _get_manifest_version(self) -> str: + """ + Get the manifest version from the manifest. + + :param manifest: The manifest to get the version from + :return: The manifest version + """ + return str(self._migrated_manifest.get(MANIFEST_VERSION_TAG, "0.0.0")) + + def _version_is_valid_for_migration( + self, + manifest_version: str, + migration_version: str, + ) -> bool: + """ + Checks if the given manifest version is less than or equal to the specified migration version. + + Args: + manifest_version (str): The version of the manifest to check. + migration_version (str): The migration version to compare against. + + Returns: + bool: True if the manifest version is less than or equal to the migration version, False otherwise. + """ + return Version(manifest_version) <= Version(migration_version) + + def _set_manifest_version(self, version: str) -> None: + """ + Set the manifest version in the manifest. + + :param version: The version to set + """ + self._migrated_manifest[MANIFEST_VERSION_TAG] = version + + def _set_migration_trace( + self, + migration_instance: Type[ManifestMigration], + manifest_version: str, + migration_version: str, + ) -> None: + """ + Set the migration trace in the manifest, under the `metadata.applied_migrations` property object. + + :param migration_instance: The migration instance to set + :param manifest_version: The manifest version before migration + :param migration_version: The manifest version after migration + """ + + if METADATA_TAG not in self._migrated_manifest: + self._migrated_manifest[METADATA_TAG] = {} + if APPLIED_MIGRATIONS_TAG not in self._migrated_manifest[METADATA_TAG]: + self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG] = [] + + migration_trace = MigrationTrace( + from_version=manifest_version, + to_version=migration_version, + migration=self._get_migration_name(migration_instance), + migrated_at=datetime.now(tz=timezone.utc).isoformat(), + ).as_dict() + + if migration_version not in self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG]: + self._migrated_manifest[METADATA_TAG][APPLIED_MIGRATIONS_TAG].append(migration_trace) diff --git a/airbyte_cdk/manifest_migrations/migrations/__init__.py b/airbyte_cdk/manifest_migrations/migrations/__init__.py new file mode 100644 index 000000000..5a567c032 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/__init__.py @@ -0,0 +1,4 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py new file mode 100644 index 000000000..12d7d2b75 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_path_to_url.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +from urllib.parse import urljoin + +from airbyte_cdk.manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) +from airbyte_cdk.sources.types import EmptyString + + +class HttpRequesterPathToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `path` key to `url` in the HttpRequester component. + The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL. + The migration will concatenate the `url_base` and `path` to form a full URL. + """ + + component_type = "HttpRequester" + original_key = "path" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + original_key_value = manifest.get(self.original_key, EmptyString).lstrip("/") + replacement_key_value = manifest[self.replacement_key] + + # return a full-url if provided directly from interpolation context + if original_key_value == EmptyString or original_key_value is None: + manifest[self.replacement_key] = replacement_key_value + manifest.pop(self.original_key, None) + else: + # since we didn't provide a full-url, the url_base might not have a trailing slash + # so we join the url_base and path correctly + if not replacement_key_value.endswith("/"): + replacement_key_value += "/" + + manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value) + manifest.pop(self.original_key, None) + + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the migration by checking if the `url` key is present and the `path` key is not. + """ + return ( + self.replacement_key in manifest + and self.original_key not in manifest + and manifest[self.replacement_key] is not None + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py new file mode 100644 index 000000000..648b8d9a3 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_request_body_json_data_to_request_body.py @@ -0,0 +1,51 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class HttpRequesterRequestBodyJsonDataToRequestBody(ManifestMigration): + """ + This migration is responsible for migrating the `request_body_json` and `request_body_data` keys + to a unified `request_body` key in the HttpRequester component. + The migration will copy the value of either original key to `request_body` and remove the original key. + """ + + component_type = "HttpRequester" + + body_json_key = "request_body_json" + body_data_key = "request_body_data" + original_keys = (body_json_key, body_data_key) + + replacement_key = "request_body" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and any( + key in list(manifest.keys()) for key in self.original_keys + ) + + def migrate(self, manifest: ManifestType) -> None: + for key in self.original_keys: + if key == self.body_json_key and key in manifest: + manifest[self.replacement_key] = { + "type": "RequestBodyJson", + "value": manifest[key], + } + manifest.pop(key, None) + elif key == self.body_data_key and key in manifest: + manifest[self.replacement_key] = { + "type": "RequestBodyData", + "value": manifest[key], + } + manifest.pop(key, None) + + def validate(self, manifest: ManifestType) -> bool: + return self.replacement_key in manifest and all( + key not in manifest for key in self.original_keys + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py new file mode 100644 index 000000000..14ffa4141 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/http_requester_url_base_to_url.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.manifest_migrations.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class HttpRequesterUrlBaseToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. + The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. + The migration will copy the value of `url_base` to `url`. + """ + + component_type = "HttpRequester" + original_key = "url_base" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) + + def validate(self, manifest: ManifestType) -> bool: + """ + Validate the migration by checking if the `url` key is present and the `url_base` key is not. + """ + return ( + self.replacement_key in manifest + and self.original_key not in manifest + and manifest[self.replacement_key] is not None + ) diff --git a/airbyte_cdk/manifest_migrations/migrations/registry.yaml b/airbyte_cdk/manifest_migrations/migrations/registry.yaml new file mode 100644 index 000000000..6beb1667c --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations/registry.yaml @@ -0,0 +1,22 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +manifest_migrations: + - version: 6.47.1 + migrations: + - name: http_requester_url_base_to_url + order: 1 + description: | + This migration updates the `url_base` field in the `http_requester` spec to `url`. + The `url_base` field is deprecated and will be removed in a future version. + - name: http_requester_path_to_url + order: 2 + description: | + This migration updates the `path` field in the `http_requester` spec to `url`. + The `path` field is deprecated and will be removed in a future version. + - name: http_requester_request_body_json_data_to_request_body + order: 3 + description: | + This migration updates the `request_body_json_data` field in the `http_requester` spec to `request_body`. + The `request_body_json_data` field is deprecated and will be removed in a future version. diff --git a/airbyte_cdk/manifest_migrations/migrations_registry.py b/airbyte_cdk/manifest_migrations/migrations_registry.py new file mode 100644 index 000000000..4a57e2a35 --- /dev/null +++ b/airbyte_cdk/manifest_migrations/migrations_registry.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + + +import importlib +import inspect +import os +from pathlib import Path +from types import ModuleType +from typing import Dict, List, Type + +import yaml + +from airbyte_cdk.manifest_migrations.manifest_migration import ( + ManifestMigration, +) + +DiscoveredMigrations = Dict[str, List[Type[ManifestMigration]]] + +MIGRATIONS_PATH = Path(__file__).parent / "migrations" +REGISTRY_PATH = MIGRATIONS_PATH / "registry.yaml" + + +def _find_migration_module(name: str) -> str: + """ + Finds the migration module by name in the migrations directory. + The name should match the file name of the migration module (without the .py extension). + Raises ImportError if the module is not found. + """ + + for migration_file in os.listdir(MIGRATIONS_PATH): + migration_name = name + ".py" + if migration_file == migration_name: + return migration_file.replace(".py", "") + + raise ImportError(f"Migration module '{name}' not found in {MIGRATIONS_PATH}.") + + +def _get_migration_class(module: ModuleType) -> Type[ManifestMigration]: + """ + Returns the ManifestMigration subclass defined in the module. + """ + for _, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, ManifestMigration): + return obj + + raise ImportError(f"No ManifestMigration subclass found in module {module.__name__}.") + + +def _discover_migrations() -> DiscoveredMigrations: + """ + Discovers and returns a list of ManifestMigration subclasses in the order specified by registry.yaml. + """ + with open(REGISTRY_PATH, "r") as f: + registry = yaml.safe_load(f) + migrations: DiscoveredMigrations = {} + # Iterate through the registry and import the migration classes + # based on the version and order specified in the registry.yaml + for version_entry in registry.get("manifest_migrations", []): + migration_version = version_entry.get("version", "0.0.0") + if not migration_version in migrations: + migrations[migration_version] = [] + + for migration in sorted(version_entry.get("migrations", []), key=lambda m: m["order"]): + module = importlib.import_module( + f"airbyte_cdk.manifest_migrations.migrations.{_find_migration_module(migration['name'])}" + ) + migration_class = _get_migration_class(module) + migrations[migration_version].append(migration_class) + + return migrations + + +# registered migrations +MANIFEST_MIGRATIONS: DiscoveredMigrations = _discover_migrations() diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 31895abeb..03e488895 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -15,6 +15,9 @@ from jsonschema.validators import validate from packaging.version import InvalidVersion, Version +from airbyte_cdk.manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) from airbyte_cdk.models import ( AirbyteConnectionStatus, AirbyteMessage, @@ -91,6 +94,7 @@ def __init__( debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, + migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, ) -> None: """ @@ -104,12 +108,11 @@ def __init__( """ self.logger = logging.getLogger(f"airbyte.{self.name}") self._should_normalize = normalize_manifest + self._should_migrate = migrate_manifest self._declarative_component_schema = _get_declarative_component_schema() # If custom components are needed, locate and/or register them. self.components_module: ModuleType | None = get_registered_components_module(config=config) - # resolve all components in the manifest - self._source_config = self._preprocess_manifest(dict(source_config)) - + # set additional attributes self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages self._constructor = ( @@ -126,11 +129,12 @@ def __init__( ) self._config = config or {} + # resolve all components in the manifest + self._source_config = self._pre_process_manifest(dict(source_config)) # validate resolved manifest against the declarative component schema self._validate_source() - # apply additional post-processing to the manifest - self._postprocess_manifest() + self._post_process_manifest() @property def resolved_manifest(self) -> Mapping[str, Any]: @@ -145,7 +149,7 @@ def resolved_manifest(self) -> Mapping[str, Any]: """ return self._source_config - def _preprocess_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: + def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: """ Preprocesses the provided manifest dictionary by resolving any manifest references. @@ -169,12 +173,14 @@ def _preprocess_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: return propagated_manifest - def _postprocess_manifest(self) -> None: + def _post_process_manifest(self) -> None: """ Post-processes the manifest after validation. This method is responsible for any additional modifications or transformations needed after the manifest has been validated and before it is used in the source. """ + # apply manifest migration, if required + self._migrate_manifest() # apply manifest normalization, if required self._normalize_manifest() @@ -190,6 +196,19 @@ def _normalize_manifest(self) -> None: normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) self._source_config = normalizer.normalize() + def _migrate_manifest(self) -> None: + """ + This method is used to migrate the manifest. It should be called after the manifest has been validated. + The migration is done in place, so the original manifest is modified. + + The original manifest is returned if any error occurs during migration. + """ + if self._should_migrate: + manifest_migrator = ManifestMigrationHandler(self._source_config) + self._source_config = manifest_migrator.apply_migrations() + # validate migrated manifest against the declarative component schema + self._validate_source() + def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: """ Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. diff --git a/unit_tests/manifest_migrations/conftest.py b/unit_tests/manifest_migrations/conftest.py new file mode 100644 index 000000000..303e31178 --- /dev/null +++ b/unit_tests/manifest_migrations/conftest.py @@ -0,0 +1,1199 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Dict + +import pytest + + +@pytest.fixture +def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["A"], + }, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "/path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + # ! the double-slash is intentional here for the test. + "path": "//path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "/path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # both requesters have duplicated `url_base`, + # which should be migrated to `url` in the new format + # and the `url_base` and `path` key should be removed + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_a1": { + "type": "string", + }, + }, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_b1": { + "type": "string", + }, + }, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_c1": { + "type": "string", + }, + }, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_d1": { + "type": "string", + }, + }, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_e1": { + "type": "string", + }, + }, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: + return { + "version": "6.47.1", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + }, + "requester_A": {"type": "HttpRequester", "url": "https://example.com/v1/"}, + "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + "metadata": { + "applied_migrations": [ + { + "from_version": "0.0.0", + "to_version": "6.47.1", + "migration": "HttpRequesterUrlBaseToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test + }, + { + "from_version": "0.0.0", + "to_version": "6.47.1", + "migration": "HttpRequesterPathToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", # time freezed in the test + }, + ] + }, + } + + +@pytest.fixture +def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: + return { + "version": "6.47.1", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": {}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + } + + +@pytest.fixture +def manifest_with_request_body_json_and_data_to_migrate_to_request_body() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["A"], + }, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "/path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "path_to_B", + "http_method": "GET", + # the `request_body_json` is expected to be migrated to the `request_body` key + "request_body_json": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + # ! the double-slash is intentional here for the test. + "path": "//path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "/path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # both requesters have duplicated `url_base`, + # which should be migrated to `url` in the new format + # and the `url_base` and `path` key should be removed + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + # this requester has a `request_body_json` key, + # to be migrated to the `request_body` key + "request_body_data": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + # for this requester, the `request_body_json` key is not present, + # but the `request_body_data` key is present in the stream `C` itself. + # it should also be migrated to the `request_body` key + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_a1": { + "type": "string", + }, + }, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_b1": { + "type": "string", + }, + }, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_c1": { + "type": "string", + }, + }, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_d1": { + "type": "string", + }, + }, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_e1": { + "type": "string", + }, + }, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_migrated_to_request_body() -> Dict[str, Any]: + return { + "version": "6.47.1", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + "request_body": { + "type": "RequestBodyJson", + "value": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + }, + "requester_A": { + "type": "HttpRequester", + "url": "https://example.com/v1/", + "request_body": { + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + }, + "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + "request_body": { + "type": "RequestBodyData", + "value": { + "test_key": "{{ config['config_key'] }}", + "test_key_2": "test_value_2", + "test_key_3": 123, + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + "request_body": { + "type": "RequestBodyJson", + "value": { + "reportType": "test_report", + "groupBy": "GROUP", + "metrics": "{{ ','.join( ['a-b','cd','e-f-g-h'] ) }}", + }, + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + "metadata": { + "applied_migrations": [ + { + "from_version": "0.0.0", + "to_version": "6.47.1", + "migration": "HttpRequesterUrlBaseToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + { + "from_version": "0.0.0", + "to_version": "6.47.1", + "migration": "HttpRequesterPathToUrl", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + { + "from_version": "0.0.0", + "to_version": "6.47.1", + "migration": "HttpRequesterRequestBodyJsonDataToRequestBody", + "migrated_at": "2025-04-01T00:00:00+00:00", + }, + ] + }, + } diff --git a/unit_tests/manifest_migrations/test_manifest_migration.py b/unit_tests/manifest_migrations/test_manifest_migration.py new file mode 100644 index 000000000..0ad897375 --- /dev/null +++ b/unit_tests/manifest_migrations/test_manifest_migration.py @@ -0,0 +1,67 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from freezegun import freeze_time + +from airbyte_cdk.manifest_migrations.migration_handler import ( + ManifestMigrationHandler, +) +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( + ManifestReferenceResolver, +) + +resolver = ManifestReferenceResolver() + + +@freeze_time("2025-04-01") +def test_manifest_resolve_migrate_url_base_and_path_to_url( + manifest_with_url_base_to_migrate_to_url, + expected_manifest_with_url_base_migrated_to_url, +) -> None: + """ + This test is to check that the manifest is migrated and normalized + when the `url_base` is migrated to `url` and the `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_to_migrate_to_url) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + assert migrated_manifest == expected_manifest_with_url_base_migrated_to_url + + +@freeze_time("2025-04-01") +def test_manifest_resolve_migrate_request_body_json_and_data_to_request_body( + manifest_with_request_body_json_and_data_to_migrate_to_request_body, + expected_manifest_with_migrated_to_request_body, +) -> None: + """ + This test is to check that the manifest is migrated correctly, + after the `request_body_json` and `request_body_data` are migrated to `request_body`. + """ + + resolved_manifest = resolver.preprocess_manifest( + manifest_with_request_body_json_and_data_to_migrate_to_request_body + ) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + assert migrated_manifest == expected_manifest_with_migrated_to_request_body + + +@freeze_time("2025-04-01") +def test_manifest_resolve_do_not_migrate( + manifest_with_migrated_url_base_and_path_is_joined_to_url, +) -> None: + """ + This test is to check that the manifest remains migrated already, + after the `url_base` and `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest( + manifest_with_migrated_url_base_and_path_is_joined_to_url + ) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + # it's expected that the manifest is the same after the processing, because the manifest version is higher. + assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url