@@ -30,11 +30,14 @@ use matrix_sdk_base::{
30
30
media:: MediaRequestParameters ,
31
31
} ;
32
32
use ruma:: { events:: relation:: RelationType , EventId , MxcUri , OwnedEventId , RoomId } ;
33
+ use tracing:: trace;
33
34
use web_sys:: IdbTransactionMode ;
34
35
35
36
use crate :: event_cache_store:: {
37
+ migrations:: current:: keys,
36
38
serializer:: IndexeddbEventCacheStoreSerializer ,
37
39
transaction:: IndexeddbEventCacheStoreTransaction ,
40
+ types:: { ChunkType , InBandEvent } ,
38
41
} ;
39
42
40
43
mod builder;
@@ -72,6 +75,20 @@ impl IndexeddbEventCacheStore {
72
75
pub fn builder ( ) -> IndexeddbEventCacheStoreBuilder {
73
76
IndexeddbEventCacheStoreBuilder :: default ( )
74
77
}
78
+
79
+ /// Initializes a new transaction on the underlying IndexedDB database and
80
+ /// returns a handle which can be used to combine database operations
81
+ /// into an atomic unit.
82
+ pub fn transaction < ' a > (
83
+ & ' a self ,
84
+ stores : & [ & str ] ,
85
+ mode : IdbTransactionMode ,
86
+ ) -> Result < IndexeddbEventCacheStoreTransaction < ' a > , IndexeddbEventCacheStoreError > {
87
+ Ok ( IndexeddbEventCacheStoreTransaction :: new (
88
+ self . inner . transaction_on_multi_with_mode ( stores, mode) ?,
89
+ & self . serializer ,
90
+ ) )
91
+ }
75
92
}
76
93
77
94
// Small hack to have the following macro invocation act as the appropriate
@@ -121,10 +138,122 @@ impl_event_cache_store! {
121
138
linked_chunk_id: LinkedChunkId <' _>,
122
139
updates: Vec <Update <Event , Gap >>,
123
140
) -> Result <( ) , IndexeddbEventCacheStoreError > {
124
- self . memory_store
125
- . handle_linked_chunk_updates( linked_chunk_id, updates)
126
- . await
127
- . map_err( IndexeddbEventCacheStoreError :: MemoryStore )
141
+ let linked_chunk_id = linked_chunk_id. to_owned( ) ;
142
+ let room_id = linked_chunk_id. room_id( ) ;
143
+
144
+ let transaction = self . transaction(
145
+ & [ keys:: LINKED_CHUNKS , keys:: GAPS , keys:: EVENTS ] ,
146
+ IdbTransactionMode :: Readwrite ,
147
+ ) ?;
148
+
149
+ for update in updates {
150
+ match update {
151
+ Update :: NewItemsChunk { previous, new, next } => {
152
+ trace!( %room_id, "Inserting new chunk (prev={previous:?}, new={new:?}, next={next:?})" ) ;
153
+ transaction
154
+ . add_chunk(
155
+ room_id,
156
+ & types:: Chunk {
157
+ identifier: new. index( ) ,
158
+ previous: previous. map( |i| i. index( ) ) ,
159
+ next: next. map( |i| i. index( ) ) ,
160
+ chunk_type: ChunkType :: Event ,
161
+ } ,
162
+ )
163
+ . await ?;
164
+ }
165
+ Update :: NewGapChunk { previous, new, next, gap } => {
166
+ trace!( %room_id, "Inserting new gap (prev={previous:?}, new={new:?}, next={next:?})" ) ;
167
+ transaction
168
+ . add_item(
169
+ room_id,
170
+ & types:: Gap {
171
+ chunk_identifier: new. index( ) ,
172
+ prev_token: gap. prev_token,
173
+ } ,
174
+ )
175
+ . await ?;
176
+ transaction
177
+ . add_chunk(
178
+ room_id,
179
+ & types:: Chunk {
180
+ identifier: new. index( ) ,
181
+ previous: previous. map( |i| i. index( ) ) ,
182
+ next: next. map( |i| i. index( ) ) ,
183
+ chunk_type: ChunkType :: Gap ,
184
+ } ,
185
+ )
186
+ . await ?;
187
+ }
188
+ Update :: RemoveChunk ( chunk_id) => {
189
+ trace!( "Removing chunk {chunk_id:?}" ) ;
190
+ transaction. delete_chunk_by_id( room_id, & chunk_id) . await ?;
191
+ }
192
+ Update :: PushItems { at, items } => {
193
+ let chunk_identifier = at. chunk_identifier( ) . index( ) ;
194
+
195
+ trace!( %room_id, "pushing {} items @ {chunk_identifier}" , items. len( ) ) ;
196
+
197
+ for ( i, item) in items. into_iter( ) . enumerate( ) {
198
+ transaction
199
+ . add_item(
200
+ room_id,
201
+ & types:: Event :: InBand ( InBandEvent {
202
+ content: item,
203
+ position: types:: Position {
204
+ chunk_identifier,
205
+ index: at. index( ) + i,
206
+ } ,
207
+ } ) ,
208
+ )
209
+ . await ?;
210
+ }
211
+ }
212
+ Update :: ReplaceItem { at, item } => {
213
+ let chunk_id = at. chunk_identifier( ) . index( ) ;
214
+ let index = at. index( ) ;
215
+
216
+ trace!( %room_id, "replacing item @ {chunk_id}:{index}" ) ;
217
+
218
+ transaction
219
+ . put_event(
220
+ room_id,
221
+ & types:: Event :: InBand ( InBandEvent {
222
+ content: item,
223
+ position: at. into( ) ,
224
+ } ) ,
225
+ )
226
+ . await ?;
227
+ }
228
+ Update :: RemoveItem { at } => {
229
+ let chunk_id = at. chunk_identifier( ) . index( ) ;
230
+ let index = at. index( ) ;
231
+
232
+ trace!( %room_id, "removing item @ {chunk_id}:{index}" ) ;
233
+
234
+ transaction. delete_event_by_position( room_id, & at. into( ) ) . await ?;
235
+ }
236
+ Update :: DetachLastItems { at } => {
237
+ let chunk_id = at. chunk_identifier( ) . index( ) ;
238
+ let index = at. index( ) ;
239
+
240
+ trace!( %room_id, "detaching last items @ {chunk_id}:{index}" ) ;
241
+
242
+ transaction. delete_events_by_chunk_from_index( room_id, & at. into( ) ) . await ?;
243
+ }
244
+ Update :: StartReattachItems | Update :: EndReattachItems => {
245
+ // Nothing? See sqlite implementation
246
+ }
247
+ Update :: Clear => {
248
+ trace!( %room_id, "clearing room" ) ;
249
+ transaction. delete_chunks_in_room( room_id) . await ?;
250
+ transaction. delete_events_in_room( room_id) . await ?;
251
+ transaction. delete_gaps_in_room( room_id) . await ?;
252
+ }
253
+ }
254
+ }
255
+ transaction. commit( ) . await ?;
256
+ Ok ( ( ) )
128
257
}
129
258
130
259
async fn load_all_chunks(
0 commit comments