diff --git a/ld_eventsource/sse_client.py b/ld_eventsource/sse_client.py index 1ac774e..a2d0548 100644 --- a/ld_eventsource/sse_client.py +++ b/ld_eventsource/sse_client.py @@ -111,7 +111,7 @@ def __init__( self.__connection_client = connect.create_client(logger) self.__connection_result: Optional[ConnectionResult] = None - self.__connected_time: float = 0 + self._retry_reset_baseline: float = 0 self.__disconnected_time: float = 0 self.__closed = False @@ -248,10 +248,17 @@ def next_retry_delay(self) -> float: return self.__next_retry_delay def _compute_next_retry_delay(self): - if self.__retry_delay_reset_threshold > 0 and self.__connected_time != 0: - connection_duration = time.time() - self.__connected_time + # If the __retry_reset_baseline is 0, then we haven't successfully connected yet. + # + # In those situations, we don't want to reset the retry delay strategy; + # it should continue to double until the retry maximum, and then hold + # steady (- jitter). + if self.__retry_delay_reset_threshold > 0 and self._retry_reset_baseline != 0: + now = time.time() + connection_duration = now - self._retry_reset_baseline if connection_duration >= self.__retry_delay_reset_threshold: self.__current_retry_delay_strategy = self.__base_retry_delay_strategy + self._retry_reset_baseline = now self.__next_retry_delay, self.__current_retry_delay_strategy = ( self.__current_retry_delay_strategy.apply(self.__base_retry_delay) ) @@ -287,7 +294,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]: # If can_return_fault is false, it means the caller explicitly called start(), in # which case there's no way to return a Fault so we just keep retrying transparently. continue - self.__connected_time = time.time() + self._retry_reset_baseline = time.time() self.__current_error_strategy = self.__base_error_strategy self.__interrupted = False return None diff --git a/ld_eventsource/testing/test_sse_client_retry.py b/ld_eventsource/testing/test_sse_client_retry.py index 39783de..fba2fe8 100644 --- a/ld_eventsource/testing/test_sse_client_retry.py +++ b/ld_eventsource/testing/test_sse_client_retry.py @@ -1,3 +1,5 @@ +from time import sleep + from ld_eventsource import * from ld_eventsource.actions import * from ld_eventsource.config import * @@ -101,6 +103,57 @@ def test_all_iterator_continues_after_retry(): assert client.next_retry_delay == initial_delay * 2 +def test_retry_delay_gets_reset_after_threshold(): + initial_delay = 0.005 + retry_delay_reset_threshold = 0.1 + mock = MockConnectStrategy( + RespondWithData("data: data1\n\n"), + RejectConnection(HTTPStatusError(503)), + ) + with SSEClient( + connect=mock, + error_strategy=ErrorStrategy.always_continue(), + initial_retry_delay=initial_delay, + retry_delay_reset_threshold=retry_delay_reset_threshold, + retry_delay_strategy=RetryDelayStrategy.default(jitter_multiplier=None), + ) as client: + assert client._retry_reset_baseline == 0 + all = client.all + + # Establish a successful connection + item1 = next(all) + assert isinstance(item1, Start) + assert client._retry_reset_baseline != 0 + + item2 = next(all) + assert isinstance(item2, Event) + assert item2.data == 'data1' + + # Stream is dropped and then fails to re-connect, resulting in backoff. + item3 = next(all) + assert isinstance(item3, Fault) + assert client.next_retry_delay == initial_delay + + item4 = next(all) + assert isinstance(item4, Fault) + assert client.next_retry_delay == initial_delay * 2 + + # Sleeping the threshold should reset the retry thresholds + sleep(retry_delay_reset_threshold) + + # Which we see it does here + item5 = next(all) + assert isinstance(item5, Fault) + assert client.next_retry_delay == initial_delay + + # And if we don't sleep long enough, it doesn't get reset. + sleep(retry_delay_reset_threshold / 2) + + item6 = next(all) + assert isinstance(item6, Fault) + assert client.next_retry_delay == initial_delay * 2 + + def test_can_interrupt_and_restart_stream(): initial_delay = 0.005 mock = MockConnectStrategy(