-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathgrouping_partition_router.py
More file actions
116 lines (95 loc) · 4.69 KB
/
grouping_partition_router.py
File metadata and controls
116 lines (95 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
@dataclass
class GroupingPartitionRouter(PartitionRouter):
"""
A partition router that groups partitions from an underlying partition router into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.
Attributes:
group_size (int): The number of partitions to include in each group.
underlying_partition_router (SinglePartitionRouter): The partition router whose output will be grouped.
deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
config (Config): The connector configuration.
parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
"""
group_size: int
underlying_partition_router: PartitionRouter
config: Config
deduplicate: bool = True
def stream_slices(self) -> Iterable[StreamSlice]:
"""
Lazily groups partitions from the underlying partition router into batches of size `group_size`.
This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
"""
batch = []
seen_keys = set() if self.deduplicate else None
# Iterate over partitions lazily from the underlying router
for partition in self.underlying_partition_router.stream_slices():
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
key = next(iter(partition.partition.values()), None)
# Skip duplicates if deduplication is enabled
if self.deduplicate and key in seen_keys:
continue
# Add partition to the batch
batch.append(partition)
if self.deduplicate:
seen_keys.add(key)
# Yield the batch when it reaches the group_size
if len(batch) == self.group_size:
yield self._create_grouped_slice(batch)
batch = [] # Reset the batch
# Yield any remaining partitions if the batch isn't empty
if batch:
yield self._create_grouped_slice(batch)
def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
# Combine partition values into a single dict with lists
grouped_partition = {
key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
}
return StreamSlice(
partition=grouped_partition,
cursor_slice={}, # Cursor is managed by the underlying router or incremental sync
)
def get_request_params(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
def set_initial_state(self, stream_state: StreamState) -> None:
"""Delegate state initialization to the underlying partition router."""
self.underlying_partition_router.set_initial_state(stream_state)
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""Delegate state retrieval to the underlying partition router."""
return self.underlying_partition_router.get_stream_state()