Skip to content

Commit 52b95e3

Browse files
committed
Add throttling for state emitting in ConcurrentPerPartitionCursor
1 parent c0bc645 commit 52b95e3

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import copy
66
import logging
77
import threading
8+
import time
89
from collections import OrderedDict
910
from copy import deepcopy
1011
from datetime import timedelta
@@ -59,7 +60,7 @@ class ConcurrentPerPartitionCursor(Cursor):
5960
"""
6061

6162
DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
62-
SWITCH_TO_GLOBAL_LIMIT = 1000
63+
SWITCH_TO_GLOBAL_LIMIT = 10_000
6364
_NO_STATE: Mapping[str, Any] = {}
6465
_NO_CURSOR_STATE: Mapping[str, Any] = {}
6566
_GLOBAL_STATE_KEY = "state"
@@ -103,6 +104,8 @@ def __init__(
103104
self._number_of_partitions: int = 0
104105
self._use_global_cursor: bool = False
105106
self._partition_serializer = PerPartitionKeySerializer()
107+
# Track the last time a state message was emitted
108+
self._last_emission_time: float = 0.0
106109

107110
self._set_initial_state(stream_state)
108111

@@ -166,9 +169,12 @@ def ensure_at_least_one_state_emitted(self) -> None:
166169
self._global_cursor = self._new_global_cursor
167170
self._lookback_window = self._timer.finish()
168171
self._parent_state = self._partition_router.get_stream_state()
169-
self._emit_state_message()
172+
self._emit_state_message(throttle=False)
170173

171-
def _emit_state_message(self) -> None:
174+
def _emit_state_message(self, throttle: bool = True) -> None:
175+
current_time = time.time()
176+
if throttle and current_time - self._last_emission_time <= 60:
177+
return
172178
self._connector_state_manager.update_state_for_stream(
173179
self._stream_name,
174180
self._stream_namespace,
@@ -178,6 +184,7 @@ def _emit_state_message(self) -> None:
178184
self._stream_name, self._stream_namespace
179185
)
180186
self._message_repository.emit_message(state_message)
187+
self._last_emission_time = current_time
181188

182189
def stream_slices(self) -> Iterable[StreamSlice]:
183190
if self._timer.is_running():
@@ -242,7 +249,7 @@ def _ensure_partition_limit(self) -> None:
242249
partition_key
243250
) # Remove the oldest partition
244251
logger.warning(
245-
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
252+
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
246253
)
247254
break
248255
else:
@@ -251,7 +258,7 @@ def _ensure_partition_limit(self) -> None:
251258
1
252259
] # Remove the oldest partition
253260
logger.warning(
254-
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions}."
261+
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._number_of_partitions - self.DEFAULT_MAX_PARTITIONS_NUMBER}."
255262
)
256263

257264
def _set_initial_state(self, stream_state: StreamState) -> None:
@@ -372,7 +379,7 @@ def observe(self, record: Record) -> None:
372379
self._to_partition_key(record.associated_slice.partition)
373380
].observe(record)
374381

375-
def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
382+
def _update_global_cursor(self, value: Any) -> None:
376383
if (
377384
self._new_global_cursor is None
378385
or self._new_global_cursor[self.cursor_field.cursor_field_key] < value

0 commit comments

Comments
 (0)