Skip to content

Commit 811ded7

Browse files
authored
Enqueue Signal requests while reconnecting (#81)
* implement queue to events emitter * check `isDisposed` instead * update message * implement queue to events emitter * update lock * impl
1 parent da8fe67 commit 811ded7

File tree

2 files changed

+62
-10
lines changed

2 files changed

+62
-10
lines changed

lib/src/core/engine.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
545545
if (_connectionState == ConnectionState.connected) {
546546
if (didReconnect) {
547547
events.emit(const EngineReconnectedEvent());
548+
// send queued requests if engine re-connected
549+
signalClient.sendQueuedRequests();
548550
} else {
549551
events.emit(const EngineConnectedEvent());
550552
}
551553
} else if (_connectionState == ConnectionState.reconnecting) {
552554
events.emit(const EngineReconnectingEvent());
553555
} else if (_connectionState == ConnectionState.disconnected) {
556+
signalClient.cleanUp();
554557
events.emit(const EngineDisconnectedEvent());
555558
}
556559
}

lib/src/core/signal_client.dart

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:collection';
23

34
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
45
import 'package:http/http.dart' as http;
@@ -26,6 +27,8 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
2627
final WebSocketConnector _wsConnector;
2728
LiveKitWebSocket? _ws;
2829

30+
final _queue = Queue<lk_rtc.SignalRequest>();
31+
2932
@internal
3033
SignalClient(WebSocketConnector wsConnector) : _wsConnector = wsConnector {
3134
events.listen((event) {
@@ -34,7 +37,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
3437

3538
onDispose(() async {
3639
await events.dispose();
37-
await _cleanUp();
40+
await cleanUp();
3841
});
3942
}
4043

@@ -59,7 +62,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
5962
? ConnectionState.reconnecting
6063
: ConnectionState.connecting);
6164
// Clean up existing socket
62-
await _cleanUp();
65+
await cleanUp();
6366
// Attempt to connect
6467
_ws = await _wsConnector(
6568
rtcUri,
@@ -103,26 +106,41 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
103106
}
104107
}
105108

106-
Future<void> _cleanUp() async {
109+
@internal
110+
Future<void> cleanUp() async {
107111
await _ws?.dispose();
108112
_ws = null;
113+
_queue.clear();
109114
}
110115

111116
@internal
112117
Future<void> disconnect() async {
113118
logger.fine('SignalClient disconnect');
114-
await _cleanUp();
119+
await cleanUp();
115120
}
116121

117-
void _sendRequest(lk_rtc.SignalRequest req) {
118-
if (_ws == null || isDisposed) {
119-
logger.warning(
120-
'[$objectId] Could not send message, not connected or already disposed');
122+
void _sendRequest(
123+
lk_rtc.SignalRequest req, {
124+
bool enqueueIfReconnecting = true,
125+
}) {
126+
if (isDisposed) {
127+
logger.warning('[$objectId] Could not send message, already disposed');
128+
return;
129+
}
130+
131+
if (_connectionState == ConnectionState.reconnecting &&
132+
req._canQueue() &&
133+
enqueueIfReconnecting) {
134+
_queue.add(req);
121135
return;
122136
}
123137

124-
final buf = req.writeToBuffer();
125-
_ws?.send(buf);
138+
if (_ws == null) {
139+
logger.warning('[$objectId] Could not send message, socket is null');
140+
return;
141+
}
142+
143+
_ws?.send(req.writeToBuffer());
126144
}
127145

128146
void _updateConnectionState(ConnectionState newValue) {
@@ -379,3 +397,34 @@ extension SignalClientRequests on SignalClient {
379397
),
380398
));
381399
}
400+
401+
// private methods
402+
extension on lk_rtc.SignalRequest {
403+
// returns if this request can be queued
404+
bool _canQueue() => ![
405+
// list of types that cannot be queued
406+
lk_rtc.SignalRequest_Message.syncState,
407+
lk_rtc.SignalRequest_Message.trickle,
408+
lk_rtc.SignalRequest_Message.offer,
409+
lk_rtc.SignalRequest_Message.answer,
410+
lk_rtc.SignalRequest_Message.simulate
411+
].contains(whichMessage());
412+
}
413+
414+
// internal methods
415+
416+
extension SignalClientInternalMethods on SignalClient {
417+
@internal
418+
void sendQueuedRequests() {
419+
// queue is empty
420+
if (_queue.isEmpty) return;
421+
// send requests
422+
for (final request in _queue) {
423+
_sendRequest(request, enqueueIfReconnecting: false);
424+
}
425+
_queue.clear();
426+
}
427+
428+
@internal
429+
void clearQueue() => _queue.clear();
430+
}

0 commit comments

Comments
 (0)