@@ -31,6 +31,18 @@ class _Order {
3131 final bool ascending;
3232}
3333
34+ class RealtimeSubscribeException implements Exception {
35+ RealtimeSubscribeException (this .status, [this .details]);
36+
37+ final RealtimeSubscribeStatus status;
38+ final Object ? details;
39+
40+ @override
41+ String toString () {
42+ return 'RealtimeSubscribeException(status: $status , details: $details )' ;
43+ }
44+ }
45+
3446typedef SupabaseStreamEvent = List <Map <String , dynamic >>;
3547
3648class SupabaseStreamBuilder extends Stream <SupabaseStreamEvent > {
@@ -195,12 +207,29 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
195207 }
196208 })
197209 .subscribe ((status, [error]) {
198- if (error != null ) {
199- _addException (error);
210+ switch (status) {
211+ case RealtimeSubscribeStatus .subscribed:
212+ // Get first data when realtime is subscribed and reload all data
213+ // from postgrest if e.g. got a channel error and is resubscribed
214+ _getPostgrestData ();
215+ break ;
216+ case RealtimeSubscribeStatus .closed:
217+ if (! (_streamController? .isClosed ?? true )) {
218+ _streamController? .close ();
219+ }
220+ break ;
221+ case RealtimeSubscribeStatus .timedOut:
222+ _addException (RealtimeSubscribeException (status, error));
223+ break ;
224+ case RealtimeSubscribeStatus .channelError:
225+ _addException (RealtimeSubscribeException (status, error));
226+ break ;
200227 }
201228 });
229+ }
202230
203- PostgrestFilterBuilder query = _queryBuilder.select ();
231+ Future <void > _getPostgrestData () async {
232+ PostgrestFilterBuilder <PostgrestList > query = _queryBuilder.select ();
204233 if (_streamFilter != null ) {
205234 switch (_streamFilter! .type) {
206235 case PostgresChangeFilterType .eq:
@@ -226,7 +255,7 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
226255 break ;
227256 }
228257 }
229- PostgrestTransformBuilder ? transformQuery;
258+ PostgrestTransformBuilder < PostgrestList > ? transformQuery;
230259 if (_orderBy != null ) {
231260 transformQuery =
232261 query.order (_orderBy! .column, ascending: _orderBy! .ascending);
@@ -237,8 +266,8 @@ class SupabaseStreamBuilder extends Stream<SupabaseStreamEvent> {
237266
238267 try {
239268 final data = await (transformQuery ?? query);
240- final rows = SupabaseStreamEvent .from (data as List );
241- _streamData. addAll ( rows) ;
269+ final rows = SupabaseStreamEvent .from (data);
270+ _streamData = rows;
242271 _addStream ();
243272 } catch (error, stackTrace) {
244273 _addException (error, stackTrace);
0 commit comments