-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathproperty_chunking.py
More file actions
71 lines (60 loc) · 2.91 KB
/
property_chunking.py
File metadata and controls
71 lines (60 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, List, Mapping, Optional
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
RecordMergeStrategy,
)
from airbyte_cdk.sources.types import Config, Record
class PropertyLimitType(Enum):
"""
The heuristic that determines when the maximum size of the current chunk of properties and when a new
one should be started.
"""
characters = "characters"
property_count = "property_count"
@dataclass
class PropertyChunking:
"""
Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
that will be used for multiple requests to the target API.
"""
property_limit_type: PropertyLimitType
property_limit: Optional[int]
record_merge_strategy: Optional[RecordMergeStrategy]
parameters: InitVar[Mapping[str, Any]]
config: Config
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
key="id", config=self.config, parameters=parameters
)
def get_request_property_chunks(
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
) -> Iterable[List[str]]:
if not self.property_limit:
single_property_chunk = list(property_fields)
if always_include_properties:
single_property_chunk.extend(always_include_properties)
yield single_property_chunk
return
current_chunk = list(always_include_properties) if always_include_properties else []
chunk_size = 0
for property_field in property_fields:
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
property_field_size = (
len(property_field)
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
if self.property_limit_type == PropertyLimitType.characters
else 1
)
if chunk_size + property_field_size > self.property_limit:
yield current_chunk
current_chunk = list(always_include_properties) if always_include_properties else []
chunk_size = 0
current_chunk.append(property_field)
chunk_size += property_field_size
yield current_chunk
def get_merge_key(self, record: Record) -> Optional[str]:
return self._record_merge_strategy.get_group_key(record=record)