Skip to content

Commit 1166a7a

Browse files
committed
Fix unit tests
1 parent 52b95e3 commit 1166a7a

File tree

2 files changed

+73
-12
lines changed

2 files changed

+73
-12
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,21 @@ def ensure_at_least_one_state_emitted(self) -> None:
171171
self._parent_state = self._partition_router.get_stream_state()
172172
self._emit_state_message(throttle=False)
173173

174-
def _emit_state_message(self, throttle: bool = True) -> None:
174+
def _throttle_state_message(self) -> Optional[float]:
175+
"""
176+
Throttles the state message emission to once every 60 seconds.
177+
"""
175178
current_time = time.time()
176-
if throttle and current_time - self._last_emission_time <= 60:
177-
return
179+
if current_time - self._last_emission_time <= 60:
180+
return None
181+
return current_time
182+
183+
def _emit_state_message(self, throttle: bool = True) -> None:
184+
if throttle:
185+
current_time = self._throttle_state_message()
186+
if current_time is None:
187+
return
188+
self._last_emission_time = current_time
178189
self._connector_state_manager.update_state_for_stream(
179190
self._stream_name,
180191
self._stream_namespace,
@@ -184,7 +195,6 @@ def _emit_state_message(self, throttle: bool = True) -> None:
184195
self._stream_name, self._stream_namespace
185196
)
186197
self._message_repository.emit_message(state_message)
187-
self._last_emission_time = current_time
188198

189199
def stream_slices(self) -> Iterable[StreamSlice]:
190200
if self._timer.is_running():
@@ -358,6 +368,7 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
358368
self._new_global_cursor = deepcopy(fixed_global_state)
359369

360370
def observe(self, record: Record) -> None:
371+
# ToDo: check number of partitions
361372
if not self._use_global_cursor and self.limit_reached():
362373
logger.info(
363374
f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of {self.SWITCH_TO_GLOBAL_LIMIT}. "

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from copy import deepcopy
44
from datetime import datetime, timedelta
55
from typing import Any, List, Mapping, MutableMapping, Optional, Union
6+
from unittest.mock import MagicMock, patch
67
from urllib.parse import unquote
78

89
import pytest
@@ -18,6 +19,7 @@
1819
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
1920
ConcurrentDeclarativeSource,
2021
)
22+
from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
2123
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
2224
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
2325

@@ -1181,14 +1183,18 @@ def test_incremental_parent_state(
11811183
initial_state,
11821184
expected_state,
11831185
):
1184-
run_incremental_parent_state_test(
1185-
manifest,
1186-
mock_requests,
1187-
expected_records,
1188-
num_intermediate_states,
1189-
initial_state,
1190-
[expected_state],
1191-
)
1186+
# Patch `_throttle_state_message` so it always returns a float (indicating "no throttle")
1187+
with patch.object(
1188+
ConcurrentPerPartitionCursor, "_throttle_state_message", return_value=9999999.0
1189+
):
1190+
run_incremental_parent_state_test(
1191+
manifest,
1192+
mock_requests,
1193+
expected_records,
1194+
num_intermediate_states,
1195+
initial_state,
1196+
[expected_state],
1197+
)
11921198

11931199

11941200
STATE_MIGRATION_EXPECTED_STATE = {
@@ -2967,3 +2973,47 @@ def test_incremental_substream_request_options_provider(
29672973
expected_records,
29682974
expected_state,
29692975
)
2976+
2977+
2978+
def test_state_throttling(mocker):
2979+
"""
2980+
Verifies that _emit_state_message does not emit a new state if less than 60s
2981+
have passed since last emission, and does emit once 60s or more have passed.
2982+
"""
2983+
cursor = ConcurrentPerPartitionCursor(
2984+
cursor_factory=MagicMock(),
2985+
partition_router=MagicMock(),
2986+
stream_name="test_stream",
2987+
stream_namespace=None,
2988+
stream_state={},
2989+
message_repository=MagicMock(),
2990+
connector_state_manager=MagicMock(),
2991+
connector_state_converter=MagicMock(),
2992+
cursor_field=MagicMock(),
2993+
)
2994+
2995+
mock_connector_manager = cursor._connector_state_manager
2996+
mock_repo = cursor._message_repository
2997+
2998+
# Set the last emission time to "0" so we can control offset from that
2999+
cursor._last_emission_time = 0
3000+
3001+
mock_time = mocker.patch("time.time")
3002+
3003+
# First attempt: only 10 seconds passed => NO emission
3004+
mock_time.return_value = 10
3005+
cursor._emit_state_message()
3006+
mock_connector_manager.update_state_for_stream.assert_not_called()
3007+
mock_repo.emit_message.assert_not_called()
3008+
3009+
# Second attempt: 30 seconds passed => still NO emission
3010+
mock_time.return_value = 30
3011+
cursor._emit_state_message()
3012+
mock_connector_manager.update_state_for_stream.assert_not_called()
3013+
mock_repo.emit_message.assert_not_called()
3014+
3015+
# Advance time: 70 seconds => exceed 60s => MUST emit
3016+
mock_time.return_value = 70
3017+
cursor._emit_state_message()
3018+
mock_connector_manager.update_state_for_stream.assert_called_once()
3019+
mock_repo.emit_message.assert_called_once()

0 commit comments

Comments
 (0)