diff --git a/Packages/StreamVideo/.agents/skills/write-test/SKILL.md b/Packages/StreamVideo/.agents/skills/write-test/SKILL.md index 721bb83b..315f3af8 100644 --- a/Packages/StreamVideo/.agents/skills/write-test/SKILL.md +++ b/Packages/StreamVideo/.agents/skills/write-test/SKILL.md @@ -53,33 +53,23 @@ This is a master kill-switch that prevents test code from compiling in productio ## Test Method Naming -### Runtime / integration tests - -Use the **`When__expect_`** pattern with underscores: +**All tests** (editor and runtime) use the same pattern: **`When__expect_`** with underscores: ``` +When_new_instance_expect_empty +When_regenerate_expect_version_increments +When_clear_expect_version_unchanged When_two_clients_join_same_call_expect_no_errors When_setting_call_custom_data_expect_custom_data_set When_participant_pinned_expect_pinned_participants_changed_event_fired ``` -The private async companion uses the same name with an `_Async` suffix: +For runtime tests that use the async bridge, the private async companion uses the same name with an `_Async` suffix: ``` When_two_clients_join_same_call_expect_no_errors_Async ``` -### Editor / unit tests - -Use **`__`** or a descriptive phrase: - -``` -NewInstance_IsEmpty -Regenerate_IncrementsVersion -Clear_DoesNotResetVersion -Regenerate_AfterClear_RestoresNonEmptyStateAndIncrementsVersion -``` - ## Writing an Editor Test (no client connection) 1. Create `{Feature}Tests.cs` in `Tests/Editor/`. @@ -109,7 +99,7 @@ namespace StreamVideo.Tests.Editor } [Test] - public void SomeProperty_DefaultValue_IsExpected() + public void When_default_state_expect_some_property_has_expected_value() { Assert.That(_myFeature.SomeProperty, Is.EqualTo(expectedValue), "Descriptive failure message explaining what went wrong."); @@ -225,7 +215,7 @@ These constants are defined in `TestsBase`. 1. **Always** wrap files in `#if STREAM_TESTS_ENABLED` / `#endif`. 2. **Never** create a new `.asmdef` — use the existing `Tests/Editor/` or `Tests/Runtime/` folder. 3. **Inherit `TestsBase`** for runtime tests; **don't** for pure-logic editor tests. -4. **Follow the naming patterns**: `When__expect_` for runtime; `__` for unit tests. +4. **Follow the naming pattern**: `When__expect_` for all tests (editor and runtime). 5. **Use the async-bridge pattern**: public `[UnityTest]` → `IEnumerator` → `ConnectAndExecute` → private `async Task` with `_Async` suffix. 6. **Add descriptive assertion messages** to every assertion. 7. **Don't manage client lifecycle manually** — let `TestsBase` and `StreamTestClientProvider` handle it. diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs new file mode 100644 index 00000000..ae71ee25 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs @@ -0,0 +1,55 @@ +namespace StreamVideo.Core.LowLevelClient +{ + /// + /// Guards reconnection attempts based on the current . + /// Prevents reconnection in states where it would be unsafe or redundant + /// (e.g. already reconnecting, joining, leaving, or left). + /// + internal class ReconnectGuard + { + /// + /// Whether a reconnection attempt is currently in progress. + /// This flag protects against race conditions when multiple peer connections + /// (Publisher and Subscriber) trigger reconnection simultaneously, before + /// the has been updated. + /// + public bool IsReconnecting { get; private set; } + + /// + /// Attempts to begin a reconnection. Returns true if the guard allows it, + /// false if the request should be silently ignored. + /// + public bool TryBeginReconnection(CallingState currentState) + { + if (IsIgnoredState(currentState)) + { + return false; + } + + if (IsReconnecting) + { + return false; + } + + IsReconnecting = true; + return true; + } + + /// + /// Marks the current reconnection attempt as complete. + /// Must be called in a finally block to ensure the guard is released. + /// + public void EndReconnection() + { + IsReconnecting = false; + } + + private static bool IsIgnoredState(CallingState state) + => state == CallingState.Reconnecting + || state == CallingState.Migrating + || state == CallingState.Joining + || state == CallingState.Leaving + || state == CallingState.Left + || state == CallingState.ReconnectingFailed; + } +} diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs.meta b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs.meta new file mode 100644 index 00000000..05614d2d --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/ReconnectGuard.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: b0547954dbe31ba439f6511b40235ee9 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs index eeee1ccc..145c89db 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs @@ -6,7 +6,6 @@ using System.Linq; using System.Net.Http; using System.Net.WebSockets; -using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using StreamVideo.v1.Sfu.Events; @@ -55,7 +54,7 @@ namespace StreamVideo.Core.LowLevelClient //StreamTodo: reconnect flow needs to send `UpdateSubscription` https://getstream.slack.com/archives/C022N8JNQGZ/p1691139853890859?thread_ts=1691139571.281779&cid=C022N8JNQGZ //StreamTodo: decide lifetime, if the obj persists across session maybe it should be named differently and only return struct handle to a session - internal sealed class RtcSession : IMediaInputProvider, ISfuClient, IDisposable + internal class RtcSession : IMediaInputProvider, ISfuClient, IDisposable { // Static session counter to track the number of sessions created private static int _sessionCounter = 0; @@ -130,7 +129,7 @@ private set public CallingState CallState { get => _callState; - private set + internal set { if (_callState == value) { @@ -144,7 +143,7 @@ private set } } - public StreamCall ActiveCall { get; private set; } + public StreamCall ActiveCall { get; internal set; } public SubscriberPeerConnection Subscriber { get; private set; } public PublisherPeerConnection Publisher { get; private set; } @@ -579,7 +578,7 @@ public async Task DoJoin(JoinCallData joinCallData, CancellationToken cancellati reconnectDetails.Subscriptions.AddRange(desiredTracks); } - var joinRequest = new SfuWebSocket.ConnectRequest + var joinRequest = new SfuConnectRequest { ReconnectDetails = reconnectDetails }; @@ -922,12 +921,140 @@ public void ResumeAndroidAudioPlayback() /// public void TrySetPublisherVideoTrackEnabled(bool isEnabled) => PublisherVideoTrackIsEnabled = isEnabled; + protected virtual bool ArePeerConnectionsHealthy() + => (Publisher?.IsHealthy ?? false) && (Subscriber?.IsHealthy ?? false); + + protected virtual bool IsOfflineTimeWithinFastReconnectDeadline() + { + var offlineTime = _timeService.UtcNow - _lastTimeOffline; + return offlineTime.TotalSeconds <= _fastReconnectDeadlineSeconds; + } + + //StreamTODO: add triggering from SFU WS closed. + // In JS -> Call.ts -> onSignalClose -> handleSfuSignalClose -> reconnect + //StreamTODO: add triggering from network changed -> js Call.ts "network.changed" + protected virtual async Task Reconnect(WebsocketReconnectStrategy strategy, string reason) + { + if (!AssertMainThread()) + { + _pendingReconnectRequest = new ValueTuple(strategy, reason); + return; + } + + if (!_reconnectGuard.TryBeginReconnection(CallState)) + { + _logs.WarningIfDebug($"[Reconnect] Ignoring reconnect request. CallState: {CallState}, IsReconnecting: {_reconnectGuard.IsReconnecting}"); + return; + } + + _logs.WarningIfDebug($"--------- Reconnection FLOW TRIGGERED ---------- strategy: {strategy}, reason: {reason}"); + + try + { + _reconnectStrategy = strategy; + _reconnectReason = reason; + + var finishedStates = new[] { CallingState.Joined, CallingState.ReconnectingFailed, CallingState.Left, CallingState.Offline }; + + var attempt = 0; + var reconnectStartTime = _timeService.UtcNow; + + //StreamTODO: we should handle cancellation token between each await + + do + { + // StreamTODO: consider give up timeout. JS has it + + // Only increment attempts if the strategy is not FAST + if (_reconnectStrategy != WebsocketReconnectStrategy.Fast) + { + _reconnectAttempts++; + } + + try + { + _logs.Info("Reconnect with strategy: " + _reconnectStrategy); + + switch (_reconnectStrategy) + { + case WebsocketReconnectStrategy.Unspecified: + case WebsocketReconnectStrategy.Disconnect: + + // Log warning + + break; + case WebsocketReconnectStrategy.Fast: + await ReconnectFast(); + break; + case WebsocketReconnectStrategy.Rejoin: + await ReconnectRejoin(); + break; + case WebsocketReconnectStrategy.Migrate: + await ReconnectMigrate(); + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + catch (Exception e) + { + _logs.ExceptionIfDebug(e); + + // We don't check for Offline CallState here because the call will remain Offline until connecting back again, even if network is back online + if (!_networkMonitor.IsNetworkAvailable) + { + _logs.WarningIfDebug("[Reconnect] Can't reconnect while network is down, stopping reconnection attempts"); + break; + } + + if (e is StreamApiException apiException && apiException.Unrecoverable) + { + _logs.Error("Can't reconnect due to coordinator unrecoverable error: " + apiException); + CallState = CallingState.ReconnectingFailed; + throw; + } + + await ReconnectRetryDelay(); + + var wasMigrating = _reconnectStrategy == WebsocketReconnectStrategy.Migrate; + + var fastReconnectTimeout = (_timeService.UtcNow - reconnectStartTime).TotalSeconds > + _fastReconnectDeadlineSeconds; + + var peerConnectionsHealthy = ArePeerConnectionsHealthy(); + + // don't immediately switch to the REJOIN strategy, but instead attempt + // to reconnect with the FAST strategy for a few times before switching. + // in some cases, we immediately switch to the REJOIN strategy. + var shouldRejoin = fastReconnectTimeout || wasMigrating || attempt >= CallRejoinMaxFastAttempts || + !peerConnectionsHealthy; + + attempt++; + _reconnectStrategy + = shouldRejoin ? WebsocketReconnectStrategy.Rejoin : WebsocketReconnectStrategy.Fast; + + _logs.WarningIfDebug( + $"Reconnect failed, attempt: {attempt}, next strategy: {_reconnectStrategy}, wasMigrating: {wasMigrating}, " + + $"fastReconnectTimeout: {fastReconnectTimeout}, arePeerConnectionsHealthy: {peerConnectionsHealthy}"); + + //StreamTODO: handle cancellation token + } + } while (finishedStates.All(s => s != CallState)); + + _reconnectAttempts = 0; + } + finally + { + _reconnectGuard.EndReconnection(); + } + } + private const float TrackSubscriptionDebounceTime = 0.1f; private const int CallJoinMaxRetries = 3; - private const int CallRejoinMaxFastAttempts = 3; + internal const int CallRejoinMaxFastAttempts = 3; private readonly ISfuWebSocketFactory _sfuWebSocketFactory; - private SfuWebSocket _sfuWebSocket; + private ISfuWebSocket _sfuWebSocket; private readonly ISerializer _serializer; private readonly ILogs _logs; private readonly ITimeService _timeService; @@ -977,14 +1104,10 @@ private readonly Dictionary _incomingAudioRequestedByParticipantSe private CancellationTokenSource _joinCallCts; private CancellationTokenSource _activeCallCts; - /// - /// Flag to track if a reconnection is in progress. This prevents parallel reconnection - /// attempts from both Publisher and Subscriber peer connections. - /// - private bool _isReconnecting; + private readonly ReconnectGuard _reconnectGuard = new ReconnectGuard(); private TaskCompletionSource _joinTaskCompletionSource; - private int _fastReconnectDeadlineSeconds; + internal int _fastReconnectDeadlineSeconds; private Task _ongoingStopTask; private WebsocketReconnectStrategy _reconnectStrategy = WebsocketReconnectStrategy.Unspecified; @@ -1558,141 +1681,7 @@ private void TryExecutePendingReconnectRequest() } } - //StreamTODO: add triggering from SFU WS closed. - // In JS -> Call.ts -> onSignalClose -> handleSfuSignalClose -> reconnect - //StreamTODO: add triggering from network changed -> js Call.ts "network.changed" - private async Task Reconnect(WebsocketReconnectStrategy strategy, string reason) - { - if (!AssertMainThread()) - { - _pendingReconnectRequest = new ValueTuple(strategy, reason); - return; - } - - // Ignore reconnection requests if we're already reconnecting, migrating, or joining - // This prevents parallel reconnection attempts from both Publisher and Subscriber - var ignoredStates = new[] - { - CallingState.Reconnecting, CallingState.Migrating, CallingState.Joining, - CallingState.Leaving, CallingState.Left, CallingState.ReconnectingFailed - }; - if (ignoredStates.Any(s => s == CallState)) - { - _logs.WarningIfDebug($"[Reconnect] Ignoring reconnect request because CallState is {CallState}"); - return; - } - - // Use a flag to track if we're in the process of reconnecting - // This protects against race conditions before CallState is updated - if (_isReconnecting) - { - _logs.WarningIfDebug($"[Reconnect] Ignoring reconnect request because reconnection is already in progress"); - return; - } - - _isReconnecting = true; - _logs.WarningIfDebug($"--------- Reconnection FLOW TRIGGERED ---------- strategy: {strategy}, reason: {reason}"); - - try - { - _reconnectStrategy = strategy; - _reconnectReason = reason; - - var finishedStates = new[] { CallingState.Joined, CallingState.ReconnectingFailed, CallingState.Left, CallingState.Offline }; - - var attempt = 0; - var reconnectStartTime = DateTime.UtcNow; - - //StreamTODO: we should handle cancellation token between each await - - do - { - // StreamTODO: consider give up timeout. JS has it - - // Only increment attempts if the strategy is not FAST - if (_reconnectStrategy != WebsocketReconnectStrategy.Fast) - { - _reconnectAttempts++; - } - - try - { - _logs.Info("Reconnect with strategy: " + _reconnectStrategy); - - switch (_reconnectStrategy) - { - case WebsocketReconnectStrategy.Unspecified: - case WebsocketReconnectStrategy.Disconnect: - - // Log warning - - break; - case WebsocketReconnectStrategy.Fast: - await ReconnectFast(); - break; - case WebsocketReconnectStrategy.Rejoin: - await ReconnectRejoin(); - break; - case WebsocketReconnectStrategy.Migrate: - await ReconnectMigrate(); - break; - default: - throw new ArgumentOutOfRangeException(); - } - } - catch (Exception e) - { - _logs.ExceptionIfDebug(e); - - if (CallState == CallingState.Offline) - { - _logs.WarningIfDebug("[Reconnect] Can't reconnect while offline, stopping reconnection attempts"); - break; - } - - if (e is StreamApiException apiException && apiException.Unrecoverable) - { - _logs.Error("Can't reconnect due to coordinator unrecoverable error: " + apiException); - CallState = CallingState.ReconnectingFailed; - throw; - } - - await Task.Delay(500, GetCurrentCancellationTokenOrDefault()); - - var wasMigrating = _reconnectStrategy == WebsocketReconnectStrategy.Migrate; - - var fastReconnectTimeout = (DateTime.UtcNow - reconnectStartTime).TotalSeconds > - _fastReconnectDeadlineSeconds; - - var arePeerConnectionsHealthy = (Publisher?.IsHealthy ?? false) && (Subscriber?.IsHealthy ?? false); - - // don't immediately switch to the REJOIN strategy, but instead attempt - // to reconnect with the FAST strategy for a few times before switching. - // in some cases, we immediately switch to the REJOIN strategy. - var shouldRejoin = fastReconnectTimeout || wasMigrating || attempt >= CallRejoinMaxFastAttempts || - !arePeerConnectionsHealthy; - - attempt++; - _reconnectStrategy - = shouldRejoin ? WebsocketReconnectStrategy.Rejoin : WebsocketReconnectStrategy.Fast; - - _logs.WarningIfDebug( - $"Reconnect failed, attempt: {attempt}, next strategy: {_reconnectStrategy}, wasMigrating: {wasMigrating}, " + - $"fastReconnectTimeout: {fastReconnectTimeout}, arePeerConnectionsHealthy: {arePeerConnectionsHealthy}"); - - //StreamTODO: handle cancellation token - } - } while (finishedStates.All(s => s != CallState)); - - _reconnectAttempts = 0; - } - finally - { - _isReconnecting = false; - } - } - - private async Task ReconnectFast() + protected virtual async Task ReconnectFast() { _reconnectStrategy = WebsocketReconnectStrategy.Fast; CallState = CallingState.Reconnecting; @@ -1706,7 +1695,7 @@ var getCallResponse } - private async Task ReconnectRejoin() + protected virtual async Task ReconnectRejoin() { _reconnectStrategy = WebsocketReconnectStrategy.Rejoin; CallState = CallingState.Reconnecting; @@ -1716,7 +1705,11 @@ private async Task ReconnectRejoin() RestoreSubscribedTracks(); } - private Task ReconnectMigrate() + // Separated to skip in tests. StreamTODO: better change to configurable delay + protected virtual Task ReconnectRetryDelay() + => Task.Delay(500, GetCurrentCancellationTokenOrDefault()); + + protected virtual Task ReconnectMigrate() { throw new NotImplementedException("Sfu migration is not yet implemented."); } @@ -2235,9 +2228,9 @@ private void DisposePublisher() /// Creates a new SfuWebSocket instance using the factory. /// Disposes the previous instance if one exists. /// - /// Returns the previous SfuWebSocket instance (if any) for cleanup after transition. - /// The newly created SfuWebSocket instance. - private SfuWebSocket CreateNewSfuWebSocket(out SfuWebSocket previousSfuWebSocket) + /// Returns the previous ISfuWebSocket instance (if any) for cleanup after transition. + /// The newly created ISfuWebSocket instance. + internal ISfuWebSocket CreateNewSfuWebSocket(out ISfuWebSocket previousSfuWebSocket) { previousSfuWebSocket = _sfuWebSocket; previousSfuWebSocket?.DebugMarkAsOld(); @@ -2271,7 +2264,7 @@ private void DisposeSfuWebSocket() } } - private async Task ClosePreviousSfuWebSocketAsync(SfuWebSocket previousSfuWebSocket, string reason) + private async Task ClosePreviousSfuWebSocketAsync(ISfuWebSocket previousSfuWebSocket, string reason) { if (previousSfuWebSocket == null) { @@ -2359,8 +2352,8 @@ private void OnSfuWebSocketDisconnected() SfuDisconnected?.Invoke(); } - var arePeerConnectionsHealthy = (Publisher?.IsHealthy ?? false) && (Subscriber?.IsHealthy ?? false); - var strategy = arePeerConnectionsHealthy + var peerConnectionsHealthy = ArePeerConnectionsHealthy(); + var strategy = peerConnectionsHealthy ? WebsocketReconnectStrategy.Fast : WebsocketReconnectStrategy.Rejoin; @@ -2378,7 +2371,7 @@ private async void OnNetworkAvailabilityChanged(bool isNetworkAvailable) _logs.WarningIfDebug("Going Online"); if (CallState == CallingState.Joining || CallState == CallingState.Reconnecting || - CallState == CallingState.Migrating || _isReconnecting) + CallState == CallingState.Migrating || _reconnectGuard.IsReconnecting) { _logs.WarningIfDebug( $"{nameof(OnNetworkAvailabilityChanged)} skipped - reconnection already in progress. CallState: {CallState}"); @@ -2398,12 +2391,11 @@ await _sfuWebSocket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, return; } - var offlineTime = DateTime.UtcNow - _lastTimeOffline; - var strategy = offlineTime.TotalSeconds > _fastReconnectDeadlineSeconds - ? WebsocketReconnectStrategy.Rejoin - : WebsocketReconnectStrategy.Fast; + var strategy = IsOfflineTimeWithinFastReconnectDeadline() + ? WebsocketReconnectStrategy.Fast + : WebsocketReconnectStrategy.Rejoin; - _logs.WarningIfDebug($"Reconnect triggered by {nameof(OnNetworkAvailabilityChanged)}. Strategy: {strategy}, offline time: {offlineTime}"); + _logs.WarningIfDebug($"Reconnect triggered by {nameof(OnNetworkAvailabilityChanged)}. Strategy: {strategy}"); await Reconnect(strategy, "Going online"); } catch (Exception e) @@ -2414,7 +2406,7 @@ await _sfuWebSocket.DisconnectAsync(WebSocketCloseStatus.NormalClosure, else { _logs.WarningIfDebug("Going Offline"); - _lastTimeOffline = DateTime.UtcNow; + _lastTimeOffline = _timeService.UtcNow; if (ActiveCall != null) { @@ -2437,7 +2429,7 @@ private static bool AssertCallIdMatch(IStreamCall activeCall, string callId, ILo return true; } - private void SubscribeToSfuEvents(SfuWebSocket sfuWebSocket) + private void SubscribeToSfuEvents(ISfuWebSocket sfuWebSocket) { sfuWebSocket.SubscriberOffer += OnSfuSubscriberOffer; sfuWebSocket.PublisherAnswer += OnSfuPublisherAnswer; @@ -2466,7 +2458,7 @@ private void SubscribeToSfuEvents(SfuWebSocket sfuWebSocket) sfuWebSocket.Disconnected += OnSfuWebSocketDisconnected; } - private void UnsubscribeFromSfuEvents(SfuWebSocket sfuWebSocket) + private void UnsubscribeFromSfuEvents(ISfuWebSocket sfuWebSocket) { sfuWebSocket.SubscriberOffer -= OnSfuSubscriberOffer; sfuWebSocket.PublisherAnswer -= OnSfuPublisherAnswer; diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs new file mode 100644 index 00000000..d1707ba4 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs @@ -0,0 +1,56 @@ +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using StreamVideo.v1.Sfu.Events; +using ICETrickle = StreamVideo.v1.Sfu.Models.ICETrickle; +using Error = StreamVideo.v1.Sfu.Events.Error; + +namespace StreamVideo.Core.LowLevelClient.WebSockets +{ + /// + /// Represents the WebSocket connection to the SFU (Selective Forwarding Unit). + /// Manages the signaling channel used to exchange session descriptions, ICE candidates, + /// participant events, and health checks between the client and the SFU server. + /// + internal interface ISfuWebSocket : IDisposable + { + event Action Connected; + event Action Disconnected; + + event Action SubscriberOffer; + event Action PublisherAnswer; + event Action ConnectionQualityChanged; + event Action AudioLevelChanged; + event Action IceTrickle; + event Action ChangePublishQuality; + event Action ParticipantJoined; + event Action ParticipantLeft; + event Action DominantSpeakerChanged; + event Action JoinResponse; + event Action HealthCheck; + event Action TrackPublished; + event Action TrackUnpublished; + event Action Error; + event Action CallGrantsUpdated; + event Action GoAway; + event Action IceRestart; + event Action PinsUpdated; + event Action CallEnded; + event Action ParticipantUpdated; + event Action ParticipantMigrationComplete; + event Action ChangePublishOptions; + event Action InboundStateNotification; + + bool IsLeaving { get; } + bool IsClosingClean { get; } + bool IsHealthy { get; } + + void Update(); + void InitNewSession(string sessionId, string sfuUrl, string sfuToken, + string subscriberOfferSdp, string publisherOfferSdp); + Task ConnectAsync(SfuConnectRequest request, CancellationToken cancellationToken = default); + Task DisconnectAsync(WebSocketCloseStatus closeStatus, string closeMessage); + void DebugMarkAsOld(); + } +} diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs.meta b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs.meta new file mode 100644 index 00000000..de4221cb --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocket.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 9270875e67e54524c916e9d16571cc51 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocketFactory.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocketFactory.cs index c49afc3c..be595d28 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocketFactory.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/ISfuWebSocketFactory.cs @@ -1,16 +1,16 @@ namespace StreamVideo.Core.LowLevelClient.WebSockets { /// - /// Factory for creating instances. + /// Factory for creating instances. /// This enables to create new SFU WebSocket connections /// when needed (e.g., for reconnection or migration scenarios). /// internal interface ISfuWebSocketFactory { /// - /// Creates a new instance. + /// Creates a new instance. /// - SfuWebSocket Create(); + ISfuWebSocket Create(); } } diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs new file mode 100644 index 00000000..ced01f5a --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs @@ -0,0 +1,13 @@ +using StreamVideo.v1.Sfu.Events; + +namespace StreamVideo.Core.LowLevelClient.WebSockets +{ + /// + /// Encapsulates the data needed to establish or re-establish a connection to the SFU, + /// including reconnect details such as strategy, attempt count, and previous session information. + /// + internal struct SfuConnectRequest + { + public ReconnectDetails ReconnectDetails; + } +} diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs.meta b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs.meta new file mode 100644 index 00000000..3c9f28b6 --- /dev/null +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuConnectRequest.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: fb03f28cd49f6374191c2e3a9aae694c +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs index de4f5b4f..fdbf7c39 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs @@ -19,13 +19,8 @@ namespace StreamVideo.Core.LowLevelClient.WebSockets { - internal class SfuWebSocket : BasePersistentWebSocket + internal class SfuWebSocket : BasePersistentWebSocket, ISfuWebSocket { - public struct ConnectRequest - { - public ReconnectDetails ReconnectDetails; - } - public event Action SubscriberOffer; public event Action PublisherAnswer; public event Action ConnectionQualityChanged; @@ -135,7 +130,7 @@ protected override void SendHealthCheck() WebsocketClient.Send(sfuRequestByteArray); } - protected override async Task ExecuteConnectAsync(ConnectRequest request, CancellationToken cancellationToken = default) + protected override async Task ExecuteConnectAsync(SfuConnectRequest request, CancellationToken cancellationToken = default) { if (ConnectionState == ConnectionState.Disconnecting || ConnectionState == ConnectionState.Closing) { diff --git a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocketFactory.cs b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocketFactory.cs index 6f017f81..c956df8d 100644 --- a/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocketFactory.cs +++ b/Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocketFactory.cs @@ -32,7 +32,7 @@ public SfuWebSocketFactory(Func websocketClientFactory, IAuthP } /// - public SfuWebSocket Create() + public ISfuWebSocket Create() { var websocketClient = _websocketClientFactory(); diff --git a/Packages/StreamVideo/Runtime/Libs/Time/ITimeService.cs b/Packages/StreamVideo/Runtime/Libs/Time/ITimeService.cs index 0edf7f89..ba3188c9 100644 --- a/Packages/StreamVideo/Runtime/Libs/Time/ITimeService.cs +++ b/Packages/StreamVideo/Runtime/Libs/Time/ITimeService.cs @@ -1,4 +1,4 @@ -namespace StreamVideo.Libs.Time +namespace StreamVideo.Libs.Time { /// /// Provides time information @@ -7,5 +7,6 @@ public interface ITimeService { float Time { get; } float DeltaTime { get; } + System.DateTime UtcNow { get; } } } \ No newline at end of file diff --git a/Packages/StreamVideo/Runtime/Libs/Time/UnityTime.cs b/Packages/StreamVideo/Runtime/Libs/Time/UnityTime.cs index c2314737..f0ca9c49 100644 --- a/Packages/StreamVideo/Runtime/Libs/Time/UnityTime.cs +++ b/Packages/StreamVideo/Runtime/Libs/Time/UnityTime.cs @@ -1,4 +1,4 @@ -namespace StreamVideo.Libs.Time +namespace StreamVideo.Libs.Time { /// /// based on @@ -7,5 +7,6 @@ public class UnityTime : ITimeService { public float Time => UnityEngine.Time.time; public float DeltaTime => UnityEngine.Time.deltaTime; + public System.DateTime UtcNow => System.DateTime.UtcNow; } } \ No newline at end of file diff --git a/Packages/StreamVideo/Tests/Editor/PackageXmlInstallerTests.cs b/Packages/StreamVideo/Tests/Editor/PackageXmlInstallerTests.cs index 9ff3c8bc..2d5753b0 100644 --- a/Packages/StreamVideo/Tests/Editor/PackageXmlInstallerTests.cs +++ b/Packages/StreamVideo/Tests/Editor/PackageXmlInstallerTests.cs @@ -1,4 +1,4 @@ -#if STREAM_TESTS_ENABLED +#if STREAM_TESTS_ENABLED using System; using System.Collections.Generic; using System.Linq; @@ -13,7 +13,7 @@ namespace StreamVideo.Tests.Editor public class PackageXmlInstallerTests { [Test] - public void All_package_xml_installer_inheritors_should_have_valid_link_xml_asset() + public void When_package_xml_installer_inheritors_expect_valid_link_xml_asset() { var inheritorTypes = FindAllPackageXmlInstallerBaseInheritors(); @@ -81,14 +81,14 @@ public void All_package_xml_installer_inheritors_should_have_valid_link_xml_asse [Test] - public void Package_xml_installer_base_should_exist() + public void When_looking_for_package_xml_installer_base_expect_type_exists() { var baseType = typeof(PackageXmlInstallerBase); Assert.IsNotNull(baseType, nameof(PackageXmlInstallerBase) + " type should exist"); } [Test] - public void All_package_xml_installer_inheritors_should_be_instantiable() + public void When_package_xml_installer_inheritors_expect_all_instantiable() { var inheritorTypes = FindAllPackageXmlInstallerBaseInheritors(); var nonInstantiableTypes = new List(); diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs b/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs new file mode 100644 index 00000000..c6746e22 --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs @@ -0,0 +1,342 @@ +#if STREAM_TESTS_ENABLED +using System; +using System.Net.Http; +using System.Net.WebSockets; +using System.Threading.Tasks; +using NSubstitute; +using NUnit.Framework; +using StreamVideo.Core.Configs; +using StreamVideo.Core.LowLevelClient; +using StreamVideo.Core.LowLevelClient.WebSockets; +using StreamVideo.Core.State; +using StreamVideo.Core.State.Caches; +using StreamVideo.Core.StatefulModels; +using StreamVideo.Libs.Logs; +using StreamVideo.Libs.NetworkMonitors; +using StreamVideo.Libs.Serialization; +using StreamVideo.Libs.Time; +using StreamVideo.v1.Sfu.Events; +using StreamVideo.v1.Sfu.Models; +using SfuErrorEvent = StreamVideo.v1.Sfu.Events.Error; + +namespace StreamVideo.Tests.Editor +{ + /// + /// Tests for the reconnection strategy selection in . + /// Validates that chooses the correct + /// in response to SFU WebSocket + /// and network availability events, based on peer connection health, + /// calling state, and offline duration. + /// + internal sealed class ReconnectFlowTests + { + [SetUp] + public void SetUp() + { + _sfuWebSocket = Substitute.For(); + _sfuWebSocket.IsLeaving.Returns(false); + _sfuWebSocket.IsClosingClean.Returns(false); + _sfuWebSocket.DisconnectAsync(Arg.Any(), Arg.Any()) + .Returns(Task.CompletedTask); + + var factory = Substitute.For(); + factory.Create().Returns(_sfuWebSocket); + + _networkMonitor = Substitute.For(); + _timeService = Substitute.For(); + + _session = new TestableRtcSession( + sfuWebSocketFactory: factory, + httpClientFactory: _ => null, + logs: Substitute.For(), + serializer: Substitute.For(), + timeService: _timeService, + lowLevelClient: null, + config: Substitute.For(), + networkMonitor: _networkMonitor + ); + + _session.CreateNewSfuWebSocket(out _); + } + + [TearDown] + public void TearDown() + { + _session?.Dispose(); + } + + [Test] + public void When_sfu_websocket_disconnects_and_peer_connections_healthy_expect_fast_reconnect() + { + _session.CallState = CallingState.Joined; + _session.PeerConnectionsHealthy = true; + + _sfuWebSocket.Disconnected += Raise.Event(); + + Assert.That(_session.LastReconnectStrategy, Is.EqualTo(WebsocketReconnectStrategy.Fast), + "When both peer connections are healthy, the SFU WS disconnect should trigger " + + "a Fast reconnect that reuses existing peer connections and establishes a new WebSocket only."); + } + + [Test] + public void When_sfu_websocket_disconnects_and_peer_connections_unhealthy_expect_rejoin() + { + _session.CallState = CallingState.Joined; + _session.PeerConnectionsHealthy = false; + + _sfuWebSocket.Disconnected += Raise.Event(); + + Assert.That(_session.LastReconnectStrategy, Is.EqualTo(WebsocketReconnectStrategy.Rejoin), + "When peer connections are unhealthy, the SFU WS disconnect should trigger " + + "a Rejoin that creates new peer connections and a new WebSocket."); + } + + [Test] + public void When_network_comes_back_online_within_fast_deadline_expect_fast_reconnect() + { + _session.CallState = CallingState.Joined; + _session.ActiveCall = CreateDummyCall(); + _session._fastReconnectDeadlineSeconds = 30; + + var baseTime = new DateTime(2025, 1, 1, 12, 0, 0, DateTimeKind.Utc); + + _timeService.UtcNow.Returns(baseTime); + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(false); + + _timeService.UtcNow.Returns(baseTime.AddSeconds(10)); + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(true); + + Assert.That(_session.LastReconnectStrategy, Is.EqualTo(WebsocketReconnectStrategy.Fast), + "When the device was offline for 10 seconds with a 30-second deadline, " + + "a Fast reconnect should be used because the SFU session is still alive."); + } + + [Test] + public void When_network_comes_back_online_after_fast_deadline_expect_rejoin() + { + _session.CallState = CallingState.Joined; + _session.ActiveCall = CreateDummyCall(); + _session._fastReconnectDeadlineSeconds = 30; + + var baseTime = new DateTime(2025, 1, 1, 12, 0, 0, DateTimeKind.Utc); + + _timeService.UtcNow.Returns(baseTime); + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(false); + + _timeService.UtcNow.Returns(baseTime.AddSeconds(60)); + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(true); + + Assert.That(_session.LastReconnectStrategy, Is.EqualTo(WebsocketReconnectStrategy.Rejoin), + "When the device was offline for 60 seconds with a 30-second deadline, " + + "a Rejoin is required because the SFU will have already cleaned up the session."); + } + + [Test] + public void When_network_goes_offline_expect_state_transitions_to_offline() + { + _session.CallState = CallingState.Joined; + _session.ActiveCall = CreateDummyCall(); + _timeService.UtcNow.Returns(DateTime.UtcNow); + + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(false); + + Assert.That(_session.CallState, Is.EqualTo(CallingState.Offline), + "Losing network connectivity should immediately transition " + + "the call state to Offline when there is an active call."); + } + + [Test] + public void When_sfu_error_with_unspecified_strategy_expect_no_reconnect_and_no_stop() + { + _session.CallState = CallingState.Joined; + + _sfuWebSocket.Error += Raise.Event>( + CreateSfuError(WebsocketReconnectStrategy.Unspecified)); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + "Unspecified reconnect strategy should not trigger any reconnection attempt."); + Assert.That(_session.CallState, Is.EqualTo(CallingState.Joined), + "Unspecified strategy should not call StopAsync — " + + "the call state should remain Joined."); + } + + [Test] + public void When_sfu_error_with_disconnect_strategy_expect_stop_instead_of_reconnect() + { + _session.CallState = CallingState.Joined; + + _sfuWebSocket.Error += Raise.Event>( + CreateSfuError(WebsocketReconnectStrategy.Disconnect)); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + "Disconnect strategy should not trigger Reconnect."); + Assert.That(_session.CallState, + Is.EqualTo(CallingState.Leaving).Or.EqualTo(CallingState.Left), + "Disconnect strategy should call StopAsync, which transitions " + + "the call state to Leaving or Left."); + } + + [TestCase(WebsocketReconnectStrategy.Fast)] + [TestCase(WebsocketReconnectStrategy.Rejoin)] + [TestCase(WebsocketReconnectStrategy.Migrate)] + public void When_sfu_error_with_reconnectable_strategy_expect_reconnect_with_matching_strategy( + WebsocketReconnectStrategy strategy) + { + _session.CallState = CallingState.Joined; + + _sfuWebSocket.Error += Raise.Event>(CreateSfuError(strategy)); + + Assert.That(_session.LastReconnectStrategy, Is.EqualTo(strategy), + $"SFU error with {strategy} strategy should trigger Reconnect " + + "using the same strategy the SFU instructed."); + } + + [Test] + public void When_sfu_go_away_received_expect_migrate_reconnect() + { + _session.CallState = CallingState.Joined; + + _sfuWebSocket.GoAway += Raise.Event>( + new GoAway { Reason = GoAwayReason.ShuttingDown }); + + Assert.That(_session.LastReconnectStrategy, + Is.EqualTo(WebsocketReconnectStrategy.Migrate), + "GoAway from the SFU should trigger a Migrate reconnect " + + "so the client moves to a different SFU instance."); + } + + [TestCase(CallingState.Joining)] + [TestCase(CallingState.Idle)] + [TestCase(CallingState.Left)] + [TestCase(CallingState.Reconnecting)] + public void When_sfu_websocket_disconnects_in_guarded_state_expect_no_reconnect(CallingState state) + { + _session.CallState = state; + _session.PeerConnectionsHealthy = true; + + _sfuWebSocket.Disconnected += Raise.Event(); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + $"SFU WS disconnect should be ignored when CallState is {state} " + + "because reconnection is either unsafe or redundant in this state."); + } + + [Test] + public void When_sfu_websocket_disconnects_while_leaving_expect_no_reconnect() + { + _session.CallState = CallingState.Joined; + _session.PeerConnectionsHealthy = true; + _sfuWebSocket.IsLeaving.Returns(true); + + _sfuWebSocket.Disconnected += Raise.Event(); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + "SFU WS disconnect should be ignored when the client is intentionally " + + "leaving the call (IsLeaving=true)."); + } + + [Test] + public void When_sfu_websocket_disconnects_while_closing_clean_expect_no_reconnect() + { + _session.CallState = CallingState.Joined; + _session.PeerConnectionsHealthy = true; + _sfuWebSocket.IsClosingClean.Returns(true); + + _sfuWebSocket.Disconnected += Raise.Event(); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + "SFU WS disconnect should be ignored when the WebSocket is being " + + "closed cleanly (IsClosingClean=true), e.g. during a planned reconnection."); + } + + [TestCase(CallingState.Joining)] + [TestCase(CallingState.Reconnecting)] + [TestCase(CallingState.Migrating)] + public void When_network_comes_back_online_in_guarded_state_expect_no_reconnect(CallingState state) + { + _session.CallState = state; + _session.ActiveCall = CreateDummyCall(); + + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(true); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + $"Going online should not trigger reconnection when CallState is {state} " + + "because a connection attempt is already in progress."); + } + + [Test] + public void When_network_comes_back_online_after_call_ended_expect_no_reconnect() + { + _session.CallState = CallingState.Left; + _session.ActiveCall = null; + _session._fastReconnectDeadlineSeconds = 30; + _timeService.UtcNow.Returns(DateTime.UtcNow); + + _networkMonitor.NetworkAvailabilityChanged + += Raise.Event(true); + + Assert.That(_session.LastReconnectStrategy, Is.Null, + "Going online should not trigger reconnection when the call has already ended " + + "because there is nothing to reconnect to."); + } + + private ISfuWebSocket _sfuWebSocket; + private INetworkMonitor _networkMonitor; + private ITimeService _timeService; + private TestableRtcSession _session; + + private static StreamCall CreateDummyCall() + => new StreamCall("test:dummy", + Substitute.For>(), + Substitute.For()); + + private static SfuErrorEvent CreateSfuError(WebsocketReconnectStrategy strategy) + => new SfuErrorEvent + { + Error_ = new StreamVideo.v1.Sfu.Models.Error + { + Message = "test error", + ShouldRetry = true, + }, + ReconnectStrategy = strategy, + }; + + /// + /// Test subclass of that overrides peer connection + /// health and reconnect execution, allowing tests to control inputs and verify + /// the chosen strategy without real networking or WebRTC dependencies. + /// + private class TestableRtcSession : RtcSession + { + public bool PeerConnectionsHealthy { get; set; } + public WebsocketReconnectStrategy? LastReconnectStrategy { get; private set; } + public string LastReconnectReason { get; private set; } + + public TestableRtcSession(ISfuWebSocketFactory sfuWebSocketFactory, + Func httpClientFactory, + ILogs logs, ISerializer serializer, ITimeService timeService, + StreamVideoLowLevelClient lowLevelClient, + IStreamClientConfig config, INetworkMonitor networkMonitor) + : base(sfuWebSocketFactory, httpClientFactory, logs, serializer, + timeService, lowLevelClient, config, networkMonitor) + { + } + + protected override bool ArePeerConnectionsHealthy() => PeerConnectionsHealthy; + + protected override Task Reconnect(WebsocketReconnectStrategy strategy, string reason) + { + LastReconnectStrategy = strategy; + LastReconnectReason = reason; + return Task.CompletedTask; + } + } + } +} +#endif diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs.meta b/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs.meta new file mode 100644 index 00000000..6bf1c385 --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectFlowTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 6e3b9ff668ca0e942abf0efe30e8fdce +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs b/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs new file mode 100644 index 00000000..c0cb763d --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs @@ -0,0 +1,85 @@ +#if STREAM_TESTS_ENABLED +using NUnit.Framework; +using StreamVideo.Core.LowLevelClient; + +namespace StreamVideo.Tests.Editor +{ + /// + /// Tests for — the guard logic that prevents + /// reconnection in states where it would be unsafe or redundant. + /// A bug here could cause parallel reconnect attempts, corrupt an in-progress + /// join, or resurrect a call after the user has left. + /// + internal sealed class ReconnectGuardTests + { + [SetUp] + public void SetUp() + { + _guard = new ReconnectGuard(); + } + + [TestCase(CallingState.Joined)] + [TestCase(CallingState.Offline)] + public void When_reconnect_triggered_in_allowed_state_expect_request_approved(CallingState state) + { + var result = _guard.TryBeginReconnection(state); + + Assert.That(result, Is.True, + $"Reconnection should be approved when CallingState is {state}."); + Assert.That(_guard.IsReconnecting, Is.True, + "IsReconnecting should be true after an approved request."); + } + + [TestCase(CallingState.Left)] + [TestCase(CallingState.Joining)] + [TestCase(CallingState.Leaving)] + [TestCase(CallingState.Reconnecting)] + [TestCase(CallingState.Migrating)] + [TestCase(CallingState.ReconnectingFailed)] + public void When_reconnect_triggered_in_ignored_state_expect_request_rejected(CallingState state) + { + var result = _guard.TryBeginReconnection(state); + + Assert.That(result, Is.False, + $"Reconnection should be rejected when CallingState is {state}."); + Assert.That(_guard.IsReconnecting, Is.False, + "IsReconnecting should remain false when the request is rejected."); + } + + [Test] + public void When_reconnect_triggered_while_already_reconnecting_expect_request_ignored() + { + _guard.TryBeginReconnection(CallingState.Joined); + + var secondResult = _guard.TryBeginReconnection(CallingState.Joined); + + Assert.That(secondResult, Is.False, + "A second reconnection request should be rejected while one is already in progress."); + Assert.That(_guard.IsReconnecting, Is.True, + "IsReconnecting should remain true from the first request."); + } + + [Test] + public void When_end_reconnection_called_expect_is_reconnecting_resets_and_new_attempt_allowed() + { + _guard.TryBeginReconnection(CallingState.Joined); + Assert.That(_guard.IsReconnecting, Is.True, + "Precondition: IsReconnecting should be true after a successful begin."); + + _guard.EndReconnection(); + + Assert.That(_guard.IsReconnecting, Is.False, + "EndReconnection should reset IsReconnecting to false."); + + var secondResult = _guard.TryBeginReconnection(CallingState.Joined); + + Assert.That(secondResult, Is.True, + "After EndReconnection, a new reconnection request should be approved."); + Assert.That(_guard.IsReconnecting, Is.True, + "IsReconnecting should be true again after the second approved request."); + } + + private ReconnectGuard _guard; + } +} +#endif diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs.meta b/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs.meta new file mode 100644 index 00000000..8e6af513 --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectGuardTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: bfc18924a8213dd4a80a803f99314f71 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs b/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs new file mode 100644 index 00000000..4ba4033a --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs @@ -0,0 +1,255 @@ +#if STREAM_TESTS_ENABLED +using System; +using System.Collections; +using System.Net.Http; +using System.Threading.Tasks; +using NSubstitute; +using NUnit.Framework; +using StreamVideo.Core.Configs; +using StreamVideo.Core.Exceptions; +using StreamVideo.Core.InternalDTO.Models; +using StreamVideo.Core.LowLevelClient; +using StreamVideo.Core.LowLevelClient.WebSockets; +using StreamVideo.Core.State; +using StreamVideo.Core.StatefulModels; +using StreamVideo.Libs.Logs; +using StreamVideo.Libs.NetworkMonitors; +using StreamVideo.Libs.Serialization; +using StreamVideo.Libs.Time; +using StreamVideo.Tests.Shared; +using StreamVideo.v1.Sfu.Models; +using UnityEngine.TestTools; + +namespace StreamVideo.Tests.Editor +{ + /// + /// Tests for the retry-loop escalation logic inside . + /// Validates that the loop correctly escalates from FAST to REJOIN based on + /// attempt count, peer connection health, fast-reconnect deadline, and error severity, + /// and that the loop terminates on offline state or unrecoverable errors. + /// + internal sealed class ReconnectRetryTests + { + [SetUp] + public void SetUp() + { + _timeService = Substitute.For(); + _timeService.UtcNow.Returns(new DateTime(2025, 1, 1, 12, 0, 0, DateTimeKind.Utc)); + + _networkMonitor = Substitute.For(); + _networkMonitor.IsNetworkAvailable.Returns(true); + + var factory = Substitute.For(); + factory.Create().Returns(Substitute.For()); + + _session = new RetryTestableRtcSession( + sfuWebSocketFactory: factory, + httpClientFactory: _ => null, + logs: Substitute.For(), + serializer: Substitute.For(), + timeService: _timeService, + lowLevelClient: null, + config: Substitute.For(), + networkMonitor: _networkMonitor + ); + } + + [TearDown] + public void TearDown() + { + _session?.Dispose(); + } + + [UnityTest] + public IEnumerator When_fast_reconnect_fails_max_allowed_times_expect_escalation_to_rejoin() + => When_fast_reconnect_fails_max_allowed_times_expect_escalation_to_rejoin_Async().RunAsIEnumerator(); + + private async Task When_fast_reconnect_fails_max_allowed_times_expect_escalation_to_rejoin_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 999; + _session.PeerConnectionsHealthy = true; + + await _session.CallReconnect(WebsocketReconnectStrategy.Fast, "test"); + + Assert.That(_session.FastReconnectCallCount, + Is.GreaterThanOrEqualTo(RtcSession.CallRejoinMaxFastAttempts), + "FAST should be attempted at least CallRejoinMaxFastAttempts times before escalating."); + Assert.That(_session.FastReconnectCallCount, + Is.LessThanOrEqualTo(RtcSession.CallRejoinMaxFastAttempts + 1), + "FAST attempts should be bounded by CallRejoinMaxFastAttempts."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(1), + "After exhausting FAST attempts, exactly one REJOIN should be triggered to complete the reconnection."); + } + + [UnityTest] + public IEnumerator When_fast_reconnect_fails_and_peer_connections_unhealthy_expect_immediate_rejoin() + => When_fast_reconnect_fails_and_peer_connections_unhealthy_expect_immediate_rejoin_Async().RunAsIEnumerator(); + + private async Task When_fast_reconnect_fails_and_peer_connections_unhealthy_expect_immediate_rejoin_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 999; + _session.PeerConnectionsHealthy = true; + _session.OnFastReconnectCalled = session => session.PeerConnectionsHealthy = false; + + await _session.CallReconnect(WebsocketReconnectStrategy.Fast, "test"); + + Assert.That(_session.FastReconnectCallCount, Is.EqualTo(1), + "FAST should be attempted exactly once; when peer connections become unhealthy " + + "during the attempt, there is no point retrying FAST."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(1), + "Unhealthy peer connections should cause immediate escalation to REJOIN " + + "without waiting for CallRejoinMaxFastAttempts."); + } + + [UnityTest] + public IEnumerator When_fast_reconnect_deadline_exceeded_expect_escalation_to_rejoin() + => When_fast_reconnect_deadline_exceeded_expect_escalation_to_rejoin_Async().RunAsIEnumerator(); + + private async Task When_fast_reconnect_deadline_exceeded_expect_escalation_to_rejoin_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 30; + _session.PeerConnectionsHealthy = true; + + var baseTime = new DateTime(2025, 1, 1, 12, 0, 0, DateTimeKind.Utc); + _timeService.UtcNow.Returns(baseTime, baseTime.AddSeconds(31)); + + await _session.CallReconnect(WebsocketReconnectStrategy.Fast, "test"); + + Assert.That(_session.FastReconnectCallCount, Is.EqualTo(1), + "FAST should be attempted exactly once; after the first failure the elapsed time " + + "exceeds the 30-second deadline, triggering immediate escalation."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(1), + "Exceeding the fast reconnect deadline should escalate to REJOIN " + + "because the SFU session has likely expired."); + } + + [UnityTest] + public IEnumerator When_device_goes_offline_during_reconnect_expect_loop_stops() + => When_device_goes_offline_during_reconnect_expect_loop_stops_Async().RunAsIEnumerator(); + + private async Task When_device_goes_offline_during_reconnect_expect_loop_stops_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 999; + _session.OnFastReconnectCalled = _ => _networkMonitor.IsNetworkAvailable.Returns(false); + + await _session.CallReconnect(WebsocketReconnectStrategy.Fast, "test"); + + Assert.That(_session.FastReconnectCallCount, Is.EqualTo(1), + "Reconnection should stop after detecting network is down without additional retry attempts."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(0), + "Going offline should stop the reconnection loop entirely, not escalate to REJOIN."); + } + + [UnityTest] + public IEnumerator When_migrate_reconnect_fails_expect_immediate_escalation_to_rejoin() + => When_migrate_reconnect_fails_expect_immediate_escalation_to_rejoin_Async().RunAsIEnumerator(); + + private async Task When_migrate_reconnect_fails_expect_immediate_escalation_to_rejoin_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 999; + _session.PeerConnectionsHealthy = true; + + await _session.CallReconnect(WebsocketReconnectStrategy.Migrate, "test"); + + Assert.That(_session.FastReconnectCallCount, Is.EqualTo(0), + "Migration failure should escalate directly to REJOIN, never to FAST."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(1), + "When Migrate reconnect fails (NotImplementedException), wasMigrating=true " + + "should cause immediate escalation to REJOIN."); + } + + [UnityTest] + public IEnumerator When_unrecoverable_api_error_during_reconnect_expect_reconnecting_failed() + => When_unrecoverable_api_error_during_reconnect_expect_reconnecting_failed_Async().RunAsIEnumerator(); + + private async Task When_unrecoverable_api_error_during_reconnect_expect_reconnecting_failed_Async() + { + _session.CallState = CallingState.Joined; + _session._fastReconnectDeadlineSeconds = 999; + _session.FastReconnectException = new StreamApiException( + new APIErrorInternalDTO { Unrecoverable = true, Message = "Unrecoverable test error" }); + + try + { + await _session.CallReconnect(WebsocketReconnectStrategy.Fast, "test"); + Assert.Fail("Expected StreamApiException to propagate from Reconnect."); + } + catch (StreamApiException) + { + } + + Assert.That(_session.CallState, Is.EqualTo(CallingState.ReconnectingFailed), + "An unrecoverable API error during reconnection should set CallState to ReconnectingFailed."); + Assert.That(_session.FastReconnectCallCount, Is.EqualTo(1), + "Reconnection should stop after the first unrecoverable error without retrying."); + Assert.That(_session.RejoinCallCount, Is.EqualTo(0), + "Unrecoverable errors should not escalate to REJOIN — there is no point retrying."); + } + + private ITimeService _timeService; + private INetworkMonitor _networkMonitor; + private RetryTestableRtcSession _session; + + /// + /// Test subclass of that lets the real + /// retry loop run, but stubs out the + /// actual FAST / REJOIN operations and the inter-retry delay. This allows + /// tests to verify the escalation logic (attempt counting, deadline, PC health) + /// in isolation without real networking, WebRTC, or wall-clock delays. + /// + private class RetryTestableRtcSession : RtcSession + { + public bool PeerConnectionsHealthy { get; set; } = true; + public int FastReconnectCallCount { get; private set; } + public int RejoinCallCount { get; private set; } + public int MigrateCallCount { get; private set; } + public Action OnFastReconnectCalled { get; set; } + public Exception FastReconnectException { get; set; } = new Exception("Simulated FAST reconnect failure"); + + public RetryTestableRtcSession(ISfuWebSocketFactory sfuWebSocketFactory, + Func httpClientFactory, + ILogs logs, ISerializer serializer, ITimeService timeService, + StreamVideoLowLevelClient lowLevelClient, + IStreamClientConfig config, INetworkMonitor networkMonitor) + : base(sfuWebSocketFactory, httpClientFactory, logs, serializer, + timeService, lowLevelClient, config, networkMonitor) + { + } + + public Task CallReconnect(WebsocketReconnectStrategy strategy, string reason) + => Reconnect(strategy, reason); + + protected override bool ArePeerConnectionsHealthy() => PeerConnectionsHealthy; + + protected override Task ReconnectRetryDelay() => Task.CompletedTask; + + protected override Task ReconnectFast() + { + FastReconnectCallCount++; + CallState = CallingState.Reconnecting; + OnFastReconnectCalled?.Invoke(this); + throw FastReconnectException; + } + + protected override Task ReconnectRejoin() + { + RejoinCallCount++; + CallState = CallingState.Joined; + return Task.CompletedTask; + } + + protected override Task ReconnectMigrate() + { + MigrateCallCount++; + CallState = CallingState.Migrating; + throw new NotImplementedException("Simulated Migrate failure"); + } + } + } +} +#endif diff --git a/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs.meta b/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs.meta new file mode 100644 index 00000000..8e57f015 --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/ReconnectRetryTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 299b609c29954b949a7c07bd2f18f74f +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Tests/Editor/RepositoryTests.cs b/Packages/StreamVideo/Tests/Editor/RepositoryTests.cs index ab75d4bc..ab281487 100644 --- a/Packages/StreamVideo/Tests/Editor/RepositoryTests.cs +++ b/Packages/StreamVideo/Tests/Editor/RepositoryTests.cs @@ -1,4 +1,4 @@ -#if STREAM_TESTS_ENABLED +#if STREAM_TESTS_ENABLED using System; using System.Collections; using System.IO; @@ -17,10 +17,10 @@ namespace StreamVideo.Tests.Editor internal class RepositoryTests { [UnityTest] - public IEnumerator Imported_samples_match_package_source_samples() - => Imported_samples_match_package_source_samples_Async().RunAsIEnumerator(); + public IEnumerator When_imported_samples_expect_match_package_source_samples() + => When_imported_samples_expect_match_package_source_samples_Async().RunAsIEnumerator(); - public async Task Imported_samples_match_package_source_samples_Async() + private async Task When_imported_samples_expect_match_package_source_samples_Async() { var fileComparer = new SimpleFileCompare(); @@ -90,10 +90,10 @@ var sourceSampleFileListing } [UnityTest] - public IEnumerator Dtos_do_not_contain_json_required_always_flag() - => Dtos_do_not_contain_json_required_always_flag_Async().RunAsIEnumerator(); + public IEnumerator When_dtos_checked_expect_no_json_required_always_flag() + => When_dtos_checked_expect_no_json_required_always_flag_Async().RunAsIEnumerator(); - private async Task Dtos_do_not_contain_json_required_always_flag_Async() + private async Task When_dtos_checked_expect_no_json_required_always_flag_Async() { var streamVideoUnityPackage = await TestUtils.GetStreamVideoPackageInfo(); var packageSourcePath = streamVideoUnityPackage.resolvedPath; diff --git a/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs b/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs new file mode 100644 index 00000000..7e940857 --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs @@ -0,0 +1,159 @@ +#if STREAM_TESTS_ENABLED +using NUnit.Framework; +using StreamVideo.Core.LowLevelClient; + +namespace StreamVideo.Tests.Editor +{ + /// + /// Tests for — the foundation of stale-operation detection. + /// A bug in SessionID would break all version guards used to invalidate stale ICE restarts + /// and other operations that were queued before a rejoin completed. + /// + internal sealed class SessionIDTests + { + [SetUp] + public void SetUp() + { + _sessionId = new SessionID(); + } + + [Test] + public void When_new_instance_expect_empty() + { + Assert.That(_sessionId.IsEmpty, Is.True, + "A freshly created SessionID should be empty."); + } + + [Test] + public void When_new_instance_expect_version_zero() + { + Assert.That(_sessionId.Version, Is.EqualTo(0), + "A freshly created SessionID should have Version == 0."); + } + + [Test] + public void When_new_instance_expect_to_string_returns_empty() + { + Assert.That(_sessionId.ToString(), Is.EqualTo(string.Empty), + "A freshly created SessionID should return an empty string from ToString()."); + } + + [Test] + public void When_regenerate_expect_non_empty_id() + { + _sessionId.Regenerate(); + + Assert.That(_sessionId.IsEmpty, Is.False, + "After Regenerate(), IsEmpty should be false."); + Assert.That(_sessionId.ToString(), Is.Not.Empty, + "After Regenerate(), ToString() should return a non-empty string."); + } + + [Test] + public void When_regenerate_multiple_times_expect_version_increments() + { + Assert.That(_sessionId.Version, Is.EqualTo(0), + "Precondition: new instance should have Version 0 before Regenerate()."); + + _sessionId.Regenerate(); + Assert.That(_sessionId.Version, Is.EqualTo(1), + "First Regenerate() should set Version to 1."); + + _sessionId.Regenerate(); + Assert.That(_sessionId.Version, Is.EqualTo(2), + "Second Regenerate() should set Version to 2."); + + _sessionId.Regenerate(); + Assert.That(_sessionId.Version, Is.EqualTo(3), + "Third Regenerate() should set Version to 3."); + } + + [Test] + public void When_regenerate_twice_expect_different_ids() + { + _sessionId.Regenerate(); + var firstId = _sessionId.ToString(); + + _sessionId.Regenerate(); + var secondId = _sessionId.ToString(); + + Assert.That(secondId, Is.Not.EqualTo(firstId), + "Each call to Regenerate() should produce a different ID (new GUID)."); + } + + [Test] + public void When_regenerate_many_times_expect_all_unique_ids() + { + const int iterations = 100; + var ids = new System.Collections.Generic.HashSet(); + + for (var i = 0; i < iterations; i++) + { + _sessionId.Regenerate(); + var id = _sessionId.ToString(); + Assert.That(ids.Add(id), Is.True, + $"Regenerate() produced a duplicate ID '{id}' on iteration {i + 1}."); + } + + Assert.That(_sessionId.Version, Is.EqualTo(iterations), + $"After {iterations} calls to Regenerate(), Version should be {iterations}."); + } + + [Test] + public void When_regenerate_expect_valid_guid_format() + { + _sessionId.Regenerate(); + var id = _sessionId.ToString(); + + Assert.That(System.Guid.TryParse(id, out _), Is.True, + $"Regenerate() should produce a valid GUID string, but got '{id}'."); + } + + [Test] + public void When_clear_after_regenerate_expect_empty() + { + _sessionId.Regenerate(); + Assert.That(_sessionId.IsEmpty, Is.False, "Precondition: should not be empty after Regenerate()."); + + _sessionId.Clear(); + + Assert.That(_sessionId.IsEmpty, Is.True, + "After Clear(), IsEmpty should be true."); + Assert.That(_sessionId.ToString(), Is.EqualTo(string.Empty), + "After Clear(), ToString() should return an empty string."); + } + + [Test] + public void When_clear_expect_version_unchanged() + { + _sessionId.Regenerate(); + _sessionId.Regenerate(); + Assert.That(_sessionId.Version, Is.EqualTo(2), "Precondition: Version should be 2 after two Regenerate() calls."); + + _sessionId.Clear(); + + Assert.That(_sessionId.Version, Is.EqualTo(2), + "Clear() should not reset the Version counter. Version tracks the number of regenerations, not the current session state."); + } + + [Test] + public void When_regenerate_after_clear_expect_non_empty_and_version_incremented() + { + _sessionId.Regenerate(); + var versionBeforeClear = _sessionId.Version; + + _sessionId.Clear(); + Assert.That(_sessionId.IsEmpty, Is.True, "Precondition: should be empty after Clear()."); + + _sessionId.Regenerate(); + + Assert.That(_sessionId.IsEmpty, Is.False, + "After Clear() followed by Regenerate(), IsEmpty should be false."); + Assert.That(_sessionId.Version, Is.EqualTo(versionBeforeClear + 1), + "Regenerate() after Clear() should continue incrementing the Version counter."); + } + + private SessionID _sessionId; + } +} +#endif diff --git a/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs.meta b/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs.meta new file mode 100644 index 00000000..f21deeaf --- /dev/null +++ b/Packages/StreamVideo/Tests/Editor/SessionIDTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: b17f8be9f9df7574ca386a18121c45af +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Packages/StreamVideo/Tests/Shared/StreamTestClientProvider.cs b/Packages/StreamVideo/Tests/Shared/StreamTestClientProvider.cs index a99a0885..8687da00 100644 --- a/Packages/StreamVideo/Tests/Shared/StreamTestClientProvider.cs +++ b/Packages/StreamVideo/Tests/Shared/StreamTestClientProvider.cs @@ -1,4 +1,4 @@ -#if STREAM_TESTS_ENABLED +#if STREAM_TESTS_ENABLED using System; using System.Collections.Generic; using System.Linq; @@ -127,7 +127,7 @@ private IStreamVideoClient CreateStateClient() private async Task DisposeStateClientsAsync() { - TryStopUpdateTask(); + await TryStopUpdateTaskAsync(); var tasks = new List(); tasks.AddRange(_spawnedClients.Select(async c => @@ -159,11 +159,23 @@ private void TryStartUpdateTask() }); } - private void TryStopUpdateTask() + private async Task TryStopUpdateTaskAsync() { Debug.LogWarning("TryStopUpdateTask"); + var updateTask = _updateTask; _updateTaskCts?.Cancel(); _updateTask = null; + + if (updateTask != null) + { + try + { + await updateTask; + } + catch (OperationCanceledException) + { + } + } } private async Task UpdateTaskAsync()