Skip to content

Commit a069db6

Browse files
committed
fix: disconnect when in background and await connecting to be ready
1 parent 7a018dd commit a069db6

File tree

6 files changed

+109
-30
lines changed

6 files changed

+109
-30
lines changed

packages/realtime_client/lib/realtime_client.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
1+
export 'src/constants.dart'
2+
show RealtimeConstants, RealtimeLogLevel, SocketStates;
23
export 'src/realtime_channel.dart';
34
export 'src/realtime_client.dart';
45
export 'src/realtime_presence.dart';

packages/realtime_client/lib/src/constants.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ enum SocketStates {
1818
/// Connection is live and connected
1919
open,
2020

21-
/// Socket is closing.
22-
closing,
21+
/// Socket is closing by the user
22+
disconnecting,
2323

2424
/// Socket being close not by the user. Realtime should attempt to reconnect.
2525
closed,

packages/realtime_client/lib/src/realtime_channel.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,19 @@ class RealtimeChannel {
646646
joinPush.resend(timeout ?? _timeout);
647647
}
648648

649+
/// Usually a rejoin only happens when the channel timeouts or errors out.
650+
/// When manually disconnecting, the channel is still marked as
651+
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
652+
/// unsubscribe itself, which causes issues when trying to rejoin. This method
653+
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
654+
void forceRejoin([Duration? timeout]) {
655+
if (isLeaving) {
656+
return;
657+
}
658+
_state = ChannelStates.joining;
659+
joinPush.resend(timeout ?? _timeout);
660+
}
661+
649662
void trigger(String type, [dynamic payload, String? ref]) {
650663
final typeLower = type.toLowerCase();
651664

packages/realtime_client/lib/src/realtime_client.dart

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,17 @@ class RealtimeClient {
139139
(String payload, Function(dynamic result) callback) =>
140140
callback(json.decode(payload));
141141
reconnectTimer = RetryTimer(
142-
() {
143-
disconnect();
144-
connect();
142+
() async {
143+
await disconnect();
144+
await connect();
145145
},
146146
this.reconnectAfterMs,
147147
);
148148
}
149149

150150
/// Connects the socket.
151151
@internal
152-
void connect() async {
152+
Future<void> connect() async {
153153
if (conn != null) {
154154
return;
155155
}
@@ -161,8 +161,15 @@ class RealtimeClient {
161161
try {
162162
await conn!.ready;
163163
} catch (error) {
164-
_onConnError(error);
165-
reconnectTimer.scheduleTimeout();
164+
// Don't schedule a reconnect and emit error if connection has been
165+
// closed by the user or [disconnect] waits for the connection to be
166+
// ready before closing it.
167+
if (connState != SocketStates.disconnected &&
168+
connState != SocketStates.disconnecting) {
169+
connState = SocketStates.closed;
170+
_onConnError(error);
171+
reconnectTimer.scheduleTimeout();
172+
}
166173
return;
167174
}
168175

@@ -176,7 +183,8 @@ class RealtimeClient {
176183
onError: _onConnError,
177184
onDone: () {
178185
// communication has been closed
179-
if (connState != SocketStates.disconnected) {
186+
if (connState != SocketStates.disconnected &&
187+
connState != SocketStates.disconnecting) {
180188
connState = SocketStates.closed;
181189
}
182190
_onConnClose();
@@ -189,17 +197,26 @@ class RealtimeClient {
189197
}
190198

191199
/// Disconnects the socket with status [code] and [reason] for the disconnect
192-
void disconnect({int? code, String? reason}) {
200+
Future<void> disconnect({int? code, String? reason}) async {
193201
final conn = this.conn;
194202
if (conn != null) {
195-
final connectionWasOpen = connState == SocketStates.open;
196-
connState = SocketStates.disconnected;
197-
if (connectionWasOpen) {
203+
final oldState = connState;
204+
connState = SocketStates.disconnecting;
205+
206+
// Connection cannot be closed while it's still connecting. Wait for connection to
207+
// be ready and then close it.
208+
if (oldState == SocketStates.connecting) {
209+
await conn.ready.catchError((_) {});
210+
}
211+
212+
if (oldState == SocketStates.open ||
213+
oldState == SocketStates.connecting) {
198214
if (code != null) {
199-
conn.sink.close(code, reason ?? '');
215+
await conn.sink.close(code, reason ?? '');
200216
} else {
201-
conn.sink.close();
217+
await conn.sink.close();
202218
}
219+
connState = SocketStates.disconnected;
203220
reconnectTimer.reset();
204221
}
205222
this.conn = null;
@@ -264,8 +281,8 @@ class RealtimeClient {
264281
return 'connecting';
265282
case SocketStates.open:
266283
return 'open';
267-
case SocketStates.closing:
268-
return 'closing';
284+
case SocketStates.disconnecting:
285+
return 'disconnecting';
269286
case SocketStates.disconnected:
270287
return 'disconnected';
271288
case SocketStates.closed:
@@ -275,7 +292,7 @@ class RealtimeClient {
275292
}
276293

277294
/// Retuns `true` is the connection is open.
278-
bool get isConnected => connectionState == 'open';
295+
bool get isConnected => connState == SocketStates.open;
279296

280297
/// Removes a subscription from the socket.
281298
@internal
@@ -374,7 +391,7 @@ class RealtimeClient {
374391
}
375392
}
376393

377-
/// Unsubscribe from channels with the specified topic.
394+
/// Unsubscribe from joined or joining channels with the specified topic.
378395
@internal
379396
void leaveOpenTopic(String topic) {
380397
final dupChannel = channels.firstWhereOrNull(

packages/realtime_client/test/socket_test.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ void main() {
215215
});
216216

217217
test('removes existing connection', () async {
218-
socket.connect();
219-
socket.disconnect();
218+
await socket.connect();
219+
await socket.disconnect();
220220

221221
expect(socket.conn, null);
222222
});

packages/supabase_flutter/lib/src/supabase_auth.dart

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'dart:io' show Platform;
44
import 'dart:math';
55

66
import 'package:app_links/app_links.dart';
7+
import 'package:async/async.dart';
78
import 'package:flutter/foundation.dart' show kIsWeb;
89
import 'package:flutter/material.dart';
910
import 'package:flutter/services.dart';
@@ -29,6 +30,8 @@ class SupabaseAuth with WidgetsBindingObserver {
2930

3031
StreamSubscription<Uri?>? _deeplinkSubscription;
3132

33+
CancelableOperation<void>? _realtimeReconnectOperation;
34+
3235
final _appLinks = AppLinks();
3336

3437
/// - Obtains session from local storage and sets it as the current session
@@ -113,22 +116,67 @@ class SupabaseAuth with WidgetsBindingObserver {
113116
void didChangeAppLifecycleState(AppLifecycleState state) {
114117
switch (state) {
115118
case AppLifecycleState.resumed:
116-
if (_autoRefreshToken) {
117-
Supabase.instance.client.auth.startAutoRefresh();
118-
}
119+
onResumed();
119120
case AppLifecycleState.detached:
120-
case AppLifecycleState.inactive:
121121
case AppLifecycleState.paused:
122-
// Realtime channels are kept alive in the background for some amount
123-
// of time after the app is paused. If we stop refreshing the token
124-
// here, the channels will be closed.
125-
if (Supabase.instance.client.realtime.getChannels().isEmpty) {
122+
if (kIsWeb || Platform.isAndroid || Platform.isIOS) {
126123
Supabase.instance.client.auth.stopAutoRefresh();
124+
_realtimeReconnectOperation?.cancel();
125+
Supabase.instance.client.realtime.disconnect();
127126
}
128127
default:
129128
}
130129
}
131130

131+
Future<void> onResumed() async {
132+
if (_autoRefreshToken) {
133+
Supabase.instance.client.auth.startAutoRefresh();
134+
}
135+
final realtime = Supabase.instance.client.realtime;
136+
if (realtime.channels.isNotEmpty) {
137+
if (realtime.connState == SocketStates.disconnecting) {
138+
// If the socket is still disconnecting from e.g.
139+
// [AppLifecycleState.paused] we should wait for it to finish before
140+
// reconnecting.
141+
142+
bool cancel = false;
143+
final connectFuture = realtime.conn!.sink.done.then(
144+
(_) {
145+
// Make this connect cancelable so that it does not connect if the
146+
// disconnect took so long that the app is already in background
147+
// again.
148+
149+
// ignore: invalid_use_of_internal_member
150+
if (!cancel) return realtime.connect();
151+
},
152+
onError: (error) {},
153+
);
154+
_realtimeReconnectOperation = CancelableOperation.fromFuture(
155+
connectFuture,
156+
onCancel: () => cancel = true,
157+
);
158+
} else if (!realtime.isConnected) {
159+
// Reconnect if the socket is currently not connected.
160+
// When coming from [AppLifecycleState.paused] this should be the case,
161+
// but when coming from [AppLifecycleState.inactive] no disconnect
162+
// happened and therefore connection should still be intanct and we
163+
// should not reconnect.
164+
165+
// ignore: invalid_use_of_internal_member
166+
await realtime.connect();
167+
for (final channel in realtime.channels) {
168+
// Only rejoin channels that think they are still joined and not
169+
// which were manually unsubscribed by the user while in background
170+
171+
// ignore: invalid_use_of_internal_member
172+
if (channel.isJoined) {
173+
channel.forceRejoin();
174+
}
175+
}
176+
}
177+
}
178+
}
179+
132180
void _onAuthStateChange(AuthChangeEvent event, Session? session) {
133181
Supabase.instance.log('**** onAuthStateChange: $event');
134182
if (session != null) {

0 commit comments

Comments
 (0)