Skip to content

Commit bb7b66a

Browse files
committed
message: Consider unsubscribed/unknown channels in reconcileMessages
This fixes the "fourth buggy behavior" in zulip#1798: zulip#1798 (comment) Fixes-partly: zulip#1798
1 parent 97f4676 commit bb7b66a

File tree

3 files changed

+235
-28
lines changed

3 files changed

+235
-28
lines changed

lib/model/message.dart

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,14 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
409409
}
410410

411411
Message _reconcileUnrecognizedMessage(Message incoming) {
412+
if (
413+
incoming is StreamMessage
414+
&& subscriptions[incoming.streamId] == null
415+
) {
416+
// The message is in an unsubscribed channel. It might grow stale;
417+
// add it to _maybeStaleChannelMessages.
418+
_maybeStaleChannelMessages.add(incoming.id);
419+
}
412420
return _stripMatchFields(incoming);
413421
}
414422

@@ -418,20 +426,57 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
418426
// overlaps another, e.g. a stream and a topic within it.
419427
//
420428
// Most often, the just-fetched message will look just like the one we
421-
// already have. But they can differ: message fetching happens out of band
422-
// from the event queue, so there's inherently a race.
423-
//
424-
// If the fetched message reflects changes we haven't yet heard from the
425-
// event queue, then it doesn't much matter which version we use: we'll
426-
// soon get the corresponding events and apply the changes anyway.
427-
// But if it lacks changes we've already heard from the event queue, then
428-
// we won't hear those events again; the only way to wind up with an
429-
// updated message is to use the version we have, that already reflects
430-
// those events' changes. So we always stick with the version we have.
431-
// TODO(#1798) consider unsubscribed channels
432-
return current;
429+
// already have. But not always, and we can choose intelligently whether
430+
// to keep the stored version or clobber it with the incoming one.
431+
432+
bool currentIsMaybeStale = false;
433+
if (incoming is StreamMessage) {
434+
if (subscriptions[incoming.streamId] != null) {
435+
// The incoming version won't grow stale; it's in a subscribed channel.
436+
// Remove it from _maybeStaleChannelMessages if it was there.
437+
currentIsMaybeStale = _maybeStaleChannelMessages.remove(incoming.id);
438+
} else {
439+
assert(_maybeStaleChannelMessages.contains(incoming.id));
440+
currentIsMaybeStale = true;
441+
}
442+
}
443+
444+
if (currentIsMaybeStale) {
445+
// The event queue is unreliable for this message; the message was in an
446+
// unsubscribed channel when we stored it or sometime since, so the stored
447+
// version might be stale. Refresh it with the fetched version.
448+
return _stripMatchFields(incoming);
449+
} else {
450+
// Message fetching happens out of band from the event queue, so there's
451+
// inherently a race.
452+
//
453+
// If the fetched message reflects changes we haven't yet heard from the
454+
// event queue, then it doesn't much matter which version we use: we'll
455+
// soon get the corresponding events and apply the changes anyway.
456+
// But if it lacks changes we've already heard from the event queue, then
457+
// we won't hear those events again; the only way to wind up with an
458+
// updated message is to use the version we have, that already reflects
459+
// those events' changes. So, stick with the version we have.
460+
return current;
461+
}
433462
}
434463

464+
/// Messages in [messages] whose data stream is or was presumably broken
465+
/// by the message being in an unsubscribed channel.
466+
///
467+
/// This is the subset of [messages] where the message was
468+
/// in an unsubscribed channel when we added it or sometime since.
469+
///
470+
/// We don't expect update events for messages in unsubscribed channels,
471+
/// so if some of these maybe-stale messages appear in a fetch,
472+
/// we'll always clobber our stored version with the fetched version.
473+
/// See [reconcileMessages].
474+
///
475+
/// (We have seen a few such events, actually --
476+
/// maybe because the channel only recently became unsubscribed? --
477+
/// but not consistently, and we're not supposed to rely on them.)
478+
final Set<int> _maybeStaleChannelMessages = {};
479+
435480
Message _stripMatchFields(Message message) {
436481
message.matchContent = null;
437482
message.matchTopic = null;
@@ -502,6 +547,30 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
502547
);
503548
}
504549

550+
void handleChannelDeleteEvent(ChannelDeleteEvent event) {
551+
final channelIds = event.streams.map((channel) => channel.streamId);
552+
_handleSubscriptionsRemoved(channelIds);
553+
}
554+
555+
void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
556+
_handleSubscriptionsRemoved(event.streamIds);
557+
}
558+
559+
void _handleSubscriptionsRemoved(Iterable<int> channelIds) {
560+
if (channelIds.length > 8) {
561+
assert(channelIds is! Set);
562+
// optimization: https://github.com/zulip/zulip-flutter/pull/1912#discussion_r2479350329
563+
channelIds = Set.from(channelIds);
564+
}
565+
566+
// Linear in [messages].
567+
final affectedKnownMessageIds = messages.values
568+
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
569+
.map((message) => message.id);
570+
571+
_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
572+
}
573+
505574
void handleUserTopicEvent(UserTopicEvent event) {
506575
for (final view in _messageListViews) {
507576
view.handleUserTopicEvent(event);
@@ -515,10 +584,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
515584
}
516585

517586
void handleMessageEvent(MessageEvent event) {
587+
final message = event.message;
588+
518589
// If the message is one we already know about (from a fetch),
519590
// clobber it with the one from the event system.
520591
// See [reconcileMessages] for reasoning.
521-
messages[event.message.id] = event.message;
592+
messages[message.id] = message;
593+
594+
if (message is StreamMessage && subscriptions[message.streamId] == null) {
595+
// We didn't expect this event, because the channel is unsubscribed. But
596+
// that doesn't mean we should expect future events about this message.
597+
_maybeStaleChannelMessages.add(message.id);
598+
}
522599

523600
_handleMessageEventOutbox(event);
524601

@@ -607,6 +684,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
607684
// See [StreamConversation.displayRecipient] on why the invalidation is
608685
// needed.
609686
message.conversation.displayRecipient = null;
687+
688+
if (subscriptions[newStreamId] == null) {
689+
// The message was moved into an unsubscribed channel, which means
690+
// we expect our data on it to get stale.
691+
_maybeStaleChannelMessages.add(messageId);
692+
}
610693
}
611694

612695
if (newTopic != origTopic) {
@@ -629,6 +712,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
629712
void handleDeleteMessageEvent(DeleteMessageEvent event) {
630713
for (final messageId in event.messageIds) {
631714
messages.remove(messageId);
715+
_maybeStaleChannelMessages.remove(messageId);
632716
_editMessageRequests.remove(messageId);
633717
}
634718
for (final view in _messageListViews) {

lib/model/store.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,17 @@ class PerAccountStore extends PerAccountStoreBase with
821821

822822
case ChannelEvent():
823823
assert(debugLog("server event: stream/${event.op}"));
824+
if (event is ChannelDeleteEvent) {
825+
_messages.handleChannelDeleteEvent(event);
826+
}
824827
_channels.handleChannelEvent(event);
825828
notifyListeners();
826829

827830
case SubscriptionEvent():
828831
assert(debugLog("server event: subscription/${event.op}"));
832+
if (event is SubscriptionRemoveEvent) {
833+
_messages.handleSubscriptionRemoveEvent(event);
834+
}
829835
_channels.handleSubscriptionEvent(event);
830836
notifyListeners();
831837

test/model/message_test.dart

Lines changed: 132 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void main() {
3636

3737
// These "late" variables are the common state operated on by each test.
3838
// Each test case calls [prepare] to initialize them.
39-
late Subscription subscription;
39+
late Subscription? subscription;
4040
late PerAccountStore store;
4141
late FakeApiConnection connection;
4242
// [messageList] is here only for the sake of checking when it notifies.
@@ -54,15 +54,20 @@ void main() {
5454
/// Initialize [store] and the rest of the test state.
5555
Future<void> prepare({
5656
ZulipStream? stream,
57+
bool isChannelSubscribed = true,
5758
int? zulipFeatureLevel,
5859
}) async {
5960
stream ??= eg.stream(streamId: eg.defaultStreamMessageStreamId);
60-
subscription = eg.subscription(stream);
6161
final selfAccount = eg.selfAccount.copyWith(zulipFeatureLevel: zulipFeatureLevel);
6262
store = eg.store(account: selfAccount,
6363
initialSnapshot: eg.initialSnapshot(zulipFeatureLevel: zulipFeatureLevel));
6464
await store.addStream(stream);
65-
await store.addSubscription(subscription);
65+
if (isChannelSubscribed) {
66+
subscription = eg.subscription(stream);
67+
await store.addSubscription(subscription!);
68+
} else {
69+
subscription = null;
70+
}
6671
connection = store.connection as FakeApiConnection;
6772
notifiedCount = 0;
6873
messageList = MessageListView.init(store: store,
@@ -533,18 +538,130 @@ void main() {
533538
});
534539
});
535540

536-
test('on ID collision, new message does not clobber old in store.messages', () async {
537-
await prepare();
538-
final message = eg.streamMessage(id: 1, content: '<p>foo</p>');
539-
await addMessages([message]);
540-
check(store.messages).deepEquals({1: message});
541-
final newMessage = eg.streamMessage(id: 1, content: '<p>bar</p>');
542-
final messages = [newMessage];
543-
store.reconcileMessages(messages);
544-
check(messages).deepEquals(
545-
// (We'll check more messages in an upcoming commit.)
546-
[message].map(conditionIdentical));
547-
check(store.messages).deepEquals({1: message});
541+
group('fetched message with ID already in store.messages', () {
542+
/// Makes a copy of the single message in [MessageStore.messages]
543+
/// by round-tripping through [Message.fromJson] and [Message.toJson].
544+
///
545+
/// If that message's [StreamMessage.conversation.displayRecipient]
546+
/// is null, callers must provide a non-null [displayRecipient]
547+
/// to allow [StreamConversation.fromJson] to complete without throwing.
548+
Message copyStoredMessage({String? displayRecipient}) {
549+
final message = store.messages.values.single;
550+
551+
final json = message.toJson();
552+
if (
553+
message is StreamMessage
554+
&& message.conversation.displayRecipient == null
555+
) {
556+
if (displayRecipient == null) throw ArgumentError();
557+
json['display_recipient'] = displayRecipient;
558+
}
559+
560+
return Message.fromJson(json);
561+
}
562+
563+
/// Checks if the single message in [MessageStore.messages]
564+
/// is identical to [message].
565+
void checkStoredMessageIdenticalTo(Message message) {
566+
check(store.messages)
567+
.deepEquals({message.id: conditionIdentical(message)});
568+
}
569+
570+
test('DM', () async {
571+
await prepare();
572+
final message = eg.dmMessage(id: 1, from: eg.otherUser, to: [eg.selfUser]);
573+
574+
store.reconcileMessages([message]);
575+
checkStoredMessageIdenticalTo(message);
576+
store.reconcileMessages([copyStoredMessage()]);
577+
// Not clobbering, because the first call didn't mark stale.
578+
checkStoredMessageIdenticalTo(message);
579+
});
580+
581+
group('channel message; chooses correctly whether to clobber the stored version', () {
582+
// Exercise the ways we move the message in and out of the "maybe stale"
583+
// state. These include reconcileMessage itself, so sometimes we test
584+
// repeated calls to that with nothing else happening in between.
585+
586+
Message checkClobber({Message? withMessageCopy}) {
587+
final messageCopy = withMessageCopy ?? copyStoredMessage();
588+
store.reconcileMessages([messageCopy]);
589+
checkStoredMessageIdenticalTo(messageCopy);
590+
return messageCopy;
591+
}
592+
593+
void checkNoClobber({required Message messageBefore}) {
594+
store.reconcileMessages([copyStoredMessage()]);
595+
checkStoredMessageIdenticalTo(messageBefore);
596+
}
597+
598+
test('various conditions', () async {
599+
final channel = eg.stream();
600+
await prepare(stream: channel, isChannelSubscribed: true);
601+
final message = eg.streamMessage(id: 1, stream: channel);
602+
603+
final otherChannel = eg.stream();
604+
await store.addStream(otherChannel);
605+
606+
store.reconcileMessages([message]);
607+
checkStoredMessageIdenticalTo(message);
608+
// Not clobbering, because the first call didn't mark stale,
609+
// because the message was in a subscribed channel.
610+
checkNoClobber(messageBefore: message);
611+
612+
await store.removeSubscription(channel.streamId);
613+
// Clobbering because the unsubscribe event marked the message stale.
614+
Message messageCopy = checkClobber();
615+
// (Check that reconcileMessage itself didn't unmark as stale.)
616+
messageCopy = checkClobber();
617+
618+
await store.addSubscription(eg.subscription(channel));
619+
// The channel became subscribed,
620+
// but the message's data hasn't been refreshed, so clobber…
621+
messageCopy = checkClobber();
622+
623+
// …Now it's been refreshed, by reconcileMessages, so don't clobber.
624+
checkNoClobber(messageBefore: messageCopy);
625+
626+
check(store.subscriptions[otherChannel.streamId]).isNull();
627+
await store.handleEvent(
628+
eg.updateMessageEventMoveFrom(origMessages: [message],
629+
newStreamId: otherChannel.streamId));
630+
// Message was moved to an unsubscribed channel, so clobber.
631+
messageCopy = checkClobber(
632+
withMessageCopy: copyStoredMessage(displayRecipient: otherChannel.name));
633+
// (Check that reconcileMessage itself didn't unmark as stale.)
634+
messageCopy = checkClobber();
635+
636+
// Subscribe, to mark message as not-stale, setting up another check…
637+
await store.addSubscription(eg.subscription(otherChannel));
638+
639+
await store.handleEvent(ChannelDeleteEvent(id: 1, streams: [otherChannel]));
640+
// Message was in a channel that became unknown, so clobber.
641+
checkClobber();
642+
});
643+
644+
test('in unsubscribed channel on first call', () async {
645+
await prepare(isChannelSubscribed: false);
646+
final message = eg.streamMessage(id: 1);
647+
648+
store.reconcileMessages([message]);
649+
checkStoredMessageIdenticalTo(message);
650+
651+
checkClobber();
652+
checkClobber();
653+
});
654+
655+
test('new-message event when in unsubscribed channel', () async {
656+
await prepare(isChannelSubscribed: false);
657+
final message = eg.streamMessage(id: 1);
658+
659+
await store.handleEvent(eg.messageEvent(message));
660+
661+
checkClobber();
662+
checkClobber();
663+
});
664+
});
548665
});
549666

550667
test('matchContent and matchTopic are removed', () async {

0 commit comments

Comments
 (0)