Skip to content

Commit f9a245b

Browse files
aleksandar-apostolovrahul-lohraCopilot
authored
Improved reconnection flow when network is lost for a brief period of time (#1521)
* fix: Correct the retry logic when fail to join call * temp 1 * Improve handling on fast reconnect * Spotless * Api dump * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt Co-authored-by: Copilot <[email protected]> * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt Co-authored-by: Copilot <[email protected]> * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt Co-authored-by: Copilot <[email protected]> * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/connection/Publisher.kt Co-authored-by: Copilot <[email protected]> * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/common/StreamWebSocket.kt Co-authored-by: Copilot <[email protected]> * Update tests * Update stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt Co-authored-by: Copilot <[email protected]> * Update * Fix the test base * Update tests * Spotless & ApiDump * Remove unused code and fix test * Spotless --------- Co-authored-by: rahullohra <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent e31ac8c commit f9a245b

File tree

21 files changed

+387
-167
lines changed

21 files changed

+387
-167
lines changed

stream-video-android-core/api/stream-video-android-core.api

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6532,8 +6532,8 @@ public abstract interface class io/getstream/video/android/core/call/audio/Input
65326532
}
65336533

65346534
public class io/getstream/video/android/core/call/connection/StreamPeerConnection : org/webrtc/PeerConnection$Observer {
6535-
public fun <init> (Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function0;IZLio/getstream/video/android/core/trace/Tracer;)V
6536-
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function0;IZLio/getstream/video/android/core/trace/Tracer;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
6535+
public fun <init> (Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function0;IZLio/getstream/video/android/core/trace/Tracer;Ljava/lang/String;)V
6536+
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function0;IZLio/getstream/video/android/core/trace/Tracer;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
65376537
public final fun addAudioTransceiver (Lorg/webrtc/MediaStreamTrack;Ljava/util/List;)V
65386538
public final fun addIceCandidate (Lio/getstream/video/android/core/model/IceCandidate;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
65396539
public final fun addVideoTransceiver (Lorg/webrtc/MediaStreamTrack;Ljava/util/List;Z)V
@@ -6582,8 +6582,8 @@ public final class io/getstream/video/android/core/call/connection/StreamPeerCon
65826582
public final fun makeAudioSource (Lorg/webrtc/MediaConstraints;)Lorg/webrtc/AudioSource;
65836583
public static synthetic fun makeAudioSource$default (Lio/getstream/video/android/core/call/connection/StreamPeerConnectionFactory;Lorg/webrtc/MediaConstraints;ILjava/lang/Object;)Lorg/webrtc/AudioSource;
65846584
public final fun makeAudioTrack (Lorg/webrtc/AudioSource;Ljava/lang/String;)Lorg/webrtc/AudioTrack;
6585-
public final fun makePeerConnection (Lkotlinx/coroutines/CoroutineScope;Lorg/webrtc/PeerConnection$RTCConfiguration;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;I)Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
6586-
public static synthetic fun makePeerConnection$default (Lio/getstream/video/android/core/call/connection/StreamPeerConnectionFactory;Lkotlinx/coroutines/CoroutineScope;Lorg/webrtc/PeerConnection$RTCConfiguration;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;IILjava/lang/Object;)Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
6585+
public final fun makePeerConnection (Lkotlinx/coroutines/CoroutineScope;Lorg/webrtc/PeerConnection$RTCConfiguration;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;ILjava/lang/String;)Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
6586+
public static synthetic fun makePeerConnection$default (Lio/getstream/video/android/core/call/connection/StreamPeerConnectionFactory;Lkotlinx/coroutines/CoroutineScope;Lorg/webrtc/PeerConnection$RTCConfiguration;Lio/getstream/video/android/core/model/StreamPeerType;Lorg/webrtc/MediaConstraints;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;ILjava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
65876587
public final fun makeVideoTrack (Lorg/webrtc/VideoSource;Ljava/lang/String;)Lorg/webrtc/VideoTrack;
65886588
public final fun setAudioProcessingEnabled (Z)V
65896589
public final fun setAudioRecordDataCallback (Lkotlin/jvm/functions/Function4;)V

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ import io.getstream.video.android.core.socket.common.scope.ClientScope
8383
import io.getstream.video.android.core.socket.common.scope.UserScope
8484
import io.getstream.video.android.core.utils.AtomicUnitCall
8585
import io.getstream.video.android.core.utils.RampValueUpAndDownHelper
86+
import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl
8687
import io.getstream.video.android.core.utils.safeCallWithDefault
8788
import io.getstream.video.android.core.utils.toQueriedMembers
8889
import io.getstream.video.android.model.User
@@ -258,13 +259,16 @@ public class Call(
258259
private val listener = object : NetworkStateProvider.NetworkStateListener {
259260
override suspend fun onConnected() {
260261
leaveTimeoutAfterDisconnect?.cancel()
261-
logger.d { "[NetworkStateListener#onConnected] #network; no args" }
262+
262263
val elapsedTimeMils = System.currentTimeMillis() - lastDisconnect
264+
logger.d {
265+
"[NetworkStateListener#onConnected] #network; no args, elapsedTimeMils:$elapsedTimeMils, lastDisconnect:$lastDisconnect, reconnectDeadlineMils:$reconnectDeadlineMils"
266+
}
263267
if (lastDisconnect > 0 && elapsedTimeMils < reconnectDeadlineMils) {
264268
logger.d {
265269
"[NetworkStateListener#onConnected] #network; Reconnecting (fast). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds"
266270
}
267-
rejoin()
271+
fastReconnect()
268272
} else {
269273
logger.d {
270274
"[NetworkStateListener#onConnected] #network; Reconnecting (full). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds"
@@ -275,7 +279,13 @@ public class Call(
275279

276280
override suspend fun onDisconnected() {
277281
state._connection.value = RealtimeConnection.Reconnecting
282+
logger.d {
283+
"[NetworkStateListener#onDisconnected] #network; old lastDisconnect:$lastDisconnect, clientImpl.leaveAfterDisconnectSeconds:${clientImpl.leaveAfterDisconnectSeconds}"
284+
}
278285
lastDisconnect = System.currentTimeMillis()
286+
logger.d {
287+
"[NetworkStateListener#onDisconnected] #network; new lastDisconnect:$lastDisconnect"
288+
}
279289
leaveTimeoutAfterDisconnect = scope.launch {
280290
delay(clientImpl.leaveAfterDisconnectSeconds * 1000)
281291
logger.d {
@@ -295,6 +305,7 @@ public class Call(
295305
private var monitorSubscriberPCStateJob: Job? = null
296306
private var sfuListener: Job? = null
297307
private var sfuEvents: Job? = null
308+
private val streamSingleFlightProcessorImpl = StreamSingleFlightProcessorImpl(scope)
298309

299310
init {
300311
scope.launch {
@@ -549,19 +560,6 @@ public class Call(
549560
}
550561
}
551562
monitorPublisherPCStateJob?.cancel()
552-
monitorPublisherPCStateJob = scope.launch {
553-
session?.publisher?.iceState?.collect {
554-
when (it) {
555-
PeerConnection.IceConnectionState.FAILED, PeerConnection.IceConnectionState.DISCONNECTED -> {
556-
session?.publisher?.restartIce()
557-
}
558-
559-
else -> {
560-
logger.d { "[monitorConnectionState] Ice connection state is $it" }
561-
}
562-
}
563-
}
564-
}
565563

566564
monitorSubscriberPCStateJob?.cancel()
567565
monitorSubscriberPCStateJob = scope.launch {
@@ -622,8 +620,8 @@ public class Call(
622620
/**
623621
* Fast reconnect to the same SFU with the same participant session.
624622
*/
625-
suspend fun fastReconnect() = schedule {
626-
logger.d { "[fastReconnect] Reconnecting" }
623+
suspend fun fastReconnect() = schedule("fast") {
624+
logger.d { "[fastReconnect] Reconnecting, reconnectAttepmts:$reconnectAttepmts" }
627625
session?.prepareReconnect()
628626
this@Call.state._connection.value = RealtimeConnection.Reconnecting
629627
if (session != null) {
@@ -648,7 +646,7 @@ public class Call(
648646
/**
649647
* Rejoin a call. Creates a new session and joins as a new participant.
650648
*/
651-
suspend fun rejoin() = schedule {
649+
suspend fun rejoin() = schedule("rejoin") {
652650
logger.d { "[rejoin] Rejoining" }
653651
reconnectAttepmts++
654652
state._connection.value = RealtimeConnection.Reconnecting
@@ -704,7 +702,7 @@ public class Call(
704702
/**
705703
* Migrate to another SFU.
706704
*/
707-
suspend fun migrate() = schedule {
705+
suspend fun migrate() = schedule("migrate") {
708706
logger.d { "[migrate] Migrating" }
709707
state._connection.value = RealtimeConnection.Migrating
710708
location?.let {
@@ -763,12 +761,10 @@ public class Call(
763761

764762
private var reconnectJob: Job? = null
765763

766-
private suspend fun schedule(block: suspend () -> Unit) = synchronized(this) {
767-
logger.d { "[schedule] #reconnect; no args" }
768-
reconnectJob?.cancel()
769-
reconnectJob = scope.launch {
770-
block()
771-
}
764+
private suspend fun schedule(key: String, block: suspend () -> Unit) {
765+
logger.d { "[schedule] #reconnect; no args" } // noob 4
766+
767+
streamSingleFlightProcessorImpl.run(key, block)
772768
}
773769

774770
/** Leave the call, but don't end it for other users */

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -877,7 +877,7 @@ public class CallState(
877877
}
878878

879879
is ChangePublishQualityEvent -> {
880-
call.session!!.handleEvent(event)
880+
call.session?.handleEvent(event)
881881
}
882882

883883
is ErrorEvent -> {

0 commit comments

Comments
 (0)