Skip to content

Commit 6694cd1

Browse files
committed
implement validators and transformer
1 parent 2f11bfa commit 6694cd1

File tree

10 files changed

+284
-1
lines changed

10 files changed

+284
-1
lines changed

airbyte_cdk/sources/declarative/transformations/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
# isort: split
1414
from .add_fields import AddFields
15+
from .remap_field import RemapField
1516
from .remove_fields import RemoveFields
1617

17-
__all__ = ["AddFields", "RecordTransformation", "RemoveFields"]
18+
__all__ = ["AddFields", "RecordTransformation", "RemapField", "RemoveFields"]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Dict, Mapping, Optional
7+
8+
from airbyte_cdk.sources.declarative.transformations.transformation import RecordTransformation
9+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
10+
11+
12+
@dataclass
13+
class RemapField(RecordTransformation):
14+
"""
15+
Transformation that remaps a field's value to another value based on a static map.
16+
"""
17+
18+
map: Mapping[str, Any]
19+
field_path: str
20+
21+
def transform(
22+
self,
23+
record: Dict[str, Any],
24+
config: Optional[Config] = None,
25+
stream_state: Optional[StreamState] = None,
26+
stream_slice: Optional[StreamSlice] = None,
27+
) -> None:
28+
"""
29+
Transforms a record by remapping a field value based on the provided map.
30+
If the original value is found in the map, it's replaced with the mapped value.
31+
If the value is not in the map, the field remains unchanged.
32+
33+
:param record: The input record to be transformed
34+
:param config: The user-provided configuration as specified by the source's spec
35+
:param stream_state: The stream state
36+
:param stream_slice: The stream slice
37+
"""
38+
# Extract path components
39+
path_components = self.field_path.split(".")
40+
41+
# Navigate to the parent object containing the field to remap
42+
current = record
43+
for i, component in enumerate(path_components[:-1]):
44+
if component not in current:
45+
# Path doesn't exist, so nothing to remap
46+
return
47+
current = current[component]
48+
49+
# If we encounter a non-dict, we can't continue navigating
50+
if not isinstance(current, dict):
51+
return
52+
53+
# The last component is the field name to remap
54+
field_name = path_components[-1]
55+
56+
# Check if the field exists and remap its value if it's in the map
57+
if field_name in current and current[field_name] in self.map:
58+
current[field_name] = self.map[current[field_name]]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.validators.dpath_validator import DpathValidator
6+
from airbyte_cdk.sources.declarative.validators.predicate_validator import PredicateValidator
7+
from airbyte_cdk.sources.declarative.validators.validate_adheres_to_schema import (
8+
ValidateAdheresToSchema,
9+
)
10+
from airbyte_cdk.sources.declarative.validators.validate_is_in_list import ValidateIsInList
11+
from airbyte_cdk.sources.declarative.validators.validate_is_of_type import ValidateIsOfType
12+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
13+
from airbyte_cdk.sources.declarative.validators.validator import Validator
14+
15+
__all__ = [
16+
"Validator",
17+
"DpathValidator",
18+
"ValidationStrategy",
19+
"ValidateIsInList",
20+
"ValidateIsOfType",
21+
"ValidateAdheresToSchema",
22+
"PredicateValidator",
23+
]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, List, Union
7+
8+
import dpath.util
9+
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
11+
from airbyte_cdk.sources.declarative.validators.validator import Validator
12+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
13+
14+
15+
@dataclass
16+
class DpathValidator(Validator):
17+
"""
18+
Validator that extracts a value at a specific path in the input data
19+
and applies a validation strategy to it.
20+
"""
21+
22+
field_path: List[Union[InterpolatedString, str]]
23+
strategy: ValidationStrategy
24+
25+
def __post_init__(self) -> None:
26+
self._field_path = [
27+
InterpolatedString.create(path, parameters={}) for path in self.field_path
28+
]
29+
for path_index in range(len(self.field_path)):
30+
if isinstance(self.field_path[path_index], str):
31+
self._field_path[path_index] = InterpolatedString.create(
32+
self.field_path[path_index], parameters={}
33+
)
34+
35+
def validate(self, input_data: dict[str, Any]) -> None:
36+
"""
37+
Extracts the value at the specified path and applies the validation strategy.
38+
39+
:param input_data: Dictionary containing the data to validate
40+
:raises ValueError: If the path doesn't exist or validation fails
41+
"""
42+
try:
43+
path = [path.eval(input_data) for path in self._field_path]
44+
value = dpath.util.get(input_data, path)
45+
self.strategy.validate(value)
46+
except KeyError:
47+
raise ValueError(f"Path '{self.field_path}' not found in the input data")
48+
except Exception as e:
49+
raise ValueError(f"Error validating path '{self.field_path}': {e}")
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any
7+
8+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
9+
10+
11+
@dataclass
12+
class PredicateValidator:
13+
"""
14+
Validator that applies a validation strategy to a value.
15+
"""
16+
17+
value: Any
18+
strategy: ValidationStrategy
19+
20+
def validate(self) -> None:
21+
"""
22+
Applies the validation strategy to the value.
23+
24+
:raises ValueError: If validation fails
25+
"""
26+
self.strategy.validate(self.value)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any
7+
8+
import jsonschema
9+
10+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
11+
12+
13+
@dataclass
14+
class ValidateAdheresToSchema(ValidationStrategy):
15+
"""
16+
Validates that a value adheres to a specified JSON schema.
17+
"""
18+
19+
schema: dict[str, Any]
20+
21+
def validate(self, value: Any) -> None:
22+
"""
23+
Validates the value against the JSON schema.
24+
25+
:param value: The value to validate
26+
:raises ValueError: If the value does not adhere to the schema
27+
"""
28+
try:
29+
jsonschema.validate(instance=value, schema=self.schema)
30+
except jsonschema.ValidationError as e:
31+
raise ValueError(f"JSON schema validation error: {e.message}")
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any
7+
8+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
9+
10+
11+
@dataclass
12+
class ValidateIsInList(ValidationStrategy):
13+
"""
14+
Validates that a value is in a list of supported values.
15+
"""
16+
17+
supported_values: list[Any]
18+
19+
def validate(self, value: Any) -> None:
20+
"""
21+
Checks if the value is in the list of supported values.
22+
23+
:param value: The value to validate
24+
:raises ValueError: If the value is not in the list of supported values
25+
"""
26+
if value not in self.supported_values:
27+
supported_values_str = ", ".join(str(v) for v in self.supported_values)
28+
raise ValueError(f"Value '{value}' not in supported values: [{supported_values_str}]")
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any
7+
8+
from airbyte_cdk.sources.declarative.validators.validation_strategy import ValidationStrategy
9+
10+
11+
@dataclass
12+
class ValidateIsOfType(ValidationStrategy):
13+
"""
14+
Validates that a value is of a specified type.
15+
"""
16+
17+
expected_type: Any
18+
19+
def validate(self, value: Any) -> None:
20+
"""
21+
Checks if the value is of the expected type.
22+
23+
:param value: The value to validate
24+
:raises ValueError: If the value is not of the expected type
25+
"""
26+
if not isinstance(value, self.expected_type):
27+
raise ValueError(f"Value '{value}' is not of type {self.expected_type.__name__}")
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Any
7+
8+
9+
class ValidationStrategy(ABC):
10+
"""
11+
Base class for validation strategies.
12+
"""
13+
14+
@abstractmethod
15+
def validate(self, value: Any) -> None:
16+
"""
17+
Validates a value according to a specific strategy.
18+
19+
:param value: The value to validate
20+
:raises ValueError: If validation fails
21+
"""
22+
pass
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Any
7+
8+
9+
class Validator(ABC):
10+
@abstractmethod
11+
def validate(self, input_data: Any) -> None:
12+
"""
13+
Validates the input data.
14+
15+
:param input_data: The data to validate
16+
:raises ValueError: If validation fails
17+
"""
18+
pass

0 commit comments

Comments
 (0)