Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent{code: $code, reason: $reason}';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -353,7 +358,7 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
Expand Down
41 changes: 35 additions & 6 deletions packages/supabase/lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class _Order {
final bool ascending;
}

class RealtimeSubscribeException implements Exception {
RealtimeSubscribeException(this.status, [this.details]);

final RealtimeSubscribeStatus status;
final Object? details;

@override
String toString() {
return 'RealtimeSubscribeException(status: $status, details: $details)';
}
}

typedef SupabaseStreamEvent = List<Map<String, dynamic>>;

class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
Expand Down Expand Up @@ -195,12 +207,29 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
}
})
.subscribe((status, [error]) {
if (error != null) {
_addException(error);
switch (status) {
case RealtimeSubscribeStatus.subscribed:
// Get first data when realtime is subscribed and reload all data
// from postgrest if e.g. got a channel error and is resubscribed
_getPostgrestData();
break;
case RealtimeSubscribeStatus.closed:
if (!(_streamController?.isClosed ?? true)) {
_streamController?.close();
}
break;
case RealtimeSubscribeStatus.timedOut:
_addException(RealtimeSubscribeException(status, error));
break;
case RealtimeSubscribeStatus.channelError:
_addException(RealtimeSubscribeException(status, error));
break;
}
});
}

PostgrestFilterBuilder query = _queryBuilder.select();
Future<void> _getPostgrestData() async {
PostgrestFilterBuilder<PostgrestList> query = _queryBuilder.select();
if (_streamFilter != null) {
switch (_streamFilter!.type) {
case PostgresChangeFilterType.eq:
Expand All @@ -226,7 +255,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
break;
}
}
PostgrestTransformBuilder? transformQuery;
PostgrestTransformBuilder<PostgrestList>? transformQuery;
if (_orderBy != null) {
transformQuery =
query.order(_orderBy!.column, ascending: _orderBy!.ascending);
Expand All @@ -237,8 +266,8 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {

try {
final data = await (transformQuery ?? query);
final rows = SupabaseStreamEvent.from(data as List);
_streamData.addAll(rows);
final rows = SupabaseStreamEvent.from(data);
_streamData = rows;
_addStream();
} catch (error, stackTrace) {
_addException(error, stackTrace);
Expand Down
2 changes: 1 addition & 1 deletion packages/supabase/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void main() {
webSocket!.add(replyString);

// Send an insert event
await Future.delayed(Duration(milliseconds: 10));
await Future.delayed(Duration(milliseconds: 100));
final insertString = jsonEncode({
'topic': topic,
'event': 'postgres_changes',
Expand Down
2 changes: 1 addition & 1 deletion packages/supabase_flutter/lib/src/supabase.dart
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class Supabase {
accessToken: accessToken,
);
_instance._debugEnable = debug ?? kDebugMode;
_instance.log('***** Supabase init completed $_instance');
_instance.log('***** Supabase init completed *****');

_instance._supabaseAuth = SupabaseAuth();
await _instance._supabaseAuth.initialize(options: authOptions);
Expand Down
7 changes: 6 additions & 1 deletion packages/supabase_flutter/lib/src/supabase_auth.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ class SupabaseAuth with WidgetsBindingObserver {
case AppLifecycleState.detached:
case AppLifecycleState.inactive:
case AppLifecycleState.paused:
Supabase.instance.client.auth.stopAutoRefresh();
// Realtime channels are kept alive in the background for some amount
// of time after the app is paused. If we stop refreshing the token
// here, the channels will be closed.
if (Supabase.instance.client.realtime.getChannels().isEmpty) {
Supabase.instance.client.auth.stopAutoRefresh();
}
default:
}
}
Expand Down