Skip to content

Commit 94b4b07

Browse files
committed
fix reconnect logic
1 parent a0c7744 commit 94b4b07

File tree

9 files changed

+157
-113
lines changed

9 files changed

+157
-113
lines changed

example/lib/exts.dart

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,20 @@ extension LKExampleExt on BuildContext {
8989
),
9090
);
9191

92+
Future<void> showReconnectSuccessDialog() => showDialog<void>(
93+
context: this,
94+
builder: (ctx) => AlertDialog(
95+
title: const Text('Reconnect'),
96+
content: const Text('Reconnection was successful.'),
97+
actions: [
98+
TextButton(
99+
onPressed: () => Navigator.pop(ctx),
100+
child: const Text('OK'),
101+
),
102+
],
103+
),
104+
);
105+
92106
Future<bool?> showSendDataDialog() => showDialog<bool>(
93107
context: this,
94108
builder: (ctx) => AlertDialog(

example/lib/widgets/controls.dart

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,14 @@ class _ControlsWidgetState extends State<ControlsWidget> {
126126

127127
void _onTapReconnect() async {
128128
final result = await context.showReconnectDialog();
129-
if (result == true) await widget.room.reconnect();
129+
if (result == true) {
130+
try {
131+
await widget.room.reconnect();
132+
await context.showReconnectSuccessDialog();
133+
} catch (error) {
134+
await context.showErrorDialog(error);
135+
}
136+
}
130137
}
131138

132139
void _onTapSendData() async {

lib/src/core/engine.dart

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
8686
}) : signalClient = signalClient ?? SignalClient() {
8787
if (kDebugMode) {
8888
// log all EngineEvents
89-
events.listen((event) =>
90-
logger.fine('[EngineEvent] $objectId ${event.runtimeType}'));
89+
events.listen((event) => logger.fine('[EngineEvent] $objectId ${event}'));
9190
}
9291

9392
_setUpListeners();
@@ -241,7 +240,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
241240
@internal
242241
Future<void> reconnect() async {
243242
if (_connectionState == ConnectionState.disconnected) {
244-
logger.fine('$objectId reconnect() already closed');
243+
logger.fine('Reconnect: Already closed.');
245244
return;
246245
}
247246

@@ -252,14 +251,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
252251
throw ConnectException('could not reconnect without url and token');
253252
}
254253

254+
_connectionState = ConnectionState.reconnecting;
255+
255256
if (_reconnectAttempts == 0) {
256257
events.emit(const EngineReconnectingEvent());
257258
}
258259
_reconnectAttempts++;
259260

260261
try {
261262
// isReconnecting = true;
262-
_connectionState = ConnectionState.reconnecting;
263263
await signalClient.reconnect(
264264
url,
265265
token,
@@ -274,29 +274,35 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
274274

275275
// await negotiate(iceRestart: true);
276276
if (_hasPublished) {
277-
logger.fine('reconnect: publisher.createAndSendOffer');
278-
await publisher!
279-
.createAndSendOffer(const RTCOfferOptions(iceRestart: true));
277+
logger.fine('Reconnect: negotiating publisher...');
278+
await publisher!.createAndSendOffer(const RTCOfferOptions(
279+
iceRestart: true,
280+
));
280281
}
281282

282-
if (!(primary?.pc.iceConnectionState?.isConnected() ?? false)) {
283-
logger.fine('reconnect: waiting for primary to ice-connect...');
283+
final iceConnected =
284+
primary?.pc.iceConnectionState?.isConnected() ?? false;
285+
286+
logger.fine('Reconnect: iceConnected: $iceConnected');
287+
288+
if (!iceConnected) {
289+
logger.fine('Reconnect: Waiting for primary to connect...');
284290

285291
await events.waitFor<EngineIceStateUpdatedEvent>(
286292
filter: (event) => event.isPrimary && event.iceState.isConnected(),
287293
duration: Timeouts.iceRestart,
288294
);
289295
}
290296

291-
logger.fine('reconnect: success');
297+
logger.fine('Reconnect: success');
298+
_connectionState = ConnectionState.connected;
292299
events.emit(const EngineReconnectedEvent());
293300
_reconnectAttempts = 0;
294-
295-
// don't catch and pass up any exception
296-
} finally {
297-
// always set reconnecting to false
298-
// isReconnecting = false;
301+
} catch (error) {
302+
logger.fine('Reconnect: error ${error}');
303+
// Pass up all exceptions
299304
_connectionState = ConnectionState.disconnected;
305+
rethrow;
300306
}
301307
}
302308

@@ -344,7 +350,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
344350

345351
subscriber?.pc.onIceConnectionState =
346352
(state) => events.emit(EngineSubscriberIceStateUpdatedEvent(
347-
state: state,
353+
iceState: state,
348354
isPrimary: _subscriberPrimary,
349355
));
350356

@@ -491,6 +497,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
491497
}
492498

493499
Future<void> _onDisconnected(String reason) async {
500+
logger.info('onDisconnected reason: $reason');
494501
if (_connectionState == ConnectionState.disconnected) {
495502
logger.fine('[$objectId] Already disconnected $reason');
496503
return;
@@ -596,6 +603,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
596603
// relay
597604
..on<SignalStreamStateUpdatedEvent>((event) => events.emit(event))
598605
..on<SignalLeaveEvent>((event) async {
606+
if (connectionState == ConnectionState.reconnecting) {
607+
logger.fine('Ignoring leave signal since engine is reconnecting...');
608+
return;
609+
}
599610
await close();
600611
events.emit(const EngineDisconnectedEvent());
601612
})

lib/src/core/room.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import '../proto/livekit_rtc.pb.dart' as lk_rtc;
1818
import '../support/disposable.dart';
1919
import '../track/track.dart';
2020
import '../types.dart';
21+
import '../core/signal_client.dart';
2122
import 'engine.dart';
2223

2324
/// Room is the primary construct for LiveKit conferences. It contains a

lib/src/core/signal_client.dart

Lines changed: 96 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ import '../types.dart';
1818
import '../utils.dart';
1919

2020
class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
21-
//
22-
bool _connected = false;
21+
// Connection state of the socket conection.
22+
ConnectionState _connectionState = ConnectionState.disconnected;
23+
2324
LiveKitWebSocket? _ws;
2425

2526
SignalClient() {
@@ -33,8 +34,6 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
3334
});
3435
}
3536

36-
bool get connected => _connected;
37-
3837
Future<void> connect(
3938
String uriString,
4039
String token, {
@@ -51,7 +50,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
5150
rtcUri,
5251
WebSocketEventHandlers(
5352
onData: _onSocketData,
54-
onDispose: _onSocketDone,
53+
onDispose: _onSocketDispose,
5554
onError: _handleError,
5655
),
5756
);
@@ -86,7 +85,8 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
8685
String token, {
8786
ConnectOptions? connectOptions,
8887
}) async {
89-
_connected = false;
88+
logger.fine('SignalClient reconnecting...');
89+
_connectionState = ConnectionState.reconnecting;
9090
await _ws?.dispose();
9191
_ws = null;
9292

@@ -101,19 +101,106 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
101101
rtcUri,
102102
WebSocketEventHandlers(
103103
onData: _onSocketData,
104-
onDispose: _onSocketDone,
104+
onDispose: _onSocketDispose,
105105
onError: _handleError,
106106
),
107107
);
108108

109-
_connected = true;
109+
logger.fine('SignalClient socket reconnected');
110+
_connectionState = ConnectionState.connected;
110111
}
111112

112113
Future<void> close() async {
113-
_connected = false;
114+
logger.fine('SignalClient close');
114115
await _ws?.dispose();
116+
_ws = null;
117+
}
118+
119+
void _sendRequest(lk_rtc.SignalRequest req) {
120+
if (_ws == null || isDisposed) {
121+
logger.warning(
122+
'[$objectId] Could not send message, not connected or already disposed');
123+
return;
124+
}
125+
126+
final buf = req.writeToBuffer();
127+
_ws?.send(buf);
128+
}
129+
130+
Future<void> _onSocketData(dynamic message) async {
131+
if (message is! List<int>) return;
132+
final msg = lk_rtc.SignalResponse.fromBuffer(message);
133+
134+
switch (msg.whichMessage()) {
135+
case lk_rtc.SignalResponse_Message.join:
136+
events.emit(SignalConnectedEvent(response: msg.join));
137+
break;
138+
case lk_rtc.SignalResponse_Message.answer:
139+
events.emit(SignalAnswerEvent(sd: msg.answer.toSDKType()));
140+
break;
141+
case lk_rtc.SignalResponse_Message.offer:
142+
events.emit(SignalOfferEvent(sd: msg.offer.toSDKType()));
143+
break;
144+
case lk_rtc.SignalResponse_Message.trickle:
145+
events.emit(SignalTrickleEvent(
146+
candidate: RTCIceCandidateExt.fromJson(msg.trickle.candidateInit),
147+
target: msg.trickle.target,
148+
));
149+
break;
150+
case lk_rtc.SignalResponse_Message.update:
151+
events.emit(SignalParticipantUpdateEvent(
152+
participants: msg.update.participants));
153+
break;
154+
case lk_rtc.SignalResponse_Message.trackPublished:
155+
events.emit(SignalLocalTrackPublishedEvent(
156+
cid: msg.trackPublished.cid,
157+
track: msg.trackPublished.track,
158+
));
159+
break;
160+
case lk_rtc.SignalResponse_Message.speakersChanged:
161+
events.emit(
162+
SignalSpeakersChangedEvent(speakers: msg.speakersChanged.speakers));
163+
break;
164+
case lk_rtc.SignalResponse_Message.connectionQuality:
165+
events.emit(SignalConnectionQualityUpdateEvent(
166+
updates: msg.connectionQuality.updates,
167+
));
168+
break;
169+
case lk_rtc.SignalResponse_Message.leave:
170+
events.emit(SignalLeaveEvent(canReconnect: msg.leave.canReconnect));
171+
break;
172+
case lk_rtc.SignalResponse_Message.mute:
173+
events.emit(SignalMuteTrackEvent(
174+
sid: msg.mute.sid,
175+
muted: msg.mute.muted,
176+
));
177+
break;
178+
case lk_rtc.SignalResponse_Message.streamStateUpdate:
179+
events.emit(SignalStreamStateUpdatedEvent(
180+
updates: msg.streamStateUpdate.streamStates,
181+
));
182+
break;
183+
default:
184+
logger.warning('skipping unsupported signal message');
185+
}
115186
}
116187

188+
void _handleError(dynamic error) {
189+
logger.warning('received websocket error $error');
190+
}
191+
192+
void _onSocketDispose() {
193+
logger.fine('SignalClient onSocketDispose $_connectionState');
194+
// don't emit event's when reconnecting state
195+
if (_connectionState != ConnectionState.reconnecting) {
196+
logger.fine('SignalClient did disconnect ${_connectionState}');
197+
_connectionState = ConnectionState.disconnected;
198+
events.emit(const SignalCloseEvent());
199+
}
200+
}
201+
}
202+
203+
extension SignalClientRequests on SignalClient {
117204
void sendOffer(rtc.RTCSessionDescription offer) =>
118205
_sendRequest(lk_rtc.SignalRequest(
119206
offer: offer.toSDKType(),
@@ -206,87 +293,4 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
206293
void sendLeave() => _sendRequest(lk_rtc.SignalRequest(
207294
leave: lk_rtc.LeaveRequest(),
208295
));
209-
210-
void _sendRequest(lk_rtc.SignalRequest req) {
211-
if (_ws == null || isDisposed) {
212-
logger.warning(
213-
'[$objectId] Could not send message, not connected or already disposed');
214-
return;
215-
}
216-
217-
final buf = req.writeToBuffer();
218-
_ws?.send(buf);
219-
}
220-
221-
Future<void> _onSocketData(dynamic message) async {
222-
if (message is! List<int>) return;
223-
final msg = lk_rtc.SignalResponse.fromBuffer(message);
224-
225-
switch (msg.whichMessage()) {
226-
case lk_rtc.SignalResponse_Message.join:
227-
if (!_connected) {
228-
_connected = true;
229-
events.emit(SignalConnectedEvent(response: msg.join));
230-
}
231-
break;
232-
case lk_rtc.SignalResponse_Message.answer:
233-
events.emit(SignalAnswerEvent(sd: msg.answer.toSDKType()));
234-
break;
235-
case lk_rtc.SignalResponse_Message.offer:
236-
events.emit(SignalOfferEvent(sd: msg.offer.toSDKType()));
237-
break;
238-
case lk_rtc.SignalResponse_Message.trickle:
239-
events.emit(SignalTrickleEvent(
240-
candidate: RTCIceCandidateExt.fromJson(msg.trickle.candidateInit),
241-
target: msg.trickle.target,
242-
));
243-
break;
244-
case lk_rtc.SignalResponse_Message.update:
245-
events.emit(SignalParticipantUpdateEvent(
246-
participants: msg.update.participants));
247-
break;
248-
case lk_rtc.SignalResponse_Message.trackPublished:
249-
events.emit(SignalLocalTrackPublishedEvent(
250-
cid: msg.trackPublished.cid,
251-
track: msg.trackPublished.track,
252-
));
253-
break;
254-
case lk_rtc.SignalResponse_Message.speakersChanged:
255-
events.emit(
256-
SignalSpeakersChangedEvent(speakers: msg.speakersChanged.speakers));
257-
break;
258-
case lk_rtc.SignalResponse_Message.connectionQuality:
259-
events.emit(SignalConnectionQualityUpdateEvent(
260-
updates: msg.connectionQuality.updates,
261-
));
262-
break;
263-
case lk_rtc.SignalResponse_Message.leave:
264-
events.emit(SignalLeaveEvent(canReconnect: msg.leave.canReconnect));
265-
break;
266-
case lk_rtc.SignalResponse_Message.mute:
267-
events.emit(SignalMuteTrackEvent(
268-
sid: msg.mute.sid,
269-
muted: msg.mute.muted,
270-
));
271-
break;
272-
case lk_rtc.SignalResponse_Message.streamStateUpdate:
273-
events.emit(SignalStreamStateUpdatedEvent(
274-
updates: msg.streamStateUpdate.streamStates,
275-
));
276-
break;
277-
default:
278-
logger.warning('skipping unsupported signal message');
279-
}
280-
}
281-
282-
void _handleError(dynamic error) {
283-
logger.warning('received websocket error $error');
284-
}
285-
286-
void _onSocketDone() {
287-
if (!_connected) return;
288-
_ws = null;
289-
_connected = false;
290-
events.emit(const SignalCloseEvent());
291-
}
292296
}

0 commit comments

Comments
 (0)