Skip to content

Commit c532a92

Browse files
authored
Merge branch 'main' into aj/feat/mini-cat-test-suites
2 parents da442a9 + bf998bd commit c532a92

File tree

9 files changed

+309
-22
lines changed

9 files changed

+309
-22
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,27 @@ definitions:
23072307
$parameters:
23082308
type: object
23092309
additionalProperties: true
2310+
KeyTransformation:
2311+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2312+
type: object
2313+
required:
2314+
- type
2315+
properties:
2316+
type:
2317+
type: string
2318+
enum: [ KeyTransformation ]
2319+
prefix:
2320+
title: Key Prefix
2321+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2322+
type: string
2323+
examples:
2324+
- flattened_
2325+
suffix:
2326+
title: Key Suffix
2327+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2328+
type: string
2329+
examples:
2330+
- _flattened
23102331
DpathFlattenFields:
23112332
title: Dpath Flatten Fields
23122333
description: A transformation that flatten field values to the to top of the record.
@@ -2335,6 +2356,11 @@ definitions:
23352356
title: Replace Origin Record
23362357
description: Whether to replace the origin record or not. Default is False.
23372358
type: boolean
2359+
key_transformation:
2360+
title: Key transformation
2361+
description: Transformation for object keys. If not provided, original key will be used.
2362+
type: object
2363+
"$ref": "#/definitions/KeyTransformation"
23382364
$parameters:
23392365
type: object
23402366
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,25 @@ class FlattenFields(BaseModel):
879879
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
880880

881881

882+
class KeyTransformation(BaseModel):
883+
prefix: Optional[Union[str, None]] = Field(
884+
None,
885+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886+
examples=[
887+
"flattened_",
888+
],
889+
title="Key Prefix",
890+
)
891+
suffix: Optional[Union[str, None]] = Field(
892+
None,
893+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894+
examples=[
895+
"_flattened",
896+
],
897+
title="Key Suffix",
898+
)
899+
900+
882901
class DpathFlattenFields(BaseModel):
883902
type: Literal["DpathFlattenFields"]
884903
field_path: List[str] = Field(
@@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel):
897916
description="Whether to replace the origin record or not. Default is False.",
898917
title="Replace Origin Record",
899918
)
919+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
920+
None,
921+
description="Transformation for object keys. If not provided, original key will be used.",
922+
title="Key transformation",
923+
)
900924
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
901925

902926

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@
498498
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
499499
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
500500
DpathFlattenFields,
501+
KeyTransformation,
501502
)
502503
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
503504
FlattenFields,
@@ -790,13 +791,24 @@ def create_dpath_flatten_fields(
790791
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
791792
) -> DpathFlattenFields:
792793
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
794+
key_transformation = (
795+
KeyTransformation(
796+
config=config,
797+
prefix=model.key_transformation.prefix,
798+
suffix=model.key_transformation.suffix,
799+
parameters=model.parameters or {},
800+
)
801+
if model.key_transformation is not None
802+
else None
803+
)
793804
return DpathFlattenFields(
794805
config=config,
795806
field_path=model_field_path,
796807
delete_origin_value=model.delete_origin_value
797808
if model.delete_origin_value is not None
798809
else False,
799810
replace_record=model.replace_record if model.replace_record is not None else False,
811+
key_transformation=key_transformation,
800812
parameters=model.parameters or {},
801813
)
802814

@@ -2054,6 +2066,7 @@ def create_default_paginator(
20542066
config: Config,
20552067
*,
20562068
url_base: str,
2069+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
20572070
decoder: Optional[Decoder] = None,
20582071
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
20592072
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
@@ -2075,7 +2088,10 @@ def create_default_paginator(
20752088
else None
20762089
)
20772090
pagination_strategy = self._create_component_from_model(
2078-
model=model.pagination_strategy, config=config, decoder=decoder_to_use
2091+
model=model.pagination_strategy,
2092+
config=config,
2093+
decoder=decoder_to_use,
2094+
extractor_model=extractor_model,
20792095
)
20802096
if cursor_used_for_stop_condition:
20812097
pagination_strategy = StopConditionPaginationStrategyDecorator(
@@ -2572,7 +2588,12 @@ def create_oauth_authenticator(
25722588
)
25732589

25742590
def create_offset_increment(
2575-
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
2591+
self,
2592+
model: OffsetIncrementModel,
2593+
config: Config,
2594+
decoder: Decoder,
2595+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
2596+
**kwargs: Any,
25762597
) -> OffsetIncrement:
25772598
if isinstance(decoder, PaginationDecoderDecorator):
25782599
inner_decoder = decoder.decoder
@@ -2587,10 +2608,24 @@ def create_offset_increment(
25872608
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
25882609
)
25892610

2611+
# Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever)
2612+
# so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the
2613+
# decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing
2614+
# behavior by having two separate extractors with identical behavior since they use the same extractor model.
2615+
# When we have more time to investigate we can look into reusing the same component.
2616+
extractor = (
2617+
self._create_component_from_model(
2618+
model=extractor_model, config=config, decoder=decoder_to_use
2619+
)
2620+
if extractor_model
2621+
else None
2622+
)
2623+
25902624
return OffsetIncrement(
25912625
page_size=model.page_size,
25922626
config=config,
25932627
decoder=decoder_to_use,
2628+
extractor=extractor,
25942629
inject_on_first_request=model.inject_on_first_request or False,
25952630
parameters=model.parameters or {},
25962631
)
@@ -2954,6 +2989,7 @@ def create_simple_retriever(
29542989
model=model.paginator,
29552990
config=config,
29562991
url_base=url_base,
2992+
extractor_model=model.record_selector.extractor,
29572993
decoder=decoder,
29582994
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
29592995
)

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JsonDecoder,
1313
PaginationDecoderDecorator,
1414
)
15+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor
1516
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1617
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1718
PaginationStrategy,
@@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy):
4647
config: Config
4748
page_size: Optional[Union[str, int]]
4849
parameters: InitVar[Mapping[str, Any]]
50+
extractor: Optional[RecordExtractor]
4951
decoder: Decoder = field(
5052
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
5153
)
@@ -75,6 +77,14 @@ def next_page_token(
7577
) -> Optional[Any]:
7678
decoded_response = next(self.decoder.decode(response))
7779

80+
if self.extractor:
81+
page_size_from_response = len(list(self.extractor.extract_records(response=response)))
82+
# The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
83+
# cases as the best effort option is to use the incoming last_page_size
84+
last_page_size = (
85+
page_size_from_response if page_size_from_response is not None else last_page_size
86+
)
87+
7888
# Stop paginating when there are fewer records than the page size or the current page has no records
7989
if (
8090
self._page_size

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,24 @@
88
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
99

1010

11+
@dataclass
12+
class KeyTransformation:
13+
config: Config
14+
parameters: InitVar[Mapping[str, Any]]
15+
prefix: Optional[str] = None
16+
suffix: Optional[str] = None
17+
18+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
19+
if self.prefix is not None:
20+
self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
21+
self.config
22+
)
23+
if self.suffix is not None:
24+
self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
25+
self.config
26+
)
27+
28+
1129
@dataclass
1230
class DpathFlattenFields(RecordTransformation):
1331
"""
@@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation):
1634
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1735
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1836
replace_record: bool = False whether to replace origin record or not. Default is False.
37+
key_transformation: KeyTransformation = None how to transform extracted object keys
1938
2039
"""
2140

@@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation):
2443
parameters: InitVar[Mapping[str, Any]]
2544
delete_origin_value: bool = False
2645
replace_record: bool = False
46+
key_transformation: Optional[KeyTransformation] = None
2747

2848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49+
self._parameters = parameters
2950
self._field_path = [
30-
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
51+
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
3152
]
3253
for path_index in range(len(self.field_path)):
3354
if isinstance(self.field_path[path_index], str):
3455
self._field_path[path_index] = InterpolatedString.create(
35-
self.field_path[path_index], parameters=parameters
56+
self.field_path[path_index], parameters=self._parameters
3657
)
3758

59+
def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
60+
if self.key_transformation:
61+
if self.key_transformation.prefix:
62+
extracted = {
63+
f"{self.key_transformation.prefix}{key}": value
64+
for key, value in extracted.items()
65+
}
66+
67+
if self.key_transformation.suffix:
68+
extracted = {
69+
f"{key}{self.key_transformation.suffix}": value
70+
for key, value in extracted.items()
71+
}
72+
73+
return extracted
74+
3875
def transform(
3976
self,
4077
record: Dict[str, Any],
@@ -50,6 +87,8 @@ def transform(
5087
extracted = dpath.get(record, path, default=[])
5188

5289
if isinstance(extracted, dict):
90+
extracted = self._apply_key_transformation(extracted)
91+
5392
if self.replace_record and extracted:
5493
dpath.delete(record, "**")
5594
record.update(extracted)

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
6767
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
6868
from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel
69+
from airbyte_cdk.sources.declarative.models import DpathExtractor as DpathExtractorModel
6970
from airbyte_cdk.sources.declarative.models import (
7071
GroupingPartitionRouter as GroupingPartitionRouterModel,
7172
)
@@ -1831,6 +1832,7 @@ def test_create_default_paginator():
18311832
component_definition=paginator_manifest,
18321833
config=input_config,
18331834
url_base="https://airbyte.io",
1835+
extractor_model=DpathExtractor(field_path=["results"], config=input_config, parameters={}),
18341836
decoder=JsonDecoder(parameters={}),
18351837
)
18361838

@@ -1968,6 +1970,7 @@ def test_create_default_paginator():
19681970
DefaultPaginator(
19691971
pagination_strategy=OffsetIncrement(
19701972
page_size=10,
1973+
extractor=None,
19711974
config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]},
19721975
parameters={},
19731976
),
@@ -2641,18 +2644,31 @@ def test_create_offset_increment():
26412644
page_size=10,
26422645
inject_on_first_request=True,
26432646
)
2647+
2648+
expected_extractor = DpathExtractor(field_path=["results"], config=input_config, parameters={})
2649+
extractor_model = DpathExtractorModel(
2650+
type="DpathExtractor", field_path=expected_extractor.field_path
2651+
)
2652+
26442653
expected_strategy = OffsetIncrement(
2645-
page_size=10, inject_on_first_request=True, parameters={}, config=input_config
2654+
page_size=10,
2655+
inject_on_first_request=True,
2656+
extractor=expected_extractor,
2657+
parameters={},
2658+
config=input_config,
26462659
)
26472660

26482661
strategy = factory.create_offset_increment(
2649-
model, input_config, decoder=JsonDecoder(parameters={})
2662+
model, input_config, extractor_model=extractor_model, decoder=JsonDecoder(parameters={})
26502663
)
26512664

26522665
assert strategy.page_size == expected_strategy.page_size
26532666
assert strategy.inject_on_first_request == expected_strategy.inject_on_first_request
26542667
assert strategy.config == input_config
26552668

2669+
assert isinstance(strategy.extractor, DpathExtractor)
2670+
assert strategy.extractor.field_path == expected_extractor.field_path
2671+
26562672

26572673
class MyCustomSchemaLoader(SchemaLoader):
26582674
def get_json_schema(self) -> Mapping[str, Any]:

0 commit comments

Comments
 (0)