Skip to content

Commit d42b930

Browse files
committed
proof of concept for property chunking
1 parent b9f84d5 commit d42b930

File tree

10 files changed

+708
-40
lines changed

10 files changed

+708
-40
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1023,6 +1023,15 @@ definitions:
10231023
$parameters:
10241024
type: object
10251025
additionalProperties: true
1026+
EmitPartialRecordMergeStrategy:
1027+
title: Emit Partial Record
1028+
description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key.
1029+
required:
1030+
- type
1031+
properties:
1032+
type:
1033+
type: string
1034+
enum: [EmitPartialRecordMergeStrategy]
10261035
JwtAuthenticator:
10271036
title: JWT Authenticator
10281037
description: Authenticator for requests using JWT authentication flow.
@@ -1731,6 +1740,30 @@ definitions:
17311740
$parameters:
17321741
type: object
17331742
additionalProperties: true
1743+
GroupByKeyMergeStrategy:
1744+
title: Group by Key
1745+
description: Record merge strategy that combines records according to fields on the record.
1746+
required:
1747+
- type
1748+
- key
1749+
properties:
1750+
type:
1751+
type: string
1752+
enum: [GroupByKeyMergeStrategy]
1753+
key:
1754+
title: Key
1755+
description: The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.
1756+
anyOf:
1757+
- type: string
1758+
- type: array
1759+
items:
1760+
type: string
1761+
examples:
1762+
- "id"
1763+
- ["parent_id", "end_date"]
1764+
$parameters:
1765+
type: object
1766+
additionalProperties: true
17341767
SessionTokenAuthenticator:
17351768
type: object
17361769
required:
@@ -1950,7 +1983,9 @@ definitions:
19501983
- type: string
19511984
- type: object
19521985
additionalProperties:
1953-
type: string
1986+
anyOf:
1987+
- type: string
1988+
- $ref": "#/definitions/QueryProperties"
19541989
interpolation_context:
19551990
- next_page_token
19561991
- stream_interval
@@ -2968,6 +3003,98 @@ definitions:
29683003
examples:
29693004
- id
29703005
- ["code", "type"]
3006+
PropertiesFromEndpoint:
3007+
title: Properties from Endpoint
3008+
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records.
3009+
type: object
3010+
required:
3011+
- type
3012+
- property_field_path
3013+
- retriever
3014+
properties:
3015+
type:
3016+
type: string
3017+
enum: [PropertiesFromEndpoint]
3018+
property_field_path:
3019+
description: Describes the path to the field that should be extracted
3020+
type: array
3021+
items:
3022+
type: string
3023+
examples:
3024+
- ["name"]
3025+
interpolation_context:
3026+
- config
3027+
- parameters
3028+
retriever:
3029+
description: Requester component that describes how to fetch the properties to query from a remote API endpoint.
3030+
anyOf:
3031+
- "$ref": "#/definitions/CustomRetriever"
3032+
- "$ref": "#/definitions/SimpleRetriever"
3033+
$parameters:
3034+
type: object
3035+
additionalProperties: true
3036+
PropertyChunking:
3037+
title: Property Chunking
3038+
description: For APIs with restrictions on the amount of properties that can be requester per request, property chunking can be applied to make multiple requests with a subset of the properties.
3039+
type: object
3040+
required:
3041+
- type
3042+
- property_limit_type
3043+
properties:
3044+
type:
3045+
type: string
3046+
enum: [PropertyChunking]
3047+
property_limit_type:
3048+
title: Property Limit Type
3049+
description: The type used to determine the maximum number of properties per chunk
3050+
enum:
3051+
- characters
3052+
- property_count
3053+
property_limit:
3054+
title: Property Limit
3055+
description: The maximum amount of properties that can be retrieved per request according to the limit type.
3056+
type: integer
3057+
record_merge_strategy:
3058+
title: Record Merge Strategy
3059+
description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination
3060+
anyOf:
3061+
- "$ref": "#/definitions/EmitPartialRecordMergeStrategy"
3062+
- "$ref": "#/definitions/GroupByKeyMergeStrategy"
3063+
$parameters:
3064+
type: object
3065+
additionalProperties: true
3066+
QueryProperties:
3067+
title: Query Properties
3068+
description: For APIs that require explicit specification of the properties to query for, this component specifies which property fields and how they are supplied to outbound requests.
3069+
type: object
3070+
required:
3071+
- type
3072+
- property_list
3073+
properties:
3074+
type:
3075+
type: string
3076+
enum: [QueryProperties]
3077+
property_list:
3078+
title: Property List
3079+
description: The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint
3080+
anyOf:
3081+
- type: array
3082+
items:
3083+
type: string
3084+
- "$ref": "#/definitions/PropertiesFromEndpoint"
3085+
always_include_properties:
3086+
title: Always Include Properties
3087+
description: The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.
3088+
type: array
3089+
items:
3090+
type: string
3091+
property_chunking:
3092+
title: Property Chunking
3093+
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
3094+
"$ref": "#/definitions/PropertyChunking"
3095+
$parameters:
3096+
type: object
3097+
additionalProperties: true
29713098
RecordFilter:
29723099
title: Record Filter
29733100
description: Filter applied on a list of records.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -343,6 +345,10 @@ class Clamping(BaseModel):
343345
target_details: Optional[Dict[str, Any]] = None
344346

345347

348+
class EmitPartialRecordMergeStrategy(BaseModel):
349+
type: Literal["EmitPartialRecordMergeStrategy"]
350+
351+
346352
class Algorithm(Enum):
347353
HS256 = "HS256"
348354
HS384 = "HS384"
@@ -716,6 +722,17 @@ class ExponentialBackoffStrategy(BaseModel):
716722
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
717723

718724

725+
class GroupByKeyMergeStrategy(BaseModel):
726+
type: Literal["GroupByKeyMergeStrategy"]
727+
key: Union[str, List[str]] = Field(
728+
...,
729+
description="The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.",
730+
examples=["id", ["parent_id", "end_date"]],
731+
title="Key",
732+
)
733+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
734+
735+
719736
class SessionTokenRequestBearerAuthenticator(BaseModel):
720737
type: Literal["Bearer"]
721738

@@ -1187,6 +1204,33 @@ class PrimaryKey(BaseModel):
11871204
)
11881205

11891206

1207+
class PropertyLimitType(Enum):
1208+
characters = "characters"
1209+
property_count = "property_count"
1210+
1211+
1212+
class PropertyChunking(BaseModel):
1213+
type: Literal["PropertyChunking"]
1214+
property_limit_type: PropertyLimitType = Field(
1215+
...,
1216+
description="The type used to determine the maximum number of properties per chunk",
1217+
title="Property Limit Type",
1218+
)
1219+
property_limit: Optional[int] = Field(
1220+
None,
1221+
description="The maximum amount of properties that can be retrieved per request according to the limit type.",
1222+
title="Property Limit",
1223+
)
1224+
record_merge_strategy: Optional[
1225+
Union[EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy]
1226+
] = Field(
1227+
None,
1228+
description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination",
1229+
title="Record Merge Strategy",
1230+
)
1231+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1232+
1233+
11901234
class RecordFilter(BaseModel):
11911235
type: Literal["RecordFilter"]
11921236
condition: Optional[str] = Field(
@@ -2174,7 +2218,7 @@ class HttpRequester(BaseModel):
21742218
examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}],
21752219
title="Request Headers",
21762220
)
2177-
request_parameters: Optional[Union[str, Dict[str, str]]] = Field(
2221+
request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field(
21782222
None,
21792223
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
21802224
examples=[
@@ -2264,6 +2308,40 @@ class ParentStreamConfig(BaseModel):
22642308
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22652309

22662310

2311+
class PropertiesFromEndpoint(BaseModel):
2312+
type: Literal["PropertiesFromEndpoint"]
2313+
property_field_path: List[str] = Field(
2314+
...,
2315+
description="Describes the path to the field that should be extracted",
2316+
examples=[["name"]],
2317+
)
2318+
retriever: Union[CustomRetriever, SimpleRetriever] = Field(
2319+
...,
2320+
description="Requester component that describes how to fetch the properties to query from a remote API endpoint.",
2321+
)
2322+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2323+
2324+
2325+
class QueryProperties(BaseModel):
2326+
type: Literal["QueryProperties"]
2327+
property_list: Union[List[str], PropertiesFromEndpoint] = Field(
2328+
...,
2329+
description="The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint",
2330+
title="Property List",
2331+
)
2332+
always_include_properties: Optional[List[str]] = Field(
2333+
None,
2334+
description="The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.",
2335+
title="Always Include Properties",
2336+
)
2337+
property_chunking: Optional[PropertyChunking] = Field(
2338+
None,
2339+
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
2340+
title="Property Chunking",
2341+
)
2342+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2343+
2344+
22672345
class StateDelegatingStream(BaseModel):
22682346
type: Literal["StateDelegatingStream"]
22692347
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
@@ -2512,5 +2590,6 @@ class DynamicDeclarativeStream(BaseModel):
25122590
SessionTokenAuthenticator.update_forward_refs()
25132591
DynamicSchemaLoader.update_forward_refs()
25142592
ParentStreamConfig.update_forward_refs()
2593+
PropertiesFromEndpoint.update_forward_refs()
25152594
SimpleRetriever.update_forward_refs()
25162595
AsyncRetriever.update_forward_refs()

0 commit comments

Comments
 (0)