Skip to content

Commit a0bc96e

Browse files
author
Oleksandr Bazarnov
committed
updated the migrations approach
1 parent 1a53161 commit a0bc96e

File tree

13 files changed

+847
-621
lines changed

13 files changed

+847
-621
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,20 +1858,6 @@ definitions:
18581858
type:
18591859
enum: [Bearer]
18601860
HttpRequester:
1861-
migrations:
1862-
- type: replace_field
1863-
description: The `url_base` has been deprecated, in favor of the `url` field.
1864-
original_key: url_base
1865-
replacement_key: url
1866-
- type: stack
1867-
migrations:
1868-
- type: handle_url_parts
1869-
description: The `path` has been deprecated, in favor of the `url` field. The value from the `path` field will be joined to the `url` field.
1870-
original_key: path
1871-
replacement_key: url
1872-
- type: remove_field
1873-
description: The `path` has been deprecated, in favor of the `url` field.
1874-
original_key: path
18751861
title: HTTP Requester
18761862
description: Requester submitting HTTP requests and extracting records from the response.
18771863
type: object

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
2727
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
2828
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
29+
from airbyte_cdk.sources.declarative.migrations.manifest.migration_handler import (
30+
ManifestMigrationHandler,
31+
)
2932
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3033
DeclarativeStream as DeclarativeStreamModel,
3134
)
@@ -39,9 +42,6 @@
3942
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
4043
ManifestComponentTransformer,
4144
)
42-
from airbyte_cdk.sources.declarative.parsers.manifest_migration_handler import (
43-
ManifestMigrationHandler,
44-
)
4545
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
4646
ManifestReferenceResolver,
4747
)
@@ -114,10 +114,9 @@ def __init__(
114114
"", resolved_source_config, {}
115115
)
116116

117-
# migrate definitions to the new format, if any are present
118117
migrated_source_config = ManifestMigrationHandler(
119-
propagated_source_config, self._declarative_component_schema
120-
).migrate()
118+
propagated_source_config
119+
).apply_migrations()
121120

122121
self._source_config = migrated_source_config
123122
self._debug = debug

airbyte_cdk/sources/declarative/migrations/manifest/__init__.py

Whitespace-only changes.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from abc import abstractmethod
4+
from typing import Any, Dict
5+
6+
ManifestType = Dict[str, Any]
7+
8+
9+
TYPE_TAG = "type"
10+
11+
NON_MIGRATABLE_TYPES = [
12+
"DynamicDeclarativeStream",
13+
]
14+
15+
16+
class ManifestMigration:
17+
@abstractmethod
18+
def should_migrate(self, manifest: ManifestType) -> bool:
19+
"""
20+
Check if the manifest should be migrated.
21+
22+
:param manifest: The manifest to potentially migrate
23+
:param kwargs: Additional arguments for migration
24+
25+
:return: true if the manifest is of the expected format and should be migrated. False otherwise.
26+
"""
27+
28+
@abstractmethod
29+
def migrate(self, manifest: ManifestType) -> None:
30+
"""
31+
Migrate the manifest. Assumes should_migrate(manifest) returned True.
32+
33+
:param manifest: The manifest to migrate
34+
:param kwargs: Additional arguments for migration
35+
"""
36+
37+
def _process_manifest(self, obj: Any) -> None:
38+
"""
39+
Recursively processes a manifest object, migrating components that match the migration criteria.
40+
41+
This method traverses the entire manifest structure (dictionaries and lists) and applies
42+
migrations to components that:
43+
1. Have a type tag
44+
2. Are not in the list of non-migratable types
45+
3. Meet the conditions defined in the should_migrate method
46+
47+
Parameters:
48+
obj (Any): The object to process, which can be a dictionary, list, or any other type.
49+
Dictionary objects are checked for component type tags and potentially migrated.
50+
List objects have each of their items processed recursively.
51+
Other types are ignored.
52+
53+
Returns:
54+
None, since we process the manifest in place.
55+
"""
56+
if isinstance(obj, dict):
57+
obj_keys = obj.keys()
58+
# check for component type match the designed migration
59+
if TYPE_TAG in obj_keys:
60+
obj_type = obj[TYPE_TAG]
61+
62+
# do not migrate if the particular type is in the list of non-migratable types
63+
if obj_type in NON_MIGRATABLE_TYPES:
64+
return
65+
66+
if self.should_migrate(obj):
67+
self.migrate(obj)
68+
69+
# Process all values in the dictionary
70+
for v in list(obj.values()):
71+
self._process_manifest(v)
72+
elif isinstance(obj, list):
73+
# Process all items in the list
74+
for item in obj:
75+
self._process_manifest(item)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import copy
6+
from typing import Type
7+
8+
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import (
9+
ManifestMigration,
10+
ManifestType,
11+
)
12+
from airbyte_cdk.sources.declarative.migrations.manifest.migrations_registry import (
13+
migrations_registry,
14+
)
15+
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import (
16+
ManifestMigrationException,
17+
)
18+
19+
20+
class ManifestMigrationHandler:
21+
"""
22+
This class is responsible for handling migrations in the manifest.
23+
"""
24+
25+
def __init__(self, manifest: ManifestType) -> None:
26+
self._manifest = manifest
27+
self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest)
28+
29+
def apply_migrations(self) -> ManifestType:
30+
"""
31+
Apply all registered migrations to the manifest.
32+
33+
This method iterates through all migrations in the migrations registry and applies
34+
them sequentially to the current manifest. If any migration fails with a
35+
ManifestMigrationException, the original unmodified manifest is returned instead.
36+
37+
Returns:
38+
ManifestType: The migrated manifest if all migrations succeeded, or the original
39+
manifest if any migration failed.
40+
"""
41+
try:
42+
for migration_class in migrations_registry:
43+
self._handle_migration(migration_class)
44+
return self._migrated_manifest
45+
except ManifestMigrationException:
46+
# if any errors occur we return the original resolved manifest
47+
return self._manifest
48+
49+
def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None:
50+
"""
51+
Handles a single manifest migration by instantiating the migration class and processing the manifest.
52+
53+
Args:
54+
migration_class (Type[ManifestMigration]): The migration class to apply to the manifest.
55+
56+
Raises:
57+
ManifestMigrationException: If the migration process encounters any errors.
58+
"""
59+
try:
60+
migration_class()._process_manifest(self._migrated_manifest)
61+
except Exception as e:
62+
raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from airbyte_cdk.sources.declarative.migrations.manifest.migrations.http_requester_path_to_url_migration import (
2+
HttpRequesterPathToUrlMigration,
3+
)
4+
from airbyte_cdk.sources.declarative.migrations.manifest.migrations.http_requester_url_base_to_url_migration import (
5+
HttpRequesterUrlBaseToUrlMigration,
6+
)
7+
8+
__all__ = [
9+
"HttpRequesterUrlBaseToUrlMigration",
10+
"HttpRequesterPathToUrlMigration",
11+
]
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from urllib.parse import urljoin
2+
3+
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import (
4+
TYPE_TAG,
5+
ManifestMigration,
6+
ManifestType,
7+
)
8+
from airbyte_cdk.sources.types import EmptyString
9+
10+
11+
class HttpRequesterPathToUrlMigration(ManifestMigration):
12+
component_type = "HttpRequester"
13+
original_key = "path"
14+
replacement_key = "url"
15+
16+
def should_migrate(self, manifest: ManifestType) -> bool:
17+
return manifest[TYPE_TAG] == self.component_type and self.original_key in list(
18+
manifest.keys()
19+
)
20+
21+
def migrate(self, manifest: ManifestType) -> None:
22+
original_key_value = manifest[self.original_key].lstrip("/")
23+
replacement_key_value = manifest[self.replacement_key]
24+
25+
# return a full-url if provided directly from interpolation context
26+
if original_key_value == EmptyString or original_key_value is None:
27+
manifest[self.replacement_key] = replacement_key_value
28+
manifest.pop(self.original_key, None)
29+
else:
30+
# since we didn't provide a full-url, the url_base might not have a trailing slash
31+
# so we join the url_base and path correctly
32+
if not replacement_key_value.endswith("/"):
33+
replacement_key_value += "/"
34+
35+
manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value)
36+
manifest.pop(self.original_key, None)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import (
2+
TYPE_TAG,
3+
ManifestMigration,
4+
ManifestType,
5+
)
6+
7+
8+
class HttpRequesterUrlBaseToUrlMigration(ManifestMigration):
9+
component_type = "HttpRequester"
10+
original_key = "url_base"
11+
replacement_key = "url"
12+
13+
def should_migrate(self, manifest: ManifestType) -> bool:
14+
return manifest[TYPE_TAG] == self.component_type and self.original_key in list(
15+
manifest.keys()
16+
)
17+
18+
def migrate(self, manifest: ManifestType) -> None:
19+
manifest[self.replacement_key] = manifest[self.original_key]
20+
manifest.pop(self.original_key, None)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from typing import List, Type
2+
3+
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import (
4+
ManifestMigration,
5+
)
6+
from airbyte_cdk.sources.declarative.migrations.manifest.migrations import (
7+
HttpRequesterPathToUrlMigration,
8+
HttpRequesterUrlBaseToUrlMigration,
9+
)
10+
11+
# This is the registry of all the migrations that are available.
12+
# Add new migrations to the bottom of the list,
13+
# ( ! ) make sure the order of the migrations is correct.
14+
migrations_registry: List[Type[ManifestMigration]] = [
15+
HttpRequesterUrlBaseToUrlMigration,
16+
HttpRequesterPathToUrlMigration,
17+
]

airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@
88

99
from pydantic.v1 import BaseModel
1010

11+
from airbyte_cdk.models import (
12+
AirbyteLogMessage,
13+
AirbyteMessage,
14+
Level,
15+
Type,
16+
)
17+
1118
# format the warning message
1219
warnings.formatwarning = (
1320
lambda message, category, *args, **kwargs: f"{category.__name__}: {message}"
@@ -36,6 +43,16 @@ def _deprecated_warning(self, field_name: str, message: str) -> None:
3643
DeprecationWarning,
3744
)
3845

46+
# print(
47+
# AirbyteMessage(
48+
# type=Type.LOG,
49+
# log=AirbyteLogMessage(
50+
# level=Level.WARN,
51+
# message=f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
52+
# ),
53+
# )
54+
# )
55+
3956
def __init__(self, **data: Any) -> None:
4057
"""
4158
Show warnings for deprecated fields during component initialization.

0 commit comments

Comments
 (0)