Skip to content

Commit 2ab661c

Browse files
chrisbobbegnprice
authored andcommitted
message: Consider unsubscribed/unknown channels in reconcileMessages
This fixes the "fourth buggy behavior" in #1798: #1798 (comment) Fixes-partly: #1798
1 parent 5469cbe commit 2ab661c

File tree

3 files changed

+242
-28
lines changed

3 files changed

+242
-28
lines changed

lib/model/message.dart

Lines changed: 97 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,14 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
417417
}
418418

419419
Message _reconcileUnrecognizedMessage(Message incoming) {
420+
if (
421+
incoming is StreamMessage
422+
&& subscriptions[incoming.streamId] == null
423+
) {
424+
// The message is in an unsubscribed channel. It might grow stale;
425+
// add it to _maybeStaleChannelMessages.
426+
_maybeStaleChannelMessages.add(incoming.id);
427+
}
420428
return _stripMatchFields(incoming);
421429
}
422430

@@ -426,20 +434,57 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
426434
// overlaps another, e.g. a stream and a topic within it.
427435
//
428436
// Most often, the just-fetched message will look just like the one we
429-
// already have. But they can differ: message fetching happens out of band
430-
// from the event queue, so there's inherently a race.
431-
//
432-
// If the fetched message reflects changes we haven't yet heard from the
433-
// event queue, then it doesn't much matter which version we use: we'll
434-
// soon get the corresponding events and apply the changes anyway.
435-
// But if it lacks changes we've already heard from the event queue, then
436-
// we won't hear those events again; the only way to wind up with an
437-
// updated message is to use the version we have, that already reflects
438-
// those events' changes. So we always stick with the version we have.
439-
// TODO(#1798) consider unsubscribed channels
440-
return current;
437+
// already have. But not always, and we can choose intelligently whether
438+
// to keep the stored version or clobber it with the incoming one.
439+
440+
bool currentIsMaybeStale = false;
441+
if (incoming is StreamMessage) {
442+
if (subscriptions[incoming.streamId] != null) {
443+
// The incoming version won't grow stale; it's in a subscribed channel.
444+
// Remove it from _maybeStaleChannelMessages if it was there.
445+
currentIsMaybeStale = _maybeStaleChannelMessages.remove(incoming.id);
446+
} else {
447+
assert(_maybeStaleChannelMessages.contains(incoming.id));
448+
currentIsMaybeStale = true;
449+
}
450+
}
451+
452+
if (currentIsMaybeStale) {
453+
// The event queue is unreliable for this message; the message was in an
454+
// unsubscribed channel when we stored it or sometime since, so the stored
455+
// version might be stale. Refresh it with the fetched version.
456+
return _stripMatchFields(incoming);
457+
} else {
458+
// Message fetching happens out of band from the event queue, so there's
459+
// inherently a race.
460+
//
461+
// If the fetched message reflects changes we haven't yet heard from the
462+
// event queue, then it doesn't much matter which version we use: we'll
463+
// soon get the corresponding events and apply the changes anyway.
464+
// But if it lacks changes we've already heard from the event queue, then
465+
// we won't hear those events again; the only way to wind up with an
466+
// updated message is to use the version we have, that already reflects
467+
// those events' changes. So, stick with the version we have.
468+
return current;
469+
}
441470
}
442471

472+
/// Messages in [messages] whose data stream is or was presumably broken
473+
/// by the message being in an unsubscribed channel.
474+
///
475+
/// This is the subset of [messages] where the message was
476+
/// in an unsubscribed channel when we added it or sometime since.
477+
///
478+
/// We don't expect update events for messages in unsubscribed channels,
479+
/// so if some of these maybe-stale messages appear in a fetch,
480+
/// we'll always clobber our stored version with the fetched version.
481+
/// See [reconcileMessages].
482+
///
483+
/// (We have seen a few such events, actually --
484+
/// maybe because the channel only recently became unsubscribed? --
485+
/// but not consistently, and we're not supposed to rely on them.)
486+
final Set<int> _maybeStaleChannelMessages = {};
487+
443488
Message _stripMatchFields(Message message) {
444489
message.matchContent = null;
445490
message.matchTopic = null;
@@ -510,6 +555,30 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
510555
);
511556
}
512557

558+
void handleChannelDeleteEvent(ChannelDeleteEvent event) {
559+
final channelIds = event.streams.map((channel) => channel.streamId);
560+
_handleSubscriptionsRemoved(channelIds);
561+
}
562+
563+
void handleSubscriptionRemoveEvent(SubscriptionRemoveEvent event) {
564+
_handleSubscriptionsRemoved(event.streamIds);
565+
}
566+
567+
void _handleSubscriptionsRemoved(Iterable<int> channelIds) {
568+
if (channelIds.length > 8) {
569+
assert(channelIds is! Set);
570+
// optimization: https://github.com/zulip/zulip-flutter/pull/1912#discussion_r2479350329
571+
channelIds = Set.from(channelIds);
572+
}
573+
574+
// Linear in [messages].
575+
final affectedKnownMessageIds = messages.values
576+
.where((message) => message is StreamMessage && channelIds.contains(message.streamId))
577+
.map((message) => message.id);
578+
579+
_maybeStaleChannelMessages.addAll(affectedKnownMessageIds);
580+
}
581+
513582
void handleUserTopicEvent(UserTopicEvent event) {
514583
for (final view in _messageListViews) {
515584
view.handleUserTopicEvent(event);
@@ -523,10 +592,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
523592
}
524593

525594
void handleMessageEvent(MessageEvent event) {
595+
final message = event.message;
596+
526597
// If the message is one we already know about (from a fetch),
527598
// clobber it with the one from the event system.
528599
// See [reconcileMessages] for reasoning.
529-
messages[event.message.id] = event.message;
600+
messages[message.id] = message;
601+
602+
if (message is StreamMessage && subscriptions[message.streamId] == null) {
603+
// We didn't expect this event, because the channel is unsubscribed. But
604+
// that doesn't mean we should expect future events about this message.
605+
_maybeStaleChannelMessages.add(message.id);
606+
}
530607

531608
_handleMessageEventOutbox(event);
532609

@@ -615,6 +692,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
615692
// See [StreamConversation.displayRecipient] on why the invalidation is
616693
// needed.
617694
message.conversation.displayRecipient = null;
695+
696+
if (subscriptions[newStreamId] == null) {
697+
// The message was moved into an unsubscribed channel, which means
698+
// we expect our data on it to get stale.
699+
_maybeStaleChannelMessages.add(messageId);
700+
}
618701
}
619702

620703
if (newTopic != origTopic) {
@@ -637,6 +720,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
637720
void handleDeleteMessageEvent(DeleteMessageEvent event) {
638721
for (final messageId in event.messageIds) {
639722
messages.remove(messageId);
723+
_maybeStaleChannelMessages.remove(messageId);
640724
_editMessageRequests.remove(messageId);
641725
}
642726
for (final view in _messageListViews) {

lib/model/store.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,11 +821,17 @@ class PerAccountStore extends PerAccountStoreBase with
821821

822822
case ChannelEvent():
823823
assert(debugLog("server event: stream/${event.op}"));
824+
if (event is ChannelDeleteEvent) {
825+
_messages.handleChannelDeleteEvent(event);
826+
}
824827
_channels.handleChannelEvent(event);
825828
notifyListeners();
826829

827830
case SubscriptionEvent():
828831
assert(debugLog("server event: subscription/${event.op}"));
832+
if (event is SubscriptionRemoveEvent) {
833+
_messages.handleSubscriptionRemoveEvent(event);
834+
}
829835
_channels.handleSubscriptionEvent(event);
830836
notifyListeners();
831837

test/model/message_test.dart

Lines changed: 139 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void main() {
3636

3737
// These "late" variables are the common state operated on by each test.
3838
// Each test case calls [prepare] to initialize them.
39-
late Subscription subscription;
39+
late Subscription? subscription;
4040
late PerAccountStore store;
4141
late FakeApiConnection connection;
4242
// [messageList] is here only for the sake of checking when it notifies.
@@ -54,15 +54,20 @@ void main() {
5454
/// Initialize [store] and the rest of the test state.
5555
Future<void> prepare({
5656
ZulipStream? stream,
57+
bool isChannelSubscribed = true,
5758
int? zulipFeatureLevel,
5859
}) async {
5960
stream ??= eg.stream(streamId: eg.defaultStreamMessageStreamId);
60-
subscription = eg.subscription(stream);
6161
final selfAccount = eg.selfAccount.copyWith(zulipFeatureLevel: zulipFeatureLevel);
6262
store = eg.store(account: selfAccount,
6363
initialSnapshot: eg.initialSnapshot(zulipFeatureLevel: zulipFeatureLevel));
6464
await store.addStream(stream);
65-
await store.addSubscription(subscription);
65+
if (isChannelSubscribed) {
66+
subscription = eg.subscription(stream);
67+
await store.addSubscription(subscription!);
68+
} else {
69+
subscription = null;
70+
}
6671
connection = store.connection as FakeApiConnection;
6772
notifiedCount = 0;
6873
messageList = MessageListView.init(store: store,
@@ -533,18 +538,137 @@ void main() {
533538
});
534539
});
535540

536-
test('on ID collision, new message does not clobber old in store.messages', () async {
537-
await prepare();
538-
final message = eg.streamMessage(id: 1, content: '<p>foo</p>');
539-
await addMessages([message]);
540-
check(store.messages).deepEquals({1: message});
541-
final newMessage = eg.streamMessage(id: 1, content: '<p>bar</p>');
542-
final messages = [newMessage];
543-
store.reconcileMessages(messages);
544-
check(messages).deepEquals(
545-
// (We'll check more messages in an upcoming commit.)
546-
[message].map(conditionIdentical));
547-
check(store.messages).deepEquals({1: message});
541+
group('fetched message with ID already in store.messages', () {
542+
/// Makes a copy of the single message in [MessageStore.messages]
543+
/// by round-tripping through [Message.fromJson] and [Message.toJson].
544+
///
545+
/// If that message's [StreamMessage.conversation.displayRecipient]
546+
/// is null, callers must provide a non-null [displayRecipient]
547+
/// to allow [StreamConversation.fromJson] to complete without throwing.
548+
Message copyStoredMessage({String? displayRecipient}) {
549+
final message = store.messages.values.single;
550+
551+
final json = message.toJson();
552+
if (
553+
message is StreamMessage
554+
&& message.conversation.displayRecipient == null
555+
) {
556+
if (displayRecipient == null) throw ArgumentError();
557+
json['display_recipient'] = displayRecipient;
558+
}
559+
560+
return Message.fromJson(json);
561+
}
562+
563+
/// Checks if the single message in [MessageStore.messages]
564+
/// is identical to [message].
565+
void checkStoredMessageIdenticalTo(Message message) {
566+
check(store.messages)
567+
.deepEquals({message.id: conditionIdentical(message)});
568+
}
569+
570+
void checkClobber({Message? withMessageCopy}) {
571+
final messageCopy = withMessageCopy ?? copyStoredMessage();
572+
store.reconcileMessages([messageCopy]);
573+
checkStoredMessageIdenticalTo(messageCopy);
574+
}
575+
576+
void checkNoClobber() {
577+
final messageBefore = store.messages.values.single;
578+
store.reconcileMessages([copyStoredMessage()]);
579+
checkStoredMessageIdenticalTo(messageBefore);
580+
}
581+
582+
test('DM', () async {
583+
await prepare();
584+
final message = eg.dmMessage(id: 1, from: eg.otherUser, to: [eg.selfUser]);
585+
586+
store.reconcileMessages([message]);
587+
588+
// Not clobbering, because the first call didn't mark stale.
589+
checkNoClobber();
590+
});
591+
592+
group('channel message; chooses correctly whether to clobber the stored version', () {
593+
// Exercise the ways we move the message in and out of the "maybe stale"
594+
// state. These include reconcileMessage itself, so sometimes we test
595+
// repeated calls to that with nothing else happening in between.
596+
597+
test('various conditions', () async {
598+
final channel = eg.stream();
599+
await prepare(stream: channel, isChannelSubscribed: true);
600+
final message = eg.streamMessage(stream: channel);
601+
602+
store.reconcileMessages([message]);
603+
604+
// Not clobbering, because the first call didn't mark stale,
605+
// because the message was in a subscribed channel.
606+
checkNoClobber();
607+
608+
await store.removeSubscription(channel.streamId);
609+
// Clobbering because the unsubscribe event marked the message stale.
610+
checkClobber();
611+
// (Check that reconcileMessage itself didn't unmark as stale.)
612+
checkClobber();
613+
614+
await store.addSubscription(eg.subscription(channel));
615+
// The channel became subscribed,
616+
// but the message's data hasn't been refreshed, so clobber…
617+
checkClobber();
618+
619+
// …Now it's been refreshed, by reconcileMessages, so don't clobber.
620+
checkNoClobber();
621+
622+
final otherChannel = eg.stream();
623+
await store.addStream(otherChannel);
624+
check(store.subscriptions[otherChannel.streamId]).isNull();
625+
await store.handleEvent(
626+
eg.updateMessageEventMoveFrom(origMessages: [message],
627+
newStreamId: otherChannel.streamId));
628+
// Message was moved to an unsubscribed channel, so clobber.
629+
checkClobber(
630+
withMessageCopy: copyStoredMessage(displayRecipient: otherChannel.name));
631+
// (Check that reconcileMessage itself didn't unmark as stale.)
632+
checkClobber();
633+
634+
// Subscribe, to mark message as not-stale, setting up another check…
635+
await store.addSubscription(eg.subscription(otherChannel));
636+
637+
await store.handleEvent(ChannelDeleteEvent(id: 1, streams: [otherChannel]));
638+
// Message was in a channel that became unknown, so clobber.
639+
checkClobber();
640+
});
641+
642+
test('in unsubscribed channel on first call', () async {
643+
await prepare(isChannelSubscribed: false);
644+
final message = eg.streamMessage();
645+
646+
store.reconcileMessages([message]);
647+
648+
checkClobber();
649+
checkClobber();
650+
});
651+
652+
test('new-message event when in unsubscribed channel', () async {
653+
await prepare(isChannelSubscribed: false);
654+
final message = eg.streamMessage();
655+
656+
await store.handleEvent(eg.messageEvent(message));
657+
658+
checkClobber();
659+
checkClobber();
660+
});
661+
662+
test('new-message event when in a subscribed channel', () async {
663+
await prepare(isChannelSubscribed: true);
664+
final message = eg.streamMessage();
665+
666+
await store.handleEvent(eg.messageEvent(message));
667+
668+
checkNoClobber();
669+
checkNoClobber();
670+
});
671+
});
548672
});
549673

550674
test('matchContent and matchTopic are removed', () async {

0 commit comments

Comments
 (0)