Skip to content

Commit ce5b2f1

Browse files
d3xvnkanat
andauthored
feat: SFU migration (#538)
* added basics for sfu migration * handled session migration event and tries reconnect in call.dart * fixed invocation of function * fix SFU migration flow --------- Co-authored-by: kanat <[email protected]>
1 parent fb499da commit ce5b2f1

File tree

15 files changed

+184
-16
lines changed

15 files changed

+184
-16
lines changed

packages/stream_video/lib/src/call/call.dart

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,16 @@ class Call {
546546
return joinedResult as Failure;
547547
}
548548

549-
Future<Result<None>> _startSession(CallCredentials credentials) async {
550-
_logger.d(() => '[startSession] credentials: $credentials');
549+
Future<Result<None>> _startSession(
550+
CallCredentials credentials, [
551+
String? sessionId,
552+
]) async {
553+
_logger.d(
554+
() => '[startSession] credentials: $credentials, sessionId: $sessionId',
555+
);
551556
_credentials = null;
552557
final session = await _sessionFactory.makeCallSession(
558+
sessionId: sessionId,
553559
credentials: credentials,
554560
stateManager: _stateManager,
555561
);
@@ -574,11 +580,24 @@ class Call {
574580
return result;
575581
}
576582

583+
Future<Result<None>> _stopSession() async {
584+
_subscriptions.cancel(_idSessionEvents);
585+
_subscriptions.cancel(_idSessionStats);
586+
587+
await _session?.dispose();
588+
_session = null;
589+
_credentials = null;
590+
591+
return const Result.success(none);
592+
}
593+
577594
Future<void> _onSfuEvent(SfuEvent sfuEvent) async {
578595
if (sfuEvent is SfuSocketDisconnected) {
579596
await _reconnect(sfuEvent.reason);
580597
} else if (sfuEvent is SfuSocketFailed) {
581598
await _reconnect(sfuEvent.error);
599+
} else if (sfuEvent is SfuGoAwayEvent) {
600+
await _switchSfu(sfuEvent.goAwayReason);
582601
}
583602
}
584603

@@ -697,6 +716,46 @@ class Call {
697716
return const Result.success(none);
698717
}
699718

719+
Future<void> _switchSfu(SfuGoAwayReason reason) async {
720+
_logger.d(() => '[switchSfu] reason: $reason');
721+
final migratingFrom = _session?.config.sfuName;
722+
final sessionId = _session?.sessionId;
723+
724+
if (_stateManager.callState.status is CallStatusMigrating) {
725+
_logger.d(() => '[switchSfu] rejected (call already migrating)');
726+
return;
727+
}
728+
await _stopSession();
729+
_stateManager.lifecycleCallMigrating();
730+
_logger.d(() => '[switchSfu] migratingFrom: $migratingFrom($sessionId)');
731+
final joinedResult = await _joinCall(migratingFrom: migratingFrom);
732+
733+
if (joinedResult is! Success<CallJoinedData>) {
734+
final failedResult = joinedResult as Failure;
735+
_logger.e(() => '[switchSfu] failed: $failedResult');
736+
final error = failedResult.error;
737+
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
738+
return;
739+
}
740+
741+
_logger.v(() => '[switchSfu] starting sfu session');
742+
final sessionResult = await _startSession(
743+
joinedResult.data.credentials,
744+
sessionId,
745+
);
746+
if (sessionResult is! Success<None>) {
747+
_logger.w(() => '[switchSfu] sfu session start failed: $sessionResult');
748+
final error = (sessionResult as Failure).error;
749+
_stateManager.lifecycleCallConnectFailed(ConnectFailed(error));
750+
return;
751+
}
752+
_logger.v(() => '[switchSfu] started session');
753+
_stateManager.lifecycleCallConnected(const CallConnected());
754+
await _applyConnectOptions();
755+
756+
_logger.v(() => '[switchSfu] completed');
757+
}
758+
700759
Future<void> _clear(String src) async {
701760
_logger.d(() => '[clear] src: $src');
702761
_status.value = _ConnectionStatus.disconnected;
@@ -956,10 +1015,14 @@ class Call {
9561015
/// and joins the call immediately.
9571016
Future<Result<CallJoinedData>> _joinCall({
9581017
bool create = false,
1018+
String? migratingFrom,
9591019
}) async {
960-
_logger.d(() => '[joinCall] cid: $callCid');
961-
final joinResult =
962-
await _coordinatorClient.joinCall(callCid: callCid, create: create);
1020+
_logger.d(() => '[joinCall] cid: $callCid, migratingFrom: $migratingFrom');
1021+
final joinResult = await _coordinatorClient.joinCall(
1022+
callCid: callCid,
1023+
create: create,
1024+
migratingFrom: migratingFrom,
1025+
);
9631026
if (joinResult is! Success<CoordinatorJoined>) {
9641027
_logger.e(() => '[joinCall] join failed: $joinResult');
9651028
return joinResult as Failure;
@@ -1154,8 +1217,12 @@ class Call {
11541217
Result.error('Session is null');
11551218

11561219
if (result.isSuccess) {
1157-
_stateManager
1158-
.participantSetCameraEnabled(SetCameraEnabled(enabled: enabled));
1220+
_stateManager.participantSetCameraEnabled(
1221+
SetCameraEnabled(enabled: enabled),
1222+
);
1223+
_connectOptions = _connectOptions.copyWith(
1224+
camera: enabled ? TrackOption.enabled() : TrackOption.disabled(),
1225+
);
11591226
}
11601227

11611228
return result;
@@ -1175,6 +1242,9 @@ class Call {
11751242
_stateManager.participantSetMicrophoneEnabled(
11761243
SetMicrophoneEnabled(enabled: enabled),
11771244
);
1245+
_connectOptions = _connectOptions.copyWith(
1246+
microphone: enabled ? TrackOption.enabled() : TrackOption.disabled(),
1247+
);
11781248
}
11791249

11801250
return result;
@@ -1194,6 +1264,9 @@ class Call {
11941264
_stateManager.participantSetScreenShareEnabled(
11951265
SetScreenShareEnabled(enabled: enabled),
11961266
);
1267+
_connectOptions = _connectOptions.copyWith(
1268+
screenShare: enabled ? TrackOption.enabled() : TrackOption.disabled(),
1269+
);
11971270
}
11981271

11991272
return result;

packages/stream_video/lib/src/call/session/call_session_config.dart

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,23 @@ import '../../types/other.dart';
22

33
class CallSessionConfig {
44
const CallSessionConfig({
5+
required this.sfuName,
56
required this.sfuToken,
67
required this.sfuUrl,
78
required this.sfuWsEndpoint,
89
required this.rtcConfig,
910
});
1011

12+
final String sfuName;
1113
final String sfuToken;
1214
final String sfuUrl;
1315
final String sfuWsEndpoint;
1416
final RTCConfiguration rtcConfig;
1517

1618
@override
1719
String toString() {
18-
return 'CallSessionConfig{sfuToken: $sfuToken, sfuUrl: $sfuUrl, '
19-
'sfuWsEndpoint: $sfuWsEndpoint, rtcConfig: $rtcConfig}';
20+
return 'CallSessionConfig{sfuName: $sfuName, sfuToken: $sfuToken, '
21+
'sfuUrl: $sfuUrl, sfuWsEndpoint: $sfuWsEndpoint, '
22+
'rtcConfig: $rtcConfig}';
2023
}
2124
}

packages/stream_video/lib/src/call/session/call_session_factory.dart

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,28 @@ class CallSessionFactory {
2424
final SdpEditor sdpEditor;
2525

2626
Future<CallSession> makeCallSession({
27+
String? sessionId,
2728
required CallCredentials credentials,
2829
required CallStateNotifier stateManager,
2930
}) async {
30-
final sessionId = const Uuid().v4();
31-
_logger.d(() => '[makeCallSession] sessionId: $sessionId');
31+
final finalSessionId = sessionId ?? const Uuid().v4();
32+
_logger.d(() => '[makeCallSession] sessionId: $finalSessionId($sessionId)');
3233
final rtcConfig = _makeRtcConfig(credentials.iceServers) ??
3334
defaultRtcConfiguration(credentials.sfuServer.url);
3435
final sessionConfig = CallSessionConfig(
36+
sfuName: credentials.sfuServer.name,
3537
sfuUrl: credentials.sfuServer.url,
3638
sfuWsEndpoint: credentials.sfuServer.wsEndpoint,
3739
sfuToken: credentials.sfuToken,
3840
rtcConfig: rtcConfig,
3941
);
40-
_logger.v(() => '[makeCallSession] sfuUrl: ${sessionConfig.sfuUrl}');
42+
final sfuName = sessionConfig.sfuName;
43+
final sfuUrl = sessionConfig.sfuUrl;
44+
_logger.v(() => '[makeCallSession] sfuName: $sfuName, sfuUrl: $sfuUrl');
4145
return CallSession(
4246
sessionSeq: _sessionSeq++,
4347
callCid: callCid,
44-
sessionId: sessionId,
48+
sessionId: finalSessionId,
4549
config: sessionConfig,
4650
stateManager: stateManager,
4751
sdpEditor: sdpEditor,

packages/stream_video/lib/src/call/state/mixins/state_lifecycle_mixin.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,14 @@ mixin StateLifecycleMixin on StateNotifier<CallState> {
239239
status: CallStatus.connected(),
240240
);
241241
}
242+
243+
void lifecycleCallMigrating() {
244+
_logger.d(() => '[lifecycleCallMigrating] state: $state');
245+
state = state.copyWith(
246+
status: const CallStatusMigrating(),
247+
callParticipants: const [],
248+
);
249+
}
242250
}
243251

244252
extension on CallMetadata {

packages/stream_video/lib/src/coordinator/coordinator_client.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ abstract class CoordinatorClient {
7070
String? datacenterId,
7171
bool? ringing,
7272
bool? create,
73+
String? migratingFrom,
7374
});
7475

7576
Future<Result<None>> acceptCall({required StreamCallCid cid});

packages/stream_video/lib/src/coordinator/open_api/coordinator_client_open_api.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ class CoordinatorClientOpenApi extends CoordinatorClient {
446446
String? datacenterId,
447447
bool? ringing,
448448
bool? create,
449+
String? migratingFrom,
449450
}) async {
450451
try {
451452
_logger.d(
@@ -466,6 +467,7 @@ class CoordinatorClientOpenApi extends CoordinatorClient {
466467
create: create,
467468
ring: ringing,
468469
location: location,
470+
migratingFrom: migratingFrom,
469471
),
470472
);
471473
_logger.v(() => '[joinCall] completed: $result');

packages/stream_video/lib/src/coordinator/retry/coordinator_client_retry.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,13 +236,15 @@ class CoordinatorClientRetry extends CoordinatorClient {
236236
String? datacenterId,
237237
bool? ringing,
238238
bool? create,
239+
String? migratingFrom,
239240
}) {
240241
return _retryManager.execute(
241242
() => _delegate.joinCall(
242243
callCid: callCid,
243244
datacenterId: datacenterId,
244245
ringing: ringing,
245246
create: create,
247+
migratingFrom: migratingFrom,
246248
),
247249
(error, nextAttemptDelay) async {
248250
_logRetry('joinCall', error, nextAttemptDelay);

packages/stream_video/lib/src/models/call_status.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ abstract class CallStatus extends Equatable {
6464

6565
bool get isReconnecting => this is CallStatusReconnecting;
6666

67+
bool get isMigrating => this is CallStatusMigrating;
68+
6769
bool get isConnected => this is CallStatusConnected;
6870

6971
bool get isDisconnected => this is CallStatusDisconnected;
@@ -144,6 +146,15 @@ class CallStatusReconnecting extends CallStatusConnecting {
144146
}
145147
}
146148

149+
class CallStatusMigrating extends CallStatusConnecting {
150+
const CallStatusMigrating() : super._internal();
151+
152+
@override
153+
String toString() {
154+
return 'Migrating';
155+
}
156+
}
157+
147158
class CallStatusConnected extends CallStatusActive {
148159
factory CallStatusConnected() {
149160
return _instance;

packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import '../models/sfu_codec.dart';
88
import '../models/sfu_connection_info.dart';
99
import '../models/sfu_connection_quality.dart';
1010
import '../models/sfu_error.dart';
11+
import '../models/sfu_goaway_reason.dart';
1112
import '../models/sfu_model_mapper_extensions.dart';
1213
import '../models/sfu_participant.dart';
1314
import '../models/sfu_track_type.dart';
@@ -137,6 +138,11 @@ extension SfuEventMapper on sfu_events.SfuEvent {
137138
),
138139
message: payload.message,
139140
);
141+
case sfu_events.SfuEvent_EventPayload.goAway:
142+
final payload = goAway;
143+
return SfuGoAwayEvent(
144+
goAwayReason: payload.reason.toDomain(),
145+
);
140146
default:
141147
return const SfuUnknownEvent();
142148
}
@@ -206,6 +212,22 @@ extension SfuConnectionQualityExtension on sfu_models.ConnectionQuality {
206212
}
207213
}
208214

215+
/// TODO
216+
extension SfuGoAwayReasonExtension on sfu_models.GoAwayReason {
217+
SfuGoAwayReason toDomain() {
218+
switch (this) {
219+
case sfu_models.GoAwayReason.GO_AWAY_REASON_REBALANCE:
220+
return SfuGoAwayReason.rebalance;
221+
case sfu_models.GoAwayReason.GO_AWAY_REASON_SHUTTING_DOWN:
222+
return SfuGoAwayReason.shuttingDown;
223+
case sfu_models.GoAwayReason.GO_AWAY_REASON_UNSPECIFIED:
224+
return SfuGoAwayReason.unspecified;
225+
default:
226+
throw StateError('unexpected go away reason: $this');
227+
}
228+
}
229+
}
230+
209231
/// TODO
210232
extension SfuTrackTypeExtension on sfu_models.TrackType {
211233
SfuTrackType toDomain() {

packages/stream_video/lib/src/sfu/data/events/sfu_events.dart

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import '../models/sfu_call_grants.dart';
1010
import '../models/sfu_call_state.dart';
1111
import '../models/sfu_connection_info.dart';
1212
import '../models/sfu_error.dart';
13+
import '../models/sfu_goaway_reason.dart';
1314
import '../models/sfu_participant.dart';
1415
import '../models/sfu_track_type.dart';
1516
import '../models/sfu_video_sender.dart';
@@ -235,6 +236,17 @@ class SfuSocketConnected extends SfuSocketEvent {
235236
List<Object?> get props => [sessionId, url];
236237
}
237238

239+
class SfuGoAwayEvent extends SfuSocketEvent {
240+
const SfuGoAwayEvent({
241+
required this.goAwayReason,
242+
});
243+
244+
final SfuGoAwayReason goAwayReason;
245+
246+
@override
247+
List<Object?> get props => [goAwayReason];
248+
}
249+
238250
class SfuSocketDisconnected extends SfuSocketEvent {
239251
const SfuSocketDisconnected({
240252
required this.sessionId,

0 commit comments

Comments
 (0)