Skip to content

Commit a73d162

Browse files
author
Oleksandr Bazarnov
committed
add stack of migrations
1 parent 6593490 commit a73d162

File tree

2 files changed

+69
-25
lines changed

2 files changed

+69
-25
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1863,13 +1863,15 @@ definitions:
18631863
description: The `url_base` has been deprecated, in favor of the `url` field.
18641864
original_key: url_base
18651865
replacement_key: url
1866-
- type: handle_url_parts
1867-
description: The `path` has been deprecated, in favor of the `url` field. The value from the `path` field will be joined to the `url` field.
1868-
original_key: path
1869-
replacement_key: url
1870-
- type: remove_field
1871-
description: The `path` has been deprecated, in favor of the `url` field.
1872-
original_key: path
1866+
- type: stack
1867+
migrations:
1868+
- type: handle_url_parts
1869+
description: The `path` has been deprecated, in favor of the `url` field. The value from the `path` field will be joined to the `url` field.
1870+
original_key: path
1871+
replacement_key: url
1872+
- type: remove_field
1873+
description: The `path` has been deprecated, in favor of the `url` field.
1874+
original_key: path
18731875
title: HTTP Requester
18741876
description: Requester submitting HTTP requests and extracting records from the response.
18751877
type: object

airbyte_cdk/sources/declarative/parsers/manifest_migration_handler.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,17 @@
1212
# Type definitions for better readability
1313
ManifestType = Dict[str, Any]
1414
DefinitionsType = Dict[str, Any]
15-
MigrationsType = List[Tuple[str, str, Optional[str]]]
1615
MigrationType = Tuple[str, str, Optional[str]]
17-
MigratedTagsType = Dict[str, List[Tuple[str, str, Optional[str]]]]
16+
MigrationsType = List[MigrationType]
17+
MigratedTagsType = Dict[str, MigrationsType]
1818
MigrationFunctionType = Callable[[Any, MigrationType], None]
1919

2020

2121
# Configuration constants
2222
TYPE_TAG = "type"
2323
DEF_TAG = "definitions"
2424
MIGRATIONS_TAG = "migrations"
25+
MIGRATIONS_STACK_TAG = "stack"
2526
ORIGINAL_KEY = "original_key"
2627
REPLACEMENT_KEY = "replacement_key"
2728

@@ -67,18 +68,28 @@ def __init__(
6768
) -> None:
6869
self._manifest = manifest
6970
self._declarative_schema = declarative_schema
70-
7171
self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest)
7272
# get the declared migrations from schema
7373
self._migration_tags = self._get_migration_schema_tags(self._declarative_schema)
7474

7575
def migrate(self) -> ManifestType:
76+
"""
77+
Migrates the manifest by applying configured migrations to different component types.
78+
79+
This method iterates through all registered component types and their associated
80+
migrations in `_migration_tags`, applying them sequentially by calling
81+
`_handle_migrations` for each component type.
82+
83+
Returns:
84+
ManifestType: The migrated manifest if migration succeeds, or the original
85+
manifest if a ManifestMigrationException occurs during migration.
86+
"""
7687
try:
7788
for component_type, migrations in self._migration_tags.items():
7889
self._handle_migrations(component_type, migrations)
7990
return self._migrated_manifest
80-
except ManifestMigrationException as e:
81-
# if any errors occurs we return the original resolved manifest
91+
except ManifestMigrationException:
92+
# if any errors occur we return the original resolved manifest
8293
return self._manifest
8394

8495
def _get_migration_schema_tags(self, schema: DefinitionsType) -> MigratedTagsType:
@@ -99,23 +110,37 @@ def _get_migration_schema_tags(self, schema: DefinitionsType) -> MigratedTagsTyp
99110

100111
for component_name, component_declaration in schema_definitions.items():
101112
if MIGRATIONS_TAG in component_declaration.keys():
102-
# create the placeholder for the migrations
103113
migrations_tags[component_name] = []
104-
# iterate over the migrations
114+
105115
for migration in component_declaration[MIGRATIONS_TAG]:
106-
migrations_tags[component_name].append(
107-
(
108-
# type of migration
109-
migration.get(TYPE_TAG),
110-
# what is the migrated key
111-
migration.get(ORIGINAL_KEY),
112-
# (optional) what is the new key to be used
113-
migration.get(REPLACEMENT_KEY),
114-
),
115-
)
116+
# register the stack of migrations
117+
if migration[TYPE_TAG] == MIGRATIONS_STACK_TAG:
118+
for migration in migration[MIGRATIONS_TAG]:
119+
self._register_migration(migrations_tags, component_name, migration)
120+
# register a single migration
121+
else:
122+
self._register_migration(migrations_tags, component_name, migration)
116123

117124
return migrations_tags
118125

126+
def _register_migration(
127+
self,
128+
migrations_tags: MigratedTagsType,
129+
component_name: str,
130+
migration: Dict[str, Any],
131+
) -> None:
132+
"""
133+
Registers the migration in the migrations_tags dictionary.
134+
"""
135+
136+
migrations_tags[component_name].append(
137+
(
138+
migration[TYPE_TAG], # type of migration
139+
migration[ORIGINAL_KEY], # what is the migrated key
140+
migration.get(REPLACEMENT_KEY), # (optional) what is the new key to be used
141+
),
142+
)
143+
119144
def _handle_migrations(
120145
self,
121146
component_type: str,
@@ -138,6 +163,21 @@ def _handle_migrations(
138163
raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e
139164

140165
def _process_migration(self, obj: Any, component_type: str, migration: MigrationType) -> None:
166+
"""
167+
Process a migration rule by recursively traversing through a nested data structure.
168+
169+
This method applies migrations to components of a specified type that contain the migrated key.
170+
It recursively processes dictionaries and lists, looking for components that match the criteria.
171+
Migration is skipped for component types listed in NON_MIGRATABLE_TYPES.
172+
173+
Args:
174+
obj: The object to process, which can be a dictionary, list, or other type.
175+
component_type: The type of component to apply the migration to.
176+
migration: A tuple containing migration type, migrated key, and additional migration info.
177+
178+
Returns:
179+
None
180+
"""
141181
migration_type, migrated_key, _ = migration
142182

143183
if isinstance(obj, dict):
@@ -146,9 +186,11 @@ def _process_migration(self, obj: Any, component_type: str, migration: Migration
146186
# check for component type match the designed migration
147187
if TYPE_TAG in obj_keys:
148188
obj_type = obj[TYPE_TAG]
149-
# do not migrate if the type is not in the list of migratable types
189+
190+
# do not migrate if the particular type is in the list of non-migratable types
150191
if obj_type in NON_MIGRATABLE_TYPES:
151192
return
193+
152194
if obj_type == component_type and migrated_key in obj_keys:
153195
if migration_type in self._migration_type_mapping.keys():
154196
# Call the appropriate function based on the migration type

0 commit comments

Comments
 (0)