From aea32ccc7fee265b83fe862c5163fe3e4c7895ad Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 28 Feb 2025 12:44:40 -0500 Subject: [PATCH 1/3] fix: Fix SSE delay reset handling The default retry policy has exponential backoff with jitter. This retry strategy is reset after some maximum retry interval. Previously, once this reset interval occurred, it would continue to reset every future attempt, effectively disabling the exponential backoff algorithm. This should no longer be the case, as we now reset the baseline used for determine when to reset the strategy each time it is replaced with the base strategy. --- ld_eventsource/sse_client.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/ld_eventsource/sse_client.py b/ld_eventsource/sse_client.py index 1ac774e..2ef5a15 100644 --- a/ld_eventsource/sse_client.py +++ b/ld_eventsource/sse_client.py @@ -111,7 +111,7 @@ def __init__( self.__connection_client = connect.create_client(logger) self.__connection_result: Optional[ConnectionResult] = None - self.__connected_time: float = 0 + self.__retry_reset_baseline: float = 0 self.__disconnected_time: float = 0 self.__closed = False @@ -248,10 +248,17 @@ def next_retry_delay(self) -> float: return self.__next_retry_delay def _compute_next_retry_delay(self): - if self.__retry_delay_reset_threshold > 0 and self.__connected_time != 0: - connection_duration = time.time() - self.__connected_time + # If the __retry_reset_baseline is 0, then we haven't successfully connected yet. + # + # In those situations, we don't want to reset the retry delay strategy; + # it should continue to double until the retry maximum, and then hold + # steady (- jitter). + if self.__retry_delay_reset_threshold > 0 and self.__retry_reset_baseline != 0: + now = time.time() + connection_duration = now - self.__retry_reset_baseline if connection_duration >= self.__retry_delay_reset_threshold: self.__current_retry_delay_strategy = self.__base_retry_delay_strategy + self.__retry_reset_baseline = now self.__next_retry_delay, self.__current_retry_delay_strategy = ( self.__current_retry_delay_strategy.apply(self.__base_retry_delay) ) @@ -287,7 +294,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]: # If can_return_fault is false, it means the caller explicitly called start(), in # which case there's no way to return a Fault so we just keep retrying transparently. continue - self.__connected_time = time.time() + self.__retry_reset_baseline = time.time() self.__current_error_strategy = self.__base_error_strategy self.__interrupted = False return None From 1f248b26a2a2435662f71d8c70ea3cd59966c28d Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 28 Feb 2025 13:50:27 -0500 Subject: [PATCH 2/3] Add unit test --- ld_eventsource/sse_client.py | 10 ++-- .../testing/test_sse_client_retry.py | 52 +++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/ld_eventsource/sse_client.py b/ld_eventsource/sse_client.py index 2ef5a15..a2d0548 100644 --- a/ld_eventsource/sse_client.py +++ b/ld_eventsource/sse_client.py @@ -111,7 +111,7 @@ def __init__( self.__connection_client = connect.create_client(logger) self.__connection_result: Optional[ConnectionResult] = None - self.__retry_reset_baseline: float = 0 + self._retry_reset_baseline: float = 0 self.__disconnected_time: float = 0 self.__closed = False @@ -253,12 +253,12 @@ def _compute_next_retry_delay(self): # In those situations, we don't want to reset the retry delay strategy; # it should continue to double until the retry maximum, and then hold # steady (- jitter). - if self.__retry_delay_reset_threshold > 0 and self.__retry_reset_baseline != 0: + if self.__retry_delay_reset_threshold > 0 and self._retry_reset_baseline != 0: now = time.time() - connection_duration = now - self.__retry_reset_baseline + connection_duration = now - self._retry_reset_baseline if connection_duration >= self.__retry_delay_reset_threshold: self.__current_retry_delay_strategy = self.__base_retry_delay_strategy - self.__retry_reset_baseline = now + self._retry_reset_baseline = now self.__next_retry_delay, self.__current_retry_delay_strategy = ( self.__current_retry_delay_strategy.apply(self.__base_retry_delay) ) @@ -294,7 +294,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]: # If can_return_fault is false, it means the caller explicitly called start(), in # which case there's no way to return a Fault so we just keep retrying transparently. continue - self.__retry_reset_baseline = time.time() + self._retry_reset_baseline = time.time() self.__current_error_strategy = self.__base_error_strategy self.__interrupted = False return None diff --git a/ld_eventsource/testing/test_sse_client_retry.py b/ld_eventsource/testing/test_sse_client_retry.py index 39783de..8e57f19 100644 --- a/ld_eventsource/testing/test_sse_client_retry.py +++ b/ld_eventsource/testing/test_sse_client_retry.py @@ -2,6 +2,7 @@ from ld_eventsource.actions import * from ld_eventsource.config import * from ld_eventsource.testing.helpers import * +from time import sleep def test_retry_during_initial_connect_succeeds(): @@ -101,6 +102,57 @@ def test_all_iterator_continues_after_retry(): assert client.next_retry_delay == initial_delay * 2 +def test_retry_delay_gets_reset_after_threshold(): + initial_delay = 0.005 + retry_delay_reset_threshold = 0.1 + mock = MockConnectStrategy( + RespondWithData("data: data1\n\n"), + RejectConnection(HTTPStatusError(503)), + ) + with SSEClient( + connect=mock, + error_strategy=ErrorStrategy.always_continue(), + initial_retry_delay=initial_delay, + retry_delay_reset_threshold=retry_delay_reset_threshold, + retry_delay_strategy=RetryDelayStrategy.default(jitter_multiplier=None), + ) as client: + assert client._retry_reset_baseline == 0 + all = client.all + + # Establish a successful connection + item1 = next(all) + assert isinstance(item1, Start) + assert client._retry_reset_baseline != 0 + + item2 = next(all) + assert isinstance(item2, Event) + assert item2.data == 'data1' + + # Stream is dropped and then fails to re-connect, resulting in backoff. + item3 = next(all) + assert isinstance(item3, Fault) + assert client.next_retry_delay == initial_delay + + item4 = next(all) + assert isinstance(item4, Fault) + assert client.next_retry_delay == initial_delay * 2 + + # Sleeping the threshold should reset the retry thresholds + sleep(retry_delay_reset_threshold) + + # Which we see it does here + item5 = next(all) + assert isinstance(item5, Fault) + assert client.next_retry_delay == initial_delay + + # And if we don't sleep long enough, it doesn't get reset. + sleep(retry_delay_reset_threshold / 2) + + item6 = next(all) + assert isinstance(item6, Fault) + assert client.next_retry_delay == initial_delay * 2 + + def test_can_interrupt_and_restart_stream(): initial_delay = 0.005 mock = MockConnectStrategy( From 906be34d31c1e5a41c6653c43dc8a21b66bfac4f Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 28 Feb 2025 14:29:38 -0500 Subject: [PATCH 3/3] Fix sort --- ld_eventsource/testing/test_sse_client_retry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ld_eventsource/testing/test_sse_client_retry.py b/ld_eventsource/testing/test_sse_client_retry.py index 8e57f19..fba2fe8 100644 --- a/ld_eventsource/testing/test_sse_client_retry.py +++ b/ld_eventsource/testing/test_sse_client_retry.py @@ -1,8 +1,9 @@ +from time import sleep + from ld_eventsource import * from ld_eventsource.actions import * from ld_eventsource.config import * from ld_eventsource.testing.helpers import * -from time import sleep def test_retry_during_initial_connect_succeeds():