Skip to content

Commit 96ee457

Browse files
committed
Add GroupingPartitionRouter
1 parent 978be1b commit 96ee457

File tree

5 files changed

+229
-3
lines changed

5 files changed

+229
-3
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3130,12 +3130,14 @@ definitions:
31303130
- "$ref": "#/definitions/CustomPartitionRouter"
31313131
- "$ref": "#/definitions/ListPartitionRouter"
31323132
- "$ref": "#/definitions/SubstreamPartitionRouter"
3133+
- "$ref": "#/definitions/GroupingPartitionRouter"
31333134
- type: array
31343135
items:
31353136
anyOf:
31363137
- "$ref": "#/definitions/CustomPartitionRouter"
31373138
- "$ref": "#/definitions/ListPartitionRouter"
31383139
- "$ref": "#/definitions/SubstreamPartitionRouter"
3140+
- "$ref": "#/definitions/GroupingPartitionRouter"
31393141
decoder:
31403142
title: Decoder
31413143
description: Component decoding the response so records can be extracted.
@@ -3290,12 +3292,14 @@ definitions:
32903292
- "$ref": "#/definitions/CustomPartitionRouter"
32913293
- "$ref": "#/definitions/ListPartitionRouter"
32923294
- "$ref": "#/definitions/SubstreamPartitionRouter"
3295+
- "$ref": "#/definitions/GroupingPartitionRouter"
32933296
- type: array
32943297
items:
32953298
anyOf:
32963299
- "$ref": "#/definitions/CustomPartitionRouter"
32973300
- "$ref": "#/definitions/ListPartitionRouter"
32983301
- "$ref": "#/definitions/SubstreamPartitionRouter"
3302+
- "$ref": "#/definitions/GroupingPartitionRouter"
32993303
decoder:
33003304
title: Decoder
33013305
description: Component decoding the response so records can be extracted.
@@ -3412,6 +3416,45 @@ definitions:
34123416
$parameters:
34133417
type: object
34143418
additionalProperties: true
3419+
GroupingPartitionRouter:
3420+
title: Grouping Partition Router
3421+
description: >
3422+
A decorator on top of a partition router that groups partitions into batches of a specified size.
3423+
This is useful for APIs that support filtering by multiple partition keys in a single request.
3424+
Note that per-partition incremental syncs may not work as expected because the grouping
3425+
of partitions might change between syncs, potentially leading to inconsistent state tracking.
3426+
type: object
3427+
required:
3428+
- type
3429+
- group_size
3430+
- underlying_partition_router
3431+
properties:
3432+
type:
3433+
type: string
3434+
enum: [GroupingPartitionRouter]
3435+
group_size:
3436+
title: Group Size
3437+
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
3438+
type: integer
3439+
minimum: 1
3440+
examples:
3441+
- 10
3442+
- 50
3443+
partition_router:
3444+
title: Underlying Partition Router
3445+
description: The partition router whose output will be grouped. This can be any valid partition router component.
3446+
anyOf:
3447+
- "$ref": "#/definitions/CustomPartitionRouter"
3448+
- "$ref": "#/definitions/ListPartitionRouter"
3449+
- "$ref": "#/definitions/SubstreamPartitionRouter"
3450+
deduplicate:
3451+
title: Deduplicate Partitions
3452+
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
3453+
type: boolean
3454+
default: true
3455+
$parameters:
3456+
type: object
3457+
additionalProperties: true
34153458
WaitUntilTimeFromHeader:
34163459
title: Wait Until Time Defined In Response Header
34173460
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from enum import Enum
77
from typing import Any, Dict, List, Literal, Optional, Union
88

9-
from pydantic.v1 import BaseModel, Extra, Field
9+
from pydantic.v1 import BaseModel, Extra, Field, conint
1010

1111

1212
class AuthFlowType(Enum):
@@ -2225,7 +2225,15 @@ class SimpleRetriever(BaseModel):
22252225
CustomPartitionRouter,
22262226
ListPartitionRouter,
22272227
SubstreamPartitionRouter,
2228-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2228+
GroupingPartitionRouter,
2229+
List[
2230+
Union[
2231+
CustomPartitionRouter,
2232+
ListPartitionRouter,
2233+
SubstreamPartitionRouter,
2234+
GroupingPartitionRouter,
2235+
]
2236+
],
22292237
]
22302238
] = Field(
22312239
[],
@@ -2303,7 +2311,15 @@ class AsyncRetriever(BaseModel):
23032311
CustomPartitionRouter,
23042312
ListPartitionRouter,
23052313
SubstreamPartitionRouter,
2306-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2314+
GroupingPartitionRouter,
2315+
List[
2316+
Union[
2317+
CustomPartitionRouter,
2318+
ListPartitionRouter,
2319+
SubstreamPartitionRouter,
2320+
GroupingPartitionRouter,
2321+
]
2322+
],
23072323
]
23082324
] = Field(
23092325
[],
@@ -2355,6 +2371,29 @@ class SubstreamPartitionRouter(BaseModel):
23552371
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
23562372

23572373

2374+
class GroupingPartitionRouter(BaseModel):
2375+
type: Literal["GroupingPartitionRouter"]
2376+
group_size: conint(ge=1) = Field(
2377+
...,
2378+
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
2379+
examples=[10, 50],
2380+
title="Group Size",
2381+
)
2382+
partition_router: Optional[
2383+
Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]
2384+
] = Field(
2385+
None,
2386+
description="The partition router whose output will be grouped. This can be any valid partition router component.",
2387+
title="Underlying Partition Router",
2388+
)
2389+
deduplicate: Optional[bool] = Field(
2390+
True,
2391+
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
2392+
title="Deduplicate Partitions",
2393+
)
2394+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2395+
2396+
23582397
class HttpComponentsResolver(BaseModel):
23592398
type: Literal["HttpComponentsResolver"]
23602399
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@
227227
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
228228
FlattenFields as FlattenFieldsModel,
229229
)
230+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231+
GroupingPartitionRouter as GroupingPartitionRouterModel,
232+
)
230233
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231234
GzipDecoder as GzipDecoderModel,
232235
)
@@ -379,6 +382,7 @@
379382
)
380383
from airbyte_cdk.sources.declarative.partition_routers import (
381384
CartesianProductStreamSlicer,
385+
GroupingPartitionRouter,
382386
ListPartitionRouter,
383387
PartitionRouter,
384388
SinglePartitionRouter,
@@ -3044,3 +3048,23 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
30443048
self._api_budget = self.create_component(
30453049
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
30463050
)
3051+
3052+
def create_grouping_partition_router(
3053+
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
3054+
) -> GroupingPartitionRouter:
3055+
underlying_router = self._create_component_from_model(
3056+
model=model.partition_router, config=config
3057+
)
3058+
3059+
if not isinstance(underlying_router, PartitionRouter):
3060+
raise ValueError(
3061+
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}"
3062+
)
3063+
3064+
return GroupingPartitionRouter(
3065+
group_size=model.group_size,
3066+
underlying_partition_router=underlying_router,
3067+
deduplicate=model.deduplicate if model.deduplicate is not None else True,
3068+
parameters=model.parameters or {},
3069+
config=config,
3070+
)

airbyte_cdk/sources/declarative/partition_routers/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
99
CartesianProductStreamSlicer,
1010
)
11+
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
12+
GroupingPartitionRouter,
13+
)
1114
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
1215
ListPartitionRouter,
1316
)
@@ -22,6 +25,7 @@
2225
__all__ = [
2326
"AsyncJobPartitionRouter",
2427
"CartesianProductStreamSlicer",
28+
"GroupingPartitionRouter",
2529
"ListPartitionRouter",
2630
"SinglePartitionRouter",
2731
"SubstreamPartitionRouter",
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Iterable, Mapping, Optional
7+
8+
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
9+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
10+
11+
12+
@dataclass
13+
class GroupingPartitionRouter(PartitionRouter):
14+
"""
15+
A partition router that groups partitions from an underlying partition router into batches of a specified size.
16+
This is useful for APIs that support filtering by multiple partition keys in a single request.
17+
18+
Attributes:
19+
group_size (int): The number of partitions to include in each group.
20+
underlying_partition_router (SinglePartitionRouter): The partition router whose output will be grouped.
21+
deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
22+
config (Config): The connector configuration.
23+
parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
24+
"""
25+
26+
group_size: int
27+
underlying_partition_router: PartitionRouter
28+
config: Config
29+
deduplicate: bool = True
30+
31+
def stream_slices(self) -> Iterable[StreamSlice]:
32+
"""
33+
Lazily groups partitions from the underlying partition router into batches of size `group_size`.
34+
35+
This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
36+
When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
37+
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
38+
39+
Yields:
40+
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
41+
"""
42+
batch = []
43+
seen_keys = set() if self.deduplicate else None
44+
45+
# Iterate over partitions lazily from the underlying router
46+
for partition in self.underlying_partition_router.stream_slices():
47+
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
48+
key = next(iter(partition.partition.values()), None)
49+
50+
# Skip duplicates if deduplication is enabled
51+
if self.deduplicate and key in seen_keys:
52+
continue
53+
54+
# Add partition to the batch
55+
batch.append(partition)
56+
if self.deduplicate:
57+
seen_keys.add(key)
58+
59+
# Yield the batch when it reaches the group_size
60+
if len(batch) == self.group_size:
61+
yield self._create_grouped_slice(batch)
62+
batch = [] # Reset the batch
63+
64+
# Yield any remaining partitions if the batch isn't empty
65+
if batch:
66+
yield self._create_grouped_slice(batch)
67+
68+
def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
69+
# Combine partition values into a single dict with lists
70+
grouped_partition = {
71+
key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
72+
}
73+
return StreamSlice(
74+
partition=grouped_partition,
75+
cursor_slice={}, # Cursor is managed by the underlying router or incremental sync
76+
)
77+
78+
def get_request_params(
79+
self,
80+
stream_state: Optional[StreamState] = None,
81+
stream_slice: Optional[StreamSlice] = None,
82+
next_page_token: Optional[Mapping[str, Any]] = None,
83+
) -> Mapping[str, Any]:
84+
return {}
85+
86+
def get_request_headers(
87+
self,
88+
stream_state: Optional[StreamState] = None,
89+
stream_slice: Optional[StreamSlice] = None,
90+
next_page_token: Optional[Mapping[str, Any]] = None,
91+
) -> Mapping[str, Any]:
92+
return {}
93+
94+
def get_request_body_data(
95+
self,
96+
stream_state: Optional[StreamState] = None,
97+
stream_slice: Optional[StreamSlice] = None,
98+
next_page_token: Optional[Mapping[str, Any]] = None,
99+
) -> Mapping[str, Any]:
100+
return {}
101+
102+
def get_request_body_json(
103+
self,
104+
stream_state: Optional[StreamState] = None,
105+
stream_slice: Optional[StreamSlice] = None,
106+
next_page_token: Optional[Mapping[str, Any]] = None,
107+
) -> Mapping[str, Any]:
108+
return {}
109+
110+
def set_initial_state(self, stream_state: StreamState) -> None:
111+
"""Delegate state initialization to the underlying partition router."""
112+
self.underlying_partition_router.set_initial_state(stream_state)
113+
114+
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
115+
"""Delegate state retrieval to the underlying partition router."""
116+
return self.underlying_partition_router.get_stream_state()

0 commit comments

Comments
 (0)