Skip to content

Commit 57e7bff

Browse files
author
Oleksandr Bazarnov
committed
add
1 parent bf998bd commit 57e7bff

File tree

13 files changed

+1061
-6
lines changed

13 files changed

+1061
-6
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@
5555
SliceLogger,
5656
)
5757
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
58+
from manifest_migrations.migration_handler import (
59+
ManifestMigrationHandler,
60+
)
5861

5962

6063
class ManifestDeclarativeSource(DeclarativeSource):
@@ -68,16 +71,19 @@ def __init__(
6871
debug: bool = False,
6972
emit_connector_builder_messages: bool = False,
7073
component_factory: Optional[ModelToComponentFactory] = None,
71-
):
74+
migrate_manifest: Optional[bool] = False,
75+
) -> None:
7276
"""
7377
Args:
7478
config: The provided config dict.
7579
source_config: The manifest of low-code components that describe the source connector.
76-
debug: True if debug mode is enabled.
77-
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
78-
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
80+
debug: bool True if debug mode is enabled.
81+
emit_connector_builder_messages: Optional[bool] True if messages should be emitted to the connector builder.
82+
component_factory: Optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
83+
migrate_manifest: Optional[bool] if the manifest should be migrated to pick up the latest declarative component schema changes at runtime.
7984
"""
8085
self.logger = logging.getLogger(f"airbyte.{self.name}")
86+
8187
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
8288
manifest = dict(source_config)
8389
if "type" not in manifest:
@@ -90,6 +96,12 @@ def __init__(
9096
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
9197
"", resolved_source_config, {}
9298
)
99+
100+
if migrate_manifest:
101+
propagated_source_config = ManifestMigrationHandler(
102+
propagated_source_config
103+
).apply_migrations()
104+
93105
self._source_config = propagated_source_config
94106
self._debug = debug
95107
self._emit_connector_builder_messages = emit_connector_builder_messages

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import copy
66
import typing
7-
from typing import Any, Mapping, Optional
7+
from typing import Any, Dict, Mapping, Optional
88

99
PARAMETERS_STR = "$parameters"
1010

@@ -95,7 +95,7 @@ def propagate_types_and_parameters(
9595
declarative_component: Mapping[str, Any],
9696
parent_parameters: Mapping[str, Any],
9797
use_parent_parameters: Optional[bool] = None,
98-
) -> Mapping[str, Any]:
98+
) -> Dict[str, Any]:
9999
"""
100100
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
101101
default component type if it was not already present. The resulting transformed components are a deep copy of the input

manifest_migrations/README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Manifest Migrations
2+
3+
This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves.
4+
5+
## Adding a New Migration
6+
7+
1. **Create a Migration File:**
8+
- Add a new Python file in the `migrations/` subdirectory.
9+
- Name the file using the pattern: `<description>_v<major>_<minor>_<patch>__<order>.py`.
10+
- Example: `http_requester_url_base_to_url_v6_45_2__0.py`
11+
- The `<order>` integer is used to determine the order of migrations for the same version.
12+
13+
2. **Define the Migration Class:**
14+
- The migration class must inherit from `ManifestMigration`.
15+
- Name the class using the pattern: `V_<major>_<minor>_<patch>_ManifestMigration_<Description>`.
16+
- Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl`
17+
- Implement the following methods:
18+
- `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest.
19+
- `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place.
20+
21+
3. **Migration Versioning:**
22+
- The migration version is extracted from the class name and used to determine applicability.
23+
- Only manifests with a version less than or equal to the migration version will be migrated.
24+
25+
4. **Component Type:**
26+
- Use the `TYPE_TAG` constant to check the component type in your migration logic.
27+
28+
5. **Examples:**
29+
- See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations.
30+
31+
## Migration Registry
32+
33+
- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`.
34+
- Migrations are applied in order, determined by the `<order>` suffix in the filename.
35+
36+
## Testing
37+
38+
- Ensure your migration is covered by unit tests.
39+
- Tests should verify both `should_migrate` and `migrate` behaviors.
40+
41+
## Example Migration Skeleton
42+
43+
```python
44+
from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType
45+
46+
class V_1_2_3_ManifestMigration_Example(ManifestMigration):
47+
component_type = "ExampleComponent"
48+
original_key = "old_key"
49+
replacement_key = "new_key"
50+
51+
def should_migrate(self, manifest: ManifestType) -> bool:
52+
return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest
53+
54+
def migrate(self, manifest: ManifestType) -> None:
55+
manifest[self.replacement_key] = manifest[self.original_key]
56+
manifest.pop(self.original_key, None)
57+
```
58+
59+
## Additional Notes
60+
61+
- Do not modify the migration registry manually; it will pick up all valid migration classes automatically.
62+
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.
63+
64+
---
65+
66+
For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder.

manifest_migrations/__init__.py

Whitespace-only changes.

manifest_migrations/exceptions.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
class ManifestMigrationException(Exception):
7+
"""
8+
Raised when a migration error occurs in the manifest.
9+
"""
10+
11+
def __init__(self, message: str) -> None:
12+
super().__init__(f"Failed to migrate the manifest: {message}")
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import re
4+
from abc import abstractmethod
5+
from typing import Any, Dict
6+
7+
ManifestType = Dict[str, Any]
8+
9+
10+
TYPE_TAG = "type"
11+
12+
NON_MIGRATABLE_TYPES = [
13+
"DynamicDeclarativeStream",
14+
]
15+
16+
17+
class ManifestMigration:
18+
@abstractmethod
19+
def should_migrate(self, manifest: ManifestType) -> bool:
20+
"""
21+
Check if the manifest should be migrated.
22+
23+
:param manifest: The manifest to potentially migrate
24+
:param kwargs: Additional arguments for migration
25+
26+
:return: true if the manifest is of the expected format and should be migrated. False otherwise.
27+
"""
28+
29+
@abstractmethod
30+
def migrate(self, manifest: ManifestType) -> None:
31+
"""
32+
Migrate the manifest. Assumes should_migrate(manifest) returned True.
33+
34+
:param manifest: The manifest to migrate
35+
:param kwargs: Additional arguments for migration
36+
"""
37+
38+
@property
39+
def migration_version(self) -> str:
40+
"""
41+
Get the migration version.
42+
43+
:return: The migration version as a string
44+
"""
45+
return self._get_migration_version()
46+
47+
def _is_component(self, obj: Dict[str, Any]) -> bool:
48+
"""
49+
Check if the object is a component.
50+
51+
:param obj: The object to check
52+
:return: True if the object is a component, False otherwise
53+
"""
54+
return TYPE_TAG in obj.keys()
55+
56+
def _is_migratable(self, obj: Dict[str, Any]) -> bool:
57+
"""
58+
Check if the object is a migratable component,
59+
based on the Type of the component and the migration version.
60+
61+
:param obj: The object to check
62+
:return: True if the object is a migratable component, False otherwise
63+
"""
64+
return (
65+
obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES
66+
and self._get_manifest_version(obj) <= self.migration_version
67+
)
68+
69+
def _process_manifest(self, obj: Any) -> None:
70+
"""
71+
Recursively processes a manifest object, migrating components that match the migration criteria.
72+
73+
This method traverses the entire manifest structure (dictionaries and lists) and applies
74+
migrations to components that:
75+
1. Have a type tag
76+
2. Are not in the list of non-migratable types
77+
3. Meet the conditions defined in the should_migrate method
78+
79+
Parameters:
80+
obj (Any): The object to process, which can be a dictionary, list, or any other type.
81+
Dictionary objects are checked for component type tags and potentially migrated.
82+
List objects have each of their items processed recursively.
83+
Other types are ignored.
84+
85+
Returns:
86+
None, since we process the manifest in place.
87+
"""
88+
if isinstance(obj, dict):
89+
# Check if the object is a component
90+
if self._is_component(obj):
91+
# Check if the object is allowed to be migrated
92+
if not self._is_migratable(obj):
93+
return
94+
95+
# Check if the object should be migrated
96+
if self.should_migrate(obj):
97+
# Perform the migration, if needed
98+
self.migrate(obj)
99+
100+
# Process all values in the dictionary
101+
for value in list(obj.values()):
102+
self._process_manifest(value)
103+
104+
elif isinstance(obj, list):
105+
# Process all items in the list
106+
for item in obj:
107+
self._process_manifest(item)
108+
109+
def _get_manifest_version(self, manifest: ManifestType) -> str:
110+
"""
111+
Get the manifest version from the manifest.
112+
113+
:param manifest: The manifest to get the version from
114+
:return: The manifest version
115+
"""
116+
return str(manifest.get("version", "0.0.0"))
117+
118+
def _get_migration_version(self) -> str:
119+
"""
120+
Get the migration version from the class name.
121+
The migration version is extracted from the class name using a regular expression.
122+
The expected format is "V_<major>_<minor>_<patch>_<migration_name>".
123+
124+
For example, "V_6_45_2_ManifestMigration_HttpRequesterPathToUrl" -> "6.45.2"
125+
126+
:return: The migration version as a string in the format "major.minor.patch"
127+
:raises ValueError: If the class name does not match the expected format
128+
"""
129+
130+
class_name = self.__class__.__name__
131+
migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name)
132+
if migration_version:
133+
return migration_version.group(1).replace("_", ".")
134+
else:
135+
raise ValueError(
136+
f"Invalid migration class name, make sure the class name has the version (e.g `V_0_0_0_`): {class_name}"
137+
)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import copy
6+
from typing import Type
7+
8+
from manifest_migrations.exceptions import (
9+
ManifestMigrationException,
10+
)
11+
from manifest_migrations.manifest_migration import (
12+
ManifestMigration,
13+
ManifestType,
14+
)
15+
from manifest_migrations.migrations_registry import (
16+
MIGRATIONS,
17+
)
18+
19+
20+
class ManifestMigrationHandler:
21+
"""
22+
This class is responsible for handling migrations in the manifest.
23+
"""
24+
25+
def __init__(self, manifest: ManifestType) -> None:
26+
self._manifest = manifest
27+
self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest)
28+
29+
def apply_migrations(self) -> ManifestType:
30+
"""
31+
Apply all registered migrations to the manifest.
32+
33+
This method iterates through all migrations in the migrations registry and applies
34+
them sequentially to the current manifest. If any migration fails with a
35+
ManifestMigrationException, the original unmodified manifest is returned instead.
36+
37+
Returns:
38+
ManifestType: The migrated manifest if all migrations succeeded, or the original
39+
manifest if any migration failed.
40+
"""
41+
try:
42+
for migration_cls in MIGRATIONS:
43+
self._handle_migration(migration_cls)
44+
return self._migrated_manifest
45+
except ManifestMigrationException:
46+
# if any errors occur we return the original resolved manifest
47+
return self._manifest
48+
49+
def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None:
50+
"""
51+
Handles a single manifest migration by instantiating the migration class and processing the manifest.
52+
53+
Args:
54+
migration_class (Type[ManifestMigration]): The migration class to apply to the manifest.
55+
56+
Raises:
57+
ManifestMigrationException: If the migration process encounters any errors.
58+
"""
59+
try:
60+
migration_class()._process_manifest(self._migrated_manifest)
61+
except Exception as e:
62+
raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e

manifest_migrations/migrations/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from urllib.parse import urljoin
2+
3+
from airbyte_cdk.sources.types import EmptyString
4+
from manifest_migrations.manifest_migration import (
5+
TYPE_TAG,
6+
ManifestMigration,
7+
ManifestType,
8+
)
9+
10+
11+
class V_6_45_2_ManifestMigration_HttpRequesterPathToUrl(ManifestMigration):
12+
"""
13+
This migration is responsible for migrating the `path` key to `url` in the HttpRequester component.
14+
The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL.
15+
The migration will concatenate the `url_base` and `path` to form a full URL.
16+
"""
17+
18+
component_type = "HttpRequester"
19+
original_key = "path"
20+
replacement_key = "url"
21+
22+
def should_migrate(self, manifest: ManifestType) -> bool:
23+
return manifest[TYPE_TAG] == self.component_type and self.original_key in list(
24+
manifest.keys()
25+
)
26+
27+
def migrate(self, manifest: ManifestType) -> None:
28+
original_key_value = manifest[self.original_key].lstrip("/")
29+
replacement_key_value = manifest[self.replacement_key]
30+
31+
# return a full-url if provided directly from interpolation context
32+
if original_key_value == EmptyString or original_key_value is None:
33+
manifest[self.replacement_key] = replacement_key_value
34+
manifest.pop(self.original_key, None)
35+
else:
36+
# since we didn't provide a full-url, the url_base might not have a trailing slash
37+
# so we join the url_base and path correctly
38+
if not replacement_key_value.endswith("/"):
39+
replacement_key_value += "/"
40+
41+
manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value)
42+
manifest.pop(self.original_key, None)

0 commit comments

Comments
 (0)