Skip to content

Commit c3f5ff2

Browse files
authored
feat: Prepare connection/region pinning. (#574)
* feat: Prepare connection/region pinning. * dart run import_sorter. * update. * update. * update. * fix. * update. * update. * fix scheme replace for url. * update README.md * fix. * update. * fix.
1 parent 253b7b5 commit c3f5ff2

19 files changed

+1269
-169
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ final roomOptions = RoomOptions(
194194
195195
final room = Room();
196196
197+
// you can use `prepareConnection` to speed up connection.
198+
await room.prepareConnection(url, token);
199+
197200
await room.connect(url, token, roomOptions: roomOptions);
198201

199202
try {

example/lib/pages/prejoin.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,12 @@ class _PreJoinPageState extends State<PreJoinPage> {
165165

166166
try {
167167
//create new room
168-
var cameraEncoding = VideoEncoding(
168+
var cameraEncoding = const VideoEncoding(
169169
maxBitrate: 5 * 1000 * 1000,
170170
maxFramerate: 30,
171171
);
172172

173-
var screenEncoding = VideoEncoding(
173+
var screenEncoding = const VideoEncoding(
174174
maxBitrate: 3 * 1000 * 1000,
175175
maxFramerate: 15,
176176
);
@@ -189,10 +189,10 @@ class _PreJoinPageState extends State<PreJoinPage> {
189189
defaultAudioPublishOptions: const AudioPublishOptions(
190190
name: 'custom_audio_track_name',
191191
),
192-
defaultCameraCaptureOptions: CameraCaptureOptions(
192+
defaultCameraCaptureOptions: const CameraCaptureOptions(
193193
maxFrameRate: 30,
194194
params: VideoParameters(
195-
dimensions: const VideoDimensions(1280, 720),
195+
dimensions: VideoDimensions(1280, 720),
196196
)),
197197
defaultScreenShareCaptureOptions: const ScreenShareCaptureOptions(
198198
useiOSBroadcastExtension: true,
@@ -214,6 +214,8 @@ class _PreJoinPageState extends State<PreJoinPage> {
214214
// Create a Listener before connecting
215215
final listener = room.createListener();
216216

217+
await room.prepareConnection(args.url, args.token);
218+
217219
// Try to connect to the room
218220
// This will throw an Exception if it fails for any reason.
219221
await room.connect(

example/lib/pages/room.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ class _RoomPageState extends State<RoomPage> {
120120
String decoded = 'Failed to decode';
121121
try {
122122
decoded = utf8.decode(event.data);
123-
} catch (_) {
124-
print('Failed to decode: $_');
123+
} catch (err) {
124+
print('Failed to decode: $err');
125125
}
126126
context.showDataReceivedDialog(decoded);
127127
})

example/lib/widgets/controls.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class _ControlsWidgetState extends State<ControlsWidget> {
169169
const androidConfig = FlutterBackgroundAndroidConfig(
170170
notificationTitle: 'Screen Sharing',
171171
notificationText: 'LiveKit Example is sharing the screen.',
172-
notificationImportance: AndroidNotificationImportance.Default,
172+
notificationImportance: AndroidNotificationImportance.normal,
173173
notificationIcon: AndroidResource(
174174
name: 'livekit_ic_launcher', defType: 'mipmap'),
175175
);

lib/src/core/engine.dart

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
3636
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
3737
import '../publication/local.dart';
3838
import '../support/disposable.dart';
39+
import '../support/region_url_provider.dart';
3940
import '../support/websocket.dart';
4041
import '../track/local/video.dart';
4142
import '../types/internal.dart';
@@ -130,6 +131,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
130131

131132
bool attemptingReconnect = false;
132133

134+
RegionUrlProvider? _regionUrlProvider;
135+
133136
void clearReconnectTimeout() {
134137
if (reconnectTimeout != null) {
135138
reconnectTimeout?.cancel();
@@ -171,6 +174,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
171174
ConnectOptions? connectOptions,
172175
RoomOptions? roomOptions,
173176
FastConnectOptions? fastConnectOptions,
177+
RegionUrlProvider? regionUrlProvider,
174178
}) async {
175179
this.url = url;
176180
this.token = token;
@@ -179,6 +183,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
179183
this.roomOptions = roomOptions ?? this.roomOptions;
180184
this.fastConnectOptions = fastConnectOptions;
181185

186+
if (regionUrlProvider != null) {
187+
_regionUrlProvider = regionUrlProvider;
188+
}
189+
182190
try {
183191
// wait for socket to connect rtc server
184192
await signalClient.connect(
@@ -192,7 +200,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
192200
await _signalListener.waitFor<SignalJoinResponseEvent>(
193201
duration: this.connectOptions.timeouts.connection,
194202
onTimeout: () => throw ConnectException(
195-
'Timed out waiting for SignalJoinResponseEvent'),
203+
'Timed out waiting for SignalJoinResponseEvent',
204+
reason: ConnectionErrorReason.Timeout),
196205
);
197206

198207
logger.fine('Waiting for engine to connect...');
@@ -663,6 +672,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
663672
));
664673

665674
clearReconnectTimeout();
675+
if (token != null && _regionUrlProvider != null) {
676+
// token may have been refreshed, we do not want to recreate the regionUrlProvider
677+
// since the current engine may have inherited a regional url
678+
_regionUrlProvider!.updateToken(token!);
679+
}
666680
logger.fine(
667681
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
668682
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
@@ -700,7 +714,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
700714
duration: connectOptions.timeouts.connection * 10,
701715
filter: (event) => !event.state.contains(ConnectivityResult.none),
702716
onTimeout: () => throw ConnectException(
703-
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent'),
717+
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent',
718+
reason: ConnectionErrorReason.Timeout),
704719
);
705720
}
706721

@@ -756,7 +771,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
756771
await events.waitFor<SignalReconnectedEvent>(
757772
duration: connectOptions.timeouts.connection,
758773
onTimeout: () => throw ConnectException(
759-
'resumeConnection: Timed out waiting for SignalReconnectedEvent'),
774+
'resumeConnection: Timed out waiting for SignalReconnectedEvent',
775+
reason: ConnectionErrorReason.Timeout),
760776
);
761777

762778
logger.fine('resumeConnection: reason: ${reason.name}');
@@ -789,53 +805,65 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
789805
}
790806

791807
@internal
792-
Future<void> restartConnection([bool signalEvents = false]) async {
808+
Future<void> restartConnection({String? regionUrl}) async {
793809
if (_isClosed) {
794810
return;
795811
}
796812

797-
events.emit(const EngineFullRestartingEvent());
813+
try {
814+
events.emit(const EngineFullRestartingEvent());
798815

799-
if (signalClient.connectionState == ConnectionState.connected) {
800-
await signalClient.sendLeave();
801-
}
816+
if (signalClient.connectionState == ConnectionState.connected) {
817+
await signalClient.sendLeave();
818+
}
802819

803-
await publisher?.dispose();
804-
publisher = null;
820+
await publisher?.dispose();
821+
publisher = null;
805822

806-
await subscriber?.dispose();
807-
subscriber = null;
823+
await subscriber?.dispose();
824+
subscriber = null;
808825

809-
_reliableDCSub = null;
810-
_reliableDCPub = null;
811-
_lossyDCSub = null;
812-
_lossyDCPub = null;
826+
_reliableDCSub = null;
827+
_reliableDCPub = null;
828+
_lossyDCSub = null;
829+
_lossyDCPub = null;
813830

814-
await _signalListener.cancelAll();
831+
await _signalListener.cancelAll();
815832

816-
_signalListener = signalClient.createListener(synchronized: true);
817-
_setUpSignalListeners();
833+
_signalListener = signalClient.createListener(synchronized: true);
834+
_setUpSignalListeners();
818835

819-
await connect(
820-
url!,
821-
token!,
822-
roomOptions: roomOptions,
823-
connectOptions: connectOptions,
824-
fastConnectOptions: fastConnectOptions,
825-
);
826-
827-
if (_hasPublished) {
828-
await negotiate();
829-
logger.fine('restartConnection: Waiting for publisher to ice-connect...');
830-
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
831-
filter: (event) => event.state.isConnected(),
832-
duration: connectOptions.timeouts.peerConnection,
836+
await connect(
837+
regionUrl ?? url!,
838+
token!,
839+
roomOptions: roomOptions,
840+
connectOptions: connectOptions,
841+
fastConnectOptions: fastConnectOptions,
833842
);
834-
}
835-
836-
fullReconnectOnNext = false;
837843

838-
events.emit(const EngineRestartedEvent());
844+
if (_hasPublished) {
845+
await negotiate();
846+
logger
847+
.fine('restartConnection: Waiting for publisher to ice-connect...');
848+
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
849+
filter: (event) => event.state.isConnected(),
850+
duration: connectOptions.timeouts.peerConnection,
851+
);
852+
}
853+
fullReconnectOnNext = false;
854+
_regionUrlProvider?.resetAttempts();
855+
events.emit(const EngineRestartedEvent());
856+
} catch (error) {
857+
final nextRegionUrl = await _regionUrlProvider?.getNextBestRegionUrl();
858+
if (nextRegionUrl != null) {
859+
await restartConnection(regionUrl: nextRegionUrl);
860+
return;
861+
} else {
862+
// no more regions to try (or we're not on cloud)
863+
_regionUrlProvider?.resetAttempts();
864+
rethrow;
865+
}
866+
}
839867
}
840868

841869
@internal
@@ -992,19 +1020,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
9921020
token = event.token;
9931021
})
9941022
..on<SignalLeaveEvent>((event) async {
995-
if (event.canReconnect) {
996-
fullReconnectOnNext = true;
997-
// reconnect immediately instead of waiting for next attempt
998-
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
999-
} else {
1000-
if (connectionState == ConnectionState.reconnecting) {
1001-
logger.warning(
1002-
'[Signal] Received Leave while engine is reconnecting, ignoring...');
1003-
return;
1004-
}
1005-
await signalClient.cleanUp();
1006-
await cleanUp();
1007-
events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
1023+
if (event.regions != null && _regionUrlProvider != null) {
1024+
logger.fine('updating regions');
1025+
_regionUrlProvider?.setServerReportedRegions(event.regions!);
1026+
}
1027+
switch (event.action) {
1028+
case lk_rtc.LeaveRequest_Action.DISCONNECT:
1029+
if (connectionState == ConnectionState.reconnecting) {
1030+
logger.warning(
1031+
'[Signal] Received Leave while engine is reconnecting, ignoring...');
1032+
return;
1033+
}
1034+
await signalClient.cleanUp();
1035+
await cleanUp();
1036+
events
1037+
.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
1038+
break;
1039+
case lk_rtc.LeaveRequest_Action.RECONNECT:
1040+
fullReconnectOnNext = true;
1041+
// reconnect immediately instead of waiting for next attempt
1042+
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
1043+
break;
1044+
case lk_rtc.LeaveRequest_Action.RESUME:
1045+
// reconnect immediately instead of waiting for next attempt
1046+
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
1047+
default:
1048+
break;
10081049
}
10091050
});
10101051

@@ -1016,6 +1057,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
10161057
await cleanUp();
10171058
}
10181059
}
1060+
1061+
void setRegionUrlProvider(RegionUrlProvider provider) {
1062+
_regionUrlProvider = provider;
1063+
}
10191064
}
10201065

10211066
extension EnginePrivateMethods on Engine {

0 commit comments

Comments
 (0)