@@ -417,6 +417,14 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
417417 }
418418
419419 Message _reconcileUnrecognizedMessage (Message incoming) {
420+ if (
421+ incoming is StreamMessage
422+ && subscriptions[incoming.streamId] == null
423+ ) {
424+ // The message is in an unsubscribed channel. It might grow stale;
425+ // add it to _maybeStaleChannelMessages.
426+ _maybeStaleChannelMessages.add (incoming.id);
427+ }
420428 return _stripMatchFields (incoming);
421429 }
422430
@@ -426,20 +434,57 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
426434 // overlaps another, e.g. a stream and a topic within it.
427435 //
428436 // Most often, the just-fetched message will look just like the one we
429- // already have. But they can differ: message fetching happens out of band
430- // from the event queue, so there's inherently a race.
431- //
432- // If the fetched message reflects changes we haven't yet heard from the
433- // event queue, then it doesn't much matter which version we use: we'll
434- // soon get the corresponding events and apply the changes anyway.
435- // But if it lacks changes we've already heard from the event queue, then
436- // we won't hear those events again; the only way to wind up with an
437- // updated message is to use the version we have, that already reflects
438- // those events' changes. So we always stick with the version we have.
439- // TODO(#1798) consider unsubscribed channels
440- return current;
437+ // already have. But not always, and we can choose intelligently whether
438+ // to keep the stored version or clobber it with the incoming one.
439+
440+ bool currentIsMaybeStale = false ;
441+ if (incoming is StreamMessage ) {
442+ if (subscriptions[incoming.streamId] != null ) {
443+ // The incoming version won't grow stale; it's in a subscribed channel.
444+ // Remove it from _maybeStaleChannelMessages if it was there.
445+ currentIsMaybeStale = _maybeStaleChannelMessages.remove (incoming.id);
446+ } else {
447+ assert (_maybeStaleChannelMessages.contains (incoming.id));
448+ currentIsMaybeStale = true ;
449+ }
450+ }
451+
452+ if (currentIsMaybeStale) {
453+ // The event queue is unreliable for this message; the message was in an
454+ // unsubscribed channel when we stored it or sometime since, so the stored
455+ // version might be stale. Refresh it with the fetched version.
456+ return _stripMatchFields (incoming);
457+ } else {
458+ // Message fetching happens out of band from the event queue, so there's
459+ // inherently a race.
460+ //
461+ // If the fetched message reflects changes we haven't yet heard from the
462+ // event queue, then it doesn't much matter which version we use: we'll
463+ // soon get the corresponding events and apply the changes anyway.
464+ // But if it lacks changes we've already heard from the event queue, then
465+ // we won't hear those events again; the only way to wind up with an
466+ // updated message is to use the version we have, that already reflects
467+ // those events' changes. So, stick with the version we have.
468+ return current;
469+ }
441470 }
442471
472+ /// Messages in [messages] whose data stream is or was presumably broken
473+ /// by the message being in an unsubscribed channel.
474+ ///
475+ /// This is the subset of [messages] where the message was
476+ /// in an unsubscribed channel when we added it or sometime since.
477+ ///
478+ /// We don't expect update events for messages in unsubscribed channels,
479+ /// so if some of these maybe-stale messages appear in a fetch,
480+ /// we'll always clobber our stored version with the fetched version.
481+ /// See [reconcileMessages] .
482+ ///
483+ /// (We have seen a few such events, actually --
484+ /// maybe because the channel only recently became unsubscribed? --
485+ /// but not consistently, and we're not supposed to rely on them.)
486+ final Set <int > _maybeStaleChannelMessages = {};
487+
443488 Message _stripMatchFields (Message message) {
444489 message.matchContent = null ;
445490 message.matchTopic = null ;
@@ -510,6 +555,30 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
510555 );
511556 }
512557
558+ void handleChannelDeleteEvent (ChannelDeleteEvent event) {
559+ final channelIds = event.streams.map ((channel) => channel.streamId);
560+ _handleSubscriptionsRemoved (channelIds);
561+ }
562+
563+ void handleSubscriptionRemoveEvent (SubscriptionRemoveEvent event) {
564+ _handleSubscriptionsRemoved (event.streamIds);
565+ }
566+
567+ void _handleSubscriptionsRemoved (Iterable <int > channelIds) {
568+ if (channelIds.length > 8 ) {
569+ assert (channelIds is ! Set );
570+ // optimization: https://github.com/zulip/zulip-flutter/pull/1912#discussion_r2479350329
571+ channelIds = Set .from (channelIds);
572+ }
573+
574+ // Linear in [messages].
575+ final affectedKnownMessageIds = messages.values
576+ .where ((message) => message is StreamMessage && channelIds.contains (message.streamId))
577+ .map ((message) => message.id);
578+
579+ _maybeStaleChannelMessages.addAll (affectedKnownMessageIds);
580+ }
581+
513582 void handleUserTopicEvent (UserTopicEvent event) {
514583 for (final view in _messageListViews) {
515584 view.handleUserTopicEvent (event);
@@ -523,10 +592,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
523592 }
524593
525594 void handleMessageEvent (MessageEvent event) {
595+ final message = event.message;
596+
526597 // If the message is one we already know about (from a fetch),
527598 // clobber it with the one from the event system.
528599 // See [reconcileMessages] for reasoning.
529- messages[event.message.id] = event.message;
600+ messages[message.id] = message;
601+
602+ if (message is StreamMessage && subscriptions[message.streamId] == null ) {
603+ // We didn't expect this event, because the channel is unsubscribed. But
604+ // that doesn't mean we should expect future events about this message.
605+ _maybeStaleChannelMessages.add (message.id);
606+ }
530607
531608 _handleMessageEventOutbox (event);
532609
@@ -615,6 +692,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
615692 // See [StreamConversation.displayRecipient] on why the invalidation is
616693 // needed.
617694 message.conversation.displayRecipient = null ;
695+
696+ if (subscriptions[newStreamId] == null ) {
697+ // The message was moved into an unsubscribed channel, which means
698+ // we expect our data on it to get stale.
699+ _maybeStaleChannelMessages.add (messageId);
700+ }
618701 }
619702
620703 if (newTopic != origTopic) {
@@ -637,6 +720,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
637720 void handleDeleteMessageEvent (DeleteMessageEvent event) {
638721 for (final messageId in event.messageIds) {
639722 messages.remove (messageId);
723+ _maybeStaleChannelMessages.remove (messageId);
640724 _editMessageRequests.remove (messageId);
641725 }
642726 for (final view in _messageListViews) {
0 commit comments