@@ -409,6 +409,14 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
409409 }
410410
411411 Message _reconcileUnrecognizedMessage (Message incoming) {
412+ if (
413+ incoming is StreamMessage
414+ && subscriptions[incoming.streamId] == null
415+ ) {
416+ // The message is in an unsubscribed channel. It might grow stale;
417+ // add it to _maybeStaleChannelMessages.
418+ _maybeStaleChannelMessages.add (incoming.id);
419+ }
412420 return _stripMatchFields (incoming);
413421 }
414422
@@ -418,20 +426,57 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
418426 // overlaps another, e.g. a stream and a topic within it.
419427 //
420428 // Most often, the just-fetched message will look just like the one we
421- // already have. But they can differ: message fetching happens out of band
422- // from the event queue, so there's inherently a race.
423- //
424- // If the fetched message reflects changes we haven't yet heard from the
425- // event queue, then it doesn't much matter which version we use: we'll
426- // soon get the corresponding events and apply the changes anyway.
427- // But if it lacks changes we've already heard from the event queue, then
428- // we won't hear those events again; the only way to wind up with an
429- // updated message is to use the version we have, that already reflects
430- // those events' changes. So we always stick with the version we have.
431- // TODO(#1798) consider unsubscribed channels
432- return current;
429+ // already have. But not always, and we can choose intelligently whether
430+ // to keep the stored version or clobber it with the incoming one.
431+
432+ bool currentIsMaybeStale = false ;
433+ if (incoming is StreamMessage ) {
434+ if (subscriptions[incoming.streamId] != null ) {
435+ // The incoming version won't grow stale; it's in a subscribed channel.
436+ // Remove it from _maybeStaleChannelMessages if it was there.
437+ currentIsMaybeStale = _maybeStaleChannelMessages.remove (incoming.id);
438+ } else {
439+ assert (_maybeStaleChannelMessages.contains (incoming.id));
440+ currentIsMaybeStale = true ;
441+ }
442+ }
443+
444+ if (currentIsMaybeStale) {
445+ // The event queue is unreliable for this message; the message was in an
446+ // unsubscribed channel when we stored it or sometime since, so the stored
447+ // version might be stale. Refresh it with the fetched version.
448+ return _stripMatchFields (incoming);
449+ } else {
450+ // Message fetching happens out of band from the event queue, so there's
451+ // inherently a race.
452+ //
453+ // If the fetched message reflects changes we haven't yet heard from the
454+ // event queue, then it doesn't much matter which version we use: we'll
455+ // soon get the corresponding events and apply the changes anyway.
456+ // But if it lacks changes we've already heard from the event queue, then
457+ // we won't hear those events again; the only way to wind up with an
458+ // updated message is to use the version we have, that already reflects
459+ // those events' changes. So, stick with the version we have.
460+ return current;
461+ }
433462 }
434463
464+ /// Messages in [messages] whose data stream is or was presumably broken
465+ /// by the message being in an unsubscribed channel.
466+ ///
467+ /// This is the subset of [messages] where the message was
468+ /// in an unsubscribed channel when we added it or sometime since.
469+ ///
470+ /// We don't expect update events for messages in unsubscribed channels,
471+ /// so if some of these maybe-stale messages appear in a fetch,
472+ /// we'll always clobber our stored version with the fetched version.
473+ /// See [reconcileMessages] .
474+ ///
475+ /// (We have seen a few such events, actually --
476+ /// maybe because the channel only recently became unsubscribed? --
477+ /// but not consistently, and we're not supposed to rely on them.)
478+ final Set <int > _maybeStaleChannelMessages = {};
479+
435480 Message _stripMatchFields (Message message) {
436481 message.matchContent = null ;
437482 message.matchTopic = null ;
@@ -502,6 +547,30 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
502547 );
503548 }
504549
550+ void handleChannelDeleteEvent (ChannelDeleteEvent event) {
551+ final channelIds = event.streams.map ((channel) => channel.streamId);
552+ _handleSubscriptionsRemoved (channelIds);
553+ }
554+
555+ void handleSubscriptionRemoveEvent (SubscriptionRemoveEvent event) {
556+ _handleSubscriptionsRemoved (event.streamIds);
557+ }
558+
559+ void _handleSubscriptionsRemoved (Iterable <int > channelIds) {
560+ if (channelIds.length > 8 ) {
561+ assert (channelIds is ! Set );
562+ // optimization: https://github.com/zulip/zulip-flutter/pull/1912#discussion_r2479350329
563+ channelIds = Set .from (channelIds);
564+ }
565+
566+ // Linear in [messages].
567+ final affectedKnownMessageIds = messages.values
568+ .where ((message) => message is StreamMessage && channelIds.contains (message.streamId))
569+ .map ((message) => message.id);
570+
571+ _maybeStaleChannelMessages.addAll (affectedKnownMessageIds);
572+ }
573+
505574 void handleUserTopicEvent (UserTopicEvent event) {
506575 for (final view in _messageListViews) {
507576 view.handleUserTopicEvent (event);
@@ -515,10 +584,18 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
515584 }
516585
517586 void handleMessageEvent (MessageEvent event) {
587+ final message = event.message;
588+
518589 // If the message is one we already know about (from a fetch),
519590 // clobber it with the one from the event system.
520591 // See [reconcileMessages] for reasoning.
521- messages[event.message.id] = event.message;
592+ messages[message.id] = message;
593+
594+ if (message is StreamMessage && subscriptions[message.streamId] == null ) {
595+ // We didn't expect this event, because the channel is unsubscribed. But
596+ // that doesn't mean we should expect future events about this message.
597+ _maybeStaleChannelMessages.add (message.id);
598+ }
522599
523600 _handleMessageEventOutbox (event);
524601
@@ -607,6 +684,12 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
607684 // See [StreamConversation.displayRecipient] on why the invalidation is
608685 // needed.
609686 message.conversation.displayRecipient = null ;
687+
688+ if (subscriptions[newStreamId] == null ) {
689+ // The message was moved into an unsubscribed channel, which means
690+ // we expect our data on it to get stale.
691+ _maybeStaleChannelMessages.add (messageId);
692+ }
610693 }
611694
612695 if (newTopic != origTopic) {
@@ -629,6 +712,7 @@ class MessageStoreImpl extends HasChannelStore with MessageStore, _OutboxMessage
629712 void handleDeleteMessageEvent (DeleteMessageEvent event) {
630713 for (final messageId in event.messageIds) {
631714 messages.remove (messageId);
715+ _maybeStaleChannelMessages.remove (messageId);
632716 _editMessageRequests.remove (messageId);
633717 }
634718 for (final view in _messageListViews) {
0 commit comments