Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f68bbf7
feat: preserve SFU error code in SignalingError
aliev Mar 17, 2026
bd33f67
feat: add SfuJoinError and retryable error detection in connect_webso…
aliev Mar 17, 2026
369a085
feat: pass migrating_from to coordinator join request
aliev Mar 17, 2026
8678b80
feat: retry connect() on SFU full by requesting a different SFU
aliev Mar 17, 2026
8f73de8
chore: remove dead retry code replaced by error-code-based detection
aliev Mar 17, 2026
6a352b9
style: remove extra blank line left after dead code removal
aliev Mar 17, 2026
536f95d
style: apply ruff formatting
aliev Mar 17, 2026
311447d
style: apply ruff formatting to test_signaling.py
aliev Mar 17, 2026
61da611
refactor: extract _handle_join_failure from connect() retry loop
aliev Mar 17, 2026
2cf9b73
refactor: use exp_backoff with sleep parameter in connect() retry loop
aliev Mar 17, 2026
bacbce1
refactor: move ConnectionManager import to module level in tests
aliev Mar 17, 2026
5af1ac6
Merge branch 'main' into feat/retry-connect-on-sfu-full
aliev Mar 17, 2026
9db988d
fix: close ws_client on connect_websocket failure to prevent thread leak
aliev Mar 17, 2026
6a34c05
test: mock exp_backoff in connect() tests to avoid real sleep delays
aliev Mar 17, 2026
857d3c7
chore: update uv.lock
aliev Mar 17, 2026
efd9b09
refactor: use pytest fixture for ConnectionManager setup in tests
aliev Mar 17, 2026
ab8a27c
refactor: use fixtures in test_connection_utils, snapshot mutable lis…
aliev Mar 17, 2026
df9a07c
test: assert ws_client cleanup behavior, not just retry count
aliev Mar 17, 2026
de6d889
refactor: remove sleep param from exp_backoff, keep sleep in caller
aliev Mar 18, 2026
23dc2f2
refactor: remove redundant _instant_backoff test helper
aliev Mar 18, 2026
dd99813
test: assert retry count in exhausted-retries test
aliev Mar 18, 2026
66708cd
refactor: extract _connect_with_sfu_reassignment from connect()
aliev Mar 18, 2026
41ec538
chore: add utility script for testing SFU connection and retry behavior
aliev Mar 18, 2026
33d18b9
refactor: extract last_failed variable for clarity in _connect_internal
aliev Mar 19, 2026
2a75703
refactor: raise directly from except instead of tracking last_error
aliev Mar 19, 2026
ab392f2
feat: validate max_join_retries and extract patched_dependencies helper
aliev Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions getstream/video/rtc/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
from getstream.video.rtc.connection_utils import (
ConnectionState,
SfuConnectionError,
SfuJoinError,
ConnectionOptions,
connect_websocket,
join_call,
watch_call,
)
from getstream.video.rtc.coordinator.backoff import exp_backoff
from getstream.video.rtc.track_util import (
fix_sdp_msid_semantic,
fix_sdp_rtcp_fb,
Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(
user_id: Optional[str] = None,
create: bool = True,
subscription_config: Optional[SubscriptionConfig] = None,
max_join_retries: int = 3,
**kwargs: Any,
):
super().__init__()
Expand All @@ -68,6 +71,7 @@ def __init__(
self.session_id: str = str(uuid.uuid4())
self.join_response: Optional[JoinCallResponse] = None
self.local_sfu: bool = False # Local SFU flag for development
self._max_join_retries: int = max_join_retries

# Private attributes
self._connection_state: ConnectionState = ConnectionState.IDLE
Expand Down Expand Up @@ -282,6 +286,7 @@ async def _connect_internal(
ws_url: Optional[str] = None,
token: Optional[str] = None,
session_id: Optional[str] = None,
migrating_from_list: Optional[list] = None,
) -> None:
"""
Internal connection method that handles the core connection logic.
Expand Down Expand Up @@ -324,6 +329,10 @@ async def _connect_internal(
"auto",
self.create,
self.local_sfu,
migrating_from=migrating_from_list[-1]
if migrating_from_list
else None,
migrating_from_list=migrating_from_list,
**self.kwargs,
)
ws_url = join_response.data.credentials.server.ws_endpoint
Expand Down Expand Up @@ -395,6 +404,8 @@ async def _connect_internal(
logger.exception(f"No join response from WebSocket: {sfu_event}")

logger.debug(f"WebSocket connected successfully to {ws_url}")
except SfuJoinError:
raise
except Exception as e:
logger.exception(f"Failed to connect WebSocket to {ws_url}: {e}")
raise SfuConnectionError(f"WebSocket connection failed: {e}") from e
Expand Down Expand Up @@ -427,7 +438,8 @@ async def connect(self):
Connect to SFU.

This method automatically handles retry logic for transient errors
like "server is full" and network issues.
like "server is full" by requesting a different SFU from the
coordinator.
"""
logger.info("Connecting to SFU")
# Fire-and-forget the coordinator WS connection so we don't block here
Expand All @@ -445,7 +457,52 @@ def _on_coordinator_task_done(task: asyncio.Task):
logger.exception("Coordinator WS task failed")

self._coordinator_task.add_done_callback(_on_coordinator_task_done)
await self._connect_internal()

failed_sfus: list[str] = []
last_error: Optional[SfuJoinError] = None

# First attempt without delay
attempt = 0
try:
await self._connect_internal()
return
except SfuJoinError as e:
last_error = e
self._handle_join_failure(e, attempt, failed_sfus)

# Retries with exponential backoff, requesting a different SFU
async for delay in exp_backoff(
max_retries=self._max_join_retries, base=0.5, sleep=True
):
attempt += 1
logger.info(f"Retrying with different SFU (waited {delay}s)...")
try:
await self._connect_internal(
migrating_from_list=failed_sfus if failed_sfus else None,
)
return
except SfuJoinError as e:
last_error = e
self._handle_join_failure(e, attempt, failed_sfus)

raise last_error # type: ignore[misc]

def _handle_join_failure(
self, error: SfuJoinError, attempt: int, failed_sfus: list[str]
) -> None:
"""Track a failed SFU and clean up partial connection state."""
if self.join_response and self.join_response.credentials:
edge = self.join_response.credentials.server.edge_name
if edge and edge not in failed_sfus:
failed_sfus.append(edge)
logger.warning(
f"SFU join failed (attempt {attempt + 1}/{1 + self._max_join_retries}, "
f"code={error.error_code}). Failed SFUs: {failed_sfus}"
)
if self._ws_client:
self._ws_client.close()
self._ws_client = None
self.connection_state = ConnectionState.IDLE

async def wait(self):
"""
Expand Down
74 changes: 34 additions & 40 deletions getstream/video/rtc/connection_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@
"connect_websocket",
]

# Private constants - internal use only
_RETRYABLE_ERROR_PATTERNS = [
"server is full",
"server overloaded",
"capacity exceeded",
"try again later",
"service unavailable",
"connection timeout",
"network error",
"temporary failure",
"connection refused",
"connection reset",
]


# Public classes and exceptions
class ConnectionState(Enum):
Expand All @@ -94,6 +80,22 @@ class SfuConnectionError(Exception):
pass


class SfuJoinError(SfuConnectionError):
"""Raised when SFU join fails with a retryable error code."""

def __init__(self, message: str, error_code: int = 0, should_retry: bool = False):
super().__init__(message)
self.error_code = error_code
self.should_retry = should_retry


_RETRYABLE_SFU_ERROR_CODES = {
700, # ERROR_CODE_SFU_FULL
600, # ERROR_CODE_SFU_SHUTTING_DOWN
301, # ERROR_CODE_CALL_PARTICIPANT_LIMIT_REACHED
}


@dataclass
class ConnectionOptions:
"""Options for the connection process."""
Expand Down Expand Up @@ -175,6 +177,8 @@ async def join_call_coordinator_request(
notify: Optional[bool] = None,
video: Optional[bool] = None,
location: Optional[str] = None,
migrating_from: Optional[str] = None,
migrating_from_list: Optional[list] = None,
) -> StreamResponse[JoinCallResponse]:
"""Make a request to join a call via the coordinator.

Expand Down Expand Up @@ -208,6 +212,10 @@ async def join_call_coordinator_request(
video=video,
data=data,
)
if migrating_from:
json_body["migrating_from"] = migrating_from
if migrating_from_list:
json_body["migrating_from_list"] = migrating_from_list

# Make the POST request to join the call
return await client.post(
Expand Down Expand Up @@ -450,32 +458,18 @@ async def connect_websocket(
logger.debug("WebSocket connection established")
return ws_client, sfu_event

except SignalingError as e:
if (
e.error
and hasattr(e.error, "code")
and e.error.code in _RETRYABLE_SFU_ERROR_CODES
):
raise SfuJoinError(
str(e),
error_code=e.error.code,
should_retry=True,
) from e
raise
except Exception as e:
logger.error(f"Failed to connect WebSocket to {ws_url}: {e}")
raise SignalingError(f"WebSocket connection failed: {e}")


# Private functions
def _is_retryable(retry_state: Any) -> bool:
"""Check if an error should be retried.

Args:
retry_state: The retry state object from tenacity

Returns:
True if the error should be retried, False otherwise
"""
# Extract the actual exception from the retry state
if hasattr(retry_state, "outcome") and retry_state.outcome.failed:
error = retry_state.outcome.exception()
else:
return False

# Import here to avoid circular imports
from getstream.video.rtc.signaling import SignalingError

if not isinstance(error, (SignalingError, SfuConnectionError)):
return False

error_message = str(error).lower()
return any(pattern in error_message for pattern in _RETRYABLE_ERROR_PATTERNS)
9 changes: 8 additions & 1 deletion getstream/video/rtc/coordinator/backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
when reconnecting to failed WebSocket connections.
"""

import asyncio
import logging
from typing import AsyncIterator

logger = logging.getLogger(__name__)


async def exp_backoff(
max_retries: int, base: float = 1.0, factor: float = 2.0
max_retries: int,
base: float = 1.0,
factor: float = 2.0,
sleep: bool = False,
) -> AsyncIterator[float]:
"""
Generate exponential backoff delays for retry attempts.
Expand All @@ -21,6 +25,7 @@ async def exp_backoff(
max_retries: Maximum number of retry attempts
base: Base delay in seconds for the first retry
factor: Multiplicative factor for each subsequent retry
sleep: If True, sleep for the delay before yielding

Yields:
float: Delay in seconds for each retry attempt
Expand All @@ -39,4 +44,6 @@ async def exp_backoff(
for attempt in range(max_retries):
delay = base * (factor**attempt)
logger.debug(f"Backoff attempt {attempt + 1}/{max_retries}: {delay}s delay")
if sleep:
await asyncio.sleep(delay)
yield delay
10 changes: 7 additions & 3 deletions getstream/video/rtc/signaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
class SignalingError(Exception):
"""Exception raised for errors in the signaling process."""

pass
def __init__(self, message: str, error=None):
super().__init__(message)
self.error = error


class WebSocketClient(StreamAsyncIOEventEmitter):
Expand Down Expand Up @@ -111,8 +113,10 @@ async def connect(self):

# Check if the first message is an error
if self.first_message and self.first_message.HasField("error"):
error_msg = self.first_message.error.error.message
raise SignalingError(f"Connection failed: {error_msg}")
sfu_error = self.first_message.error.error
raise SignalingError(
f"Connection failed: {sfu_error.message}", error=sfu_error
)

# Check if we got join_response
if self.first_message and self.first_message.HasField("join_response"):
Expand Down
33 changes: 33 additions & 0 deletions tests/rtc/coordinator/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import pytest
from unittest.mock import patch, AsyncMock
from getstream.video.rtc.coordinator.backoff import exp_backoff


Expand Down Expand Up @@ -84,3 +85,35 @@ async def test_exp_backoff_fractional_factor():
assert actual_delays == expected_delays, (
f"Expected delays {expected_delays}, but got {actual_delays}"
)


@pytest.mark.asyncio
async def test_exp_backoff_sleep():
"""Test that sleep=True calls asyncio.sleep with each delay."""
expected_delays = [0.5, 1.0, 2.0]

with patch(
"getstream.video.rtc.coordinator.backoff.asyncio.sleep",
new_callable=AsyncMock,
) as mock_sleep:
actual_delays = []
async for delay in exp_backoff(max_retries=3, base=0.5, sleep=True):
actual_delays.append(delay)

assert actual_delays == expected_delays
assert mock_sleep.await_count == 3
for expected, call in zip(expected_delays, mock_sleep.await_args_list):
assert call.args[0] == expected


@pytest.mark.asyncio
async def test_exp_backoff_no_sleep_by_default():
"""Test that sleep=False (default) does not call asyncio.sleep."""
with patch(
"getstream.video.rtc.coordinator.backoff.asyncio.sleep",
new_callable=AsyncMock,
) as mock_sleep:
async for _ in exp_backoff(max_retries=3):
pass

mock_sleep.assert_not_awaited()
Loading