Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions airbyte_cdk/sources/utils/transform.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import importlib
import logging
import sys
from enum import Flag, auto
from typing import Any, Callable, Dict, Generator, Mapping, Optional, cast
from functools import lru_cache
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Callable, Dict, Generator, Mapping, Optional, Type, cast

from datamodel_code_generator import DataModelType, InputFileType, generate
from jsonschema import Draft7Validator, RefResolver, ValidationError, Validator, validators
from pydantic import BaseModel

MAX_NESTING_DEPTH = 3
json_to_python_simple = {
Expand Down Expand Up @@ -275,3 +281,121 @@ def _get_type_structure(self, input_data: Any, current_depth: int = 0) -> Any:

else:
return python_to_json[type(input_data)]


class PydanticTypeTransformer:
"""
A utility class for dynamically generating Pydantic models from JSON schemas
and transforming records using these models.

This class allows users to generate a Pydantic model from a given JSON schema
at runtime, apply optional modifications to the generated model, and use it
to transform input records by ensuring they conform to the schema.

Features:
- Dynamically generates Pydantic models from JSON schemas.
- Allows users to register a custom model normalizer for modifying the generated models.
- Provides a caching mechanism for model generation to improve performance.
- Supports transformation of input records to ensure schema compliance.

Attributes:
_custom_model_normalizer (Callable[[Type[BaseModel]], Type[BaseModel]]):
An optional callback function that allows users to modify the generated model.

Methods:
register_custom_model(custom_model_callback):
Registers a custom callback function to modify the generated model.

stream_model(json_schema, model_modifier):
Generates a Pydantic model from a JSON schema and applies an optional model modification function.

transform(record, schema):
Transforms a given record using a dynamically generated Pydantic model.
"""

_custom_model_normalizer: Optional[Callable[[Type[BaseModel]], Type[BaseModel]]] = None

def register_custom_model(
self, custom_model_callback: Callable[[Type[BaseModel]], Type[BaseModel]]
) -> Callable[[Type[BaseModel]], Type[BaseModel]]:
"""
Registers a custom normalization callback to modify the dynamically generated model.

This function allows users to define a transformation function that modifies
the generated Pydantic model before it is used for data validation.

Args:
custom_model_callback (Callable[[Type[BaseModel]], Type[BaseModel]]):
A function that takes a generated Pydantic model as input and returns a modified version.

Returns:
Callable[[Type[BaseModel]], Type[BaseModel]]: The registered callback function.
"""

self._custom_model_normalizer = custom_model_callback
return custom_model_callback

@lru_cache
def stream_model(
self,
json_schema: str,
model_modifier: Optional[Callable[[Type[BaseModel]], Type[BaseModel]]] = None,
) -> Type[BaseModel]:
"""
Generates a Pydantic model from a given JSON schema.

This method dynamically creates a Pydantic model using the provided JSON schema.
Optionally, a model modification function can be applied to customize the generated model.

Args:
json_schema (str): The JSON schema definition as a string.
model_modifier (Optional[Callable[[Type[BaseModel]], Type[BaseModel]]], optional):
A function that modifies the generated model before returning it.

Returns:
Type[BaseModel]: The generated Pydantic model with optional modifications.
"""
with TemporaryDirectory() as temporary_directory_name:
temporary_directory = Path(temporary_directory_name)
output = Path(temporary_directory / "models.py")
generate(
str(json_schema),
input_file_type=InputFileType.Auto,
input_filename="example.json",
output=output,
class_name="NormalizationModel",
output_model_type=DataModelType.PydanticV2BaseModel,
)

# Load the generated models.py dynamically
spec = importlib.util.spec_from_file_location("models", output)
module = importlib.util.module_from_spec(spec) # type: ignore
sys.modules["models"] = module
spec.loader.exec_module(module) # type: ignore

normalization_model: Type[BaseModel] = getattr(module, "NormalizationModel")

# Apply the user-provided modifier
if model_modifier:
return model_modifier(normalization_model)

return normalization_model

def transform(self, record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
"""
Transforms a given record using a dynamically generated Pydantic model.

This method ensures that the input record conforms to the provided schema by:
- Generating a Pydantic model from the schema.
- Validating and normalizing the record based on the generated model.
- Updating the record with the validated and transformed values.

Args:
record (Dict[str, Any]): The data record to be transformed.
schema (Mapping[str, Any]): The schema that defines the expected structure of the record.

Returns:
None: The function updates the `record` in place.
"""
model: Type[BaseModel] = self.stream_model(str(schema))
record.update(model(**record).model_dump())
153 changes: 148 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ python = ">=3.10,<3.13"
airbyte-protocol-models-dataclasses = "^0.14"
backoff = "*"
cachetools = "*"
datamodel-code-generator = "^0.28.2"
dpath = "^2.1.6"
dunamai = "^1.22.0"
genson = "1.3.0"
Expand Down
Loading