Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions ld_eventsource/sse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(

self.__connection_client = connect.create_client(logger)
self.__connection_result: Optional[ConnectionResult] = None
self.__connected_time: float = 0
self._retry_reset_baseline: float = 0
self.__disconnected_time: float = 0

self.__closed = False
Expand Down Expand Up @@ -248,10 +248,17 @@ def next_retry_delay(self) -> float:
return self.__next_retry_delay

def _compute_next_retry_delay(self):
if self.__retry_delay_reset_threshold > 0 and self.__connected_time != 0:
connection_duration = time.time() - self.__connected_time
# If the __retry_reset_baseline is 0, then we haven't successfully connected yet.
#
# In those situations, we don't want to reset the retry delay strategy;
# it should continue to double until the retry maximum, and then hold
# steady (- jitter).
if self.__retry_delay_reset_threshold > 0 and self._retry_reset_baseline != 0:
now = time.time()
connection_duration = now - self._retry_reset_baseline
if connection_duration >= self.__retry_delay_reset_threshold:
self.__current_retry_delay_strategy = self.__base_retry_delay_strategy
self._retry_reset_baseline = now
self.__next_retry_delay, self.__current_retry_delay_strategy = (
self.__current_retry_delay_strategy.apply(self.__base_retry_delay)
)
Expand Down Expand Up @@ -287,7 +294,7 @@ def _try_start(self, can_return_fault: bool) -> Optional[Fault]:
# If can_return_fault is false, it means the caller explicitly called start(), in
# which case there's no way to return a Fault so we just keep retrying transparently.
continue
self.__connected_time = time.time()
self._retry_reset_baseline = time.time()
self.__current_error_strategy = self.__base_error_strategy
self.__interrupted = False
return None
Expand Down
53 changes: 53 additions & 0 deletions ld_eventsource/testing/test_sse_client_retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from time import sleep

from ld_eventsource import *
from ld_eventsource.actions import *
from ld_eventsource.config import *
Expand Down Expand Up @@ -101,6 +103,57 @@ def test_all_iterator_continues_after_retry():
assert client.next_retry_delay == initial_delay * 2


def test_retry_delay_gets_reset_after_threshold():
initial_delay = 0.005
retry_delay_reset_threshold = 0.1
mock = MockConnectStrategy(
RespondWithData("data: data1\n\n"),
RejectConnection(HTTPStatusError(503)),
)
with SSEClient(
connect=mock,
error_strategy=ErrorStrategy.always_continue(),
initial_retry_delay=initial_delay,
retry_delay_reset_threshold=retry_delay_reset_threshold,
retry_delay_strategy=RetryDelayStrategy.default(jitter_multiplier=None),
) as client:
assert client._retry_reset_baseline == 0
all = client.all

# Establish a successful connection
item1 = next(all)
assert isinstance(item1, Start)
assert client._retry_reset_baseline != 0

item2 = next(all)
assert isinstance(item2, Event)
assert item2.data == 'data1'

# Stream is dropped and then fails to re-connect, resulting in backoff.
item3 = next(all)
assert isinstance(item3, Fault)
assert client.next_retry_delay == initial_delay

item4 = next(all)
assert isinstance(item4, Fault)
assert client.next_retry_delay == initial_delay * 2

# Sleeping the threshold should reset the retry thresholds
sleep(retry_delay_reset_threshold)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little scary, but I assume we don't have an ability to mock advance time.


# Which we see it does here
item5 = next(all)
assert isinstance(item5, Fault)
assert client.next_retry_delay == initial_delay

# And if we don't sleep long enough, it doesn't get reset.
sleep(retry_delay_reset_threshold / 2)

item6 = next(all)
assert isinstance(item6, Fault)
assert client.next_retry_delay == initial_delay * 2


def test_can_interrupt_and_restart_stream():
initial_delay = 0.005
mock = MockConnectStrategy(
Expand Down