Skip to content

Commit 9f777f1

Browse files
authored
fix(core): improve thread reply pagination logic (#2348)
1 parent d666475 commit 9f777f1

File tree

10 files changed

+179
-61
lines changed

10 files changed

+179
-61
lines changed

packages/stream_chat/lib/src/client/channel.dart

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,22 +1671,40 @@ class Channel {
16711671
PaginationParams? options,
16721672
bool preferOffline = false,
16731673
}) async {
1674-
final cachedReplies = await _client.chatPersistenceClient?.getReplies(
1675-
parentId,
1676-
options: options,
1677-
);
1678-
if (cachedReplies != null && cachedReplies.isNotEmpty) {
1679-
state?.updateThreadInfo(parentId, cachedReplies);
1680-
if (preferOffline) {
1681-
return QueryRepliesResponse()..messages = cachedReplies;
1674+
QueryRepliesResponse? response;
1675+
1676+
// If we prefer offline, we first try to get the replies from the
1677+
// offline storage.
1678+
if (preferOffline) {
1679+
if (_client.chatPersistenceClient case final persistenceClient?) {
1680+
final cachedReplies = await persistenceClient.getReplies(
1681+
parentId,
1682+
options: options,
1683+
);
1684+
1685+
// If the cached replies are not empty, we can use them.
1686+
if (cachedReplies.isNotEmpty) {
1687+
response = QueryRepliesResponse()..messages = cachedReplies;
1688+
}
16821689
}
16831690
}
1684-
final repliesResponse = await _client.getReplies(
1685-
parentId,
1686-
options: options,
1687-
);
1688-
state?.updateThreadInfo(parentId, repliesResponse.messages);
1689-
return repliesResponse;
1691+
1692+
// If we still don't have the replies, we try to get them from the API.
1693+
response ??= await _client.getReplies(parentId, options: options);
1694+
1695+
// Before updating the state, we check if we are querying around a
1696+
// reply, If we are, we have to clear the state to avoid potential
1697+
// gaps in the message sequence.
1698+
final isQueryingAround = switch (options) {
1699+
PaginationParams(idAround: _?) => true,
1700+
PaginationParams(createdAtAround: _?) => true,
1701+
_ => false,
1702+
};
1703+
1704+
if (isQueryingAround) state?.clearThread(parentId);
1705+
state?.updateThreadInfo(parentId, response.messages);
1706+
1707+
return response;
16901708
}
16911709

16921710
/// List the reactions for a message in the channel.
@@ -3339,6 +3357,16 @@ class ChannelClientState {
33393357
);
33403358
}
33413359

3360+
/// Clears all the replies in the thread identified by [parentId].
3361+
void clearThread(String parentId) {
3362+
final updatedThreads = {
3363+
...threads,
3364+
parentId: <Message>[],
3365+
};
3366+
3367+
_threads = updatedThreads;
3368+
}
3369+
33423370
/// Update threads with updated information about messages.
33433371
void updateThreadInfo(String parentId, List<Message> messages) {
33443372
final newThreads = {...threads}..update(

packages/stream_chat/lib/src/db/chat_persistence_client.dart

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,12 +257,17 @@ abstract class ChatPersistenceClient {
257257
// Adding new reactions and users data
258258
final reactions = messages.expand(_expandReactions).toList();
259259
final users = messages.map((it) => it.user).withNullifyer.toList();
260+
await updateUsers(users);
260261

261-
await Future.wait([
262-
updateMessages(cid, messages),
263-
updateReactions(reactions),
264-
updateUsers(users),
265-
]);
262+
final channel = await getChannelByCid(cid);
263+
if (channel == null) {
264+
// If the channel does not yet exist, we create a new one otherwise
265+
// the db will throw an error due to foreign key constraint.
266+
await updateChannels([ChannelModel(cid: cid)]);
267+
}
268+
269+
await updateMessages(cid, messages);
270+
await updateReactions(reactions);
266271
}
267272

268273
/// Update the channel state data using [channelState]

packages/stream_chat_flutter/lib/src/message_list_view/message_list_view.dart

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,6 @@ class _StreamMessageListViewState extends State<StreamMessageListView> {
462462
_firstUnreadMessage = streamChannel?.getFirstUnreadMessage();
463463
}),
464464
);
465-
466-
if (_isThreadConversation) {
467-
streamChannel!.getReplies(widget.parentMessage!.id);
468-
}
469465
}
470466
}
471467

packages/stream_chat_flutter_core/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## Upcoming
2+
3+
🐞 Fixed
4+
5+
- Fixed `MessageListCore` not properly loading and paginating thread replies.
6+
7+
✅ Added
8+
9+
- Added methods for paginating thread replies in `StreamChannel`.
10+
111
## 9.15.0
212

313
✅ Added

packages/stream_chat_flutter_core/lib/src/message_list_core.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,9 @@ class MessageListCoreState extends State<MessageListCore> {
178178
limit: widget.paginationLimit,
179179
);
180180
} else {
181-
return _streamChannel!.getReplies(
181+
return _streamChannel!.queryReplies(
182182
widget.parentMessage!.id,
183+
direction: direction,
183184
limit: widget.paginationLimit,
184185
);
185186
}

packages/stream_chat_flutter_core/lib/src/stream_channel.dart

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,7 @@ class StreamChannelState extends State<StreamChannel> {
281281
return _queryBottomMessages(limit: limit);
282282
}
283283

284-
/// Calls [channel.getReplies] updating [queryMessage] stream
285-
Future<void> getReplies(
284+
Future<void> _queryTopReplies(
286285
String parentId, {
287286
int limit = 30,
288287
bool preferOffline = false,
@@ -293,18 +292,17 @@ class StreamChannelState extends State<StreamChannel> {
293292

294293
_queryTopMessagesController.safeAdd(true);
295294

296-
Message? message;
297-
if (channel.state!.threads.containsKey(parentId)) {
298-
final thread = channel.state!.threads[parentId]!;
299-
if (thread.isNotEmpty) {
300-
message = thread.first;
301-
}
295+
final threadReplies = channel.state!.threads[parentId];
296+
if (threadReplies == null || threadReplies.isEmpty) {
297+
return _queryTopMessagesController.safeAdd(false);
302298
}
303299

300+
final oldestReply = threadReplies.first;
301+
304302
try {
305303
final pagination = PaginationParams(
306304
limit: limit,
307-
lessThan: message?.id,
305+
lessThan: oldestReply.id,
308306
);
309307

310308
final response = await channel.getReplies(
@@ -316,8 +314,8 @@ class StreamChannelState extends State<StreamChannel> {
316314
final messages = response.messages;
317315
final limitNotMatched = messages.length < pagination.limit;
318316

319-
// If we didn't get enough messages before the parent message, that means
320-
// there are no more messages before the parent message.
317+
// If we didn't get enough messages in the response, that means there are
318+
// no more messages before the oldest reply.
321319
if (limitNotMatched) _topPaginationEnded = true;
322320

323321
_queryTopMessagesController.safeAdd(false);
@@ -326,6 +324,87 @@ class StreamChannelState extends State<StreamChannel> {
326324
}
327325
}
328326

327+
Future<void> _queryBottomReplies(
328+
String parentId, {
329+
int limit = 30,
330+
bool preferOffline = false,
331+
}) async {
332+
if (channel.state == null) return;
333+
if (_bottomPaginationEnded) return;
334+
if (_queryBottomMessagesController.value) return;
335+
336+
_queryBottomMessagesController.safeAdd(true);
337+
338+
final threadReplies = channel.state!.threads[parentId];
339+
if (threadReplies == null || threadReplies.isEmpty) {
340+
return _queryBottomMessagesController.safeAdd(false);
341+
}
342+
343+
final recentReply = threadReplies.last;
344+
345+
try {
346+
final pagination = PaginationParams(
347+
limit: limit,
348+
greaterThan: recentReply.id,
349+
);
350+
351+
final response = await channel.getReplies(
352+
parentId,
353+
options: pagination,
354+
preferOffline: preferOffline,
355+
);
356+
357+
final messages = response.messages;
358+
final limitNotMatched = messages.length < pagination.limit;
359+
360+
// If we didn't get enough messages in the response, that means there are
361+
// no more messages after the recent reply.
362+
if (limitNotMatched) _bottomPaginationEnded = true;
363+
364+
_queryBottomMessagesController.safeAdd(false);
365+
} catch (e, stk) {
366+
_queryBottomMessagesController.safeAddError(e, stk);
367+
}
368+
}
369+
370+
/// Calls [channel.getReplies] updating [queryMessage] stream
371+
Future<void> queryReplies(
372+
String parentId, {
373+
int limit = 30,
374+
QueryDirection direction = QueryDirection.top,
375+
}) async {
376+
if (direction == QueryDirection.top) {
377+
return _queryTopReplies(parentId, limit: limit);
378+
}
379+
return _queryBottomReplies(parentId, limit: limit);
380+
}
381+
382+
/// Calls [channel.getReplies] updating [queryMessage] stream
383+
Future<void> getReplies(
384+
String parentId, {
385+
int limit = 30,
386+
bool preferOffline = false,
387+
}) async {
388+
if (channel.state == null) return;
389+
390+
final pagination = PaginationParams(limit: limit);
391+
392+
final response = await channel.getReplies(
393+
parentId,
394+
options: pagination,
395+
preferOffline: preferOffline,
396+
);
397+
398+
final messages = response.messages;
399+
final limitNotMatched = messages.length < pagination.limit;
400+
401+
// We can assume that the bottom pagination is ended as we are querying
402+
// latest replies, and if we didn't get enough messages, that means
403+
// there are no more messages to load in the top direction.
404+
_bottomPaginationEnded = true;
405+
_topPaginationEnded = limitNotMatched;
406+
}
407+
329408
/// Query the channel members and watchers
330409
Future<void> queryMembersAndWatchers() async {
331410
final members = channel.state?.members;
@@ -428,25 +507,17 @@ class StreamChannelState extends State<StreamChannel> {
428507
messagesPagination: pagination,
429508
);
430509

431-
// If the messageId is null, it means we are loading the latest messages
432-
// and the bottom pagination is ended.
433-
if (messageId == null) {
434-
_bottomPaginationEnded = true;
435-
channel.state?.isUpToDate = _bottomPaginationEnded;
436-
437-
return state;
438-
}
439-
440510
final messages = state.messages ?? [];
441511
final limitNotMatched = messages.length < pagination.limit;
442512

443-
// Otherwise, if we are loading messages around a specific messageId
444-
// and we didn't get enough messages, that means there are no more
445-
// messages around that messageId.
446-
if (limitNotMatched) {
447-
_topPaginationEnded = true;
513+
// If the messageId is null, it means we are loading the latest messages
514+
// therefore we can assume that the bottom pagination is ended, and if we
515+
// didn't get enough messages, that means there are no more messages
516+
// to load in the top direction.
517+
if (messageId == null || limitNotMatched) {
448518
_bottomPaginationEnded = true;
449519
channel.state?.isUpToDate = _bottomPaginationEnded;
520+
_topPaginationEnded = limitNotMatched;
450521

451522
return state;
452523
}
@@ -580,7 +651,7 @@ class StreamChannelState extends State<StreamChannel> {
580651
}) {
581652
final pagination = PaginationParams(
582653
limit: limit,
583-
greaterThanOrEqual: messageId,
654+
greaterThan: messageId,
584655
);
585656

586657
return channel.query(

packages/stream_chat_flutter_core/test/message_list_core_test.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,13 @@ void main() {
412412
.thenAnswer((_) => Stream.value(threads));
413413
when(() => mockChannel.state.unreadCount).thenReturn(0);
414414

415+
when(
416+
() => mockChannel.getReplies(
417+
parentMessage.id,
418+
options: any(named: 'options'),
419+
),
420+
).thenAnswer((_) async => QueryRepliesResponse()..messages = []);
421+
415422
await tester.pumpWidget(
416423
Directionality(
417424
textDirection: TextDirection.ltr,

packages/stream_chat_persistence/lib/src/dao/message_dao.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
148148
msgList.removeRange(lessThanIndex, msgList.length);
149149
}
150150
}
151-
if (options?.greaterThanOrEqual != null) {
151+
if (options?.greaterThan != null) {
152152
final greaterThanIndex = msgList.indexWhere(
153-
(m) => m.id == options!.greaterThanOrEqual,
153+
(m) => m.id == options!.greaterThan,
154154
);
155155
if (greaterThanIndex != -1) {
156156
msgList.removeRange(0, greaterThanIndex);
@@ -203,9 +203,9 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
203203
msgList.removeRange(lessThanIndex, msgList.length);
204204
}
205205
}
206-
if (messagePagination?.greaterThanOrEqual != null) {
206+
if (messagePagination?.greaterThan != null) {
207207
final greaterThanIndex = msgList.indexWhere(
208-
(m) => m.id == messagePagination!.greaterThanOrEqual,
208+
(m) => m.id == messagePagination!.greaterThan,
209209
);
210210
if (greaterThanIndex != -1) {
211211
msgList.removeRange(0, greaterThanIndex);

packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
146146
msgList.removeRange(lessThanIndex, msgList.length);
147147
}
148148
}
149-
if (options?.greaterThanOrEqual != null) {
149+
if (options?.greaterThan != null) {
150150
final greaterThanIndex = msgList.indexWhere(
151-
(m) => m.id == options!.greaterThanOrEqual,
151+
(m) => m.id == options!.greaterThan,
152152
);
153153
if (greaterThanIndex != -1) {
154154
msgList.removeRange(0, greaterThanIndex);
@@ -201,9 +201,9 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
201201
msgList.removeRange(lessThanIndex, msgList.length);
202202
}
203203
}
204-
if (messagePagination?.greaterThanOrEqual != null) {
204+
if (messagePagination?.greaterThan != null) {
205205
final greaterThanIndex = msgList.indexWhere(
206-
(m) => m.id == messagePagination!.greaterThanOrEqual,
206+
(m) => m.id == messagePagination!.greaterThan,
207207
);
208208
if (greaterThanIndex != -1) {
209209
msgList.removeRange(0, greaterThanIndex);

packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,11 +353,11 @@ void main() {
353353
const cid = 'test:Cid';
354354
const limit = 15;
355355
const lessThan = 'testMessageId${cid}25';
356-
const greaterThanOrEqual = 'testMessageId${cid}5';
356+
const greaterThan = 'testMessageId${cid}5';
357357
const pagination = PaginationParams(
358358
limit: limit,
359359
lessThan: lessThan,
360-
greaterThanOrEqual: greaterThanOrEqual,
360+
greaterThan: greaterThan,
361361
);
362362

363363
// Should be empty initially
@@ -377,7 +377,7 @@ void main() {
377377
messagePagination: pagination,
378378
);
379379
expect(fetchedMessages.length, limit);
380-
expect(fetchedMessages.first.id, greaterThanOrEqual);
380+
expect(fetchedMessages.first.id, greaterThan);
381381
expect(fetchedMessages.last.id != lessThan, true);
382382
});
383383

0 commit comments

Comments
 (0)