Skip to content

Commit 8e9bc0d

Browse files
authored
fix(llc): improve the handling of SFU join errors (#1191)
* handle sfu join errors * changelog * increase heap size for example app * tweak * unit tests
1 parent 862a4b5 commit 8e9bc0d

File tree

18 files changed

+637
-39
lines changed

18 files changed

+637
-39
lines changed

packages/stream_video/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## Upcoming
22

33
### 🐞 Fixed
4+
* Improved SFU join, recovery and migration logic to reduce failed joins with reconnect loops when joining full or shutting-down SFU
45
* Fixed race condition in `Call.join` when another connect is already in progress, with proper timeout handling.
56
* Fixed `consumeAndAcceptActiveCall` to ensure the coordinator WS is connected before consuming incoming calls during cold start.
67

packages/stream_video/lib/globals.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import 'protobuf/video/sfu/models/models.pb.dart';
66
const String streamSdkName = 'stream-flutter';
77
const String streamVideoVersion = '1.3.0';
88
const String openapiModelsVersion = '219.11.0';
9-
const String protocolModelsVersion = '1.40.1';
9+
const String protocolModelsVersion = '1.46.1';
1010
const String androidWebRTCVersion = webrtc.androidWebRTCVersion;
1111
const String iosWebRTCVersion = webrtc.iosWebRTCVersion;
1212

packages/stream_video/lib/open_api/video/coordinator/model/join_call_request.dart

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class JoinCallRequest {
1818
required this.location,
1919
this.membersLimit,
2020
this.migratingFrom,
21+
this.migratingFromList = const [],
2122
this.notify,
2223
this.ring,
2324
this.video,
@@ -60,6 +61,9 @@ class JoinCallRequest {
6061
///
6162
String? migratingFrom;
6263

64+
/// List of SFU IDs to exclude when picking a new SFU for the participant
65+
List<String> migratingFromList;
66+
6367
///
6468
/// Please note: This property should have been non-nullable! Since the specification file
6569
/// does not include a default value (using the "default:" property), however, the generated
@@ -94,6 +98,7 @@ class JoinCallRequest {
9498
other.location == location &&
9599
other.membersLimit == membersLimit &&
96100
other.migratingFrom == migratingFrom &&
101+
_deepEquality.equals(other.migratingFromList, migratingFromList) &&
97102
other.notify == notify &&
98103
other.ring == ring &&
99104
other.video == video;
@@ -106,13 +111,14 @@ class JoinCallRequest {
106111
(location.hashCode) +
107112
(membersLimit == null ? 0 : membersLimit!.hashCode) +
108113
(migratingFrom == null ? 0 : migratingFrom!.hashCode) +
114+
(migratingFromList.hashCode) +
109115
(notify == null ? 0 : notify!.hashCode) +
110116
(ring == null ? 0 : ring!.hashCode) +
111117
(video == null ? 0 : video!.hashCode);
112118

113119
@override
114120
String toString() =>
115-
'JoinCallRequest[create=$create, data=$data, location=$location, membersLimit=$membersLimit, migratingFrom=$migratingFrom, notify=$notify, ring=$ring, video=$video]';
121+
'JoinCallRequest[create=$create, data=$data, location=$location, membersLimit=$membersLimit, migratingFrom=$migratingFrom, migratingFromList=$migratingFromList, notify=$notify, ring=$ring, video=$video]';
116122

117123
Map<String, dynamic> toJson() {
118124
final json = <String, dynamic>{};
@@ -137,6 +143,7 @@ class JoinCallRequest {
137143
} else {
138144
json[r'migrating_from'] = null;
139145
}
146+
json[r'migrating_from_list'] = this.migratingFromList;
140147
if (this.notify != null) {
141148
json[r'notify'] = this.notify;
142149
} else {
@@ -181,6 +188,11 @@ class JoinCallRequest {
181188
location: mapValueOfType<String>(json, r'location')!,
182189
membersLimit: mapValueOfType<int>(json, r'members_limit'),
183190
migratingFrom: mapValueOfType<String>(json, r'migrating_from'),
191+
migratingFromList: json[r'migrating_from_list'] is Iterable
192+
? (json[r'migrating_from_list'] as Iterable)
193+
.cast<String>()
194+
.toList(growable: false)
195+
: const [],
184196
notify: mapValueOfType<bool>(json, r'notify'),
185197
ring: mapValueOfType<bool>(json, r'ring'),
186198
video: mapValueOfType<bool>(json, r'video'),

packages/stream_video/lib/protobuf/video/sfu/models/models.pb.dart

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1477,12 +1477,14 @@ class ClientDetails extends $pb.GeneratedMessage {
14771477
OS? os,
14781478
Browser? browser,
14791479
Device? device,
1480+
$core.String? webrtcVersion,
14801481
}) {
14811482
final result = create();
14821483
if (sdk != null) result.sdk = sdk;
14831484
if (os != null) result.os = os;
14841485
if (browser != null) result.browser = browser;
14851486
if (device != null) result.device = device;
1487+
if (webrtcVersion != null) result.webrtcVersion = webrtcVersion;
14861488
return result;
14871489
}
14881490

@@ -1505,6 +1507,7 @@ class ClientDetails extends $pb.GeneratedMessage {
15051507
..aOM<Browser>(3, _omitFieldNames ? '' : 'browser',
15061508
subBuilder: Browser.create)
15071509
..aOM<Device>(4, _omitFieldNames ? '' : 'device', subBuilder: Device.create)
1510+
..aOS(5, _omitFieldNames ? '' : 'webrtcVersion')
15081511
..hasRequiredFields = false;
15091512

15101513
@$core.Deprecated('See https://github.com/google/protobuf.dart/issues/998.')
@@ -1569,6 +1572,15 @@ class ClientDetails extends $pb.GeneratedMessage {
15691572
void clearDevice() => $_clearField(4);
15701573
@$pb.TagNumber(4)
15711574
Device ensureDevice() => $_ensure(3);
1575+
1576+
@$pb.TagNumber(5)
1577+
$core.String get webrtcVersion => $_getSZ(4);
1578+
@$pb.TagNumber(5)
1579+
set webrtcVersion($core.String value) => $_setString(4, value);
1580+
@$pb.TagNumber(5)
1581+
$core.bool hasWebrtcVersion() => $_has(4);
1582+
@$pb.TagNumber(5)
1583+
void clearWebrtcVersion() => $_clearField(5);
15721584
}
15731585

15741586
class Sdk extends $pb.GeneratedMessage {

packages/stream_video/lib/protobuf/video/sfu/models/models.pbenum.dart

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ class SdkType extends $pb.ProtobufEnum {
283283
SdkType._(8, _omitEnumNames ? '' : 'SDK_TYPE_GO');
284284
static const SdkType SDK_TYPE_PLAIN_JAVASCRIPT =
285285
SdkType._(9, _omitEnumNames ? '' : 'SDK_TYPE_PLAIN_JAVASCRIPT');
286+
static const SdkType SDK_TYPE_PYTHON =
287+
SdkType._(10, _omitEnumNames ? '' : 'SDK_TYPE_PYTHON');
288+
static const SdkType SDK_TYPE_VISION_AGENTS =
289+
SdkType._(11, _omitEnumNames ? '' : 'SDK_TYPE_VISION_AGENTS');
286290

287291
static const $core.List<SdkType> values = <SdkType>[
288292
SDK_TYPE_UNSPECIFIED,
@@ -295,10 +299,12 @@ class SdkType extends $pb.ProtobufEnum {
295299
SDK_TYPE_UNITY,
296300
SDK_TYPE_GO,
297301
SDK_TYPE_PLAIN_JAVASCRIPT,
302+
SDK_TYPE_PYTHON,
303+
SDK_TYPE_VISION_AGENTS,
298304
];
299305

300306
static final $core.List<SdkType?> _byValue =
301-
$pb.ProtobufEnum.$_initByValueList(values, 9);
307+
$pb.ProtobufEnum.$_initByValueList(values, 11);
302308
static SdkType? valueOf($core.int value) =>
303309
value < 0 || value >= _byValue.length ? null : _byValue[value];
304310

packages/stream_video/lib/protobuf/video/sfu/models/models.pbjson.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ const SdkType$json = {
180180
{'1': 'SDK_TYPE_UNITY', '2': 7},
181181
{'1': 'SDK_TYPE_GO', '2': 8},
182182
{'1': 'SDK_TYPE_PLAIN_JAVASCRIPT', '2': 9},
183+
{'1': 'SDK_TYPE_PYTHON', '2': 10},
184+
{'1': 'SDK_TYPE_VISION_AGENTS', '2': 11},
183185
],
184186
};
185187

@@ -189,7 +191,8 @@ final $typed_data.Uint8List sdkTypeDescriptor = $convert.base64Decode(
189191
'IUChBTREtfVFlQRV9BTkdVTEFSEAISFAoQU0RLX1RZUEVfQU5EUk9JRBADEhAKDFNES19UWVBF'
190192
'X0lPUxAEEhQKEFNES19UWVBFX0ZMVVRURVIQBRIZChVTREtfVFlQRV9SRUFDVF9OQVRJVkUQBh'
191193
'ISCg5TREtfVFlQRV9VTklUWRAHEg8KC1NES19UWVBFX0dPEAgSHQoZU0RLX1RZUEVfUExBSU5f'
192-
'SkFWQVNDUklQVBAJ');
194+
'SkFWQVNDUklQVBAJEhMKD1NES19UWVBFX1BZVEhPThAKEhoKFlNES19UWVBFX1ZJU0lPTl9BR0'
195+
'VOVFMQCw==');
193196

194197
@$core.Deprecated('Use trackUnpublishReasonDescriptor instead')
195198
const TrackUnpublishReason$json = {
@@ -831,6 +834,7 @@ const ClientDetails$json = {
831834
'6': '.stream.video.sfu.models.Device',
832835
'10': 'device'
833836
},
837+
{'1': 'webrtc_version', '3': 5, '4': 1, '5': 9, '10': 'webrtcVersion'},
834838
],
835839
};
836840

@@ -840,7 +844,7 @@ final $typed_data.Uint8List clientDetailsDescriptor = $convert.base64Decode(
840844
'Nka1IDc2RrEisKAm9zGAIgASgLMhsuc3RyZWFtLnZpZGVvLnNmdS5tb2RlbHMuT1NSAm9zEjoK'
841845
'B2Jyb3dzZXIYAyABKAsyIC5zdHJlYW0udmlkZW8uc2Z1Lm1vZGVscy5Ccm93c2VyUgdicm93c2'
842846
'VyEjcKBmRldmljZRgEIAEoCzIfLnN0cmVhbS52aWRlby5zZnUubW9kZWxzLkRldmljZVIGZGV2'
843-
'aWNl');
847+
'aWNlEiUKDndlYnJ0Y192ZXJzaW9uGAUgASgJUg13ZWJydGNWZXJzaW9u');
844848

845849
@$core.Deprecated('Use sdkDescriptor instead')
846850
const Sdk$json = {

packages/stream_video/lib/src/call/call.dart

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -906,13 +906,15 @@ class Call {
906906
return _callJoinLock.synchronized(() async {
907907
final sfuJoinFailures = <String, int>{};
908908
String? sfuToForceExclude;
909+
final sfusToExclude = <String>[];
909910

910911
for (var attempt = 0; attempt < max(maxJoinRetries, 1); attempt++) {
911912
final result = await runCatchingResult(
912913
() => _doJoin(
913914
connectOptions: connectOptions,
914915
membersLimit: membersLimit,
915916
sfuToForceExclude: sfuToForceExclude,
917+
sfusToExclude: List.unmodifiable(sfusToExclude),
916918
reconnectReason: reconnectReason,
917919
),
918920
);
@@ -929,6 +931,19 @@ class Call {
929931
final error = result.getErrorOrNull();
930932
if (error is VideoErrorWithCause &&
931933
error.cause is SessionConnectionFailure) {
934+
final connectionFailure = error.cause as SessionConnectionFailure;
935+
936+
if (_isUnrecoverableSfuError(connectionFailure)) {
937+
_logger.e(
938+
() => '[join] unrecoverable SFU error, not retrying',
939+
);
940+
await leave(
941+
reason: DisconnectReason.failure(error),
942+
);
943+
return result;
944+
}
945+
946+
final switchSfu = _isJoinErrorCode(connectionFailure);
932947
final sfuName = _credentials?.sfuServer.name ?? '';
933948

934949
sfuJoinFailures.update(
@@ -937,17 +952,25 @@ class Call {
937952
ifAbsent: () => 1,
938953
);
939954

940-
if (sfuJoinFailures[sfuName]! >= 2) {
955+
if (switchSfu || sfuJoinFailures[sfuName]! >= 2) {
956+
final sfuMigrateReason = switchSfu
957+
? 'join error code'
958+
: 'too many failures';
959+
941960
_logger.e(
942961
() =>
943-
'[join] too many failures for SFU: $sfuName, migrating...',
962+
'[join] $sfuMigrateReason for SFU: $sfuName, migrating...',
944963
);
945964

946965
_session?.trace('call_join_migrate', {
947966
'migrateFrom': sfuName,
967+
'reason': sfuMigrateReason,
948968
});
949969

950970
sfuToForceExclude = sfuName;
971+
sfusToExclude
972+
..clear()
973+
..addAll(sfuJoinFailures.keys);
951974
}
952975
}
953976
}
@@ -971,10 +994,28 @@ class Call {
971994
});
972995
}
973996

997+
SfuError? _extractSfuError(SessionConnectionFailure failure) {
998+
final innerError = failure.error;
999+
if (innerError is VideoErrorWithCause && innerError.cause is SfuError) {
1000+
return innerError.cause as SfuError;
1001+
}
1002+
return null;
1003+
}
1004+
1005+
bool _isJoinErrorCode(SessionConnectionFailure failure) {
1006+
return _extractSfuError(failure)?.code.isJoinErrorCode ?? false;
1007+
}
1008+
1009+
bool _isUnrecoverableSfuError(SessionConnectionFailure failure) {
1010+
return _extractSfuError(failure)?.reconnectStrategy ==
1011+
SfuReconnectionStrategy.disconnect;
1012+
}
1013+
9741014
Future<Result<None>> _doJoin({
9751015
CallConnectOptions? connectOptions,
9761016
int? membersLimit,
9771017
String? sfuToForceExclude,
1018+
List<String> sfusToExclude = const [],
9781019
String? reconnectReason,
9791020
}) async {
9801021
_logger.d(() => '[join] options: $_connectOptions');
@@ -1021,6 +1062,7 @@ class Call {
10211062
connectOptions: connectOptions,
10221063
membersLimit: membersLimit,
10231064
forceMigratingFrom: sfuToForceExclude,
1065+
migratingFromList: sfusToExclude,
10241066
);
10251067

10261068
if (joinedResult is! Success<CallCredentials>) {
@@ -1180,6 +1222,7 @@ class Call {
11801222
CallConnectOptions? connectOptions,
11811223
int? membersLimit,
11821224
String? forceMigratingFrom,
1225+
List<String> migratingFromList = const [],
11831226
}) async {
11841227
_logger.d(
11851228
() =>
@@ -1197,14 +1240,25 @@ class Call {
11971240
_reconnectStrategy == SfuReconnectionStrategy.migrate) {
11981241
_logger.d(() => '[joinIfNeeded] joining');
11991242

1243+
final migratingFrom =
1244+
forceMigratingFrom ??
1245+
(_reconnectStrategy == SfuReconnectionStrategy.migrate
1246+
? _session?.config.sfuName
1247+
: null);
1248+
1249+
// When migrating, include the current SFU in the exclusion list
1250+
// so the coordinator picks a different SFU.
1251+
final effectiveMigratingFromList = [
1252+
...migratingFromList,
1253+
if (migratingFrom != null && !migratingFromList.contains(migratingFrom))
1254+
migratingFrom,
1255+
];
1256+
12001257
final joinedResult = await _performJoinCallRequest(
12011258
create: true,
12021259
connectOptions: connectOptions,
1203-
migratingFrom:
1204-
forceMigratingFrom ??
1205-
(_reconnectStrategy == SfuReconnectionStrategy.migrate
1206-
? _session?.config.sfuName
1207-
: null),
1260+
migratingFrom: migratingFrom,
1261+
migratingFromList: effectiveMigratingFromList,
12081262
membersLimit: membersLimit,
12091263
);
12101264

@@ -1239,10 +1293,14 @@ class Call {
12391293
bool create = false,
12401294
bool video = false,
12411295
String? migratingFrom,
1296+
List<String> migratingFromList = const [],
12421297
int? membersLimit,
12431298
CallConnectOptions? connectOptions,
12441299
}) async {
1245-
_logger.d(() => '[joinCall] cid: $callCid, migratingFrom: $migratingFrom');
1300+
_logger.d(
1301+
() =>
1302+
'[joinCall] cid: $callCid, migratingFrom: $migratingFrom, migratingFromList: $migratingFromList',
1303+
);
12461304

12471305
if (_callLifecycleCompleter.isCompleted) {
12481306
_logger.w(() => '[joinCall] rejected (call was left)');
@@ -1253,6 +1311,7 @@ class Call {
12531311
callCid: callCid,
12541312
create: create,
12551313
migratingFrom: migratingFrom,
1314+
migratingFromList: migratingFromList,
12561315
video: video,
12571316
membersLimit: membersLimit,
12581317
);
@@ -1557,6 +1616,22 @@ class Call {
15571616
}
15581617
// error event
15591618
else if (sfuEvent is SfuErrorEvent) {
1619+
_session?.trace('sfu_error', {
1620+
'code': sfuEvent.error.code.name,
1621+
'error': sfuEvent.error.message,
1622+
'strategy': sfuEvent.error.reconnectStrategy.name,
1623+
});
1624+
1625+
// SFU_FULL, SFU_SHUTTING_DOWN, CALL_PARTICIPANT_LIMIT_REACHED are join
1626+
// errors. Although they may specify a `migrate` strategy, they should be
1627+
// handled by the join retry logic in the join flow with a REJOIN to a new SFU instead.
1628+
if (sfuEvent.error.code.isJoinErrorCode) {
1629+
_logger.w(
1630+
() => '[onSfuEvent] skipping join error code: ${sfuEvent.error.code}',
1631+
);
1632+
return;
1633+
}
1634+
15601635
switch (sfuEvent.error.reconnectStrategy) {
15611636
case SfuReconnectionStrategy.rejoin:
15621637
case SfuReconnectionStrategy.fast:
@@ -1565,9 +1640,7 @@ class Call {
15651640
() =>
15661641
'[onSfuEvent] SFU error: ${sfuEvent.error}, reconnect strategy: ${sfuEvent.error.reconnectStrategy}',
15671642
);
1568-
_session?.trace('sfu_error', {
1569-
'error': sfuEvent.error.message,
1570-
});
1643+
15711644
await _reconnect(
15721645
sfuEvent.error.reconnectStrategy,
15731646
reconnectReason: 'sfu error: ${sfuEvent.error.message}',

packages/stream_video/lib/src/coordinator/coordinator_client.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ abstract class CoordinatorClient {
8787
bool? ringing,
8888
bool? create,
8989
String? migratingFrom,
90+
List<String> migratingFromList = const [],
9091
bool? video,
9192
int? membersLimit,
9293
});

0 commit comments

Comments
 (0)