diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 6b6b31111..28e0fa578 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -56,12 +56,23 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits: return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) +def should_migrate_manifest(config: Mapping[str, Any]) -> bool: + """ + Check if the manifest requires migration. + + :param config: The config to check + :return: True if the manifest requires migration, False otherwise + """ + return config.get("__should_migrate", False) + + def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( config=config, emit_connector_builder_messages=True, source_config=manifest, + migrate_manifest=should_migrate_manifest(config), component_factory=ModelToComponentFactory( emit_connector_builder_messages=True, limit_pages_fetched_per_slice=limits.max_pages_per_slice, diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index a125047b4..d8b94cd34 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -112,11 +112,16 @@ def run_test_read( record_limit = self._check_record_limit(record_limit) # The connector builder currently only supports reading from a single stream at a time stream = source.streams(config)[0] + + # get any deprecation warnings during the component creation + deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings() + schema_inferrer = SchemaInferrer( self._pk_to_nested_and_composite_field(stream.primary_key), self._cursor_field_to_nested_and_composite_field(stream.cursor_field), ) datetime_format_inferrer = DatetimeFormatInferrer() + message_group = get_message_groups( self._read_stream(source, config, configured_catalog, state), schema_inferrer, @@ -125,7 +130,7 @@ def run_test_read( ) slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( - message_group + message_group, deprecation_warnings ) schema, log_messages = self._get_infered_schema( configured_catalog, schema_inferrer, log_messages @@ -238,7 +243,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int: return record_limit - def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES: + def _categorise_groups( + self, + message_groups: MESSAGE_GROUPS, + deprecation_warnings: Optional[List[Any]] = None, + ) -> GROUPED_MESSAGES: """ Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update. @@ -269,6 +278,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES auxiliary_requests = [] latest_config_update: Optional[AirbyteControlMessage] = None + # process the message groups first for message_group in message_groups: match message_group: case AirbyteLogMessage(): @@ -298,6 +308,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES case _: raise ValueError(f"Unknown message group type: {type(message_group)}") + # process deprecation warnings, if present + if deprecation_warnings is not None: + for deprecation in deprecation_warnings: + match deprecation: + case AirbyteLogMessage(): + log_messages.append( + LogMessage(message=deprecation.message, level=deprecation.level.value) + ) + case _: + raise ValueError(f"Unknown message group type: {type(deprecation)}") + return slices, log_messages, auxiliary_requests, latest_config_update def _get_infered_schema( diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2d36004a3..9d7f775e7 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1863,14 +1863,16 @@ definitions: type: object required: - type - - url_base properties: type: type: string enum: [HttpRequester] url_base: + deprecated: true + deprecation_message: "Use `url` field instead." + sharable: true title: API Base URL - description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. type: string interpolation_context: - config @@ -1886,9 +1888,30 @@ definitions: - "{{ config['base_url'] or 'https://app.posthog.com'}}/api" - "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups" - "https://example.com/api/v1/resource/{{ next_page_token['id'] }}" + url: + sharable: true + title: API URL + description: The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + type: string + interpolation_context: + - config + - next_page_token + - stream_interval + - stream_partition + - stream_slice + - creation_response + - polling_response + - download_target + examples: + - "https://connect.squareup.com/v2" + - "{{ config['url'] or 'https://app.posthog.com'}}/api" + - "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups" + - "https://example.com/api/v1/resource/{{ next_page_token['id'] }}" path: + deprecated: true + deprecation_message: "Use `url` field instead." title: URL Path - description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. + description: Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this. type: string interpolation_context: - config diff --git a/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte_cdk/sources/declarative/declarative_source.py index 77bf427a1..55f425e50 100644 --- a/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte_cdk/sources/declarative/declarative_source.py @@ -4,8 +4,11 @@ import logging from abc import abstractmethod -from typing import Any, Mapping, Tuple +from typing import Any, List, Mapping, Tuple +from airbyte_cdk.models import ( + AirbyteLogMessage, +) from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -34,3 +37,9 @@ def check_connection( The error object will be cast to string to display the problem to the user. """ return self.connection_checker.check_connection(self, logger, config) + + def deprecation_warnings(self) -> List[AirbyteLogMessage]: + """ + Returns a list of deprecation warnings for the source. + """ + return [] diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index cfd258c6c..d03066220 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -17,6 +17,7 @@ from airbyte_cdk.models import ( AirbyteConnectionStatus, + AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, @@ -26,6 +27,9 @@ from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.declarative.migrations.manifest.migration_handler import ( + ManifestMigrationHandler, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DeclarativeStream as DeclarativeStreamModel, ) @@ -68,16 +72,19 @@ def __init__( debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, - ): + migrate_manifest: Optional[bool] = False, + ) -> None: """ Args: config: The provided config dict. source_config: The manifest of low-code components that describe the source connector. - debug: True if debug mode is enabled. - emit_connector_builder_messages: True if messages should be emitted to the connector builder. - component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. + debug: bool True if debug mode is enabled. + emit_connector_builder_messages: Optional[bool] True if messages should be emitted to the connector builder. + component_factory: Optional factory if ModelToComponentFactory's default behavior needs to be tweaked. + migrate_manifest: Optional[bool] if the manifest should be migrated to pick up the latest declarative component schema changes at runtime. """ self.logger = logging.getLogger(f"airbyte.{self.name}") + # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing manifest = dict(source_config) if "type" not in manifest: @@ -90,6 +97,12 @@ def __init__( propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( "", resolved_source_config, {} ) + + if migrate_manifest: + propagated_source_config = ManifestMigrationHandler( + propagated_source_config + ).apply_migrations() + self._source_config = propagated_source_config self._debug = debug self._emit_connector_builder_messages = emit_connector_builder_messages @@ -123,6 +136,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]: manifest=self._source_config, config=self._config, with_dynamic_stream_name=True ) + def deprecation_warnings(self) -> List[AirbyteLogMessage]: + return self._constructor.get_model_deprecations() or [] + @property def connection_checker(self) -> ConnectionChecker: check = self._source_config["check"] diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/README.md b/airbyte_cdk/sources/declarative/migrations/manifest/README.md new file mode 100644 index 000000000..79ddd4b19 --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/README.md @@ -0,0 +1,66 @@ +# Manifest Migrations + +This directory contains the logic and registry for manifest migrations in the Airbyte CDK. Migrations are used to update or transform manifest components to newer formats or schemas as the CDK evolves. + +## Adding a New Migration + +1. **Create a Migration File:** + - Add a new Python file in the `migrations/` subdirectory. + - Name the file using the pattern: `_v____.py`. + - Example: `http_requester_url_base_to_url_v6_45_2__0.py` + - The `` integer is used to determine the order of migrations for the same version. + +2. **Define the Migration Class:** + - The migration class must inherit from `ManifestMigration`. + - Name the class using the pattern: `V____ManifestMigration_`. + - Example: `V_6_45_2_ManifestMigration_HttpRequesterUrlBaseToUrl` + - Implement the following methods: + - `should_migrate(self, manifest: ManifestType) -> bool`: Return `True` if the migration should be applied to the given manifest. + - `migrate(self, manifest: ManifestType) -> None`: Perform the migration in-place. + +3. **Migration Versioning:** + - The migration version is extracted from the class name and used to determine applicability. + - Only manifests with a version less than or equal to the migration version will be migrated. + +4. **Component Type:** + - Use the `TYPE_TAG` constant to check the component type in your migration logic. + +5. **Examples:** + - See `migrations/http_requester_url_base_to_url_v6_45_2__0.py` and `migrations/http_requester_path_to_url_v6_45_2__1.py` for reference implementations. + +## Migration Registry + +- All migration classes in the `migrations/` folder are automatically discovered and registered in `migrations_registry.py`. +- Migrations are applied in order, determined by the `` suffix in the filename. + +## Testing + +- Ensure your migration is covered by unit tests. +- Tests should verify both `should_migrate` and `migrate` behaviors. + +## Example Migration Skeleton + +```python +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import TYPE_TAG, ManifestMigration, ManifestType + +class V_1_2_3_ManifestMigration_Example(ManifestMigration): + component_type = "ExampleComponent" + original_key = "old_key" + replacement_key = "new_key" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in manifest + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) +``` + +## Additional Notes + +- Do not modify the migration registry manually; it will pick up all valid migration classes automatically. +- If you need to skip certain component types, use the `NON_MIGRATABLE_TYPES` list in `manifest_migration.py`. + +--- + +For more details, see the docstrings in `manifest_migration.py` and the examples in the `migrations/` folder. diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/__init__.py b/airbyte_cdk/sources/declarative/migrations/manifest/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/exceptions.py b/airbyte_cdk/sources/declarative/migrations/manifest/exceptions.py new file mode 100644 index 000000000..7a140706f --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/exceptions.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +class ManifestMigrationException(Exception): + """ + Raised when a migration error occurs in the manifest. + """ + + def __init__(self, message: str) -> None: + super().__init__(f"Failed to migrate the manifest: {message}") diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/manifest_migration.py b/airbyte_cdk/sources/declarative/migrations/manifest/manifest_migration.py new file mode 100644 index 000000000..6e4b3bb2f --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/manifest_migration.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import re +from abc import abstractmethod +from typing import Any, Dict + +ManifestType = Dict[str, Any] + + +TYPE_TAG = "type" + +NON_MIGRATABLE_TYPES = [ + "DynamicDeclarativeStream", +] + + +class ManifestMigration: + @abstractmethod + def should_migrate(self, manifest: ManifestType) -> bool: + """ + Check if the manifest should be migrated. + + :param manifest: The manifest to potentially migrate + :param kwargs: Additional arguments for migration + + :return: true if the manifest is of the expected format and should be migrated. False otherwise. + """ + + @abstractmethod + def migrate(self, manifest: ManifestType) -> None: + """ + Migrate the manifest. Assumes should_migrate(manifest) returned True. + + :param manifest: The manifest to migrate + :param kwargs: Additional arguments for migration + """ + + @property + def migration_version(self) -> str: + """ + Get the migration version. + + :return: The migration version as a string + """ + return self._get_migration_version() + + def _is_component(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a component. + + :param obj: The object to check + :return: True if the object is a component, False otherwise + """ + return TYPE_TAG in obj.keys() + + def _is_migratable(self, obj: Dict[str, Any]) -> bool: + """ + Check if the object is a migratable component, + based on the Type of the component and the migration version. + + :param obj: The object to check + :return: True if the object is a migratable component, False otherwise + """ + return ( + obj[TYPE_TAG] not in NON_MIGRATABLE_TYPES + and self._get_manifest_version(obj) <= self.migration_version + ) + + def _process_manifest(self, obj: Any) -> None: + """ + Recursively processes a manifest object, migrating components that match the migration criteria. + + This method traverses the entire manifest structure (dictionaries and lists) and applies + migrations to components that: + 1. Have a type tag + 2. Are not in the list of non-migratable types + 3. Meet the conditions defined in the should_migrate method + + Parameters: + obj (Any): The object to process, which can be a dictionary, list, or any other type. + Dictionary objects are checked for component type tags and potentially migrated. + List objects have each of their items processed recursively. + Other types are ignored. + + Returns: + None, since we process the manifest in place. + """ + if isinstance(obj, dict): + # Check if the object is a component + if self._is_component(obj): + # Check if the object is allowed to be migrated + if not self._is_migratable(obj): + return + + # Check if the object should be migrated + if self.should_migrate(obj): + # Perform the migration, if needed + self.migrate(obj) + + # Process all values in the dictionary + for value in list(obj.values()): + self._process_manifest(value) + + elif isinstance(obj, list): + # Process all items in the list + for item in obj: + self._process_manifest(item) + + def _get_manifest_version(self, manifest: ManifestType) -> str: + """ + Get the manifest version from the manifest. + + :param manifest: The manifest to get the version from + :return: The manifest version + """ + return str(manifest.get("version", "0.0.0")) + + def _get_migration_version(self) -> str: + """ + Get the migration version from the class name. + The migration version is extracted from the class name using a regular expression. + The expected format is "V____". + + For example, "V_6_45_2_ManifestMigration_HttpRequesterPathToUrl" -> "6.45.2" + + :return: The migration version as a string in the format "major.minor.patch" + :raises ValueError: If the class name does not match the expected format + """ + + class_name = self.__class__.__name__ + migration_version = re.search(r"V_(\d+_\d+_\d+)", class_name) + if migration_version: + return migration_version.group(1).replace("_", ".") + else: + raise ValueError( + f"Invalid migration class name, make sure the class name has the version (e.g `V_0_0_0_`): {class_name}" + ) diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/migration_handler.py b/airbyte_cdk/sources/declarative/migrations/manifest/migration_handler.py new file mode 100644 index 000000000..501672183 --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/migration_handler.py @@ -0,0 +1,62 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import copy +from typing import Type + +from airbyte_cdk.sources.declarative.migrations.manifest.exceptions import ( + ManifestMigrationException, +) +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import ( + ManifestMigration, + ManifestType, +) +from airbyte_cdk.sources.declarative.migrations.manifest.migrations_registry import ( + MIGRATIONS, +) + + +class ManifestMigrationHandler: + """ + This class is responsible for handling migrations in the manifest. + """ + + def __init__(self, manifest: ManifestType) -> None: + self._manifest = manifest + self._migrated_manifest: ManifestType = copy.deepcopy(self._manifest) + + def apply_migrations(self) -> ManifestType: + """ + Apply all registered migrations to the manifest. + + This method iterates through all migrations in the migrations registry and applies + them sequentially to the current manifest. If any migration fails with a + ManifestMigrationException, the original unmodified manifest is returned instead. + + Returns: + ManifestType: The migrated manifest if all migrations succeeded, or the original + manifest if any migration failed. + """ + try: + for migration_cls in MIGRATIONS: + self._handle_migration(migration_cls) + return self._migrated_manifest + except ManifestMigrationException: + # if any errors occur we return the original resolved manifest + return self._manifest + + def _handle_migration(self, migration_class: Type[ManifestMigration]) -> None: + """ + Handles a single manifest migration by instantiating the migration class and processing the manifest. + + Args: + migration_class (Type[ManifestMigration]): The migration class to apply to the manifest. + + Raises: + ManifestMigrationException: If the migration process encounters any errors. + """ + try: + migration_class()._process_manifest(self._migrated_manifest) + except Exception as e: + raise ManifestMigrationException(f"Failed to migrate the manifest: {e}") from e diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/migrations/__init__.py b/airbyte_cdk/sources/declarative/migrations/manifest/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_path_to_url_v6_45_2__1.py b/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_path_to_url_v6_45_2__1.py new file mode 100644 index 000000000..4ad67cfa4 --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_path_to_url_v6_45_2__1.py @@ -0,0 +1,42 @@ +from urllib.parse import urljoin + +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) +from airbyte_cdk.sources.types import EmptyString + + +class V_6_45_2_HttpRequesterPathToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `path` key to `url` in the HttpRequester component. + The `path` key is expected to be a relative path, and the `url` key is expected to be a full URL. + The migration will concatenate the `url_base` and `path` to form a full URL. + """ + + component_type = "HttpRequester" + original_key = "path" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + original_key_value = manifest[self.original_key].lstrip("/") + replacement_key_value = manifest[self.replacement_key] + + # return a full-url if provided directly from interpolation context + if original_key_value == EmptyString or original_key_value is None: + manifest[self.replacement_key] = replacement_key_value + manifest.pop(self.original_key, None) + else: + # since we didn't provide a full-url, the url_base might not have a trailing slash + # so we join the url_base and path correctly + if not replacement_key_value.endswith("/"): + replacement_key_value += "/" + + manifest[self.replacement_key] = urljoin(replacement_key_value, original_key_value) + manifest.pop(self.original_key, None) diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_url_base_to_url_v6_45_2__0.py b/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_url_base_to_url_v6_45_2__0.py new file mode 100644 index 000000000..c41b44371 --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/migrations/http_requester_url_base_to_url_v6_45_2__0.py @@ -0,0 +1,26 @@ +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import ( + TYPE_TAG, + ManifestMigration, + ManifestType, +) + + +class V_6_45_2_HttpRequesterUrlBaseToUrl(ManifestMigration): + """ + This migration is responsible for migrating the `url_base` key to `url` in the HttpRequester component. + The `url_base` key is expected to be a base URL, and the `url` key is expected to be a full URL. + The migration will copy the value of `url_base` to `url`. + """ + + component_type = "HttpRequester" + original_key = "url_base" + replacement_key = "url" + + def should_migrate(self, manifest: ManifestType) -> bool: + return manifest[TYPE_TAG] == self.component_type and self.original_key in list( + manifest.keys() + ) + + def migrate(self, manifest: ManifestType) -> None: + manifest[self.replacement_key] = manifest[self.original_key] + manifest.pop(self.original_key, None) diff --git a/airbyte_cdk/sources/declarative/migrations/manifest/migrations_registry.py b/airbyte_cdk/sources/declarative/migrations/manifest/migrations_registry.py new file mode 100644 index 000000000..1b0bb1aea --- /dev/null +++ b/airbyte_cdk/sources/declarative/migrations/manifest/migrations_registry.py @@ -0,0 +1,53 @@ +import importlib +import inspect +import pkgutil +import re +import sys +from typing import List, Type + +import airbyte_cdk.sources.declarative.migrations.manifest.migrations as migrations_pkg +from airbyte_cdk.sources.declarative.migrations.manifest.manifest_migration import ( + ManifestMigration, +) + +# Dynamically import all modules in the migrations package +for _, module_name, is_pkg in pkgutil.iter_modules(migrations_pkg.__path__): + if not is_pkg: + importlib.import_module(f"{migrations_pkg.__name__}.{module_name}") + + +def _migration_order_key(cls: object) -> int: + # Extract the migration order from the module name, e.g., http_requester_url_base_to_url_v6_45_2__0 + # The order is the integer after the double underscore at the end of the module name + module_name = cls.__module__.split(".")[-1] + match = re.search(r"__(\d+)$", module_name) + return int(match.group(1)) if match else 0 + + +def _discover_migrations() -> List[Type[ManifestMigration]]: + migration_classes = [] + for name, obj in inspect.getmembers(sys.modules[migrations_pkg.__name__], inspect.isclass): + if ( + issubclass(obj, ManifestMigration) + and obj is not ManifestMigration + and obj not in migration_classes + ): + migration_classes.append(obj) + + for _, module_name, _ in pkgutil.iter_modules(migrations_pkg.__path__): + module = sys.modules.get(f"{migrations_pkg.__name__}.{module_name}") + if module: + for name, obj in inspect.getmembers(module, inspect.isclass): + if ( + issubclass(obj, ManifestMigration) + and obj is not ManifestMigration + and obj not in migration_classes + ): + migration_classes.append(obj) + + # Sort by migration order key + migration_classes.sort(key=_migration_order_key) + return migration_classes + + +MIGRATIONS: List[Type[ManifestMigration]] = _discover_migrations() diff --git a/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py b/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py new file mode 100644 index 000000000..93300a1c8 --- /dev/null +++ b/airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py @@ -0,0 +1,114 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS +# WHEN DEPRECATED FIELDS ARE ACCESSED + +import warnings +from typing import Any, List + +from pydantic.v1 import BaseModel + +from airbyte_cdk.models import ( + AirbyteLogMessage, + Level, +) + +# format the warning message +warnings.formatwarning = ( + lambda message, category, *args, **kwargs: f"{category.__name__}: {message}" +) + +FIELDS_TAG = "__fields__" +DEPRECATED = "deprecated" +DEPRECATION_MESSAGE = "deprecation_message" +DEPRECATION_LOGS_TAG = "_deprecation_logs" + + +class BaseModelWithDeprecations(BaseModel): + """ + Pydantic BaseModel that warns when deprecated fields are accessed. + The deprecation message is stored in the field's extra attributes. + This class is used to create models that can have deprecated fields + and show warnings when those fields are accessed or initialized. + + The `_deprecation_logs` attribute is storred in the model itself. + The collected deprecation warnings are further proparated to the Airbyte log messages, + during the component creation process, in `model_to_component._collect_model_deprecations()`. + + The component implementation is not responsible for handling the deprecation warnings, + since the deprecation warnings are already handled in the model itself. + """ + + class Config: + """ + Allow extra fields in the model. In case the model restricts extra fields. + """ + + extra = "allow" + + _deprecation_logs: List[AirbyteLogMessage] = [] + + def __init__(self, **data: Any) -> None: + """ + Show warnings for deprecated fields during component initialization. + """ + model_fields = self.__fields__ + + for field_name in data: + if field_name in model_fields: + is_deprecated_field = model_fields[field_name].field_info.extra.get( + DEPRECATED, False + ) + if is_deprecated_field: + deprecation_message = model_fields[field_name].field_info.extra.get( + DEPRECATION_MESSAGE, "" + ) + self._deprecated_warning(field_name, deprecation_message) + + # Call the parent constructor + super().__init__(**data) + + def __getattribute__(self, name: str) -> Any: + """ + Show warnings for deprecated fields during field usage. + """ + + value = super().__getattribute__(name) + + try: + model_fields = super().__getattribute__(FIELDS_TAG) + field_info = model_fields.get(name) + is_deprecated_field = ( + field_info.field_info.extra.get(DEPRECATED, False) if field_info else False + ) + if is_deprecated_field: + deprecation_message = field_info.extra.get(DEPRECATION_MESSAGE, "") + self._deprecated_warning(name, deprecation_message) + except (AttributeError, KeyError): + pass + + return value + + def _deprecated_warning(self, field_name: str, message: str) -> None: + """ + Show a warning message for deprecated fields (to stdout). + Args: + field_name (str): Name of the deprecated field. + message (str): Warning message to be displayed. + """ + + # Emit a warning message for deprecated fields (to stdout) (Python Default behavior) + warnings.warn( + f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}", + DeprecationWarning, + ) + + # Add the deprecation message to the Airbyte log messages, + # this logs are displayed in the Connector Builder. + self._deprecation_logs.append( + AirbyteLogMessage( + level=Level.WARN, + message=f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}", + ), + ) + # dummy change to trigger CI tests. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 7f56ab92d..ed6e5e0ac 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -10,6 +8,10 @@ from pydantic.v1 import BaseModel, Extra, Field +from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( + BaseModelWithDeprecations, +) + class AuthFlowType(Enum): oauth2_0 = "oauth2.0" @@ -2171,11 +2173,13 @@ class SessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class HttpRequester(BaseModel): +class HttpRequester(BaseModelWithDeprecations): type: Literal["HttpRequester"] - url_base: str = Field( - ..., - description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + url_base: Optional[str] = Field( + None, + deprecated=True, + deprecation_message="Use `url` field instead.", + description="Deprecated, use the `url` instead. Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "https://connect.squareup.com/v2", "{{ config['base_url'] or 'https://app.posthog.com'}}/api", @@ -2184,9 +2188,22 @@ class HttpRequester(BaseModel): ], title="API Base URL", ) + url: Optional[str] = Field( + None, + description="The URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + examples=[ + "https://connect.squareup.com/v2", + "{{ config['url'] or 'https://app.posthog.com'}}/api", + "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups", + "https://example.com/api/v1/resource/{{ next_page_token['id'] }}", + ], + title="API URL", + ) path: Optional[str] = Field( None, - description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", + deprecated=True, + deprecation_message="Use `url` field instead.", + description="Deprecated, use the `url` instead. Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.", examples=[ "/products", "/quotes/{{ stream_partition['id'] }}/quote_line_groups", diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 6779b54ab..44f414343 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -4,7 +4,7 @@ import copy import typing -from typing import Any, Mapping, Optional +from typing import Any, Dict, Mapping, Optional PARAMETERS_STR = "$parameters" @@ -95,7 +95,7 @@ def propagate_types_and_parameters( declarative_component: Mapping[str, Any], parent_parameters: Mapping[str, Any], use_parent_parameters: Optional[bool] = None, - ) -> Mapping[str, Any]: + ) -> Dict[str, Any]: """ Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the default component type if it was not already present. The resulting transformed components are a deep copy of the input diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 064fb6c91..b97555d7d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -27,7 +27,7 @@ from isodate import parse_duration from pydantic.v1 import BaseModel -from airbyte_cdk.models import FailureType, Level +from airbyte_cdk.models import AirbyteLogMessage, FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker @@ -108,6 +108,10 @@ CustomStateMigration, GzipDecoder, ) +from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import ( + DEPRECATION_LOGS_TAG, + BaseModelWithDeprecations, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -584,6 +588,8 @@ def __init__( self._connector_state_manager = connector_state_manager or ConnectorStateManager() self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1) + # placeholder for deprecation warnings + self._deprecation_logs: List[AirbyteLogMessage] = [] def _init_mappings(self) -> None: self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { @@ -730,8 +736,26 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__) if not component_constructor: raise ValueError(f"Could not find constructor for {model.__class__}") + + # collect deprecation warnings for supported models. + if isinstance(model, BaseModelWithDeprecations): + self._collect_model_deprecations(model) + return component_constructor(model=model, config=config, **kwargs) + def get_model_deprecations(self) -> List[Any]: + """ + Returns the deprecation warnings that were collected during the creation of components. + """ + return self._deprecation_logs + + def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None: + if hasattr(model, DEPRECATION_LOGS_TAG) and model._deprecation_logs is not None: + for log in model._deprecation_logs: + # avoid duplicates for deprecation logs observed. + if log not in self._deprecation_logs: + self._deprecation_logs.append(log) + @staticmethod def create_added_field_definition( model: AddedFieldDefinitionModel, config: Config, **kwargs: Any @@ -2164,7 +2188,7 @@ def create_http_requester( self._create_component_from_model( model=model.authenticator, config=config, - url_base=model.url_base, + url_base=model.url or model.url_base, name=name, decoder=decoder, ) @@ -2201,6 +2225,7 @@ def create_http_requester( return HttpRequester( name=name, + url=model.url, url_base=model.url_base, path=model.path, authenticator=authenticator, @@ -2898,6 +2923,25 @@ def create_simple_retriever( use_cache: Optional[bool] = None, **kwargs: Any, ) -> SimpleRetriever: + def _get_url() -> str: + """ + Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever. + This is needed because the URL is not set until the requester is created. + """ + + _url = ( + model.requester.url + if hasattr(model.requester, "url") and model.requester.url is not None + else requester.get_url() + ) + _url_base = ( + model.requester.url_base + if hasattr(model.requester, "url_base") and model.requester.url_base is not None + else requester.get_url_base() + ) + + return _url or _url_base + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder @@ -2962,11 +3006,6 @@ def create_simple_retriever( use_cache=use_cache, config=config, ) - url_base = ( - model.requester.url_base - if hasattr(model.requester, "url_base") - else requester.get_url_base() - ) # Define cursor only if per partition or common incremental support is needed cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None @@ -2990,7 +3029,7 @@ def create_simple_retriever( self._create_component_from_model( model=model.paginator, config=config, - url_base=url_base, + url_base=_get_url(), extractor_model=model.record_selector.extractor, decoder=decoder, cursor_used_for_stop_condition=cursor_used_for_stop_condition, diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 78c07b725..6b0e65aab 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -3,7 +3,6 @@ # import logging -import os from dataclasses import InitVar, dataclass, field from typing import Any, Callable, Mapping, MutableMapping, Optional, Union from urllib.parse import urljoin @@ -53,10 +52,11 @@ class HttpRequester(Requester): """ name: str - url_base: Union[InterpolatedString, str] config: Config parameters: InitVar[Mapping[str, Any]] + url: Optional[Union[InterpolatedString, str]] = None + url_base: Optional[Union[InterpolatedString, str]] = None path: Optional[Union[InterpolatedString, str]] = None authenticator: Optional[DeclarativeAuthenticator] = None http_method: Union[str, HttpMethod] = HttpMethod.GET @@ -71,7 +71,14 @@ class HttpRequester(Requester): decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def __post_init__(self, parameters: Mapping[str, Any]) -> None: - self._url_base = InterpolatedString.create(self.url_base, parameters=parameters) + self._url = InterpolatedString.create( + self.url if self.url else EmptyString, parameters=parameters + ) + # deprecated + self._url_base = InterpolatedString.create( + self.url_base if self.url_base else EmptyString, parameters=parameters + ) + # deprecated self._path = InterpolatedString.create( self.path if self.path else EmptyString, parameters=parameters ) @@ -120,6 +127,51 @@ def exit_on_rate_limit(self, value: bool) -> None: def get_authenticator(self) -> DeclarativeAuthenticator: return self._authenticator + def get_url( + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> str: + interpolation_context = get_interpolation_context( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + return str(self._url.eval(self.config, **interpolation_context)) + + def _get_url( + self, + *, + path: Optional[str] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> str: + url = self.get_url( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + url_base = self.get_url_base( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + path = path or self.get_path( + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, + ) + + full_url = self._join_url(url_base, path) if url_base else url + path if path else url + + return full_url + def get_url_base( self, *, @@ -349,7 +401,7 @@ def _request_body_json( return options @classmethod - def _join_url(cls, url_base: str, path: str) -> str: + def _join_url(cls, url_base: str, path: Optional[str] = None) -> str: """ Joins a base URL with a given path and returns the resulting URL with any trailing slash removed. @@ -358,7 +410,7 @@ def _join_url(cls, url_base: str, path: str) -> str: Args: url_base (str): The base URL to which the path will be appended. - path (str): The path to join with the base URL. + path (Optional[str]): The path to join with the base URL. Returns: str: The resulting joined URL. @@ -399,18 +451,11 @@ def send_request( ) -> Optional[requests.Response]: request, response = self._http_client.send_request( http_method=self.get_method().value, - url=self._join_url( - self.get_url_base( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - path - or self.get_path( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), + url=self._get_url( + path=path, + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ), request_kwargs={"stream": self.stream_response}, headers=self._request_headers( diff --git a/airbyte_cdk/sources/declarative/requesters/requester.py b/airbyte_cdk/sources/declarative/requesters/requester.py index ddda1ddba..97b31e884 100644 --- a/airbyte_cdk/sources/declarative/requesters/requester.py +++ b/airbyte_cdk/sources/declarative/requesters/requester.py @@ -34,6 +34,18 @@ def get_authenticator(self) -> DeclarativeAuthenticator: """ pass + @abstractmethod + def get_url( + self, + *, + stream_state: Optional[StreamState], + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + ) -> str: + """ + :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" + """ + @abstractmethod def get_url_base( self, diff --git a/bin/generate_component_manifest_files.py b/bin/generate_component_manifest_files.py index 43f9b568e..51b3d8efb 100755 --- a/bin/generate_component_manifest_files.py +++ b/bin/generate_component_manifest_files.py @@ -1,5 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. +import re import sys from glob import glob from pathlib import Path @@ -28,6 +29,63 @@ def generate_init_module_content() -> str: return header +def replace_base_model_for_classes_with_deprecated_fields(post_processed_content: str) -> str: + """ + Replace the base model for classes with deprecated fields. + This function looks for classes that inherit from `BaseModel` and have fields marked as deprecated. + It replaces the base model with `BaseModelWithDeprecations` for those classes. + """ + + # Find classes with deprecated fields + classes_with_deprecated_fields = set() + class_matches = re.finditer(r"class (\w+)\(BaseModel\):", post_processed_content) + + for class_match in class_matches: + class_name = class_match.group(1) + class_start = class_match.start() + # Find the next class definition or end of file + next_class_match = re.search( + r"class \w+\(", + post_processed_content[class_start + len(class_match.group(0)) :], + ) + class_end = ( + len(post_processed_content) + if next_class_match is None + else class_start + len(class_match.group(0)) + next_class_match.start() + ) + class_content = post_processed_content[class_start:class_end] + + # Check if any field has deprecated=True + if re.search(r"deprecated\s*=\s*True", class_content): + classes_with_deprecated_fields.add(class_name) + + # update the imports to include the new base model with deprecation warinings + # only if there are classes with the fields marked as deprecated. + if len(classes_with_deprecated_fields) > 0: + # Find where to insert the base model - after imports but before class definitions + imports_end = post_processed_content.find( + "\n\n", + post_processed_content.find("from pydantic.v1 import"), + ) + if imports_end > 0: + post_processed_content = ( + post_processed_content[:imports_end] + + "\n\n" + + "from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (\n" + + " BaseModelWithDeprecations,\n" + + ")" + + post_processed_content[imports_end:] + ) + + # Use the `BaseModelWithDeprecations` base model for the classes with deprecated fields + for class_name in classes_with_deprecated_fields: + pattern = rf"class {class_name}\(BaseModel\):" + replacement = f"class {class_name}(BaseModelWithDeprecations):" + post_processed_content = re.sub(pattern, replacement, post_processed_content) + + return post_processed_content + + async def post_process_codegen(codegen_container: dagger.Container): codegen_container = codegen_container.with_exec( ["mkdir", "/generated_post_processed"], use_entrypoint=True @@ -41,6 +99,11 @@ async def post_process_codegen(codegen_container: dagger.Container): post_processed_content = original_content.replace( " _parameters:", " parameters:" ).replace("from pydantic", "from pydantic.v1") + + post_processed_content = replace_base_model_for_classes_with_deprecated_fields( + post_processed_content + ) + codegen_container = codegen_container.with_new_file( f"/generated_post_processed/{generated_file}", contents=post_processed_content ) @@ -75,6 +138,12 @@ async def main(): "--set-default-enum-member", "--use-double-quotes", "--remove-special-field-name-prefix", + # allow usage of the extra key such as `deprecated`, etc. + "--field-extra-keys", + # account the `deprecated` flag provided for the field. + "deprecated", + # account the `deprecation_message` provided for the field. + "deprecation_message", ], use_entrypoint=True, ) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index d98a49a8c..4abff8f19 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -7,7 +7,7 @@ import json import logging import os -from typing import Literal +from typing import List, Literal from unittest import mock from unittest.mock import MagicMock, patch @@ -818,6 +818,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: connector_specification.connectionSpecification = {} return connector_specification + def deprecation_warnings(self) -> List[AirbyteLogMessage]: + return [] + @property def check_config_against_spec(self) -> Literal[False]: return False diff --git a/unit_tests/sources/declarative/migrations/conftest.py b/unit_tests/sources/declarative/migrations/conftest.py new file mode 100644 index 000000000..38cdbdfb1 --- /dev/null +++ b/unit_tests/sources/declarative/migrations/conftest.py @@ -0,0 +1,600 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Dict + +import pytest + + +@pytest.fixture +def manifest_with_url_base_to_migrate_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["A"], + }, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "/path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/A"}, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_A", + "path": "path_to_A", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/B"}, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/C"}, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + # ! the double-slash is intentional here for the test. + "path": "//path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/D"}, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "$ref": "#/definitions/requester_B", + "path": "/path_to_B", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": {"$ref": "#/schemas/E"}, + }, + }, + }, + # both requesters have duplicated `url_base`, + # which should be migrated to `url` in the new format + # and the `url_base` and `path` key should be removed + "requester_A": { + "type": "HttpRequester", + "url_base": "https://example.com/v1/", + }, + "requester_B": { + "type": "HttpRequester", + "url_base": "https://example.com/v2/", + }, + }, + "streams": [ + {"$ref": "#/definitions/streams/A"}, + {"$ref": "#/definitions/streams/B"}, + {"$ref": "#/definitions/streams/C"}, + {"$ref": "#/definitions/streams/D"}, + {"$ref": "#/definitions/streams/E"}, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_a1": { + "type": "string", + }, + }, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_b1": { + "type": "string", + }, + }, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_c1": { + "type": "string", + }, + }, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_d1": { + "type": "string", + }, + }, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": { + "field_e1": { + "type": "string", + }, + }, + }, + }, + } + + +@pytest.fixture +def expected_manifest_with_url_base_migrated_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": { + "streams": { + "A": { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + "B": { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + "C": { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + "D": { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + "E": { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + }, + "requester_A": {"type": "HttpRequester", "url": "https://example.com/v1/"}, + "requester_B": {"type": "HttpRequester", "url": "https://example.com/v2/"}, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "D", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "E", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + "D": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_d1": {"type": "string"}}, + }, + "E": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_e1": {"type": "string"}}, + }, + }, + } + + +@pytest.fixture +def manifest_with_migrated_url_base_and_path_is_joined_to_url() -> Dict[str, Any]: + return { + "version": "0.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["A"]}, + "definitions": {}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "A", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v1/path_to_A", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "B", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + }, + }, + { + "type": "DeclarativeStream", + "name": "C", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "http_method": "GET", + "url": "https://example.com/v2/path_to_B", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + }, + ], + "schemas": { + "A": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_a1": {"type": "string"}}, + }, + "B": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_b1": {"type": "string"}}, + }, + "C": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "properties": {"field_c1": {"type": "string"}}, + }, + }, + } diff --git a/unit_tests/sources/declarative/migrations/test_manifest_migration.py b/unit_tests/sources/declarative/migrations/test_manifest_migration.py new file mode 100644 index 000000000..6490ae7b7 --- /dev/null +++ b/unit_tests/sources/declarative/migrations/test_manifest_migration.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from airbyte_cdk.sources.declarative.migrations.manifest.migration_handler import ( + ManifestMigrationHandler, +) +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( + ManifestReferenceResolver, +) + +resolver = ManifestReferenceResolver() + + +def test_manifest_resolve_migrate( + manifest_with_url_base_to_migrate_to_url, + expected_manifest_with_url_base_migrated_to_url, +) -> None: + """ + This test is to check that the manifest is migrated and normalized + when the `url_base` is migrated to `url` and the `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_to_migrate_to_url) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + assert migrated_manifest == expected_manifest_with_url_base_migrated_to_url + + +def test_manifest_resolve_do_not_migrate( + manifest_with_migrated_url_base_and_path_is_joined_to_url, +) -> None: + """ + This test is to check that the manifest remains migrated already, + after the `url_base` and `path` is joined to `url`. + """ + + resolved_manifest = resolver.preprocess_manifest( + manifest_with_migrated_url_base_and_path_is_joined_to_url + ) + migrated_manifest = ManifestMigrationHandler(dict(resolved_manifest)).apply_migrations() + + # it's expected that the manifest is the same after the processing + assert migrated_manifest == manifest_with_migrated_url_base_and_path_is_joined_to_url