Skip to content

Commit 8f96aee

Browse files
authored
fix(concurrent perpartition cursor): Increase throttle time to 10 minutes (#542)
1 parent dc10839 commit 8f96aee

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
229229

230230
def _throttle_state_message(self) -> Optional[float]:
231231
"""
232-
Throttles the state message emission to once every 60 seconds.
232+
Throttles the state message emission to once every 600 seconds.
233233
"""
234234
current_time = time.time()
235-
if current_time - self._last_emission_time <= 60:
235+
if current_time - self._last_emission_time <= 600:
236236
return None
237237
return current_time
238238

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,8 +3265,8 @@ def test_incremental_substream_request_options_provider(
32653265

32663266
def test_state_throttling(mocker):
32673267
"""
3268-
Verifies that _emit_state_message does not emit a new state if less than 60s
3269-
have passed since last emission, and does emit once 60s or more have passed.
3268+
Verifies that _emit_state_message does not emit a new state if less than 600s
3269+
have passed since last emission, and does emit once 600s or more have passed.
32703270
"""
32713271
cursor = ConcurrentPerPartitionCursor(
32723272
cursor_factory=MagicMock(),
@@ -3288,20 +3288,20 @@ def test_state_throttling(mocker):
32883288

32893289
mock_time = mocker.patch("time.time")
32903290

3291-
# First attempt: only 10 seconds passed => NO emission
3292-
mock_time.return_value = 10
3291+
# First attempt: only 100 seconds passed => NO emission
3292+
mock_time.return_value = 100
32933293
cursor._emit_state_message()
32943294
mock_connector_manager.update_state_for_stream.assert_not_called()
32953295
mock_repo.emit_message.assert_not_called()
32963296

3297-
# Second attempt: 30 seconds passed => still NO emission
3298-
mock_time.return_value = 30
3297+
# Second attempt: 300 seconds passed => still NO emission
3298+
mock_time.return_value = 300
32993299
cursor._emit_state_message()
33003300
mock_connector_manager.update_state_for_stream.assert_not_called()
33013301
mock_repo.emit_message.assert_not_called()
33023302

3303-
# Advance time: 70 seconds => exceed 60s => MUST emit
3304-
mock_time.return_value = 70
3303+
# Advance time: 700 seconds => exceed 600s => MUST emit
3304+
mock_time.return_value = 700
33053305
cursor._emit_state_message()
33063306
mock_connector_manager.update_state_for_stream.assert_called_once()
33073307
mock_repo.emit_message.assert_called_once()

0 commit comments

Comments
 (0)