Skip to content

Commit 956fe13

Browse files
committed
message: Consider unsubscribed/unknown channels in reconcileMessages
This fixes the "fourth buggy behavior" in zulip#1798: zulip#1798 (comment) Fixes-partly: zulip#1798
1 parent b50b954 commit 956fe13

File tree

3 files changed

+242
-28
lines changed

3 files changed

+242
-28
lines changed

lib/model/message.dart

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,14 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
409409
}
410410

411411
Message _reconcileUnrecognizedMessage(Message incoming) {
412+
if (
413+
incoming is StreamMessage
414+
&& subscriptions[incoming.streamId] == null
415+
) {
416+
// The message is in an unsubscribed channel. It might grow stale;
417+
// add it to _maybeStaleChannelMessages.
418+
_maybeStaleChannelMessages.add(incoming.id);
419+
}
412420
return _stripMatchFields(incoming);
413421
}
414422

@@ -418,20 +426,57 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
418426
// overlaps another, e.g. a stream and a topic within it.
419427
//
420428
// Most often, the just-fetched message will look just like the one we
421-
// already have. But they can differ: message fetching happens out of band
422-
// from the event queue, so there's inherently a race.
423-
//
424-
// If the fetched message reflects changes we haven't yet heard from the
425-
// event queue, then it doesn't much matter which version we use: we'll
426-
// soon get the corresponding events and apply the changes anyway.
427-
// But if it lacks changes we've already heard from the event queue, then
428-
// we won't hear those events again; the only way to wind up with an
429-
// updated message is to use the version we have, that already reflects
430-
// those events' changes. So we always stick with the version we have.
431-
// TODO(#1798) consider unsubscribed channels
432-
return current;
429+
// already have. But not always, and we can choose intelligently whether
430+
// to keep the stored version or clobber it with the incoming one.
431+
432+
bool currentIsMaybeStale = false;
433+
if (incoming is StreamMessage) {
434+
if (subscriptions[incoming.streamId] != null) {
435+
// The incoming version won't grow stale; it's in a subscribed channel.
436+
// Remove it from _maybeStaleChannelMessages if it was there.
437+
currentIsMaybeStale = _maybeStaleChannelMessages.remove(incoming.id);
438+
} else {
439+
assert(_maybeStaleChannelMessages.contains(incoming.id));
440+
currentIsMaybeStale = true;
441+
}
442+
}
443+
444+
if (currentIsMaybeStale) {
445+
// The event queue is unreliable for this message; the message was in an
446+
// unsubscribed channel when we stored it or sometime since, so the stored
447+
// version might be stale. Refresh it with the fetched version.
448+
return _stripMatchFields(incoming);
449+
} else {
450+
// Message fetching happens out of band from the event queue, so there's
451+
// inherently a race.
452+
//
453+
// If the fetched message reflects changes we haven't yet heard from the
454+
// event queue, then it doesn't much matter which version we use: we'll
455+
// soon get the corresponding events and apply the changes anyway.
456+
// But if it lacks changes we've already heard from the event queue, then
457+
// we won't hear those events again; the only way to wind up with an
458+
// updated message is to use the version we have, that already reflects
459+
// those events' changes. So, stick with the version we have.
460+
return current;
461+
}
433462
}
434463

464+
/// Messages in [messages] whose data stream is or was presumably broken
465+
/// by the message being in an unsubscribed channel.
466+
///
467+
/// This is the subset of [messages] where the message was
468+
/// in an unsubscribed channel when we added it or sometime since.
469+
///
470+
/// We don't expect update events for messages in unsubscribed channels,
471+
/// so if some of these maybe-stale messages appear in a fetch,
472+
/// we'll always clobber our stored version with the fetched version.
473+
/// See [reconcileMessages].
474+
///
475+
/// (We have seen a few such events, actually --
476+
/// maybe because the channel only recently became unsubscribed? --
477+
/// but not consistently, and we're not supposed to rely on them.)
478+
final Set<int> _maybeStaleChannelMessages = {};
479+
435480
Message _stripMatchFields(Message message) {
436481
message.matchContent = null;
437482
message.matchTopic = null;
@@ -502,6 +547,30 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
502547
);
503548
}
504549

550+
void handleChannelDeleteEvent(ChannelDeleteEvent event) {
551+
final channelIds = event.streams.map((channel) => channel.streamId);
552+
_handleSubscriptionsRemoved(channelIds);
553+
}
554+
555+
void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
556+
_handleSubscriptionsRemoved(event.streamIds);
557+
}
558+
559+
void _handleSubscriptionsRemoved(Iterable<int> channelIds) {
560+
if (channelIds.length > 8) {
561+
assert(channelIds is! Set);
562+
// optimization: https://github.com/zulip/zulip-flutter/pull/1912#discussion_r2479350329
563+
channelIds = Set.from(channelIds);
564+
}
565+
566+
// Linear in [messages].
567+
final affectedKnownMessageIds = messages.values
568+
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
569+
.map((message) => message.id);
570+
571+
_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
572+
}
573+
505574
void handleUserTopicEvent(UserTopicEvent event) {
506575
for (final view in _messageListViews) {
507576
view.handleUserTopicEvent(event);
@@ -515,10 +584,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
515584
}
516585

517586
void handleMessageEvent(MessageEvent event) {
587+
final message = event.message;
588+
518589
// If the message is one we already know about (from a fetch),
519590
// clobber it with the one from the event system.
520591
// See [reconcileMessages] for reasoning.
521-
messages[event.message.id] = event.message;
592+
messages[message.id] = message;
593+
594+
if (message is StreamMessage && subscriptions[message.streamId] == null) {
595+
// We didn't expect this event, because the channel is unsubscribed. But
596+
// that doesn't mean we should expect future events about this message.
597+
_maybeStaleChannelMessages.add(message.id);
598+
}
522599

523600
_handleMessageEventOutbox(event);
524601

@@ -607,6 +684,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
607684
// See [StreamConversation.displayRecipient] on why the invalidation is
608685
// needed.
609686
message.conversation.displayRecipient = null;
687+
688+
if (subscriptions[newStreamId] == null) {
689+
// The message was moved into an unsubscribed channel, which means
690+
// we expect our data on it to get stale.
691+
_maybeStaleChannelMessages.add(messageId);
692+
}
610693
}
611694

612695
if (newTopic != origTopic) {
@@ -629,6 +712,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
629712
void handleDeleteMessageEvent(DeleteMessageEvent event) {
630713
for (final messageId in event.messageIds) {
631714
messages.remove(messageId);
715+
_maybeStaleChannelMessages.remove(messageId);
632716
_editMessageRequests.remove(messageId);
633717
}
634718
for (final view in _messageListViews) {

lib/model/store.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,17 @@ class PerAccountStore extends PerAccountStoreBase with
821821

822822
case ChannelEvent():
823823
assert(debugLog("server event: stream/${event.op}"));
824+
if (event is ChannelDeleteEvent) {
825+
_messages.handleChannelDeleteEvent(event);
826+
}
824827
_channels.handleChannelEvent(event);
825828
notifyListeners();
826829

827830
case SubscriptionEvent():
828831
assert(debugLog("server event: subscription/${event.op}"));
832+
if (event is SubscriptionRemoveEvent) {
833+
_messages.handleSubscriptionRemoveEvent(event);
834+
}
829835
_channels.handleSubscriptionEvent(event);
830836
notifyListeners();
831837

test/model/message_test.dart

Lines changed: 139 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void main() {
3636

3737
// These "late" variables are the common state operated on by each test.
3838
// Each test case calls [prepare] to initialize them.
39-
late Subscription subscription;
39+
late Subscription? subscription;
4040
late PerAccountStore store;
4141
late FakeApiConnection connection;
4242
// [messageList] is here only for the sake of checking when it notifies.
@@ -54,15 +54,20 @@ void main() {
5454
/// Initialize [store] and the rest of the test state.
5555
Future<void> prepare({
5656
ZulipStream? stream,
57+
bool isChannelSubscribed = true,
5758
int? zulipFeatureLevel,
5859
}) async {
5960
stream ??= eg.stream(streamId: eg.defaultStreamMessageStreamId);
60-
subscription = eg.subscription(stream);
6161
final selfAccount = eg.selfAccount.copyWith(zulipFeatureLevel: zulipFeatureLevel);
6262
store = eg.store(account: selfAccount,
6363
initialSnapshot: eg.initialSnapshot(zulipFeatureLevel: zulipFeatureLevel));
6464
await store.addStream(stream);
65-
await store.addSubscription(subscription);
65+
if (isChannelSubscribed) {
66+
subscription = eg.subscription(stream);
67+
await store.addSubscription(subscription!);
68+
} else {
69+
subscription = null;
70+
}
6671
connection = store.connection as FakeApiConnection;
6772
notifiedCount = 0;
6873
messageList = MessageListView.init(store: store,
@@ -533,18 +538,137 @@ void main() {
533538
});
534539
});
535540

536-
test('on ID collision, new message does not clobber old in store.messages', () async {
537-
await prepare();
538-
final message = eg.streamMessage(id: 1, content: '<p>foo</p>');
539-
await addMessages([message]);
540-
check(store.messages).deepEquals({1: message});
541-
final newMessage = eg.streamMessage(id: 1, content: '<p>bar</p>');
542-
final messages = [newMessage];
543-
store.reconcileMessages(messages);
544-
check(messages).deepEquals(
545-
// (We'll check more messages in an upcoming commit.)
546-
[message].map(conditionIdentical));
547-
check(store.messages).deepEquals({1: message});
541+
group('fetched message with ID already in store.messages', () {
542+
/// Makes a copy of the single message in [MessageStore.messages]
543+
/// by round-tripping through [Message.fromJson] and [Message.toJson].
544+
///
545+
/// If that message's [StreamMessage.conversation.displayRecipient]
546+
/// is null, callers must provide a non-null [displayRecipient]
547+
/// to allow [StreamConversation.fromJson] to complete without throwing.
548+
Message copyStoredMessage({String? displayRecipient}) {
549+
final message = store.messages.values.single;
550+
551+
final json = message.toJson();
552+
if (
553+
message is StreamMessage
554+
&& message.conversation.displayRecipient == null
555+
) {
556+
if (displayRecipient == null) throw ArgumentError();
557+
json['display_recipient'] = displayRecipient;
558+
}
559+
560+
return Message.fromJson(json);
561+
}
562+
563+
/// Checks if the single message in [MessageStore.messages]
564+
/// is identical to [message].
565+
void checkStoredMessageIdenticalTo(Message message) {
566+
check(store.messages)
567+
.deepEquals({message.id: conditionIdentical(message)});
568+
}
569+
570+
void checkClobber({Message? withMessageCopy}) {
571+
final messageCopy = withMessageCopy ?? copyStoredMessage();
572+
store.reconcileMessages([messageCopy]);
573+
checkStoredMessageIdenticalTo(messageCopy);
574+
}
575+
576+
void checkNoClobber() {
577+
final messageBefore = store.messages.values.single;
578+
store.reconcileMessages([copyStoredMessage()]);
579+
checkStoredMessageIdenticalTo(messageBefore);
580+
}
581+
582+
test('DM', () async {
583+
await prepare();
584+
final message = eg.dmMessage(id: 1, from: eg.otherUser, to: [eg.selfUser]);
585+
586+
store.reconcileMessages([message]);
587+
588+
// Not clobbering, because the first call didn't mark stale.
589+
checkNoClobber();
590+
});
591+
592+
group('channel message; chooses correctly whether to clobber the stored version', () {
593+
// Exercise the ways we move the message in and out of the "maybe stale"
594+
// state. These include reconcileMessage itself, so sometimes we test
595+
// repeated calls to that with nothing else happening in between.
596+
597+
test('various conditions', () async {
598+
final channel = eg.stream();
599+
await prepare(stream: channel, isChannelSubscribed: true);
600+
final message = eg.streamMessage(stream: channel);
601+
602+
store.reconcileMessages([message]);
603+
604+
// Not clobbering, because the first call didn't mark stale,
605+
// because the message was in a subscribed channel.
606+
checkNoClobber();
607+
608+
await store.removeSubscription(channel.streamId);
609+
// Clobbering because the unsubscribe event marked the message stale.
610+
checkClobber();
611+
// (Check that reconcileMessage itself didn't unmark as stale.)
612+
checkClobber();
613+
614+
await store.addSubscription(eg.subscription(channel));
615+
// The channel became subscribed,
616+
// but the message's data hasn't been refreshed, so clobber…
617+
checkClobber();
618+
619+
// …Now it's been refreshed, by reconcileMessages, so don't clobber.
620+
checkNoClobber();
621+
622+
final otherChannel = eg.stream();
623+
await store.addStream(otherChannel);
624+
check(store.subscriptions[otherChannel.streamId]).isNull();
625+
await store.handleEvent(
626+
eg.updateMessageEventMoveFrom(origMessages: [message],
627+
newStreamId: otherChannel.streamId));
628+
// Message was moved to an unsubscribed channel, so clobber.
629+
checkClobber(
630+
withMessageCopy: copyStoredMessage(displayRecipient: otherChannel.name));
631+
// (Check that reconcileMessage itself didn't unmark as stale.)
632+
checkClobber();
633+
634+
// Subscribe, to mark message as not-stale, setting up another check…
635+
await store.addSubscription(eg.subscription(otherChannel));
636+
637+
await store.handleEvent(ChannelDeleteEvent(id: 1, streams: [otherChannel]));
638+
// Message was in a channel that became unknown, so clobber.
639+
checkClobber();
640+
});
641+
642+
test('in unsubscribed channel on first call', () async {
643+
await prepare(isChannelSubscribed: false);
644+
final message = eg.streamMessage();
645+
646+
store.reconcileMessages([message]);
647+
648+
checkClobber();
649+
checkClobber();
650+
});
651+
652+
test('new-message event when in unsubscribed channel', () async {
653+
await prepare(isChannelSubscribed: false);
654+
final message = eg.streamMessage();
655+
656+
await store.handleEvent(eg.messageEvent(message));
657+
658+
checkClobber();
659+
checkClobber();
660+
});
661+
662+
test('new-message event when in a subscribed channel', () async {
663+
await prepare(isChannelSubscribed: true);
664+
final message = eg.streamMessage();
665+
666+
await store.handleEvent(eg.messageEvent(message));
667+
668+
checkNoClobber();
669+
checkNoClobber();
670+
});
671+
});
548672
});
549673

550674
test('matchContent and matchTopic are removed', () async {

0 commit comments

Comments
 (0)