@@ -57,7 +57,7 @@ use ruma::{
5757} ;
5858use tokio:: sync:: {
5959 broadcast:: { error:: RecvError , Receiver } ,
60- Mutex , RwLock ,
60+ mpsc , Mutex , RwLock ,
6161} ;
6262use tracing:: { error, info, info_span, instrument, trace, warn, Instrument as _, Span } ;
6363
@@ -129,6 +129,9 @@ pub struct EventCacheDropHandles {
129129
130130 /// Task that listens to updates to the user's ignored list.
131131 ignore_user_list_update_task : JoinHandle < ( ) > ,
132+
133+ /// The task used to automatically shrink the linked chunks.
134+ auto_shrink_linked_chunk_task : JoinHandle < ( ) > ,
132135}
133136
134137impl Debug for EventCacheDropHandles {
@@ -141,6 +144,7 @@ impl Drop for EventCacheDropHandles {
141144 fn drop ( & mut self ) {
142145 self . listen_updates_task . abort ( ) ;
143146 self . ignore_user_list_update_task . abort ( ) ;
147+ self . auto_shrink_linked_chunk_task . abort ( ) ;
144148 }
145149}
146150
@@ -172,6 +176,7 @@ impl EventCache {
172176 by_room : Default :: default ( ) ,
173177 drop_handles : Default :: default ( ) ,
174178 all_events : Default :: default ( ) ,
179+ auto_shrink_sender : Default :: default ( ) ,
175180 } ) ,
176181 }
177182 }
@@ -213,7 +218,19 @@ impl EventCache {
213218 client. subscribe_to_ignore_user_list_changes ( ) ,
214219 ) ) ;
215220
216- Arc :: new ( EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task } )
221+ let ( tx, rx) = mpsc:: channel ( 32 ) ;
222+
223+ // Force-initialize the sender in the [`RoomEventCacheInner`].
224+ self . inner . auto_shrink_sender . get_or_init ( || tx) ;
225+
226+ let auto_shrink_linked_chunk_tasks =
227+ spawn ( Self :: auto_shrink_linked_chunk_task ( self . inner . clone ( ) , rx) ) ;
228+
229+ Arc :: new ( EventCacheDropHandles {
230+ listen_updates_task,
231+ ignore_user_list_update_task,
232+ auto_shrink_linked_chunk_task : auto_shrink_linked_chunk_tasks,
233+ } )
217234 } ) ;
218235
219236 Ok ( ( ) )
@@ -309,6 +326,60 @@ impl EventCache {
309326 }
310327 }
311328
329+ /// Spawns the task that will listen to auto-shrink notifications.
330+ ///
331+ /// The auto-shrink mechanism works this way:
332+ ///
333+ /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
334+ /// increment the active number of listeners to that room, aka
335+ /// [`RoomEventCacheState::listener_count`].
336+ /// - When that subscriber is dropped, it will decrement that count; and
337+ /// notify the task below if it reached 0.
338+ /// - The task spawned here, owned by the [`EventCacheInner`], will listen
339+ /// to such notifications that a room may be shrunk. It will attempt an
340+ /// auto-shrink, by letting the inner state decide whether this is a good
341+ /// time to do so (new listeners might have spawned in the meanwhile).
342+ #[ instrument( skip_all) ]
343+ async fn auto_shrink_linked_chunk_task (
344+ inner : Arc < EventCacheInner > ,
345+ mut rx : mpsc:: Receiver < AutoShrinkChannelPayload > ,
346+ ) {
347+ while let Some ( room_id) = rx. recv ( ) . await {
348+ let room = match inner. for_room ( & room_id) . await {
349+ Ok ( room) => room,
350+ Err ( err) => {
351+ warn ! ( for_room = %room_id, "error when getting a RoomEventCache: {err}" ) ;
352+ continue ;
353+ }
354+ } ;
355+
356+ let mut state = room. inner . state . write ( ) . await ;
357+
358+ match state. auto_shrink_if_no_listeners ( ) . await {
359+ Ok ( diffs) => {
360+ if let Some ( diffs) = diffs {
361+ // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
362+ // listeners, right? RIGHT? Especially because the state is guarded behind
363+ // a lock.
364+ //
365+ // However, better safe than sorry, and it's cheap to send an update here,
366+ // so let's do it!
367+ let _ =
368+ room. inner . sender . send ( RoomEventCacheUpdate :: UpdateTimelineEvents {
369+ diffs,
370+ origin : EventsOrigin :: Cache ,
371+ } ) ;
372+ }
373+ }
374+
375+ Err ( err) => {
376+ // There's not much we can do here, unfortunately.
377+ warn ! ( for_room = %room_id, "error when attempting to shrink linked chunk: {err}" ) ;
378+ }
379+ }
380+ }
381+ }
382+
312383 /// Return a room-specific view over the [`EventCache`].
313384 pub ( crate ) async fn for_room (
314385 & self ,
@@ -534,8 +605,18 @@ struct EventCacheInner {
534605
535606 /// Handles to keep alive the task listening to updates.
536607 drop_handles : OnceLock < Arc < EventCacheDropHandles > > ,
608+
609+ /// A sender for notifications that a room *may* need to be auto-shrunk.
610+ ///
611+ /// Needs to live here, so it may be passed to each [`RoomEventCache`]
612+ /// instance.
613+ ///
614+ /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
615+ auto_shrink_sender : OnceLock < mpsc:: Sender < AutoShrinkChannelPayload > > ,
537616}
538617
618+ type AutoShrinkChannelPayload = OwnedRoomId ;
619+
539620impl EventCacheInner {
540621 fn client ( & self ) -> Result < Client > {
541622 self . client . get ( ) . ok_or ( EventCacheError :: ClientDropped )
@@ -644,12 +725,20 @@ impl EventCacheInner {
644725 RoomVersionId :: V1
645726 } ) ;
646727
728+ // SAFETY: we must have subscribed before reaching this coed, otherwise
729+ // something is very wrong.
730+ let auto_shrink_sender =
731+ self . auto_shrink_sender . get ( ) . cloned ( ) . expect (
732+ "we must have called `EventCache::subscribe()` before calling here." ,
733+ ) ;
734+
647735 let room_event_cache = RoomEventCache :: new (
648736 self . client . clone ( ) ,
649737 room_state,
650738 room_id. to_owned ( ) ,
651739 room_version,
652740 self . all_events . clone ( ) ,
741+ auto_shrink_sender,
653742 ) ;
654743
655744 by_room_guard. insert ( room_id. to_owned ( ) , room_event_cache. clone ( ) ) ;
@@ -718,6 +807,9 @@ pub enum EventsOrigin {
718807
719808 /// Events are coming from pagination.
720809 Pagination ,
810+
811+ /// The cause of the change is purely internal to the cache.
812+ Cache ,
721813}
722814
723815#[ cfg( test) ]
0 commit comments