Skip to content

Commit 52683a4

Browse files
committed
Update to complex type resolving
1 parent d21a122 commit 52683a4

File tree

6 files changed

+85
-58
lines changed

6 files changed

+85
-58
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,30 +1800,19 @@ definitions:
18001800
$parameters:
18011801
type: object
18021802
additionalProperties: true
1803-
SchemaFieldType:
1803+
ComplexFieldType:
18041804
title: Schema Field Type
1805-
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property.
1805+
description: (This component is experimental. Use at your own risk.) Represents a complex field type.
18061806
type: object
18071807
required:
18081808
- field_type
18091809
properties:
18101810
field_type:
18111811
type: string
18121812
items:
1813-
"$ref": "#/definitions/SchemaFieldType"
1814-
ItemsTypeMap:
1815-
title: Types Map
1816-
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type for property.
1817-
type: object
1818-
required:
1819-
- target_type
1820-
properties:
1821-
condition:
1822-
type: string
1823-
interpolation_context:
1824-
- raw_schema
1825-
target_type:
1826-
"$ref": "#/definitions/SchemaFieldType"
1813+
anyOf:
1814+
- type: string
1815+
- "$ref": "#/definitions/ComplexFieldType"
18271816
TypesMap:
18281817
title: Types Map
18291818
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
@@ -1838,6 +1827,7 @@ definitions:
18381827
- type: array
18391828
items:
18401829
type: string
1830+
- "$ref": "#/definitions/ComplexFieldType"
18411831
current_type:
18421832
anyOf:
18431833
- type: string
@@ -1848,8 +1838,6 @@ definitions:
18481838
type: string
18491839
interpolation_context:
18501840
- raw_schema
1851-
items_type:
1852-
"$ref": "#/definitions/ItemsTypeMap"
18531841
SchemaTypeIdentifier:
18541842
title: Schema Type Identifier
18551843
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -738,21 +738,15 @@ class HttpResponseFilter(BaseModel):
738738
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
739739

740740

741-
class SchemaFieldType(BaseModel):
741+
class ComplexFieldType(BaseModel):
742742
field_type: str
743-
items: Optional[SchemaFieldType] = None
744-
745-
746-
class ItemsTypeMap(BaseModel):
747-
condition: Optional[str] = None
748-
target_type: SchemaFieldType
743+
items: Optional[Union[str, ComplexFieldType]] = None
749744

750745

751746
class TypesMap(BaseModel):
752-
target_type: Union[str, List[str]]
747+
target_type: Union[str, List[str], ComplexFieldType]
753748
current_type: Union[str, List[str]]
754749
condition: Optional[str] = None
755-
items_type: Optional[ItemsTypeMap] = None
756750

757751

758752
class SchemaTypeIdentifier(BaseModel):
@@ -2297,7 +2291,7 @@ class DynamicDeclarativeStream(BaseModel):
22972291
)
22982292

22992293

2300-
SchemaFieldType.update_forward_refs()
2294+
ComplexFieldType.update_forward_refs()
23012295
CompositeErrorHandler.update_forward_refs()
23022296
DeclarativeSource1.update_forward_refs()
23032297
DeclarativeSource2.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@
245245
InlineSchemaLoader as InlineSchemaLoaderModel,
246246
)
247247
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
248-
ItemsTypeMap as ItemsTypeMapModel,
248+
ComplexFieldType as ComplexFieldTypeModel,
249249
)
250250
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
251251
IterableDecoder as IterableDecoderModel,
@@ -435,7 +435,7 @@
435435
DefaultSchemaLoader,
436436
DynamicSchemaLoader,
437437
InlineSchemaLoader,
438-
ItemsTypeMap,
438+
ComplexFieldType,
439439
JsonFileSchemaLoader,
440440
SchemaTypeIdentifier,
441441
TypesMap,
@@ -576,7 +576,7 @@ def _init_mappings(self) -> None:
576576
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
577577
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
578578
TypesMapModel: self.create_types_map,
579-
ItemsTypeMapModel: self.create_items_type_map,
579+
ComplexFieldTypeModel: self.create_complex_field_type,
580580
JwtAuthenticatorModel: self.create_jwt_authenticator,
581581
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
582582
ListPartitionRouterModel: self.create_list_partition_router,
@@ -1899,27 +1899,28 @@ def create_inline_schema_loader(
18991899
) -> InlineSchemaLoader:
19001900
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})
19011901

1902-
def create_items_type_map(
1903-
self, model: ItemsTypeMapModel, config: Config, **kwargs: Any
1904-
) -> ItemsTypeMap:
1905-
type_mapping = self._create_component_from_model(model=model.type_mapping, config=config)
1906-
model_items_type_pointer: List[Union[InterpolatedString, str]] = (
1907-
[x for x in model.items_type_pointer] if model.items_type_pointer else []
1902+
def create_complex_field_type(
1903+
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
1904+
) -> ComplexFieldType:
1905+
items = (
1906+
self._create_component_from_model(model=model.items, config=config)
1907+
if isinstance(model.items, ComplexFieldTypeModel)
1908+
else model.items
19081909
)
1909-
return ItemsTypeMap(items_type_pointer=model_items_type_pointer, type_mapping=type_mapping)
1910+
1911+
return ComplexFieldType(field_type=model.field_type, items=items)
19101912

19111913
def create_types_map(self, model: TypesMapModel, config: Config, **kwargs: Any) -> TypesMap:
1912-
items_type = (
1913-
self._create_component_from_model(model=model.items_type, config=config)
1914-
if isinstance(model.items_type, ItemsTypeMapModel)
1915-
else model.items_type
1914+
target_type = (
1915+
self._create_component_from_model(model=model.target_type, config=config)
1916+
if isinstance(model.target_type, ComplexFieldTypeModel)
1917+
else model.target_type
19161918
)
19171919

19181920
return TypesMap(
1919-
target_type=model.target_type,
1921+
target_type=target_type,
19201922
current_type=model.current_type,
19211923
condition=model.condition if model.condition is not None else "True",
1922-
items_type=items_type,
19231924
)
19241925

19251926
def create_schema_type_identifier(

airbyte_cdk/sources/declarative/schema/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
66
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import (
77
DynamicSchemaLoader,
8-
ItemsTypeMap,
98
SchemaTypeIdentifier,
9+
ComplexFieldType,
1010
TypesMap,
1111
)
1212
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
@@ -19,7 +19,7 @@
1919
"SchemaLoader",
2020
"InlineSchemaLoader",
2121
"DynamicSchemaLoader",
22+
"ComplexFieldType",
2223
"TypesMap",
23-
"ItemsTypeMap",
2424
"SchemaTypeIdentifier",
2525
]

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from airbyte_cdk.sources.source import ExperimentalClassWarning
1919
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
2020

21-
AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
21+
AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
2222
"string": {"type": ["null", "string"]},
2323
"boolean": {"type": ["null", "boolean"]},
2424
"date": {"type": ["null", "string"], "format": "date"},
@@ -45,14 +45,33 @@
4545
}
4646

4747

48+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
49+
@dataclass(frozen=True)
50+
class ComplexFieldType:
51+
"""
52+
Identifies complex field type
53+
"""
54+
55+
field_type: str
56+
items: Optional[Union[str, "ComplexFieldType"]] = None
57+
58+
def __post_init__(self) -> None:
59+
"""
60+
Enforces that `items` is only used when `field_type` is a array
61+
"""
62+
# `items_type` is valid only for array target types
63+
if self.items and self.field_type != "array":
64+
raise ValueError("'items' can only be used when 'field_type' is an array.")
65+
66+
4867
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
4968
@dataclass(frozen=True)
5069
class TypesMap:
5170
"""
5271
Represents a mapping between a current type and its corresponding target type.
5372
"""
5473

55-
target_type: Union[List[str], str]
74+
target_type: Union[List[str], str, ComplexFieldType]
5675
current_type: Union[List[str], str]
5776
condition: Optional[str]
5877

@@ -188,13 +207,39 @@ def _get_type(
188207
first_type = self._get_airbyte_type(mapped_field_type[0])
189208
second_type = self._get_airbyte_type(mapped_field_type[1])
190209
return {"oneOf": [first_type, second_type]}
210+
191211
elif isinstance(mapped_field_type, str):
192212
return self._get_airbyte_type(mapped_field_type)
213+
214+
elif isinstance(mapped_field_type, ComplexFieldType):
215+
return self._resolve_complex_type(mapped_field_type)
216+
193217
else:
194218
raise ValueError(
195219
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
196220
)
197221

222+
def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]:
223+
types = [complex_type]
224+
resolved_type = {}
225+
226+
while types:
227+
current_type = types.pop()
228+
if not current_type.items:
229+
resolved_type = self._get_airbyte_type(current_type.field_type)
230+
else:
231+
field_type = self._get_airbyte_type(current_type.field_type)
232+
233+
if isinstance(current_type.items, str):
234+
items_type = current_type.items
235+
else:
236+
types.append(current_type.items)
237+
continue # Skip the following lines until the stack is resolved
238+
field_type["items"] = self._get_airbyte_type(items_type)
239+
resolved_type = field_type
240+
241+
return resolved_type
242+
198243
def _replace_type_if_not_valid(
199244
self,
200245
field_type: Union[List[str], str],
@@ -216,7 +261,7 @@ def _replace_type_if_not_valid(
216261
return field_type
217262

218263
@staticmethod
219-
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
264+
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
220265
"""
221266
Maps a field type to its corresponding Airbyte type definition.
222267
"""

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,16 @@
8585
"key_pointer": ["name"],
8686
"type_pointer": ["type"],
8787
"types_mapping": [
88-
{"target_type": "string", "current_type": "singleLineText"},
8988
{
90-
"target_type": "array",
91-
"current_type": "formula",
92-
"items_type": {
93-
"items_type_pointer": ["result", "type"],
94-
"type_mapping": {
95-
"target_type": "integer",
96-
"current_type": "customInteger",
97-
},
89+
"target_type": "string",
90+
"current_type": "singleLineText"
91+
},
92+
{
93+
"target_type": {
94+
"field_type": "array",
95+
"items": "integer"
9896
},
97+
"current_type": "formula",
9998
"condition": "{{ raw_schema['result']['type'] == 'customInteger' }}",
10099
},
101100
],

0 commit comments

Comments
 (0)