Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
export 'src/constants.dart'
show RealtimeConstants, RealtimeLogLevel, SocketStates;
export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
Expand Down
4 changes: 2 additions & 2 deletions packages/realtime_client/lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ enum SocketStates {
/// Connection is live and connected
open,

/// Socket is closing.
closing,
/// Socket is closing by the user
disconnecting,

/// Socket being close not by the user. Realtime should attempt to reconnect.
closed,
Expand Down
19 changes: 16 additions & 3 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RealtimeChannel {
socket.remove(this);
});

_onError((String? reason) {
_onError((reason) {
if (isLeaving || isClosed) {
return;
}
Expand Down Expand Up @@ -260,9 +260,9 @@ class RealtimeChannel {
}

/// Registers a callback that will be executed when the channel encounteres an error.
void _onError(void Function(String?) callback) {
void _onError(Function callback) {
onEvents(ChannelEvents.error.eventName(), ChannelFilter(),
(reason, [ref]) => callback(reason?.toString()));
(reason, [ref]) => callback(reason));
}

/// Sets up a listener on your Supabase database.
Expand Down Expand Up @@ -646,6 +646,19 @@ class RealtimeChannel {
joinPush.resend(timeout ?? _timeout);
}

/// Usually a rejoin only happens when the channel timeouts or errors out.
/// When manually disconnecting, the channel is still marked as
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
/// unsubscribe itself, which causes issues when trying to rejoin. This method
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
void forceRejoin([Duration? timeout]) {
if (isLeaving) {
return;
}
_state = ChannelStates.joining;
joinPush.resend(timeout ?? _timeout);
}

void trigger(String type, [dynamic payload, String? ref]) {
final typeLower = type.toLowerCase();

Expand Down
76 changes: 53 additions & 23 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent(code: $code, reason: $reason)';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -134,17 +139,17 @@ class RealtimeClient {
(String payload, Function(dynamic result) callback) =>
callback(json.decode(payload));
reconnectTimer = RetryTimer(
() {
disconnect();
connect();
() async {
await disconnect();
await connect();
},
this.reconnectAfterMs,
);
}

/// Connects the socket.
@internal
void connect() async {
Future<void> connect() async {
if (conn != null) {
return;
}
Expand All @@ -153,8 +158,20 @@ class RealtimeClient {
connState = SocketStates.connecting;
conn = transport(endPointURL, headers);

// handle connection errors
conn!.ready.catchError(_onConnError);
try {
await conn!.ready;
} catch (error) {
// Don't schedule a reconnect and emit error if connection has been
// closed by the user or [disconnect] waits for the connection to be
// ready before closing it.
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
_onConnError(error);
reconnectTimer.scheduleTimeout();
}
return;
}

connState = SocketStates.open;

Expand All @@ -166,7 +183,8 @@ class RealtimeClient {
onError: _onConnError,
onDone: () {
// communication has been closed
if (connState != SocketStates.disconnected) {
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
}
_onConnClose();
Expand All @@ -179,20 +197,32 @@ class RealtimeClient {
}

/// Disconnects the socket with status [code] and [reason] for the disconnect
void disconnect({int? code, String? reason}) {
Future<void> disconnect({int? code, String? reason}) async {
final conn = this.conn;
if (conn != null) {
connState = SocketStates.disconnected;
if (code != null) {
conn.sink.close(code, reason ?? '');
} else {
conn.sink.close();
final oldState = connState;
connState = SocketStates.disconnecting;

// Connection cannot be closed while it's still connecting. Wait for connection to
// be ready and then close it.
if (oldState == SocketStates.connecting) {
await conn.ready.catchError((_) {});
}

if (oldState == SocketStates.open ||
oldState == SocketStates.connecting) {
if (code != null) {
await conn.sink.close(code, reason ?? '');
} else {
await conn.sink.close();
}
connState = SocketStates.disconnected;
reconnectTimer.reset();
}
this.conn = null;

// remove open handles
if (heartbeatTimer != null) heartbeatTimer?.cancel();
reconnectTimer.reset();
}
}

Expand Down Expand Up @@ -251,8 +281,8 @@ class RealtimeClient {
return 'connecting';
case SocketStates.open:
return 'open';
case SocketStates.closing:
return 'closing';
case SocketStates.disconnecting:
return 'disconnecting';
case SocketStates.disconnected:
return 'disconnected';
case SocketStates.closed:
Expand All @@ -262,7 +292,7 @@ class RealtimeClient {
}

/// Retuns `true` is the connection is open.
bool get isConnected => connectionState == 'open';
bool get isConnected => connState == SocketStates.open;

/// Removes a subscription from the socket.
@internal
Expand Down Expand Up @@ -353,15 +383,15 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
}
}
}

/// Unsubscribe from channels with the specified topic.
/// Unsubscribe from joined or joining channels with the specified topic.
@internal
void leaveOpenTopic(String topic) {
final dupChannel = channels.firstWhereOrNull(
Expand Down Expand Up @@ -399,7 +429,7 @@ class RealtimeClient {
/// SocketStates.disconnected: by user with socket.disconnect()
/// SocketStates.closed: NOT by user, should try to reconnect
if (connState == SocketStates.closed) {
_triggerChanError();
_triggerChanError(event);
reconnectTimer.scheduleTimeout();
}
if (heartbeatTimer != null) heartbeatTimer!.cancel();
Expand All @@ -410,15 +440,15 @@ class RealtimeClient {

void _onConnError(dynamic error) {
log('transport', error.toString());
_triggerChanError();
_triggerChanError(error);
for (final callback in stateChangeCallbacks['error']!) {
callback(error);
}
}

void _triggerChanError() {
void _triggerChanError([dynamic error]) {
for (final channel in channels) {
channel.trigger(ChannelEvents.error.eventName());
channel.trigger(ChannelEvents.error.eventName(), error);
}
}

Expand Down
8 changes: 5 additions & 3 deletions packages/realtime_client/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ void main() {
final subscribeCallback =
expectAsync2((RealtimeSubscribeStatus event, error) {
if (event == RealtimeSubscribeStatus.channelError) {
expect(error, isNull);
expect(error, isA<RealtimeCloseEvent>());
error as RealtimeCloseEvent;
expect(error.reason, "heartbeat timeout");
} else {
expect(event, RealtimeSubscribeStatus.closed);
}
Expand All @@ -285,8 +287,8 @@ void main() {

channel.subscribe(subscribeCallback);

await client.conn!.sink
.close(Constants.wsCloseNormal, "heartbeat timeout");
await Future.delayed(Duration(milliseconds: 200));
await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout");
});
});

Expand Down
12 changes: 8 additions & 4 deletions packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void main() {
});

socket.connect();
await Future.delayed(const Duration(milliseconds: 200));
expect(opens, 1);

socket.sendHeartbeat();
Expand Down Expand Up @@ -214,8 +215,8 @@ void main() {
});

test('removes existing connection', () async {
socket.connect();
socket.disconnect();
await socket.connect();
await socket.disconnect();

expect(socket.conn, null);
});
Expand All @@ -229,7 +230,7 @@ void main() {
expect(closes, 1);
});

test('calls connection close callback', () {
test('calls connection close callback', () async {
final mockedSocketChannel = MockIOWebSocketChannel();
final mockedSocket = RealtimeClient(
socketEndpoint,
Expand All @@ -247,7 +248,10 @@ void main() {
const tReason = 'reason';

mockedSocket.connect();
mockedSocket.connState = SocketStates.open;
await Future.delayed(const Duration(milliseconds: 200));
mockedSocket.disconnect(code: tCode, reason: tReason);
await Future.delayed(const Duration(milliseconds: 200));

verify(
() => mockedSink.close(
Expand Down Expand Up @@ -423,7 +427,7 @@ void main() {
});

group('setAuth', () {
final updateJoinPayload = {'user_token': 'token123'};
final updateJoinPayload = {'access_token': 'token123'};
final pushPayload = {'access_token': 'token123'};

test(
Expand Down
50 changes: 44 additions & 6 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class _Order {
final bool ascending;
}

class RealtimeSubscribeException implements Exception {
RealtimeSubscribeException(this.status, [this.details]);

final RealtimeSubscribeStatus status;
final Object? details;

@override
String toString() {
return 'RealtimeSubscribeException(status: $status, details: $details)';
}
}

typedef SupabaseStreamEvent = List<Map<String, dynamic>>;

class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
Expand Down Expand Up @@ -64,6 +76,9 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
/// Count of record to be returned
int? _limit;

/// Flag that the stream has at least one time been subscribed to realtime
bool _wasSubscribed = false;

SupabaseStreamBuilder({
required PostgrestQueryBuilder queryBuilder,
required String realtimeTopic,
Expand Down Expand Up @@ -195,12 +210,31 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
}
})
.subscribe((status, [error]) {
if (error != null) {
_addException(error);
switch (status) {
case RealtimeSubscribeStatus.subscribed:
// Reload all data after a reconnect from postgrest
// First data from postgrest gets loaded before the realtime connect
if (_wasSubscribed) {
_getPostgrestData();
}
_wasSubscribed = true;
break;
case RealtimeSubscribeStatus.closed:
_streamController?.close();
break;
case RealtimeSubscribeStatus.timedOut:
_addException(RealtimeSubscribeException(status, error));
break;
case RealtimeSubscribeStatus.channelError:
_addException(RealtimeSubscribeException(status, error));
break;
}
});
_getPostgrestData();
}

PostgrestFilterBuilder query = _queryBuilder.select();
Future<void> _getPostgrestData() async {
PostgrestFilterBuilder<PostgrestList> query = _queryBuilder.select();
if (_streamFilter != null) {
switch (_streamFilter!.type) {
case PostgresChangeFilterType.eq:
Expand All @@ -226,7 +260,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
break;
}
}
PostgrestTransformBuilder? transformQuery;
PostgrestTransformBuilder<PostgrestList>? transformQuery;
if (_orderBy != null) {
transformQuery =
query.order(_orderBy!.column, ascending: _orderBy!.ascending);
Expand All @@ -237,11 +271,15 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

try {
final data = await (transformQuery ?? query);
final rows = SupabaseStreamEvent.from(data as List);
_streamData.addAll(rows);
final rows = SupabaseStreamEvent.from(data);
_streamData = rows;
_addStream();
} catch (error, stackTrace) {
_addException(error, stackTrace);
// In case the postgrest call fails, there is no need to keep the
// realtime connection open
_channel?.unsubscribe();
_streamController?.close();
}
}

Expand Down
Loading