@@ -1971,3 +1971,175 @@ async fn test_lazy_loading() {
19711971
19721972 assert ! ( updates_stream. is_empty( ) ) ;
19731973}
1974+
1975+ #[ async_test]
1976+ async fn test_deduplication ( ) {
1977+ let room_id = room_id ! ( "!foo:bar.baz" ) ;
1978+ let event_factory = EventFactory :: new ( ) . room ( room_id) . sender ( & ALICE ) ;
1979+
1980+ let mock_server = MatrixMockServer :: new ( ) . await ;
1981+ let client = mock_server. client_builder ( ) . build ( ) . await ;
1982+
1983+ // Set up the event cache store.
1984+ {
1985+ let event_cache_store = client. event_cache_store ( ) . lock ( ) . await . unwrap ( ) ;
1986+
1987+ // The event cache contains 2 chunks as such (from newest to older):
1988+ // 2. a chunk of 4 items
1989+ // 1. a chunk of 3 items
1990+ event_cache_store
1991+ . handle_linked_chunk_updates (
1992+ room_id,
1993+ vec ! [
1994+ // chunk #0
1995+ Update :: NewItemsChunk {
1996+ previous: None ,
1997+ new: ChunkIdentifier :: new( 0 ) ,
1998+ next: None ,
1999+ } ,
2000+ // … and its 4 items
2001+ Update :: PushItems {
2002+ at: Position :: new( ChunkIdentifier :: new( 0 ) , 0 ) ,
2003+ items: ( 0 ..4 )
2004+ . map( |nth| {
2005+ event_factory
2006+ . text_msg( "foo" )
2007+ . event_id( & EventId :: parse( format!( "$ev0_{nth}" ) ) . unwrap( ) )
2008+ . into_event( )
2009+ } )
2010+ . collect:: <Vec <_>>( ) ,
2011+ } ,
2012+ // chunk #1
2013+ Update :: NewItemsChunk {
2014+ previous: Some ( ChunkIdentifier :: new( 0 ) ) ,
2015+ new: ChunkIdentifier :: new( 1 ) ,
2016+ next: None ,
2017+ } ,
2018+ // … and its 3 items
2019+ Update :: PushItems {
2020+ at: Position :: new( ChunkIdentifier :: new( 1 ) , 0 ) ,
2021+ items: ( 0 ..3 )
2022+ . map( |nth| {
2023+ event_factory
2024+ . text_msg( "foo" )
2025+ . event_id( & EventId :: parse( format!( "$ev1_{nth}" ) ) . unwrap( ) )
2026+ . into_event( )
2027+ } )
2028+ . collect:: <Vec <_>>( ) ,
2029+ } ,
2030+ ] ,
2031+ )
2032+ . await
2033+ . unwrap ( ) ;
2034+ }
2035+
2036+ // Set up the event cache.
2037+ let event_cache = client. event_cache ( ) ;
2038+ event_cache. subscribe ( ) . unwrap ( ) ;
2039+ event_cache. enable_storage ( ) . unwrap ( ) ;
2040+
2041+ let room = mock_server. sync_joined_room ( & client, room_id) . await ;
2042+ let ( room_event_cache, _room_event_cache_drop_handle) = room. event_cache ( ) . await . unwrap ( ) ;
2043+
2044+ let ( initial_updates, mut updates_stream) = room_event_cache. subscribe ( ) . await ;
2045+
2046+ // One chunk has been loaded, so there are 3 events in memory.
2047+ {
2048+ assert_eq ! ( initial_updates. len( ) , 3 ) ;
2049+
2050+ assert_event_id ! ( initial_updates[ 0 ] , "$ev1_0" ) ;
2051+ assert_event_id ! ( initial_updates[ 1 ] , "$ev1_1" ) ;
2052+ assert_event_id ! ( initial_updates[ 2 ] , "$ev1_2" ) ;
2053+
2054+ assert ! ( updates_stream. is_empty( ) ) ;
2055+ }
2056+
2057+ // Now let's imagine we have a sync.
2058+ // Do you know what's funny? This sync is a bit weird. It's totally messy
2059+ // but our system is robust, so nothing will fail.
2060+ //
2061+ // The sync contains 6 events:
2062+ // - 2 of them are duplicated with events in the loaded chunk #1,
2063+ // - 2 of them are duplicated with events in the store (so not loaded yet),
2064+ // - 2 events are unique.
2065+ mock_server
2066+ . mock_sync ( )
2067+ . ok_and_run ( & client, |sync_builder| {
2068+ sync_builder. add_joined_room ( {
2069+ JoinedRoomBuilder :: new ( room_id)
2070+ // The 2 events duplicated with the ones from the loaded chunk.
2071+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev1_0" ) ) )
2072+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev1_2" ) ) )
2073+ // The 2 events duplicated with the ones from the not-loaded chunk.
2074+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev0_1" ) ) )
2075+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev0_2" ) ) )
2076+ // The 2 unique events.
2077+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev3_0" ) ) )
2078+ . add_timeline_event ( event_factory. text_msg ( "foo" ) . event_id ( event_id ! ( "$ev3_1" ) ) )
2079+ } ) ;
2080+ } )
2081+ . await ;
2082+
2083+ // What should we see?
2084+ // - On `updates_stream`: 2 events from the loaded chunk #1 must be removed, and
2085+ // 6 events must be added inserted (!); indeed, 4 are removed and re-inserted
2086+ // at the back, plus 2 events are newly inserted at the back, so 6 are
2087+ // inserted,
2088+ // - On the store, 2 events must be removed from chunk #0
2089+ //
2090+ // First off, let's check `updates_stream`.
2091+ {
2092+ let update = updates_stream. recv ( ) . await . unwrap ( ) ;
2093+
2094+ assert_matches ! ( update, RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } => {
2095+ // 3 diffs, of course.
2096+ assert_eq!( diffs. len( ) , 3 ) ;
2097+
2098+ // Note that index 2 is removed before index 0!
2099+ assert_matches!( & diffs[ 0 ] , VectorDiff :: Remove { index } => {
2100+ assert_eq!( * index, 2 ) ;
2101+ } ) ;
2102+ assert_matches!( & diffs[ 1 ] , VectorDiff :: Remove { index } => {
2103+ assert_eq!( * index, 0 ) ;
2104+ } ) ;
2105+ assert_matches!( & diffs[ 2 ] , VectorDiff :: Append { values: events } => {
2106+ assert_eq!( events. len( ) , 6 ) ;
2107+
2108+ assert_event_id!( & events[ 0 ] , "$ev1_0" ) ;
2109+ assert_event_id!( & events[ 1 ] , "$ev1_2" ) ;
2110+ assert_event_id!( & events[ 2 ] , "$ev0_1" ) ;
2111+ assert_event_id!( & events[ 3 ] , "$ev0_2" ) ;
2112+ assert_event_id!( & events[ 4 ] , "$ev3_0" ) ;
2113+ assert_event_id!( & events[ 5 ] , "$ev3_1" ) ;
2114+ } ) ;
2115+ } ) ;
2116+ }
2117+
2118+ // Hands in the air, don't touch your keyboard, let's see the state of the
2119+ // store by paginating backwards. `$ev0_1` and `$ev0_2` **MUST be absent**.
2120+ {
2121+ let outcome = room_event_cache. pagination ( ) . run_backwards_until ( 1 ) . await . unwrap ( ) ;
2122+
2123+ // Alrighty, we should get 2 events since 2 of 4 should have been removed.
2124+ assert_eq ! ( outcome. events. len( ) , 2 ) ;
2125+
2126+ // Hello, in reverse order because it's a backward pagination.
2127+ assert_event_id ! ( outcome. events[ 0 ] , "$ev0_3" ) ;
2128+ assert_event_id ! ( outcome. events[ 1 ] , "$ev0_0" ) ;
2129+
2130+ // Let's check what the stream has to say.
2131+ let update = updates_stream. recv ( ) . await . unwrap ( ) ;
2132+
2133+ assert_matches ! ( update, RoomEventCacheUpdate :: UpdateTimelineEvents { diffs, .. } => {
2134+ // 2 diffs, but who's counting?
2135+ assert_eq!( diffs. len( ) , 2 ) ;
2136+
2137+ assert_matches!( & diffs[ 0 ] , VectorDiff :: Insert { index: 0 , value: event } => {
2138+ assert_event_id!( event, "$ev0_0" ) ;
2139+ } ) ;
2140+ assert_matches!( & diffs[ 1 ] , VectorDiff :: Insert { index: 1 , value: event } => {
2141+ assert_event_id!( event, "$ev0_3" ) ;
2142+ } ) ;
2143+ } ) ;
2144+ }
2145+ }
0 commit comments