Skip to content

Commit 0ac0d5c

Browse files
committed
merge from main
2 parents 4658790 + bf998bd commit 0ac0d5c

File tree

19 files changed

+392
-44
lines changed

19 files changed

+392
-44
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
from dataclasses import asdict, dataclass, field
7-
from typing import Any, Dict, List, Mapping
7+
from typing import Any, ClassVar, Dict, List, Mapping
88

99
from airbyte_cdk.connector_builder.test_reader import TestReader
1010
from airbyte_cdk.models import (
@@ -37,6 +37,8 @@
3737

3838
@dataclass
3939
class TestLimits:
40+
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
41+
4042
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
4143
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
4244
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
import logging
7-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Union
7+
from typing import Any, ClassVar, Dict, Iterator, List, Mapping, Optional, Union
88

99
from airbyte_cdk.connector_builder.models import (
1010
AuxiliaryRequest,
@@ -66,6 +66,8 @@ class TestReader:
6666
6767
"""
6868

69+
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
70+
6971
logger = logging.getLogger("airbyte.connector-builder")
7072

7173
def __init__(

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,6 +2343,27 @@ definitions:
23432343
$parameters:
23442344
type: object
23452345
additionalProperties: true
2346+
KeyTransformation:
2347+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2348+
type: object
2349+
required:
2350+
- type
2351+
properties:
2352+
type:
2353+
type: string
2354+
enum: [ KeyTransformation ]
2355+
prefix:
2356+
title: Key Prefix
2357+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2358+
type: string
2359+
examples:
2360+
- flattened_
2361+
suffix:
2362+
title: Key Suffix
2363+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2364+
type: string
2365+
examples:
2366+
- _flattened
23462367
DpathFlattenFields:
23472368
title: Dpath Flatten Fields
23482369
description: A transformation that flatten field values to the to top of the record.
@@ -2371,6 +2392,11 @@ definitions:
23712392
title: Replace Origin Record
23722393
description: Whether to replace the origin record or not. Default is False.
23732394
type: boolean
2395+
key_transformation:
2396+
title: Key transformation
2397+
description: Transformation for object keys. If not provided, original key will be used.
2398+
type: object
2399+
"$ref": "#/definitions/KeyTransformation"
23742400
$parameters:
23752401
type: object
23762402
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,25 @@ class FlattenFields(BaseModel):
879879
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
880880

881881

882+
class KeyTransformation(BaseModel):
883+
prefix: Optional[Union[str, None]] = Field(
884+
None,
885+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886+
examples=[
887+
"flattened_",
888+
],
889+
title="Key Prefix",
890+
)
891+
suffix: Optional[Union[str, None]] = Field(
892+
None,
893+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894+
examples=[
895+
"_flattened",
896+
],
897+
title="Key Suffix",
898+
)
899+
900+
882901
class DpathFlattenFields(BaseModel):
883902
type: Literal["DpathFlattenFields"]
884903
field_path: List[str] = Field(
@@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel):
897916
description="Whether to replace the origin record or not. Default is False.",
898917
title="Replace Origin Record",
899918
)
919+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
920+
None,
921+
description="Transformation for object keys. If not provided, original key will be used.",
922+
title="Key transformation",
923+
)
900924
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
901925

902926

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,7 @@
501501
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
502502
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
503503
DpathFlattenFields,
504+
KeyTransformation,
504505
)
505506
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
506507
FlattenFields,
@@ -794,13 +795,24 @@ def create_dpath_flatten_fields(
794795
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
795796
) -> DpathFlattenFields:
796797
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
798+
key_transformation = (
799+
KeyTransformation(
800+
config=config,
801+
prefix=model.key_transformation.prefix,
802+
suffix=model.key_transformation.suffix,
803+
parameters=model.parameters or {},
804+
)
805+
if model.key_transformation is not None
806+
else None
807+
)
797808
return DpathFlattenFields(
798809
config=config,
799810
field_path=model_field_path,
800811
delete_origin_value=model.delete_origin_value
801812
if model.delete_origin_value is not None
802813
else False,
803814
replace_record=model.replace_record if model.replace_record is not None else False,
815+
key_transformation=key_transformation,
804816
parameters=model.parameters or {},
805817
)
806818

@@ -2064,6 +2076,7 @@ def create_default_paginator(
20642076
config: Config,
20652077
*,
20662078
url_base: str,
2079+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
20672080
decoder: Optional[Decoder] = None,
20682081
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
20692082
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
@@ -2085,7 +2098,10 @@ def create_default_paginator(
20852098
else None
20862099
)
20872100
pagination_strategy = self._create_component_from_model(
2088-
model=model.pagination_strategy, config=config, decoder=decoder_to_use
2101+
model=model.pagination_strategy,
2102+
config=config,
2103+
decoder=decoder_to_use,
2104+
extractor_model=extractor_model,
20892105
)
20902106
if cursor_used_for_stop_condition:
20912107
pagination_strategy = StopConditionPaginationStrategyDecorator(
@@ -2582,7 +2598,12 @@ def create_oauth_authenticator(
25822598
)
25832599

25842600
def create_offset_increment(
2585-
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
2601+
self,
2602+
model: OffsetIncrementModel,
2603+
config: Config,
2604+
decoder: Decoder,
2605+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
2606+
**kwargs: Any,
25862607
) -> OffsetIncrement:
25872608
if isinstance(decoder, PaginationDecoderDecorator):
25882609
inner_decoder = decoder.decoder
@@ -2597,10 +2618,24 @@ def create_offset_increment(
25972618
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
25982619
)
25992620

2621+
# Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever)
2622+
# so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the
2623+
# decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing
2624+
# behavior by having two separate extractors with identical behavior since they use the same extractor model.
2625+
# When we have more time to investigate we can look into reusing the same component.
2626+
extractor = (
2627+
self._create_component_from_model(
2628+
model=extractor_model, config=config, decoder=decoder_to_use
2629+
)
2630+
if extractor_model
2631+
else None
2632+
)
2633+
26002634
return OffsetIncrement(
26012635
page_size=model.page_size,
26022636
config=config,
26032637
decoder=decoder_to_use,
2638+
extractor=extractor,
26042639
inject_on_first_request=model.inject_on_first_request or False,
26052640
parameters=model.parameters or {},
26062641
)
@@ -2968,6 +3003,7 @@ def create_simple_retriever(
29683003
model=model.paginator,
29693004
config=config,
29703005
url_base=url_base,
3006+
extractor_model=model.record_selector.extractor,
29713007
decoder=decoder,
29723008
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
29733009
)

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JsonDecoder,
1313
PaginationDecoderDecorator,
1414
)
15+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor
1516
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1617
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1718
PaginationStrategy,
@@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy):
4647
config: Config
4748
page_size: Optional[Union[str, int]]
4849
parameters: InitVar[Mapping[str, Any]]
50+
extractor: Optional[RecordExtractor]
4951
decoder: Decoder = field(
5052
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
5153
)
@@ -75,6 +77,14 @@ def next_page_token(
7577
) -> Optional[Any]:
7678
decoded_response = next(self.decoder.decode(response))
7779

80+
if self.extractor:
81+
page_size_from_response = len(list(self.extractor.extract_records(response=response)))
82+
# The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
83+
# cases as the best effort option is to use the incoming last_page_size
84+
last_page_size = (
85+
page_size_from_response if page_size_from_response is not None else last_page_size
86+
)
87+
7888
# Stop paginating when there are fewer records than the page size or the current page has no records
7989
if (
8090
self._page_size

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ def _read_pages(
364364
pagination_complete = False
365365
initial_token = self._paginator.get_initial_token()
366366
next_page_token: Optional[Mapping[str, Any]] = (
367-
{"next_page_token": initial_token} if initial_token else None
367+
{"next_page_token": initial_token} if initial_token is not None else None
368368
)
369369
while not pagination_complete:
370370
response = self._fetch_next_page(stream_state, stream_slice, next_page_token)

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,24 @@
88
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
99

1010

11+
@dataclass
12+
class KeyTransformation:
13+
config: Config
14+
parameters: InitVar[Mapping[str, Any]]
15+
prefix: Optional[str] = None
16+
suffix: Optional[str] = None
17+
18+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
19+
if self.prefix is not None:
20+
self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
21+
self.config
22+
)
23+
if self.suffix is not None:
24+
self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
25+
self.config
26+
)
27+
28+
1129
@dataclass
1230
class DpathFlattenFields(RecordTransformation):
1331
"""
@@ -16,6 +34,7 @@ class DpathFlattenFields(RecordTransformation):
1634
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1735
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
1836
replace_record: bool = False whether to replace origin record or not. Default is False.
37+
key_transformation: KeyTransformation = None how to transform extracted object keys
1938
2039
"""
2140

@@ -24,17 +43,35 @@ class DpathFlattenFields(RecordTransformation):
2443
parameters: InitVar[Mapping[str, Any]]
2544
delete_origin_value: bool = False
2645
replace_record: bool = False
46+
key_transformation: Optional[KeyTransformation] = None
2747

2848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49+
self._parameters = parameters
2950
self._field_path = [
30-
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
51+
InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
3152
]
3253
for path_index in range(len(self.field_path)):
3354
if isinstance(self.field_path[path_index], str):
3455
self._field_path[path_index] = InterpolatedString.create(
35-
self.field_path[path_index], parameters=parameters
56+
self.field_path[path_index], parameters=self._parameters
3657
)
3758

59+
def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
60+
if self.key_transformation:
61+
if self.key_transformation.prefix:
62+
extracted = {
63+
f"{self.key_transformation.prefix}{key}": value
64+
for key, value in extracted.items()
65+
}
66+
67+
if self.key_transformation.suffix:
68+
extracted = {
69+
f"{key}{self.key_transformation.suffix}": value
70+
for key, value in extracted.items()
71+
}
72+
73+
return extracted
74+
3875
def transform(
3976
self,
4077
record: Dict[str, Any],
@@ -50,6 +87,8 @@ def transform(
5087
extracted = dpath.get(record, path, default=[])
5188

5289
if isinstance(extracted, dict):
90+
extracted = self._apply_key_transformation(extracted)
91+
5392
if self.replace_record and extracted:
5493
dpath.delete(record, "**")
5594
record.update(extracted)

pytest.ini

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,7 @@ log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno
55
log_cli_date_format=%Y-%m-%d %H:%M:%S
66
filterwarnings =
77
ignore::airbyte_cdk.sources.source.ExperimentalClassWarning
8+
markers =
9+
slow: mark tests as slow
10+
asyncio: mark test as asyncio
11+
requires_creds: mark test as requiring credentials

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from http.server import BaseHTTPRequestHandler, HTTPServer
99
from io import BytesIO, StringIO
1010
from threading import Thread
11-
from typing import Iterable
11+
from typing import ClassVar, Iterable
1212
from unittest.mock import Mock, patch
1313

1414
import pytest
@@ -259,6 +259,8 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
259259

260260

261261
class TestServer(BaseHTTPRequestHandler):
262+
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
263+
262264
def do_GET(self) -> None:
263265
self.send_response(200)
264266
self.end_headers()

0 commit comments

Comments
 (0)