Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions airbyte_cdk/manifest_migrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Manifest Migrations

This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves.

## Adding a New Migration

1. **Create a Migration File:**
- Add a new Python file in the `migrations/` subdirectory.
- Name the file using the pattern: `<description>_v<major>_<minor>_<patch>__<order>.py`.
- Example: `http_requester_url_base_to_url_v6_45_2__0.py`
- The `<order>` integer is used to determine the order of migrations for the same version.

2. **Define the Migration Class:**
- The migration class must inherit from `ManifestMigration`.
- Name the class using the pattern: `V_<major>_<minor>_<patch>_ManifestMigration_<Description>`.
- Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl`
- Implement the following methods:
- `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest.
- `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place.

3. **Migration Versioning:**
- The migration version is extracted from the class name and used to determine applicability.
- Only manifests with a version less than or equal to the migration version will be migrated.

4. **Component Type:**
- Use the `TYPE_TAG` constant to check the component type in your migration logic.

5. **Examples:**
- See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations.

## Migration Registry

- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`.
- Migrations are applied in order, determined by the `<order>` suffix in the filename.

## Testing

- Ensure your migration is covered by unit tests.
- Tests should verify both `should_migrate` and `migrate` behaviors.

## Example Migration Skeleton

```python
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType

class V_1_2_3_ManifestMigration_Example(ManifestMigration):
component_type = "ExampleComponent"
original_key = "old_key"
replacement_key = "new_key"

def should_migrate(self, manifest: ManifestType) -> bool:
return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest

def migrate(self, manifest: ManifestType) -> None:
manifest[self.replacement_key] = manifest[self.original_key]
manifest.pop(self.original_key, None)
```

## Additional Notes

- Do not modify the migration registry manually; it will pick up all valid migration classes automatically.
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.

---

For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder.
Empty file.
12 changes: 12 additions & 0 deletions airbyte_cdk/manifest_migrations/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


class ManifestMigrationException(Exception):
"""
Raised when a migration error occurs in the manifest.
"""

def __init__(self, message: str) -> None:
super().__init__(f"Failed to migrate the manifest: {message}")
127 changes: 127 additions & 0 deletions airbyte_cdk/manifest_migrations/manifest_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import re
from abc import abstractmethod
from typing import Any, Dict

from packaging.version import Version

ManifestType = Dict[str, Any]


TYPE_TAG = "type"

NON_MIGRATABLE_TYPES = [
"DynamicDeclarativeStream",
]


class ManifestMigration:
@abstractmethod
def should_migrate(self, manifest: ManifestType) -> bool:
"""
Check if the manifest should be migrated.

:param manifest: The manifest to potentially migrate
:param kwargs: Additional arguments for migration

:return: true if the manifest is of the expected format and should be migrated. False otherwise.
"""

@abstractmethod
def migrate(self, manifest: ManifestType) -> None:
"""
Migrate the manifest. Assumes should_migrate(manifest) returned True.

:param manifest: The manifest to migrate
:param kwargs: Additional arguments for migration
"""

@property
def migration_version(self) -> Version:
"""
Get the migration version.

:return: The migration version as a Version object
"""
return Version(self._get_migration_version())

def _is_component(self, obj: Dict[str, Any]) -> bool:
"""
Check if the object is a component.

:param obj: The object to check
:return: True if the object is a component, False otherwise
"""
return TYPE_TAG in obj.keys()

def _is_migratable_type(self, obj: Dict[str, Any]) -> bool:
"""
Check if the object is a migratable component,
based on the Type of the component and the migration version.

:param obj: The object to check
:return: True if the object is a migratable component, False otherwise
"""
return obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES

def _process_manifest(self, obj: Any) -> None:
"""
Recursively processes a manifest object, migrating components that match the migration criteria.

This method traverses the entire manifest structure (dictionaries and lists) and applies
migrations to components that:
1. Have a type tag
2. Are not in the list of non-migratable types
3. Meet the conditions defined in the should_migrate method

Parameters:
obj (Any): The object to process, which can be a dictionary, list, or any other type.
Dictionary objects are checked for component type tags and potentially migrated.
List objects have each of their items processed recursively.
Other types are ignored.

Returns:
None, since we process the manifest in place.
"""
if isinstance(obj, dict):
# Check if the object is a component
if self._is_component(obj):
# Check if the object is allowed to be migrated
if not self._is_migratable_type(obj):
return

# Check if the object should be migrated
if self.should_migrate(obj):
# Perform the migration, if needed
self.migrate(obj)

# Process all values in the dictionary
for value in list(obj.values()):
self._process_manifest(value)

elif isinstance(obj, list):
# Process all items in the list
for item in obj:
self._process_manifest(item)

def _get_migration_version(self) -> str:
"""
Get the migration version from the class name.
The migration version is extracted from the class name using a regular expression.
The expected format is "V_<major>_<minor>_<patch>_<migration_name>".

For example, "V_6_45_2_HttpRequesterPathToUrl" -> "6.45.2"

:return: The migration version as a string in the format "major.minor.patch"
:raises ValueError: If the class name does not match the expected format
"""

class_name = self.__class__.__name__
migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name)
if migration_version:
return migration_version.group(1).replace("_", ".")
else:
raise ValueError(
f"Invalid migration class name, make sure the class name has the version (e.g `V_<major>_<minor>_<patch>_<migration_name>`): {class_name}"
)
84 changes: 84 additions & 0 deletions airbyte_cdk/manifest_migrations/migration_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
import logging
from typing import Type

from packaging.version import Version

from airbyte_cdk.manifest_migrations.exceptions import (
ManifestMigrationException,
)
from airbyte_cdk.manifest_migrations.manifest_migration import (
ManifestMigration,
ManifestType,
)
from airbyte_cdk.manifest_migrations.migrations_registry import (
MIGRATIONS,
)

LOGGER = logging.getLogger("airbyte.cdk.manifest_migrations")


class ManifestMigrationHandler:
"""
This class is responsible for handling migrations in the manifest.
"""

def __init__(self, manifest: ManifestType) -> None:
self._manifest = manifest
self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest)
self._manifest_version: Version = self._get_manifest_version()

def apply_migrations(self) -> ManifestType:
"""
Apply all registered migrations to the manifest.

This method iterates through all migrations in the migrations registry and applies
them sequentially to the current manifest. If any migration fails with a
ManifestMigrationException, the original unmodified manifest is returned instead.

Returns:
ManifestType: The migrated manifest if all migrations succeeded, or the original
manifest if any migration failed.
"""
try:
for migration_cls in MIGRATIONS:
self._handle_migration(migration_cls)
return self._migrated_manifest
except ManifestMigrationException:
# if any errors occur we return the original resolved manifest
return self._manifest

def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None:
"""
Handles a single manifest migration by instantiating the migration class and processing the manifest.

Args:
migration_class (Type[ManifestMigration]): The migration class to apply to the manifest.

Raises:
ManifestMigrationException: If the migration process encounters any errors.
"""
try:
migration_instance = migration_class()
# check if the migration is supported for the given manifest version
if self._manifest_version <= migration_instance.migration_version:
migration_instance._process_manifest(self._migrated_manifest)
else:
LOGGER.info(
f"Manifest migration: `{migration_class.__name__}` is not supported for the given manifest version `{self._manifest_version}`.",
)
except Exception as e:
raise ManifestMigrationException(str(e)) from e

def _get_manifest_version(self) -> Version:
"""
Get the manifest version from the manifest.

:param manifest: The manifest to get the version from
:return: The manifest version
"""
return Version(str(self._migrated_manifest.get("version", "0.0.0")))
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from urllib.parse import urljoin

from airbyte_cdk.manifest_migrations.manifest_migration import (
TYPE_TAG,
ManifestMigration,
ManifestType,
)
from airbyte_cdk.sources.types import EmptyString


class V_6_45_2_HttpRequesterPathToUrl(ManifestMigration):
"""
This migration is responsible for migrating the `path` key to `url` in the HttpRequester component.
The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL.
The migration will concatenate the `url_base` and `path` to form a full URL.
"""

component_type = "HttpRequester"
original_key = "path"
replacement_key = "url"

def should_migrate(self, manifest: ManifestType) -> bool:
return manifest[TYPE_TAG] == self.component_type and self.original_key in list(
manifest.keys()
)

def migrate(self, manifest: ManifestType) -> None:
original_key_value = manifest.get(self.original_key, EmptyString).lstrip("/")
replacement_key_value = manifest[self.replacement_key]

# return a full-url if provided directly from interpolation context
if original_key_value == EmptyString or original_key_value is None:
manifest[self.replacement_key] = replacement_key_value
manifest.pop(self.original_key, None)
else:
# since we didn't provide a full-url, the url_base might not have a trailing slash
# so we join the url_base and path correctly
if not replacement_key_value.endswith("/"):
replacement_key_value += "/"

manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value)
manifest.pop(self.original_key, None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from airbyte_cdk.manifest_migrations.manifest_migration import (
TYPE_TAG,
ManifestMigration,
ManifestType,
)


class V_6_45_2_HttpRequesterUrlBaseToUrl(ManifestMigration):
"""
This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component.
The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL.
The migration will copy the value of `url_base` to `url`.
"""

component_type = "HttpRequester"
original_key = "url_base"
replacement_key = "url"

def should_migrate(self, manifest: ManifestType) -> bool:
return manifest[TYPE_TAG] == self.component_type and self.original_key in list(
manifest.keys()
)

def migrate(self, manifest: ManifestType) -> None:
manifest[self.replacement_key] = manifest[self.original_key]
manifest.pop(self.original_key, None)
Loading
Loading