Skip to content

Commit 9c41d67

Browse files
committed
Add RecordMergeStrategy interface and apply it to existing types
1 parent cb99b1e commit 9c41d67

File tree

14 files changed

+91
-16
lines changed

14 files changed

+91
-16
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,9 @@ definitions:
10321032
type:
10331033
type: string
10341034
enum: [EmitPartialRecordMergeStrategy]
1035+
$parameters:
1036+
type: object
1037+
additionalProperties: true
10351038
JwtAuthenticator:
10361039
title: JWT Authenticator
10371040
description: Authenticator for requests using JWT authentication flow.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ class Clamping(BaseModel):
347347

348348
class EmitPartialRecordMergeStrategy(BaseModel):
349349
type: Literal["EmitPartialRecordMergeStrategy"]
350+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
350351

351352

352353
class Algorithm(Enum):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,10 @@
218218
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
219219
DynamicSchemaLoader as DynamicSchemaLoaderModel,
220220
)
221+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
222+
EmitPartialRecordMergeStrategy,
223+
ValueType,
224+
)
221225
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
222226
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
223227
)
@@ -384,7 +388,6 @@
384388
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
385389
UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel,
386390
)
387-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
388391
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
389392
WaitTimeFromHeader as WaitTimeFromHeaderModel,
390393
)
@@ -441,14 +444,17 @@
441444
StopConditionPaginationStrategyDecorator,
442445
)
443446
from airbyte_cdk.sources.declarative.requesters.query_properties import (
444-
GroupByKey,
445447
PropertiesFromEndpoint,
446448
PropertyChunking,
447449
QueryProperties,
448450
)
449451
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
450452
PropertyLimitType,
451453
)
454+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
455+
EmitPartialRecord,
456+
GroupByKey,
457+
)
452458
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
453459
from airbyte_cdk.sources.declarative.requesters.request_options import (
454460
DatetimeBasedRequestOptionsProvider,
@@ -609,6 +615,7 @@ def _init_mappings(self) -> None:
609615
DefaultErrorHandlerModel: self.create_default_error_handler,
610616
DefaultPaginatorModel: self.create_default_paginator,
611617
DpathExtractorModel: self.create_dpath_extractor,
618+
EmitPartialRecordMergeStrategy: self.create_emit_partial_record,
612619
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
613620
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
614621
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
@@ -790,6 +797,12 @@ def create_dpath_flatten_fields(
790797
parameters=model.parameters or {},
791798
)
792799

800+
@staticmethod
801+
def create_emit_partial_record(
802+
model: EmitPartialRecordMergeStrategy, config: Config, **kwargs: Any
803+
) -> EmitPartialRecord:
804+
return EmitPartialRecord(config=config, parameters=model.parameters or {})
805+
793806
@staticmethod
794807
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
795808
if not value_type:

airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

3-
from airbyte_cdk.sources.declarative.requesters.query_properties.group_by_key import GroupByKey
43
from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import (
54
PropertiesFromEndpoint,
65
)
@@ -11,4 +10,4 @@
1110
QueryProperties,
1211
)
1312

14-
__all__ = ["GroupByKey", "PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]
13+
__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from enum import Enum
55
from typing import Any, Iterable, List, Mapping, Optional
66

7-
from airbyte_cdk.sources.declarative.requesters.query_properties import GroupByKey
7+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
8+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
9+
RecordMergeStrategy,
10+
)
811
from airbyte_cdk.sources.types import Config, Record
912

1013

@@ -26,9 +29,7 @@ class PropertyChunking:
2629

2730
property_limit_type: PropertyLimitType
2831
property_limit: Optional[int]
29-
record_merge_strategy: Optional[
30-
GroupByKey
31-
] # This should eventually be some sort of interface or type
32+
record_merge_strategy: Optional[RecordMergeStrategy]
3233
parameters: InitVar[Mapping[str, Any]]
3334
config: Config
3435

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.emit_partial_record import (
4+
EmitPartialRecord,
5+
)
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import (
7+
GroupByKey,
8+
)
9+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
10+
RecordMergeStrategy,
11+
)
12+
13+
__all__ = ["EmitPartialRecord", "GroupByKey", "RecordMergeStrategy"]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import InitVar, dataclass
4+
from typing import Any, Mapping, Optional
5+
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
7+
RecordMergeStrategy,
8+
)
9+
from airbyte_cdk.sources.types import Config, Record
10+
11+
12+
@dataclass
13+
class EmitPartialRecord(RecordMergeStrategy):
14+
"""
15+
Record merge strategy that emits partial records as they are without merging them together usually if
16+
there is not a suitable primary key to merge on.
17+
"""
18+
19+
parameters: InitVar[Mapping[str, Any]]
20+
config: Config
21+
22+
def get_group_key(self, record: Record) -> Optional[str]:
23+
return None

airbyte_cdk/sources/declarative/requesters/query_properties/group_by_key.py renamed to airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
from dataclasses import InitVar, dataclass
44
from typing import Any, List, Mapping, Optional, Union
55

6+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
7+
RecordMergeStrategy,
8+
)
69
from airbyte_cdk.sources.types import Config, Record
710

811

912
@dataclass
10-
class GroupByKey:
13+
class GroupByKey(RecordMergeStrategy):
1114
"""
1215
Record merge strategy that combines records together according to values on the record for one or many keys.
1316
"""
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from dataclasses import dataclass
5+
from typing import Optional
6+
7+
from airbyte_cdk.sources.types import Record
8+
9+
10+
@dataclass
11+
class RecordMergeStrategy(ABC):
12+
"""
13+
Describe the interface for how records that required multiple requests to get the complete set of fields
14+
should be merged back into a single record.
15+
"""
16+
17+
@abstractmethod
18+
def get_group_key(self, record: Record) -> Optional[str]:
19+
pass

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import freezegun
1010
import pytest
1111
import requests
12-
from pydantic.v1 import ValidationError
1312

1413
from airbyte_cdk import AirbyteTracedException
1514
from airbyte_cdk.models import (
@@ -127,14 +126,14 @@
127126
StopConditionPaginationStrategyDecorator,
128127
)
129128
from airbyte_cdk.sources.declarative.requesters.query_properties import (
130-
GroupByKey,
131129
PropertiesFromEndpoint,
132130
PropertyChunking,
133131
QueryProperties,
134132
)
135133
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
136134
PropertyLimitType,
137135
)
136+
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
138137
from airbyte_cdk.sources.declarative.requesters.request_option import (
139138
RequestOption,
140139
RequestOptionType,
@@ -4376,7 +4375,7 @@ def test_create_property_chunking_invalid_property_limit_type():
43764375

43774376
connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True)
43784377

4379-
with pytest.raises(ValidationError):
4378+
with pytest.raises:
43804379
connector_builder_factory.create_component(
43814380
model_type=PropertyChunkingModel,
43824381
component_definition=property_chunking_model,

0 commit comments

Comments
 (0)