Skip to content

Commit 13454a4

Browse files
Improve stats collection and reconnect flow (#1587)
* Debugging session * Debugging session * Spotless * Add back code that got deleted mistakenly, causing weird issues * add mockSfuConnectionModule to SubsriberTest * api dump * Update traces * api dump * Fix unit test * change comparison to assignment * Fix typo * Read the correct jobName whose execution faced a crash * spotless * Fix tag name for trace * Remove TODO * 1. Calling session.leaveWithReason() when rejoining, so that the mediascope is cancelled 2. Some renaming of variables and method * fix --------- Co-authored-by: pratimmallick <[email protected]>
1 parent 45f7088 commit 13454a4

File tree

18 files changed

+258
-164
lines changed

18 files changed

+258
-164
lines changed

stream-video-android-core/api/stream-video-android-core.api

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7571,7 +7571,8 @@ public final class io/getstream/video/android/core/Call {
75717571
public final fun disableClientCapabilities (Ljava/util/List;)V
75727572
public final fun enableClientCapabilities (Ljava/util/List;)V
75737573
public final fun end (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7574-
public final fun fastReconnect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7574+
public final fun fastReconnect (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7575+
public static synthetic fun fastReconnect$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
75757576
public final fun fireEvent (Lio/getstream/android/video/generated/models/VideoEvent;)V
75767577
public final fun get (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
75777578
public final fun getAudioFilter ()Lio/getstream/video/android/core/call/audio/InputAudioFilter;
@@ -7608,7 +7609,8 @@ public final class io/getstream/video/android/core/Call {
76087609
public static synthetic fun joinAndRing$default (Lio/getstream/video/android/core/Call;Ljava/util/List;Lio/getstream/video/android/core/CreateCallOptions;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
76097610
public final fun kickUser (Ljava/lang/String;ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
76107611
public static synthetic fun kickUser$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
7611-
public final fun leave ()V
7612+
public final fun leave (Ljava/lang/String;)V
7613+
public static synthetic fun leave$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;ILjava/lang/Object;)V
76127614
public final fun listRecordings (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
76137615
public static synthetic fun listRecordings$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
76147616
public final fun listTranscription (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -7626,7 +7628,8 @@ public final class io/getstream/video/android/core/Call {
76267628
public static synthetic fun queryMembers$default (Lio/getstream/video/android/core/Call;Ljava/util/Map;Ljava/util/List;ILjava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
76277629
public final fun reject (Lio/getstream/video/android/core/model/RejectReason;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
76287630
public static synthetic fun reject$default (Lio/getstream/video/android/core/Call;Lio/getstream/video/android/core/model/RejectReason;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
7629-
public final fun rejoin (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7631+
public final fun rejoin (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
7632+
public static synthetic fun rejoin$default (Lio/getstream/video/android/core/Call;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
76307633
public final fun removeMembers (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
76317634
public final fun requestPermissions ([Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
76327635
public final fun revokePermissions (Ljava/lang/String;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -12978,8 +12981,8 @@ public abstract interface class io/getstream/video/android/core/socket/common/So
1297812981
public abstract fun sendEvent (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1297912982
public abstract fun state ()Lkotlinx/coroutines/flow/StateFlow;
1298012983
public abstract fun updateToken (Ljava/lang/Object;)V
12981-
public abstract fun whenConnected (JLkotlin/jvm/functions/Function2;)V
12982-
public static synthetic fun whenConnected$default (Lio/getstream/video/android/core/socket/common/SocketActions;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
12984+
public abstract fun whenConnected (JLkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
12985+
public static synthetic fun whenConnected$default (Lio/getstream/video/android/core/socket/common/SocketActions;JLkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
1298312986
}
1298412987

1298512988
public final class io/getstream/video/android/core/socket/common/SocketActions$Companion {
@@ -13157,7 +13160,7 @@ public class io/getstream/video/android/core/socket/coordinator/CoordinatorSocke
1315713160
public fun state ()Lkotlinx/coroutines/flow/StateFlow;
1315813161
public synthetic fun updateToken (Ljava/lang/Object;)V
1315913162
public fun updateToken (Ljava/lang/String;)V
13160-
public fun whenConnected (JLkotlin/jvm/functions/Function2;)V
13163+
public fun whenConnected (JLkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
1316113164
}
1316213165

1316313166
public final class io/getstream/video/android/core/socket/coordinator/state/RestartReason : java/lang/Enum {
@@ -13367,7 +13370,7 @@ public final class io/getstream/video/android/core/socket/sfu/SfuSocketConnectio
1336713370
public fun state ()Lkotlinx/coroutines/flow/StateFlow;
1336813371
public synthetic fun updateToken (Ljava/lang/Object;)V
1336913372
public fun updateToken (Ljava/lang/String;)V
13370-
public fun whenConnected (JLkotlin/jvm/functions/Function2;)V
13373+
public fun whenConnected (JLkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V
1337113374
}
1337213375

1337313376
public final class io/getstream/video/android/core/socket/sfu/SfuSocketConnection$Companion {

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import io.getstream.video.android.core.socket.common.scope.UserScope
9191
import io.getstream.video.android.core.utils.AtomicUnitCall
9292
import io.getstream.video.android.core.utils.RampValueUpAndDownHelper
9393
import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl
94+
import io.getstream.video.android.core.utils.safeCall
9495
import io.getstream.video.android.core.utils.safeCallWithDefault
9596
import io.getstream.video.android.core.utils.toQueriedMembers
9697
import io.getstream.video.android.model.User
@@ -362,12 +363,12 @@ public class Call(
362363
logger.d {
363364
"[NetworkStateListener#onConnected] #network; Reconnecting (fast). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds"
364365
}
365-
fastReconnect()
366+
fastReconnect("NetworkStateListener#onConnected")
366367
} else {
367368
logger.d {
368369
"[NetworkStateListener#onConnected] #network; Reconnecting (full). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds"
369370
}
370-
rejoin()
371+
rejoin("NetworkStateListener#onConnected")
371372
}
372373
}
373374

@@ -576,7 +577,7 @@ public class Call(
576577
}.onError {
577578
logger.e { "[joinAndRing] Ring failed #ringing; #track; error: $it" }
578579
state.toggleRingingStateUpdates(false)
579-
leave()
580+
leave("ring-failed (${it.message})")
580581
}
581582
}
582583
}
@@ -678,6 +679,19 @@ public class Call(
678679
}
679680
}
680681
monitorPublisherPCStateJob?.cancel()
682+
monitorPublisherPCStateJob = scope.launch {
683+
session?.publisher?.iceState?.collect {
684+
when (it) {
685+
PeerConnection.IceConnectionState.FAILED, PeerConnection.IceConnectionState.DISCONNECTED -> {
686+
session?.publisher?.connection?.restartIce()
687+
}
688+
689+
else -> {
690+
logger.d { "[monitorPubConnectionState] Ice connection state is $it" }
691+
}
692+
}
693+
}
694+
}
681695

682696
monitorSubscriberPCStateJob?.cancel()
683697
monitorSubscriberPCStateJob = scope.launch {
@@ -688,7 +702,7 @@ public class Call(
688702
}
689703

690704
else -> {
691-
logger.d { "[monitorConnectionState] Ice connection state is $it" }
705+
logger.d { "[monitorSubConnectionState] Ice connection state is $it" }
692706
}
693707
}
694708
}
@@ -738,7 +752,7 @@ public class Call(
738752
/**
739753
* Fast reconnect to the same SFU with the same participant session.
740754
*/
741-
suspend fun fastReconnect() = schedule("fast") {
755+
suspend fun fastReconnect(reason: String = "unknown") = schedule("fast") {
742756
logger.d { "[fastReconnect] Reconnecting, reconnectAttepmts:$reconnectAttepmts" }
743757
session?.prepareReconnect()
744758
this@Call.state._connection.value = RealtimeConnection.Reconnecting
@@ -753,8 +767,11 @@ public class Call(
753767
announced_tracks = publishingInfo,
754768
subscriptions = subscriptionsInfo,
755769
reconnect_attempt = reconnectAttepmts,
770+
reason = reason,
756771
)
757772
session.fastReconnect(reconnectDetails)
773+
val oldSessionStats = collectStats()
774+
session.sendCallStats(oldSessionStats)
758775
} else {
759776
logger.d { "[fastReconnect] [RealtimeConnection.Disconnected], call_id:$id" }
760777
this@Call.state._connection.value = RealtimeConnection.Disconnected
@@ -764,7 +781,7 @@ public class Call(
764781
/**
765782
* Rejoin a call. Creates a new session and joins as a new participant.
766783
*/
767-
suspend fun rejoin() = schedule("rejoin") {
784+
suspend fun rejoin(reason: String = "unknown") = schedule("rejoin") {
768785
logger.d { "[rejoin] Rejoining" }
769786
reconnectAttepmts++
770787
state._connection.value = RealtimeConnection.Reconnecting
@@ -775,21 +792,23 @@ public class Call(
775792
if (joinResponse is Success) {
776793
// switch to the new SFU
777794
val cred = joinResponse.value.credentials
778-
val session = this.session!!
795+
val oldSession = this.session!!
796+
val oldSessionStats = collectStats()
779797
val currentOptions = this.session?.publisher?.currentOptions()
780-
logger.i { "Rejoin SFU ${session?.sfuUrl} to ${cred.server.url}" }
798+
logger.i { "Rejoin SFU ${oldSession?.sfuUrl} to ${cred.server.url}" }
781799

782800
this.sessionId = UUID.randomUUID().toString()
783-
val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo()
801+
val (prevSessionId, subscriptionsInfo, publishingInfo) = oldSession.currentSfuInfo()
784802
val reconnectDetails = ReconnectDetails(
785803
previous_session_id = prevSessionId,
786804
strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN,
787805
announced_tracks = publishingInfo,
788806
subscriptions = subscriptionsInfo,
789807
reconnect_attempt = reconnectAttepmts,
808+
reason = reason,
790809
)
791810
this.state.removeParticipant(prevSessionId)
792-
session.prepareRejoin()
811+
oldSession.prepareRejoin()
793812
try {
794813
this.session = RtcSession(
795814
clientImpl,
@@ -807,7 +826,10 @@ public class Call(
807826
},
808827
)
809828
this.session?.connect(reconnectDetails, currentOptions)
810-
session.cleanup()
829+
this.session?.sfuTracer?.trace("rejoin", reason)
830+
oldSession.sendCallStats(oldSessionStats)
831+
oldSession.leaveWithReason("Rejoin :: $reason")
832+
oldSession.cleanup()
811833
monitorSession(joinResponse.value)
812834
} catch (ex: Exception) {
813835
logger.e(ex) {
@@ -901,14 +923,18 @@ public class Call(
901923
}
902924

903925
/** Leave the call, but don't end it for other users */
904-
fun leave() {
926+
fun leave(reason: String = "user") {
905927
logger.d { "[leave] #ringing; no args, call_cid:$cid" }
906-
leave(disconnectionReason = null)
928+
internalLeave(null, reason)
907929
}
908930

909-
private fun leave(disconnectionReason: Throwable?) = atomicLeave {
931+
private fun internalLeave(disconnectionReason: Throwable?, reason: String) = atomicLeave {
910932
val callId = id
911-
session?.leaveWithReason(disconnectionReason?.message ?: "user")
933+
monitorSubscriberPCStateJob?.cancel()
934+
monitorPublisherPCStateJob?.cancel()
935+
monitorPublisherPCStateJob = null
936+
monitorSubscriberPCStateJob = null
937+
session?.leaveWithReason("[reason=$reason, error=${disconnectionReason?.message}]")
912938
leaveTimeoutAfterDisconnect?.cancel()
913939
network.unsubscribe(listener)
914940
sfuListener?.cancel()
@@ -942,15 +968,26 @@ public class Call(
942968
.leaveCall(this)
943969

944970
(client as StreamVideoClient).onCallCleanUp(this)
945-
cleanup()
971+
972+
clientImpl.scope.launch {
973+
safeCall {
974+
session?.sfuTracer?.trace(
975+
"leave-call",
976+
"[reason=$reason, error=${disconnectionReason?.message}]",
977+
)
978+
val stats = collectStats()
979+
session?.sendCallStats(stats)
980+
}
981+
cleanup()
982+
}
946983
}
947984

948985
/** ends the call for yourself as well as other users */
949986
suspend fun end(): Result<Unit> {
950987
// end the call for everyone
951988
val result = clientImpl.endCall(type, id)
952989
// cleanup
953-
leave()
990+
leave("call-ended")
954991
return result
955992
}
956993

@@ -1703,9 +1740,9 @@ public class Call(
17031740
}
17041741
}
17051742

1706-
fun fastReconnect() {
1743+
fun fastReconnect(reason: String = "Debug") {
17071744
call.scope.launch {
1708-
call.fastReconnect()
1745+
call.fastReconnect(reason)
17091746
}
17101747
}
17111748
}

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ public class CallState(
745745
} else if (callRingState is RingingState.Incoming && event.user.id == client.userId) {
746746
// Call accepted by me + this device is Incoming => I accepted on another device
747747
// Then leave the call on this device
748-
if (!acceptedOnThisDevice) call.leave()
748+
if (!acceptedOnThisDevice) call.leave("accepted-on-another-device")
749749
}
750750
}
751751

@@ -773,7 +773,7 @@ public class CallState(
773773
}
774774
_rejectedBy.value = newRejectedBySet.toSet()
775775
_ringingState.value = RingingState.RejectedByAll
776-
call.leave()
776+
call.leave("LocalCallMissedEvent")
777777
}
778778
}
779779

@@ -782,12 +782,12 @@ public class CallState(
782782
updateFromResponse(event.call)
783783
_endedAt.value = OffsetDateTime.now(Clock.systemUTC())
784784
_endedByUser.value = event.user?.toUser()
785-
call.leave()
785+
call.leave("CallEndedEvent")
786786
}
787787

788788
is CallEndedSfuEvent -> {
789789
_endedAt.value = OffsetDateTime.now(Clock.systemUTC())
790-
call.leave()
790+
call.leave("CallEndedSfuEvent")
791791
}
792792

793793
is CallMemberUpdatedEvent -> {
@@ -1189,7 +1189,7 @@ public class CallState(
11891189
} else if ((rejectedBy.isNotEmpty() && rejectedBy.size >= outgoingMembersCount) ||
11901190
(rejectedBy.contains(createdBy?.id) && hasRingingCall)
11911191
) {
1192-
call.leave()
1192+
call.leave("updateRingingState-rejected")
11931193
cancelTimeout()
11941194

11951195
if (rejectReason?.alias == REJECT_REASON_TIMEOUT) {
@@ -1301,7 +1301,7 @@ public class CallState(
13011301
if (_ringingState.value is RingingState.Outgoing || _ringingState.value is RingingState.Incoming && client.state.activeCall.value == null) {
13021302
ringingStateUpdatesStopped = false
13031303
call.reject(reason = RejectReason.Custom(alias = REJECT_REASON_TIMEOUT))
1304-
call.leave()
1304+
call.leave("start-ringing-timeout")
13051305
}
13061306
} else {
13071307
logger.w { "[startRingingTimer] No autoCancelTimeoutMs set - call ring with no timeout" }

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ExternalCallRejectionHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ internal class ExternalCallRejectionHandler() {
6868
* onSuccess: (suspend (Call) -> Unit)?,
6969
* onError: (suspend (Exception) -> Unit)?,)
7070
*/
71-
call.leave()
71+
call.leave("rejected-on-wearable")
7272
}
7373
}
7474
}

stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ internal class StreamVideoClient internal constructor(
239239
}
240240
}
241241
}
242-
activeCall?.leave()
242+
activeCall?.leave("client-cleanup")
243243
}
244244

245245
/**

0 commit comments

Comments
 (0)