Skip to content

Commit 65cb290

Browse files
authored
fix(llc): improve persistence updates for channel state and threads (#2401)
1 parent 008bece commit 65cb290

File tree

4 files changed

+75
-64
lines changed

4 files changed

+75
-64
lines changed

packages/stream_chat/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## Upcoming
2+
3+
🐞 Fixed
4+
5+
- Fixed `ChannelState.memberCount`, `ChannelState.config` and `ChannelState.extraData` getting reset
6+
on first load.
7+
18
## 9.17.0
29

310
🐞 Fixed

packages/stream_chat/lib/src/client/channel.dart

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,23 +2107,19 @@ class ChannelClientState {
21072107
ChannelClientState(
21082108
this._channel,
21092109
ChannelState channelState,
2110-
) : _debouncedUpdatePersistenceChannelState = debounce(
2111-
(ChannelState state) {
2112-
final persistenceClient = _channel._client.chatPersistenceClient;
2113-
return persistenceClient?.updateChannelState(state);
2114-
},
2115-
const Duration(seconds: 1),
2116-
) {
2110+
) {
21172111
_retryQueue = RetryQueue(
21182112
channel: _channel,
21192113
logger: _channel.client.detachedLogger(
21202114
'⟳ (${generateHash([_channel.cid])})',
21212115
),
21222116
);
21232117

2124-
_checkExpiredAttachmentMessages(channelState);
2125-
21262118
_channelStateController = BehaviorSubject.seeded(channelState);
2119+
// Update the persistence storage with the seeded channel state.
2120+
_debouncedUpdatePersistenceChannelState.call([channelState]);
2121+
2122+
_checkExpiredAttachmentMessages(channelState);
21272123

21282124
_listenTypingEvents();
21292125

@@ -2199,20 +2195,11 @@ class ChannelClientState {
21992195

22002196
_listenChannelPushPreferenceUpdated();
22012197

2202-
_channel._client.chatPersistenceClient
2203-
?.getChannelThreads(_channel.cid!)
2204-
.then((threads) {
2205-
_threads = threads;
2206-
}).then((_) {
2207-
_channel._client.chatPersistenceClient
2208-
?.getChannelStateByCid(_channel.cid!)
2209-
.then((state) {
2210-
// Replacing the persistence state members with the latest
2211-
// `channelState.members` as they may have changes over the time.
2212-
updateChannelState(state.copyWith(members: channelState.members));
2213-
retryFailedMessages();
2214-
});
2215-
});
2198+
final persistenceClient = _channel.client.chatPersistenceClient;
2199+
persistenceClient?.getChannelThreads(_channel.cid!).then((threads) {
2200+
// Load all the threads for the channel from the offline storage.
2201+
if (threads.isNotEmpty) _threads = threads;
2202+
}).then((_) => retryFailedMessages());
22162203
}
22172204

22182205
final Channel _channel;
@@ -3357,13 +3344,30 @@ class ChannelClientState {
33573344
ChannelState get channelState => _channelStateController.value;
33583345
late BehaviorSubject<ChannelState> _channelStateController;
33593346

3360-
final Debounce _debouncedUpdatePersistenceChannelState;
3347+
late final _debouncedUpdatePersistenceChannelState = debounce(
3348+
(ChannelState state) {
3349+
final persistenceClient = _channel._client.chatPersistenceClient;
3350+
return persistenceClient?.updateChannelState(state);
3351+
},
3352+
const Duration(seconds: 1),
3353+
);
33613354

33623355
set _channelState(ChannelState v) {
33633356
_channelStateController.safeAdd(v);
33643357
_debouncedUpdatePersistenceChannelState.call([v]);
33653358
}
33663359

3360+
late final _debouncedUpdatePersistenceChannelThreads = debounce(
3361+
(Map<String, List<Message>> threads) async {
3362+
final channelCid = _channel.cid;
3363+
if (channelCid == null) return;
3364+
3365+
final persistenceClient = _channel._client.chatPersistenceClient;
3366+
return persistenceClient?.updateChannelThreads(channelCid, threads);
3367+
},
3368+
const Duration(seconds: 1),
3369+
);
3370+
33673371
/// The channel threads related to this channel.
33683372
Map<String, List<Message>> get threads => {..._threadsController.value};
33693373

@@ -3372,10 +3376,7 @@ class ChannelClientState {
33723376
final _threadsController = BehaviorSubject.seeded(<String, List<Message>>{});
33733377
set _threads(Map<String, List<Message>> threads) {
33743378
_threadsController.safeAdd(threads);
3375-
_channel.client.chatPersistenceClient?.updateChannelThreads(
3376-
_channel.cid!,
3377-
threads,
3378-
);
3379+
_debouncedUpdatePersistenceChannelThreads.call([threads]);
33793380
}
33803381

33813382
/// Clears all the replies in the thread identified by [parentId].
@@ -3537,6 +3538,7 @@ class ChannelClientState {
35373538

35383539
/// Call this method to dispose this object.
35393540
void dispose() {
3541+
_debouncedUpdatePersistenceChannelThreads.cancel();
35403542
_debouncedUpdatePersistenceChannelState.cancel();
35413543
_retryQueue.dispose();
35423544
_subscriptions.cancel();

packages/stream_chat/lib/src/db/chat_persistence_client.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,11 @@ abstract class ChatPersistenceClient {
248248
String cid,
249249
Map<String, List<Message>> threads,
250250
) async {
251+
if (threads.isEmpty) return;
252+
253+
// Flattening the messages from threads
251254
final messages = threads.values.expand((it) => it).toList();
255+
if (messages.isEmpty) return;
252256

253257
// Removing old reactions before saving the new
254258
final oldReactions = messages.map((it) => it.id).toList();
@@ -276,6 +280,8 @@ abstract class ChatPersistenceClient {
276280

277281
/// Update list of channel states
278282
Future<void> updateChannelStates(List<ChannelState> channelStates) async {
283+
if (channelStates.isEmpty) return;
284+
279285
final reactionsToDelete = <String>[];
280286
final pinnedReactionsToDelete = <String>[];
281287
final membersToDelete = <String>[];

packages/stream_chat/test/src/client/client_test.dart

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ void main() {
606606
final persistentChannelStates = List.generate(
607607
3,
608608
(index) => ChannelState(
609-
channel: ChannelModel(cid: 'p-test-type-$index:p-test-id-$index'),
609+
channel: ChannelModel(cid: 'test-type-$index:test-id-$index'),
610610
),
611611
);
612612

@@ -636,18 +636,19 @@ void main() {
636636
(_) async => QueryChannelsResponse()..channels = channelStates,
637637
);
638638

639-
when(() => persistence.getChannelThreads(any()))
640-
.thenAnswer((_) async => {});
641-
when(() => persistence.updateChannelThreads(any(), any()))
642-
.thenAnswer((_) async => {});
643-
when(() => persistence.getChannelStateByCid(any(),
644-
messagePagination: any(named: 'messagePagination'),
645-
pinnedMessagePagination:
646-
any(named: 'pinnedMessagePagination'))).thenAnswer(
647-
(invocation) async => ChannelState(
648-
channel: ChannelModel(cid: invocation.positionalArguments.first),
649-
),
639+
when(() => persistence.getChannelThreads(any())).thenAnswer(
640+
(_) async => <String, List<Message>>{
641+
for (final channelState in channelStates)
642+
channelState.channel!.cid: [
643+
Message(id: 'test-message-id', text: 'Test message')
644+
],
645+
},
650646
);
647+
648+
when(() => persistence.updateChannelState(any()))
649+
.thenAnswer((_) async {});
650+
when(() => persistence.updateChannelThreads(any(), any()))
651+
.thenAnswer((_) async {});
651652
when(() => persistence.updateChannelQueries(any(), any(),
652653
clearQueryCache: any(named: 'clearQueryCache')))
653654
.thenAnswer((_) => Future.value());
@@ -664,7 +665,7 @@ void main() {
664665

665666
// Hack as `teardown` gets called even
666667
// before our stream starts emitting data
667-
await delay(300);
668+
await delay(1050);
668669

669670
verify(() => persistence.getChannelStates(
670671
filter: any(named: 'filter'),
@@ -684,14 +685,11 @@ void main() {
684685
)).called(1);
685686

686687
verify(() => persistence.getChannelThreads(any()))
687-
.called((persistentChannelStates + channelStates).length);
688+
.called(channelStates.length);
689+
verify(() => persistence.updateChannelState(any()))
690+
.called(channelStates.length);
688691
verify(() => persistence.updateChannelThreads(any(), any()))
689-
.called((persistentChannelStates + channelStates).length);
690-
verify(
691-
() => persistence.getChannelStateByCid(any(),
692-
messagePagination: any(named: 'messagePagination'),
693-
pinnedMessagePagination: any(named: 'pinnedMessagePagination')),
694-
).called((persistentChannelStates + channelStates).length);
692+
.called(channelStates.length);
695693
verify(() => persistence.updateChannelQueries(any(), any(),
696694
clearQueryCache: any(named: 'clearQueryCache'))).called(1);
697695
},
@@ -703,7 +701,7 @@ void main() {
703701
final persistentChannelStates = List.generate(
704702
3,
705703
(index) => ChannelState(
706-
channel: ChannelModel(cid: 'p-test-type-$index:p-test-id-$index'),
704+
channel: ChannelModel(cid: 'test-type-$index:test-id-$index'),
707705
),
708706
);
709707

@@ -724,18 +722,19 @@ void main() {
724722
paginationParams: any(named: 'paginationParams'),
725723
)).thenThrow(StreamChatNetworkError(ChatErrorCode.inputError));
726724

727-
when(() => persistence.getChannelThreads(any()))
725+
when(() => persistence.getChannelThreads(any())).thenAnswer(
726+
(_) async => <String, List<Message>>{
727+
for (final channelState in persistentChannelStates)
728+
channelState.channel!.cid: [
729+
Message(id: 'test-message-id', text: 'Test message')
730+
],
731+
},
732+
);
733+
734+
when(() => persistence.updateChannelState(any()))
728735
.thenAnswer((_) async => {});
729736
when(() => persistence.updateChannelThreads(any(), any()))
730737
.thenAnswer((_) async => {});
731-
when(() => persistence.getChannelStateByCid(any(),
732-
messagePagination: any(named: 'messagePagination'),
733-
pinnedMessagePagination:
734-
any(named: 'pinnedMessagePagination'))).thenAnswer(
735-
(invocation) async => ChannelState(
736-
channel: ChannelModel(cid: invocation.positionalArguments.first),
737-
),
738-
);
739738

740739
expectLater(
741740
client.queryChannels(),
@@ -747,7 +746,7 @@ void main() {
747746

748747
// Hack as `teardown` gets called even
749748
// before our stream starts emitting data
750-
await delay(300);
749+
await delay(1050);
751750

752751
verify(() => persistence.getChannelStates(
753752
filter: any(named: 'filter'),
@@ -768,13 +767,10 @@ void main() {
768767

769768
verify(() => persistence.getChannelThreads(any()))
770769
.called(persistentChannelStates.length);
770+
verify(() => persistence.updateChannelState(any()))
771+
.called(persistentChannelStates.length);
771772
verify(() => persistence.updateChannelThreads(any(), any()))
772773
.called(persistentChannelStates.length);
773-
verify(
774-
() => persistence.getChannelStateByCid(any(),
775-
messagePagination: any(named: 'messagePagination'),
776-
pinnedMessagePagination: any(named: 'pinnedMessagePagination')),
777-
).called(persistentChannelStates.length);
778774
},
779775
);
780776
});

0 commit comments

Comments
 (0)