Skip to content

Commit be01553

Browse files
committed
ignore new data on disposed socket
1 parent 8f0e734 commit be01553

File tree

4 files changed

+29
-22
lines changed

4 files changed

+29
-22
lines changed

lib/src/core/engine.dart

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
259259
_reconnectAttempts++;
260260

261261
try {
262-
// isReconnecting = true;
263262
await signalClient.reconnect(
264263
url,
265264
token,
@@ -272,7 +271,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
272271

273272
subscriber!.restartingIce = true;
274273

275-
// await negotiate(iceRestart: true);
276274
if (_hasPublished) {
277275
logger.fine('Reconnect: negotiating publisher...');
278276
await publisher!.createAndSendOffer(const RTCOfferOptions(
@@ -604,8 +602,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
604602
..on<SignalStreamStateUpdatedEvent>((event) => events.emit(event))
605603
..on<SignalLeaveEvent>((event) async {
606604
if (connectionState == ConnectionState.reconnecting) {
607-
logger.fine('Ignoring leave signal since engine is reconnecting...');
608-
return;
605+
logger.warning('Received leave signal while engine is reconnecting.');
609606
}
610607
await close();
611608
events.emit(const EngineDisconnectedEvent());

lib/src/support/websocket.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import '../support/disposable.dart';
12
import 'websocket/io.dart' if (dart.library.html) 'websocket/web.dart';
23

34
class WebSocketException implements Exception {
@@ -29,9 +30,8 @@ class WebSocketEventHandlers {
2930
});
3031
}
3132

32-
abstract class LiveKitWebSocket {
33+
abstract class LiveKitWebSocket extends Disposable {
3334
void send(List<int> data);
34-
Future<void> dispose();
3535

3636
static Future<LiveKitWebSocket> connect(
3737
Uri uri, [

lib/src/support/websocket/io.dart

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Future<LiveKitWebSocketIO> lkWebSocketConnect(
1111
]) =>
1212
LiveKitWebSocketIO.connect(uri, options);
1313

14-
class LiveKitWebSocketIO implements LiveKitWebSocket {
14+
class LiveKitWebSocketIO extends LiveKitWebSocket {
1515
final io.WebSocket _ws;
1616
final WebSocketEventHandlers? options;
1717
late final StreamSubscription _subscription;
@@ -21,19 +21,24 @@ class LiveKitWebSocketIO implements LiveKitWebSocket {
2121
this.options,
2222
]) {
2323
_subscription = _ws.listen(
24-
(dynamic data) => options?.onData?.call(data),
24+
(dynamic data) {
25+
if (isDisposed) {
26+
logger.warning('$objectId already disposed, ignoring received data.');
27+
return;
28+
}
29+
options?.onData?.call(data);
30+
},
2531
onDone: () async {
2632
await _subscription.cancel();
2733
options?.onDispose?.call();
2834
},
2935
);
30-
}
3136

32-
@override
33-
Future<void> dispose() async {
34-
if (_ws.readyState != io.WebSocket.closed) {
35-
await _ws.close();
36-
}
37+
onDispose(() async {
38+
if (_ws.readyState != io.WebSocket.closed) {
39+
await _ws.close();
40+
}
41+
});
3742
}
3843

3944
@override

lib/src/support/websocket/web.dart

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import 'dart:async';
22
import 'dart:html' as html;
33
import 'dart:typed_data';
44

5+
import '../../extensions.dart';
6+
import '../../logger.dart';
57
import '../websocket.dart';
68

79
// ignore: avoid_web_libraries_in_flutter
@@ -12,7 +14,7 @@ Future<LiveKitWebSocketWeb> lkWebSocketConnect(
1214
]) =>
1315
LiveKitWebSocketWeb.connect(uri, options);
1416

15-
class LiveKitWebSocketWeb implements LiveKitWebSocket {
17+
class LiveKitWebSocketWeb extends LiveKitWebSocket {
1618
final html.WebSocket _ws;
1719
final WebSocketEventHandlers? options;
1820
late final StreamSubscription _messageSubscription;
@@ -24,6 +26,10 @@ class LiveKitWebSocketWeb implements LiveKitWebSocket {
2426
]) {
2527
_ws.binaryType = 'arraybuffer';
2628
_messageSubscription = _ws.onMessage.listen((_) {
29+
if (isDisposed) {
30+
logger.warning('$objectId already disposed, ignoring received data.');
31+
return;
32+
}
2733
dynamic _data = _.data is ByteBuffer ? _.data.asUint8List() : _.data;
2834
options?.onData?.call(_data);
2935
});
@@ -32,18 +38,17 @@ class LiveKitWebSocketWeb implements LiveKitWebSocket {
3238
await _closeSubscription.cancel();
3339
options?.onDispose?.call();
3440
});
41+
42+
onDispose(() async {
43+
if (_ws.readyState != html.WebSocket.CLOSED) {
44+
_ws.close();
45+
}
46+
});
3547
}
3648

3749
@override
3850
void send(List<int> data) => _ws.send(data);
3951

40-
@override
41-
Future<void> dispose() async {
42-
if (_ws.readyState != html.WebSocket.CLOSED) {
43-
_ws.close();
44-
}
45-
}
46-
4752
static Future<LiveKitWebSocketWeb> connect(
4853
Uri uri, [
4954
WebSocketEventHandlers? options,

0 commit comments

Comments
 (0)