Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3130,12 +3130,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3290,12 +3292,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3412,6 +3416,45 @@ definitions:
$parameters:
type: object
additionalProperties: true
GroupingPartitionRouter:
title: Grouping Partition Router
description: >
A decorator on top of a partition router that groups partitions into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.
Note that per-partition incremental syncs may not work as expected because the grouping
of partitions might change between syncs, potentially leading to inconsistent state tracking.
type: object
required:
- type
- group_size
- underlying_partition_router
properties:
type:
type: string
enum: [GroupingPartitionRouter]
group_size:
title: Group Size
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
type: integer
minimum: 1
examples:
- 10
- 50
partition_router:
title: Underlying Partition Router
description: The partition router whose output will be grouped. This can be any valid partition router component.
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
deduplicate:
title: Deduplicate Partitions
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
type: boolean
default: true
$parameters:
type: object
additionalProperties: true
WaitUntilTimeFromHeader:
title: Wait Until Time Defined In Response Header
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union

from pydantic.v1 import BaseModel, Extra, Field
from pydantic.v1 import BaseModel, Extra, Field, conint


class AuthFlowType(Enum):
Expand Down Expand Up @@ -2225,7 +2225,15 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2303,7 +2311,15 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2355,6 +2371,29 @@ class SubstreamPartitionRouter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GroupingPartitionRouter(BaseModel):
type: Literal["GroupingPartitionRouter"]
group_size: conint(ge=1) = Field(
...,
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
examples=[10, 50],
title="Group Size",
)
partition_router: Optional[
Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]
] = Field(
None,
description="The partition router whose output will be grouped. This can be any valid partition router component.",
title="Underlying Partition Router",
)
deduplicate: Optional[bool] = Field(
True,
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
title="Deduplicate Partitions",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GroupingPartitionRouter as GroupingPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipDecoder as GzipDecoderModel,
)
Expand Down Expand Up @@ -379,6 +382,7 @@
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
ListPartitionRouter,
PartitionRouter,
SinglePartitionRouter,
Expand Down Expand Up @@ -3044,3 +3048,23 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def create_grouping_partition_router(
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
) -> GroupingPartitionRouter:
underlying_router = self._create_component_from_model(
model=model.partition_router, config=config
)

if not isinstance(underlying_router, PartitionRouter):
raise ValueError(
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}"
)

return GroupingPartitionRouter(
group_size=model.group_size,
underlying_partition_router=underlying_router,
deduplicate=model.deduplicate if model.deduplicate is not None else True,
parameters=model.parameters or {},
config=config,
)
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/partition_routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
CartesianProductStreamSlicer,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
ListPartitionRouter,
)
Expand All @@ -22,6 +25,7 @@
__all__ = [
"AsyncJobPartitionRouter",
"CartesianProductStreamSlicer",
"GroupingPartitionRouter",
"ListPartitionRouter",
"SinglePartitionRouter",
"SubstreamPartitionRouter",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class GroupingPartitionRouter(PartitionRouter):
"""
A partition router that groups partitions from an underlying partition router into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.

Attributes:
group_size (int): The number of partitions to include in each group.
underlying_partition_router (SinglePartitionRouter): The partition router whose output will be grouped.
deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
config (Config): The connector configuration.
parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
"""

group_size: int
underlying_partition_router: PartitionRouter
config: Config
deduplicate: bool = True

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Lazily groups partitions from the underlying partition router into batches of size `group_size`.

This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.

Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
"""
batch = []
seen_keys = set() if self.deduplicate else None

# Iterate over partitions lazily from the underlying router
for partition in self.underlying_partition_router.stream_slices():
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
key = next(iter(partition.partition.values()), None)

# Skip duplicates if deduplication is enabled
if self.deduplicate and key in seen_keys:
continue

# Add partition to the batch
batch.append(partition)
if self.deduplicate:
seen_keys.add(key)

# Yield the batch when it reaches the group_size
if len(batch) == self.group_size:
yield self._create_grouped_slice(batch)
batch = [] # Reset the batch

# Yield any remaining partitions if the batch isn't empty
if batch:
yield self._create_grouped_slice(batch)

def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
# Combine partition values into a single dict with lists
grouped_partition = {
key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
}
return StreamSlice(
partition=grouped_partition,
cursor_slice={}, # Cursor is managed by the underlying router or incremental sync
)

def get_request_params(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def set_initial_state(self, stream_state: StreamState) -> None:
"""Delegate state initialization to the underlying partition router."""
self.underlying_partition_router.set_initial_state(stream_state)

def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""Delegate state retrieval to the underlying partition router."""
return self.underlying_partition_router.get_stream_state()
Loading