Skip to content

Commit b2693d2

Browse files
committed
merge from main
2 parents a0a2ea6 + a5273a5 commit b2693d2

31 files changed

+3108
-76
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits:
5656
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
5757

5858

59+
def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
60+
"""
61+
Determines whether the manifest should be migrated,
62+
based on the presence of the "__should_migrate" key in the config.
63+
64+
This flag is set by the UI.
65+
"""
66+
return config.get("__should_migrate", False)
67+
68+
5969
def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
6070
"""
6171
Check if the manifest should be normalized.
@@ -71,6 +81,7 @@ def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDecl
7181
config=config,
7282
emit_connector_builder_messages=True,
7383
source_config=manifest,
84+
migrate_manifest=should_migrate_manifest(config),
7485
normalize_manifest=should_normalize_manifest(config),
7586
component_factory=ModelToComponentFactory(
7687
emit_connector_builder_messages=True,

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
import logging
7+
from math import log
78
from typing import Any, ClassVar, Dict, Iterator, List, Mapping, Optional, Union
89

910
from airbyte_cdk.connector_builder.models import (
@@ -112,11 +113,16 @@ def run_test_read(
112113
record_limit = self._check_record_limit(record_limit)
113114
# The connector builder currently only supports reading from a single stream at a time
114115
stream = source.streams(config)[0]
116+
117+
# get any deprecation warnings during the component creation
118+
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
119+
115120
schema_inferrer = SchemaInferrer(
116121
self._pk_to_nested_and_composite_field(stream.primary_key),
117122
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
118123
)
119124
datetime_format_inferrer = DatetimeFormatInferrer()
125+
120126
message_group = get_message_groups(
121127
self._read_stream(source, config, configured_catalog, state),
122128
schema_inferrer,
@@ -127,6 +133,10 @@ def run_test_read(
127133
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
128134
message_group
129135
)
136+
137+
# extend log messages with deprecation warnings
138+
log_messages.extend(deprecation_warnings)
139+
130140
schema, log_messages = self._get_infered_schema(
131141
configured_catalog, schema_inferrer, log_messages
132142
)
@@ -269,6 +279,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
269279
auxiliary_requests = []
270280
latest_config_update: Optional[AirbyteControlMessage] = None
271281

282+
# process the message groups first
272283
for message_group in message_groups:
273284
match message_group:
274285
case AirbyteLogMessage():
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Manifest Migrations
2+
3+
This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves.
4+
5+
## Adding a New Migration
6+
7+
1. **Create a Migration File:**
8+
- Add a new Python file in the `migrations/` subdirectory.
9+
- Name the file using the pattern: `<description_of_the_migration>.py`.
10+
- Example: `http_requester_url_base_to_url.py`
11+
- The filename should be unique and descriptive.
12+
13+
2. **Define the Migration Class:**
14+
- The migration class must inherit from `ManifestMigration`.
15+
- Name the class using a descriptive name (e.g., `HttpRequesterUrlBaseToUrl`).
16+
- Implement the following methods:
17+
- `should_migrate(self, manifest: ManifestType) -> bool`
18+
- `migrate(self, manifest: ManifestType) -> None`
19+
- `validate(self, manifest: ManifestType) -> bool`
20+
21+
3. **Register the Migration:**
22+
- Open `migrations/registry.yaml`.
23+
- Add an entry under the appropriate version, or create a new version section if needed.
24+
- Each migration entry should include:
25+
- `name`: The filename (without `.py`)
26+
- `order`: The order in which this migration should be applied for the version
27+
- `description`: A short description of the migration
28+
29+
Example:
30+
```yaml
31+
manifest_migrations:
32+
- version: 6.45.2
33+
migrations:
34+
- name: http_requester_url_base_to_url
35+
order: 1
36+
description: |
37+
This migration updates the `url_base` field in the `HttpRequester` component spec to `url`.
38+
```
39+
40+
4. **Testing:**
41+
- Ensure your migration is covered by unit tests.
42+
- Tests should verify both `should_migrate`, `migrate`, and `validate` behaviors.
43+
44+
## Migration Discovery
45+
46+
- Migrations are discovered and registered automatically based on the entries in `migrations/registry.yaml`.
47+
- Do not modify the migration registry in code manually.
48+
- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`.
49+
50+
## Example Migration Skeleton
51+
52+
```python
53+
from airbyte_cdk.manifest_migrations.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType
54+
55+
class ExampleMigration(ManifestMigration):
56+
component_type = "ExampleComponent"
57+
original_key = "old_key"
58+
replacement_key = "new_key"
59+
60+
def should_migrate(self, manifest: ManifestType) -> bool:
61+
return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest
62+
63+
def migrate(self, manifest: ManifestType) -> None:
64+
manifest[self.replacement_key] = manifest[self.original_key]
65+
manifest.pop(self.original_key, None)
66+
67+
def validate(self, manifest: ManifestType) -> bool:
68+
return self.replacement_key in manifest and self.original_key not in manifest
69+
```
70+
71+
---
72+
73+
For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
class ManifestMigrationException(Exception):
7+
"""
8+
Raised when a migration error occurs in the manifest.
9+
"""
10+
11+
def __init__(self, message: str) -> None:
12+
super().__init__(f"Failed to migrate the manifest: {message}")
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
from abc import ABC, abstractmethod
7+
from dataclasses import asdict, dataclass
8+
from typing import Any, Dict
9+
10+
ManifestType = Dict[str, Any]
11+
12+
13+
TYPE_TAG = "type"
14+
15+
NON_MIGRATABLE_TYPES = [
16+
# more info here: https://github.com/airbytehq/airbyte-internal-issues/issues/12423
17+
"DynamicDeclarativeStream",
18+
]
19+
20+
21+
@dataclass
22+
class MigrationTrace:
23+
"""
24+
This class represents a migration that has been applied to the manifest.
25+
It contains information about the migration, including the version it was applied from,
26+
the version it was applied to, and the time it was applied.
27+
"""
28+
29+
from_version: str
30+
to_version: str
31+
migration: str
32+
migrated_at: str
33+
34+
def as_dict(self) -> Dict[str, Any]:
35+
return asdict(self)
36+
37+
38+
class ManifestMigration(ABC):
39+
"""
40+
Base class for manifest migrations.
41+
This class provides a framework for migrating manifest components.
42+
It defines the structure for migration classes, including methods for checking if a migration is needed,
43+
performing the migration, and validating the migration.
44+
"""
45+
46+
def __init__(self) -> None:
47+
self.is_migrated: bool = False
48+
49+
@abstractmethod
50+
def should_migrate(self, manifest: ManifestType) -> bool:
51+
"""
52+
Check if the manifest should be migrated.
53+
54+
:param manifest: The manifest to potentially migrate
55+
56+
:return: true if the manifest is of the expected format and should be migrated. False otherwise.
57+
"""
58+
59+
@abstractmethod
60+
def migrate(self, manifest: ManifestType) -> None:
61+
"""
62+
Migrate the manifest. Assumes should_migrate(manifest) returned True.
63+
64+
:param manifest: The manifest to migrate
65+
"""
66+
67+
@abstractmethod
68+
def validate(self, manifest: ManifestType) -> bool:
69+
"""
70+
Validate the manifest to ensure the migration was successfully applied.
71+
72+
:param manifest: The manifest to validate
73+
"""
74+
75+
def _is_component(self, obj: Dict[str, Any]) -> bool:
76+
"""
77+
Check if the object is a component.
78+
79+
:param obj: The object to check
80+
:return: True if the object is a component, False otherwise
81+
"""
82+
return TYPE_TAG in obj.keys()
83+
84+
def _is_migratable_type(self, obj: Dict[str, Any]) -> bool:
85+
"""
86+
Check if the object is a migratable component,
87+
based on the Type of the component and the migration version.
88+
89+
:param obj: The object to check
90+
:return: True if the object is a migratable component, False otherwise
91+
"""
92+
return obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES
93+
94+
def _process_manifest(self, obj: Any) -> None:
95+
"""
96+
Recursively processes a manifest object, migrating components that match the migration criteria.
97+
98+
This method traverses the entire manifest structure (dictionaries and lists) and applies
99+
migrations to components that:
100+
1. Have a type tag
101+
2. Are not in the list of non-migratable types
102+
3. Meet the conditions defined in the should_migrate method
103+
104+
Parameters:
105+
obj (Any): The object to process, which can be a dictionary, list, or any other type.
106+
Dictionary objects are checked for component type tags and potentially migrated.
107+
List objects have each of their items processed recursively.
108+
Other types are ignored.
109+
110+
Returns:
111+
None, since we process the manifest in place.
112+
"""
113+
if isinstance(obj, dict):
114+
# Check if the object is a component
115+
if self._is_component(obj):
116+
# Check if the object is allowed to be migrated
117+
if not self._is_migratable_type(obj):
118+
return
119+
120+
# Check if the object should be migrated
121+
if self.should_migrate(obj):
122+
# Perform the migration, if needed
123+
self.migrate(obj)
124+
# validate the migration
125+
self.is_migrated = self.validate(obj)
126+
127+
# Process all values in the dictionary
128+
for value in list(obj.values()):
129+
self._process_manifest(value)
130+
131+
elif isinstance(obj, list):
132+
# Process all items in the list
133+
for item in obj:
134+
self._process_manifest(item)

0 commit comments

Comments
 (0)