Skip to content

Commit e1d79f3

Browse files
Brazolrenefloor
andauthored
fix(llc): fixes for fast reconnect flow (#918)
* fixes for fast reconnect flow * tweaks to timeouts * tweak * using unique participant key * tweaks * fix: don't get member participants on reconnect * fix * small refactor + ignoring normal closure of ws * fixes * better handling of disconnect state * cleanup * tweak * updated webrtc dependencies * fix for muting call on Android --------- Co-authored-by: Rene Floor <[email protected]>
1 parent f75d5b0 commit e1d79f3

27 files changed

+332
-176
lines changed

melos.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ command:
1818

1919
# Dependencies used in the project.
2020
dependencies:
21-
dart_webrtc: ^1.4.10
21+
dart_webrtc: ^1.5.3
2222
device_info_plus: ">=10.1.2 <12.0.0"
2323
stream_chat_flutter: ^9.6.0
24-
stream_webrtc_flutter: ^1.0.0-dev.1
24+
stream_webrtc_flutter: ^1.0.1
2525

2626
scripts:
2727
postclean:

packages/stream_video/lib/src/call/call.dart

Lines changed: 100 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ const _idSessionEvents = 4;
8080
const _idSessionStats = 5;
8181
const _idConnect = 6;
8282
const _idAwait = 7;
83-
const _idReconnect = 7;
8483
const _idFastReconnectTimeout = 8;
84+
const _idReconnect = 9;
8585

8686
const _tag = 'SV:Call';
8787
int _callSeq = 1;
@@ -95,6 +95,7 @@ class Call {
9595
required StreamCallCid callCid,
9696
required CoordinatorClient coordinatorClient,
9797
required StreamVideo streamVideo,
98+
required InternetConnection networkMonitor,
9899
RetryPolicy? retryPolicy,
99100
SdpPolicy? sdpPolicy,
100101
CallPreferences? preferences,
@@ -104,6 +105,7 @@ class Call {
104105
callCid: callCid,
105106
coordinatorClient: coordinatorClient,
106107
streamVideo: streamVideo,
108+
networkMonitor: networkMonitor,
107109
retryPolicy: retryPolicy,
108110
sdpPolicy: sdpPolicy,
109111
preferences: preferences,
@@ -117,6 +119,7 @@ class Call {
117119
required CallCreatedData data,
118120
required CoordinatorClient coordinatorClient,
119121
required StreamVideo streamVideo,
122+
required InternetConnection networkMonitor,
120123
RetryPolicy? retryPolicy,
121124
SdpPolicy? sdpPolicy,
122125
CallPreferences? preferences,
@@ -126,6 +129,7 @@ class Call {
126129
callCid: data.callCid,
127130
coordinatorClient: coordinatorClient,
128131
streamVideo: streamVideo,
132+
networkMonitor: networkMonitor,
129133
retryPolicy: retryPolicy,
130134
sdpPolicy: sdpPolicy,
131135
preferences: preferences,
@@ -144,6 +148,7 @@ class Call {
144148
required CallRingingData data,
145149
required CoordinatorClient coordinatorClient,
146150
required StreamVideo streamVideo,
151+
required InternetConnection networkMonitor,
147152
RetryPolicy? retryPolicy,
148153
SdpPolicy? sdpPolicy,
149154
CallPreferences? preferences,
@@ -153,6 +158,7 @@ class Call {
153158
callCid: data.callCid,
154159
coordinatorClient: coordinatorClient,
155160
streamVideo: streamVideo,
161+
networkMonitor: networkMonitor,
156162
retryPolicy: retryPolicy,
157163
sdpPolicy: sdpPolicy,
158164
preferences: preferences,
@@ -163,6 +169,7 @@ class Call {
163169
required StreamCallCid callCid,
164170
required CoordinatorClient coordinatorClient,
165171
required StreamVideo streamVideo,
172+
required InternetConnection networkMonitor,
166173
RetryPolicy? retryPolicy,
167174
SdpPolicy? sdpPolicy,
168175
CallPreferences? preferences,
@@ -189,6 +196,7 @@ class Call {
189196
return Call._(
190197
coordinatorClient: coordinatorClient,
191198
streamVideo: streamVideo,
199+
networkMonitor: networkMonitor,
192200
stateManager: stateManager,
193201
credentials: credentials,
194202
retryPolicy: finalRetryPolicy,
@@ -202,6 +210,7 @@ class Call {
202210
required StreamVideo streamVideo,
203211
required CallStateNotifier stateManager,
204212
required PermissionsManager permissionManager,
213+
required this.networkMonitor,
205214
required RetryPolicy retryPolicy,
206215
required SdpPolicy sdpPolicy,
207216
CallCredentials? credentials,
@@ -240,6 +249,7 @@ class Call {
240249
final CallStateNotifier _stateManager;
241250
final PermissionsManager _permissionsManager;
242251
final DynascaleManager dynascaleManager;
252+
final InternetConnection networkMonitor;
243253

244254
CallCredentials? _credentials;
245255
CallSession? _session;
@@ -253,6 +263,7 @@ class Call {
253263
SfuReconnectionStrategy _reconnectStrategy =
254264
SfuReconnectionStrategy.unspecified;
255265
Future<InternetStatus>? _awaitNetworkAvailableFuture;
266+
Future<Result<None>>? _awaitMigrationCompleteFuture;
256267
bool _initialized = false;
257268

258269
final List<Timer> _reactionTimers = [];
@@ -352,11 +363,10 @@ class Call {
352363
void _observeReconnectEvents() {
353364
_subscriptions.add(
354365
_idReconnect,
355-
InternetConnection.createInstance().onStatusChange.listen(
366+
networkMonitor.onStatusChange.listen(
356367
(status) {
357368
if (status == InternetStatus.disconnected) {
358369
_logger.d(() => '[observeReconnectEvents] network disconnected');
359-
_awaitNetworkAvailableFuture = _awaitNetworkAvailable();
360370
_reconnect(SfuReconnectionStrategy.fast);
361371
}
362372
},
@@ -685,17 +695,15 @@ class Call {
685695
_credentials = joinedResult.data;
686696
_previousSession = _session;
687697

688-
final isWsHealthy = _previousSession?.sfuWS.isConnected ?? false;
689-
690698
final reconnectDetails =
691699
_reconnectStrategy == SfuReconnectionStrategy.unspecified
692700
? null
693701
: await _previousSession?.getReconnectDetails(_reconnectStrategy);
694702

695-
if (performingRejoin || performingMigration || !isWsHealthy) {
703+
if (!performingFastReconnect) {
696704
_logger.v(
697705
() =>
698-
'[join] creating new sfu session (rejoin: $performingRejoin, migration: $performingMigration, wsHealthy: $isWsHealthy)',
706+
'[join] creating new sfu session (rejoin: $performingRejoin, migration: $performingMigration)',
699707
);
700708

701709
_session = await _sessionFactory.makeCallSession(
@@ -705,6 +713,7 @@ class Call {
705713
credentials: _credentials!,
706714
stateManager: _stateManager,
707715
dynascaleManager: dynascaleManager,
716+
networkMonitor: networkMonitor,
708717
onPeerConnectionFailure: (pc) async {
709718
if (state.value.status is! CallStatusReconnecting) {
710719
await pc.pc.restartIce().onError((_, __) {
@@ -716,20 +725,15 @@ class Call {
716725
_stateManager.callState.preferences.clientPublishOptions,
717726
);
718727

728+
if (performingMigration) {
729+
_awaitMigrationCompleteFuture = _session!.waitForMigrationComplete();
730+
}
731+
719732
dynascaleManager.init(
720733
sfuClient: _session!.sfuClient,
721734
sessionId: _session!.sessionId,
722735
);
723-
} else {
724-
_logger.v(
725-
() =>
726-
'[join] reusing previous sfu session (rejoin: $performingRejoin, migration: $performingMigration, wsHealthy: $isWsHealthy)',
727-
);
728736

729-
_session = _previousSession;
730-
}
731-
732-
if (_session?.sessionSeq != _previousSession?.sessionSeq) {
733737
_logger.d(() => '[join] starting sfu session');
734738

735739
final sessionResult = await _startSession(
@@ -744,23 +748,27 @@ class Call {
744748
_stateManager.lifecycleCallConnectFailed(error: error);
745749
return sessionResult;
746750
}
747-
}
751+
} else {
752+
_logger.v(
753+
() =>
754+
'[join] reusing previous sfu session (rejoin: $performingRejoin, migration: $performingMigration)',
755+
);
756+
757+
_session = _previousSession;
748758

749-
if (performingFastReconnect && isWsHealthy) {
750759
_logger.d(() => '[join] fast reconnecting');
760+
final result = await _session!.fastReconnect();
751761

752-
final result = await _session?.fastReconnect();
762+
if (result.isFailure) {
763+
_logger.e(() => '[join] fast reconnecting failed: $result');
764+
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
765+
return Result.error('fast reconnecting failed');
766+
}
753767

754-
result?.fold(
755-
success: (success) {
756-
_logger.v(() => '[join] fast reconnecting success');
757-
_fastReconnectDeadline = success.data.fastReconnectDeadline;
758-
},
759-
failure: (failure) {
760-
_logger.e(() => '[join] fast reconnecting failed: $failure');
761-
return failure;
762-
},
763-
);
768+
_logger.v(() => '[join] fast reconnecting success');
769+
_fastReconnectDeadline =
770+
result.getDataOrNull()?.fastReconnectDeadline ??
771+
_fastReconnectDeadline;
764772
}
765773

766774
// make sure we only track connection timing if we are not calling this method as part of a migration flow
@@ -779,16 +787,11 @@ class Call {
779787
'Closing previous WS after reconnect with strategy: ${_reconnectStrategy.name}',
780788
);
781789
await _previousSession?.dispose();
782-
} else if (!isWsHealthy) {
783-
_logger.v(() => '[join] closing unhealthy WS');
784-
await _previousSession?.close(
785-
StreamWebSocketCloseCode.disposeOldSocket,
786-
closeReason: 'Closing unhealthy WS after reconnect',
787-
);
788790
}
789791

790792
// For migration we have to wait for confirmation before we can complete the flow
791793
if (_reconnectStrategy != SfuReconnectionStrategy.migrate) {
794+
_logger.v(() => '[join] connected');
792795
_previousSession = null;
793796
_stateManager.lifecycleCallConnected();
794797
}
@@ -1061,9 +1064,15 @@ class Call {
10611064
}
10621065

10631066
if (sfuEvent is SfuSocketDisconnected) {
1064-
_logger.w(() => '[onSfuEvent] socket disconnected');
1067+
if (!StreamWebSocketCloseCode.isIntentionalClosure(
1068+
sfuEvent.reason.closeCode,
1069+
)) {
1070+
_logger.w(() => '[onSfuEvent] socket disconnected');
1071+
await _reconnect(SfuReconnectionStrategy.fast);
1072+
}
10651073
} else if (sfuEvent is SfuSocketFailed) {
10661074
_logger.w(() => '[onSfuEvent] socket failed');
1075+
await _reconnect(SfuReconnectionStrategy.fast);
10671076
} else if (sfuEvent is SfuGoAwayEvent) {
10681077
_logger.w(() => '[onSfuEvent] go away, migrating sfu');
10691078
await _reconnect(SfuReconnectionStrategy.migrate);
@@ -1094,6 +1103,11 @@ class Call {
10941103
}
10951104

10961105
Future<void> _reconnect(SfuReconnectionStrategy strategy) async {
1106+
if (state.value.status is CallStatusDisconnected) {
1107+
_logger.w(() => '[reconnect] rejected (call is already disconnected)');
1108+
return;
1109+
}
1110+
10971111
if (_callReconnectLock.locked) {
10981112
_logger.w(
10991113
() =>
@@ -1104,6 +1118,7 @@ class Call {
11041118

11051119
await _callReconnectLock.synchronized(() async {
11061120
_reconnectStrategy = strategy;
1121+
_awaitNetworkAvailableFuture = _awaitNetworkAvailable();
11071122

11081123
do {
11091124
if (strategy != SfuReconnectionStrategy.migrate) {
@@ -1183,25 +1198,42 @@ class Call {
11831198
final migrateTimeStopwatch = Stopwatch()..start();
11841199

11851200
_reconnectStrategy = SfuReconnectionStrategy.migrate;
1186-
await _join();
1187-
final result = await _session?.waitForMigrationComplete();
1201+
final joinResult = await _join();
1202+
1203+
if (joinResult.isFailure) {
1204+
_logger.e(() => '[reconnectMigrate] join failed: $joinResult');
1205+
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
1206+
return;
1207+
}
11881208

11891209
await _previousSession?.close(StreamWebSocketCloseCode.disposeOldSocket);
11901210

1191-
result?.fold(
1211+
final migrationResult = await _awaitMigrationCompleteFuture;
1212+
if (migrationResult == null) {
1213+
_logger.e(() => '[reconnectMigrate] migration failed');
1214+
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
1215+
return;
1216+
}
1217+
1218+
migrationResult.fold(
11921219
success: (_) {
11931220
_stateManager.lifecycleCallConnected();
11941221
},
11951222
failure: (_) {
1223+
_logger.e(
1224+
() => '[reconnectMigrate] migration did not complete correctly',
1225+
);
11961226
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
11971227
},
11981228
);
11991229

1200-
migrateTimeStopwatch.stop();
1201-
await _sfuStatsReporter?.sendSfuStats(
1202-
connectionTimeMs: migrateTimeStopwatch.elapsedMilliseconds,
1203-
reconnectionStrategy: _reconnectStrategy,
1204-
);
1230+
if (migrationResult.isSuccess) {
1231+
migrateTimeStopwatch.stop();
1232+
await _sfuStatsReporter?.sendSfuStats(
1233+
connectionTimeMs: migrateTimeStopwatch.elapsedMilliseconds,
1234+
reconnectionStrategy: _reconnectStrategy,
1235+
);
1236+
}
12051237
}
12061238

12071239
Future<InternetStatus> _awaitNetworkAvailable() async {
@@ -1213,10 +1245,14 @@ class Call {
12131245
}
12141246
});
12151247

1248+
final previousCheckInterval = networkMonitor.checkInterval;
1249+
networkMonitor.setIntervalAndResetTimer(const Duration(seconds: 1));
1250+
12161251
final connectionStatus = await InternetConnection.createInstance(
12171252
checkInterval: const Duration(seconds: 1),
12181253
)
12191254
.onStatusChange
1255+
.startWithFuture(networkMonitor.internetStatus)
12201256
.firstWhere((status) => status == InternetStatus.connected)
12211257
.timeout(
12221258
_retryPolicy.config.callRejoinTimeout,
@@ -1230,6 +1266,8 @@ class Call {
12301266
.valueOrDefault(InternetStatus.disconnected);
12311267

12321268
fastReconnectTimer.cancel();
1269+
networkMonitor.setIntervalAndResetTimer(previousCheckInterval);
1270+
12331271
return connectionStatus;
12341272
}
12351273

@@ -1280,6 +1318,7 @@ class Call {
12801318
}
12811319

12821320
_stateManager.lifecycleCallDisconnected(reason: reason);
1321+
12831322
_logger.v(() => '[leave] finished');
12841323

12851324
return const Result.success(none);
@@ -2393,6 +2432,11 @@ class Call {
23932432
required SfuTrackTypeVideo trackType,
23942433
RtcVideoDimension? videoDimension,
23952434
}) async {
2435+
if (state.value.status.isDisconnected) {
2436+
_logger.d(() => '[updateSubscription] rejected (disconnected)');
2437+
return const Result.success(none);
2438+
}
2439+
23962440
final result = await dynascaleManager.updateSubscription(
23972441
SubscriptionChange.update(
23982442
userId: userId,
@@ -2423,6 +2467,11 @@ class Call {
24232467
required SfuTrackTypeVideo trackType,
24242468
RtcVideoDimension? videoDimension,
24252469
}) async {
2470+
if (state.value.status.isDisconnected) {
2471+
_logger.d(() => '[removeSubscription] rejected (disconnected)');
2472+
return const Result.success(none);
2473+
}
2474+
24262475
final result = await dynascaleManager.updateSubscription(
24272476
SubscriptionChange.update(
24282477
userId: userId,
@@ -2626,3 +2675,10 @@ enum TrackType {
26262675
}
26272676
}
26282677
}
2678+
2679+
extension FutureStartWithEx<T> on Stream<T> {
2680+
Stream<T> startWithFuture(Future<T> futureValue) async* {
2681+
yield await futureValue;
2682+
yield* this;
2683+
}
2684+
}

0 commit comments

Comments
 (0)