Skip to content

Commit 67e4aca

Browse files
committed
Merge remote-tracking branch 'origin/main' into aj/feat/add-standard-tests-cli
2 parents 1b45851 + 58a89d6 commit 67e4aca

File tree

25 files changed

+970
-218
lines changed

25 files changed

+970
-218
lines changed

airbyte_cdk/sources/declarative/auth/oauth.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ def get_token_expiry_date(self) -> AirbyteDateTime:
239239
def _has_access_token_been_initialized(self) -> bool:
240240
return self._access_token is not None
241241

242-
def set_token_expiry_date(self, value: Union[str, int]) -> None:
243-
self._token_expiry_date = self._parse_token_expiration_date(value)
242+
def set_token_expiry_date(self, value: AirbyteDateTime) -> None:
243+
self._token_expiry_date = value
244244

245245
def get_assertion_name(self) -> str:
246246
return self.assertion_name

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from airbyte_cdk.sources.declarative.extractors.record_filter import (
2020
ClientSideIncrementalRecordFilterDecorator,
2121
)
22-
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
22+
from airbyte_cdk.sources.declarative.incremental import (
23+
ConcurrentPerPartitionCursor,
24+
GlobalSubstreamCursor,
25+
)
2326
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
2427
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
2528
PerPartitionWithGlobalCursor,
@@ -361,7 +364,8 @@ def _group_streams(
361364
== DatetimeBasedCursorModel.__name__
362365
and hasattr(declarative_stream.retriever, "stream_slicer")
363366
and isinstance(
364-
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
367+
declarative_stream.retriever.stream_slicer,
368+
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
365369
)
366370
):
367371
stream_state = self._connector_state_manager.get_stream_state(

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,27 @@ definitions:
23072307
$parameters:
23082308
type: object
23092309
additionalProperties: true
2310+
KeyTransformation:
2311+
title: Transformation to apply for extracted object keys by Dpath Flatten Fields
2312+
type: object
2313+
required:
2314+
- type
2315+
properties:
2316+
type:
2317+
type: string
2318+
enum: [ KeyTransformation ]
2319+
prefix:
2320+
title: Key Prefix
2321+
description: Prefix to add for object keys. If not provided original keys remain unchanged.
2322+
type: string
2323+
examples:
2324+
- flattened_
2325+
suffix:
2326+
title: Key Suffix
2327+
description: Suffix to add for object keys. If not provided original keys remain unchanged.
2328+
type: string
2329+
examples:
2330+
- _flattened
23102331
DpathFlattenFields:
23112332
title: Dpath Flatten Fields
23122333
description: A transformation that flatten field values to the to top of the record.
@@ -2335,6 +2356,11 @@ definitions:
23352356
title: Replace Origin Record
23362357
description: Whether to replace the origin record or not. Default is False.
23372358
type: boolean
2359+
key_transformation:
2360+
title: Key transformation
2361+
description: Transformation for object keys. If not provided, original key will be used.
2362+
type: object
2363+
"$ref": "#/definitions/KeyTransformation"
23382364
$parameters:
23392365
type: object
23402366
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -879,6 +879,25 @@ class FlattenFields(BaseModel):
879879
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
880880

881881

882+
class KeyTransformation(BaseModel):
883+
prefix: Optional[Union[str, None]] = Field(
884+
None,
885+
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886+
examples=[
887+
"flattened_",
888+
],
889+
title="Key Prefix",
890+
)
891+
suffix: Optional[Union[str, None]] = Field(
892+
None,
893+
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894+
examples=[
895+
"_flattened",
896+
],
897+
title="Key Suffix",
898+
)
899+
900+
882901
class DpathFlattenFields(BaseModel):
883902
type: Literal["DpathFlattenFields"]
884903
field_path: List[str] = Field(
@@ -897,6 +916,11 @@ class DpathFlattenFields(BaseModel):
897916
description="Whether to replace the origin record or not. Default is False.",
898917
title="Replace Origin Record",
899918
)
919+
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
920+
None,
921+
description="Transformation for object keys. If not provided, original key will be used.",
922+
title="Key transformation",
923+
)
900924
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
901925

902926

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@
498498
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
499499
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
500500
DpathFlattenFields,
501+
KeyTransformation,
501502
)
502503
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
503504
FlattenFields,
@@ -790,13 +791,24 @@ def create_dpath_flatten_fields(
790791
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
791792
) -> DpathFlattenFields:
792793
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
794+
key_transformation = (
795+
KeyTransformation(
796+
config=config,
797+
prefix=model.key_transformation.prefix,
798+
suffix=model.key_transformation.suffix,
799+
parameters=model.parameters or {},
800+
)
801+
if model.key_transformation is not None
802+
else None
803+
)
793804
return DpathFlattenFields(
794805
config=config,
795806
field_path=model_field_path,
796807
delete_origin_value=model.delete_origin_value
797808
if model.delete_origin_value is not None
798809
else False,
799810
replace_record=model.replace_record if model.replace_record is not None else False,
811+
key_transformation=key_transformation,
800812
parameters=model.parameters or {},
801813
)
802814

@@ -1427,7 +1439,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
14271439
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14281440

14291441
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
1430-
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)
1442+
use_global_cursor = isinstance(
1443+
partition_router, GroupingPartitionRouter
1444+
) or component_definition.get("global_substream_cursor", False)
14311445

14321446
# Return the concurrent cursor and state converter
14331447
return ConcurrentPerPartitionCursor(
@@ -2054,6 +2068,7 @@ def create_default_paginator(
20542068
config: Config,
20552069
*,
20562070
url_base: str,
2071+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
20572072
decoder: Optional[Decoder] = None,
20582073
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
20592074
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
@@ -2075,7 +2090,10 @@ def create_default_paginator(
20752090
else None
20762091
)
20772092
pagination_strategy = self._create_component_from_model(
2078-
model=model.pagination_strategy, config=config, decoder=decoder_to_use
2093+
model=model.pagination_strategy,
2094+
config=config,
2095+
decoder=decoder_to_use,
2096+
extractor_model=extractor_model,
20792097
)
20802098
if cursor_used_for_stop_condition:
20812099
pagination_strategy = StopConditionPaginationStrategyDecorator(
@@ -2572,7 +2590,12 @@ def create_oauth_authenticator(
25722590
)
25732591

25742592
def create_offset_increment(
2575-
self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any
2593+
self,
2594+
model: OffsetIncrementModel,
2595+
config: Config,
2596+
decoder: Decoder,
2597+
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
2598+
**kwargs: Any,
25762599
) -> OffsetIncrement:
25772600
if isinstance(decoder, PaginationDecoderDecorator):
25782601
inner_decoder = decoder.decoder
@@ -2587,10 +2610,24 @@ def create_offset_increment(
25872610
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
25882611
)
25892612

2613+
# Ideally we would instantiate the runtime extractor from highest most level (in this case the SimpleRetriever)
2614+
# so that it can be shared by OffSetIncrement and RecordSelector. However, due to how we instantiate the
2615+
# decoder with various decorators here, but not in create_record_selector, it is simpler to retain existing
2616+
# behavior by having two separate extractors with identical behavior since they use the same extractor model.
2617+
# When we have more time to investigate we can look into reusing the same component.
2618+
extractor = (
2619+
self._create_component_from_model(
2620+
model=extractor_model, config=config, decoder=decoder_to_use
2621+
)
2622+
if extractor_model
2623+
else None
2624+
)
2625+
25902626
return OffsetIncrement(
25912627
page_size=model.page_size,
25922628
config=config,
25932629
decoder=decoder_to_use,
2630+
extractor=extractor,
25942631
inject_on_first_request=model.inject_on_first_request or False,
25952632
parameters=model.parameters or {},
25962633
)
@@ -2954,6 +2991,7 @@ def create_simple_retriever(
29542991
model=model.paginator,
29552992
config=config,
29562993
url_base=url_base,
2994+
extractor_model=model.record_selector.extractor,
29572995
decoder=decoder,
29582996
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
29592997
)

airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
JsonDecoder,
1313
PaginationDecoderDecorator,
1414
)
15+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import RecordExtractor
1516
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1617
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
1718
PaginationStrategy,
@@ -46,6 +47,7 @@ class OffsetIncrement(PaginationStrategy):
4647
config: Config
4748
page_size: Optional[Union[str, int]]
4849
parameters: InitVar[Mapping[str, Any]]
50+
extractor: Optional[RecordExtractor]
4951
decoder: Decoder = field(
5052
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
5153
)
@@ -75,6 +77,14 @@ def next_page_token(
7577
) -> Optional[Any]:
7678
decoded_response = next(self.decoder.decode(response))
7779

80+
if self.extractor:
81+
page_size_from_response = len(list(self.extractor.extract_records(response=response)))
82+
# The extractor could return 0 records which is valid, but evaluates to False. Our fallback in other
83+
# cases as the best effort option is to use the incoming last_page_size
84+
last_page_size = (
85+
page_size_from_response if page_size_from_response is not None else last_page_size
86+
)
87+
7888
# Stop paginating when there are fewer records than the page size or the current page has no records
7989
if (
8090
self._page_size

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ def get_request_property_chunks(
5252
chunk_size = 0
5353
for property_field in property_fields:
5454
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
55+
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
5556
property_field_size = (
5657
len(property_field)
58+
+ 1 # The +1 represents the extra character for the delimiter in between properties
5759
if self.property_limit_type == PropertyLimitType.characters
5860
else 1
5961
)

0 commit comments

Comments
 (0)