Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,22 @@ class RealtimeClient {
final conn = this.conn;
if (conn != null) {
final oldState = connState;
connState = SocketStates.disconnecting;
log('transport', 'disconnecting', {'code': code, 'reason': reason},
Level.FINE);
final shouldCloseSink =
oldState == SocketStates.open || oldState == SocketStates.connecting;
if (shouldCloseSink) {
// Don't set the state to `disconnecting` if the connection is already closed.
connState = SocketStates.disconnecting;
log('transport', 'disconnecting', {'code': code, 'reason': reason},
Level.FINE);
}

// Connection cannot be closed while it's still connecting. Wait for connection to
// be ready and then close it.
if (oldState == SocketStates.connecting) {
await conn.ready.catchError((_) {});
}

if (oldState == SocketStates.open ||
oldState == SocketStates.connecting) {
if (shouldCloseSink) {
if (code != null) {
await conn.sink.close(code, reason ?? '');
} else {
Expand Down Expand Up @@ -319,7 +323,7 @@ class RealtimeClient {
}
}

/// Retuns `true` is the connection is open.
/// Returns `true` is the connection is open.
bool get isConnected => connState == SocketStates.open;

/// Removes a subscription from the socket.
Expand Down
52 changes: 46 additions & 6 deletions packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ void main() {

setUp(() async {
mockServer = await HttpServer.bind('localhost', 0);
WebSocketChannel? channel;

mockServer.transform(WebSocketTransformer()).listen((webSocket) {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen((request) {
channel.sink.add(request);
channel = IOWebSocketChannel(webSocket);
channel!.stream.listen((request) {
channel!.sink.add(request);
});
}, onDone: () {
channel?.sink.close();
});
});

Expand Down Expand Up @@ -170,11 +174,15 @@ void main() {
socket.disconnect();
});

test('establishes websocket connection with endpoint', () {
socket.connect();
test('establishes websocket connection with endpoint', () async {
final connFuture = socket.connect();
expect(socket.connState, SocketStates.connecting);

final conn = socket.conn;

await connFuture;
expect(socket.connState, SocketStates.open);

expect(conn, isA<IOWebSocketChannel>());
//! Not verifying connection url
});
Expand Down Expand Up @@ -239,9 +247,11 @@ void main() {

test('removes existing connection', () async {
await socket.connect();

expect(socket.conn, isNotNull);
await socket.disconnect();

expect(socket.conn, null);
expect(socket.conn, isNull);
});

test('calls callback', () async {
Expand Down Expand Up @@ -284,6 +294,36 @@ void main() {
).called(1);
});

test('disconnecting a closed connections stays closed', () async {
await socket.connect();
expect(socket.connState, SocketStates.open);
await mockServer.close();
await Future.delayed(const Duration(milliseconds: 200));
expect(socket.connState, SocketStates.closed);
expect(socket.conn, isNotNull);

final disconnectFuture = socket.disconnect();

// `connState` stays `closed` during disconnect
expect(socket.connState, SocketStates.closed);
await disconnectFuture;
expect(socket.connState, SocketStates.closed);
expect(socket.conn, isNull);
});

test('disconnecting an open connection', () async {
await socket.connect();
expect(socket.connState, SocketStates.open);

final disconnectFuture = socket.disconnect();

// `connState` stays `closed` during disconnect
expect(socket.connState, SocketStates.disconnecting);
await disconnectFuture;
expect(socket.connState, SocketStates.disconnected);
expect(socket.conn, isNull);
});

test('does not throw when no connection', () {
expect(() => socket.disconnect(), returnsNormally);
});
Expand Down
3 changes: 2 additions & 1 deletion packages/supabase_flutter/lib/src/supabase.dart
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ class Supabase with WidgetsBindingObserver {
Future<void> onResumed() async {
final realtime = Supabase.instance.client.realtime;
if (realtime.channels.isNotEmpty) {
if (realtime.connState == SocketStates.disconnecting) {
if (realtime.connState == SocketStates.disconnecting &&
realtime.conn != null) {
// If the socket is still disconnecting from e.g.
// [AppLifecycleState.paused] we should wait for it to finish before
// reconnecting.
Expand Down