Skip to content

Commit 342e5c9

Browse files
committed
relisten to websocket events after reconnect
1 parent 8ca0f19 commit 342e5c9

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

packages/stream_feeds/lib/src/client/feeds_client_impl.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:rxdart/rxdart.dart';
34
import 'package:stream_core/stream_core.dart';
45

56
import '../cdn/cdn_api.dart';
@@ -270,6 +271,7 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
270271
feedsRepository: _feedsRepository,
271272
pollsRepository: _pollsRepository,
272273
eventsEmitter: events,
274+
onReconnectEmitter: onReconnectEmitter,
273275
);
274276
}
275277

@@ -474,4 +476,25 @@ class StreamFeedsClientImpl implements StreamFeedsClient {
474476
Future<Result<void>> deleteImage({required String url}) {
475477
return _cdnClient.deleteImage(url);
476478
}
479+
480+
Stream<void> get onReconnectEmitter {
481+
return connectionState.stream.scan(
482+
(state, connectionStatus, i) => switch (connectionStatus) {
483+
Initialized() || Connecting() => (
484+
wasDisconnected: state.wasDisconnected,
485+
reconnected: false,
486+
),
487+
Disconnecting() || Disconnected() => (
488+
wasDisconnected: true,
489+
reconnected: false,
490+
),
491+
Connected() => (
492+
wasDisconnected: false,
493+
reconnected: state.wasDisconnected,
494+
),
495+
_ => state,
496+
},
497+
(wasDisconnected: false, reconnected: false),
498+
).mapNotNull((state) => state.reconnected ? () : null);
499+
}
477500
}

packages/stream_feeds/lib/src/state/feed.dart

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import 'dart:async';
22

33
import 'package:freezed_annotation/freezed_annotation.dart';
4+
import 'package:rxdart/rxdart.dart';
45
import 'package:state_notifier/state_notifier.dart';
56
import 'package:stream_core/stream_core.dart';
67

8+
import '../../stream_feeds.dart';
79
import '../generated/api/api.dart' as api;
810
import '../models/activity_data.dart';
911
import '../models/bookmark_data.dart';
@@ -48,6 +50,7 @@ class Feed with Disposable {
4850
required this.feedsRepository,
4951
required this.pollsRepository,
5052
required this.eventsEmitter,
53+
required Stream<void> onReconnectEmitter,
5154
}) {
5255
final fid = query.fid;
5356

@@ -65,7 +68,12 @@ class Feed with Disposable {
6568

6669
// Attach event handlers for the feed events
6770
final handler = FeedEventHandler(fid: fid, state: _stateNotifier);
68-
_eventsSubscription = eventsEmitter.listen(handler.handleEvent);
71+
_feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent));
72+
73+
// Automatically refetch data on reconnection
74+
if (query.watch) {
75+
_subscribeToReconnectionUpdates(onReconnectEmitter: onReconnectEmitter);
76+
}
6977
}
7078

7179
FeedId get fid => query.fid;
@@ -86,11 +94,11 @@ class Feed with Disposable {
8694
late final FeedStateNotifier _stateNotifier;
8795

8896
final SharedEmitter<WsEvent> eventsEmitter;
89-
StreamSubscription<WsEvent>? _eventsSubscription;
97+
final CompositeSubscription _feedSubscriptions = CompositeSubscription();
9098

9199
@override
92100
void dispose() {
93-
_eventsSubscription?.cancel();
101+
_feedSubscriptions.cancel();
94102
_stateNotifier.dispose();
95103
_memberList.dispose();
96104
super.dispose();
@@ -628,4 +636,14 @@ class Feed with Disposable {
628636
}
629637

630638
// endregion
639+
640+
void _subscribeToReconnectionUpdates({
641+
required Stream<void> onReconnectEmitter,
642+
}) {
643+
_feedSubscriptions.add(
644+
onReconnectEmitter.listen((_) {
645+
getOrCreate();
646+
}),
647+
);
648+
}
631649
}

0 commit comments

Comments
 (0)