@@ -12,6 +12,7 @@ use tokio::sync::broadcast;
1212use tokio:: sync:: broadcast:: Sender ;
1313
1414use crate :: database:: database_trait:: { DatabaseRowCollabService , DatabaseRowDataVariant } ;
15+ use dashmap:: DashMap ;
1516use tracing:: { instrument, trace} ;
1617use yrs:: block:: ClientID ;
1718
@@ -32,6 +33,7 @@ pub struct Block {
3233 collab_service : Arc < dyn DatabaseRowCollabService > ,
3334 pub notifier : Arc < Sender < BlockEvent > > ,
3435 row_change_tx : Option < RowChangeSender > ,
36+ inflight_row_init : Arc < DashMap < RowId , Arc < tokio:: sync:: Mutex < ( ) > > > > ,
3537}
3638
3739impl Block {
@@ -46,6 +48,7 @@ impl Block {
4648 collab_service,
4749 notifier : Arc :: new ( notifier) ,
4850 row_change_tx,
51+ inflight_row_init : Arc :: new ( DashMap :: new ( ) ) ,
4952 }
5053 }
5154
@@ -126,6 +129,15 @@ impl Block {
126129 Ok ( row_order)
127130 }
128131
132+ #[ instrument( level = "debug" , skip_all) ]
133+ pub fn get_cached_database_row ( & self , row_id : & RowId ) -> Option < Arc < RwLock < DatabaseRow > > > {
134+ let cache = self . collab_service . database_row_cache ( ) ?;
135+ cache. get ( row_id) . map ( |row| row. clone ( ) )
136+ }
137+
138+ /// Return the [DatabaseRow], initializing it on demand if needed.
139+ /// Use [Self::get_cached_database_row] for cache-only access.
140+ #[ instrument( level = "debug" , skip_all) ]
129141 pub async fn get_database_row ( & self , row_id : & RowId ) -> Option < Arc < RwLock < DatabaseRow > > > {
130142 self . get_or_init_database_row ( row_id) . await . ok ( )
131143 }
@@ -172,6 +184,7 @@ impl Block {
172184 rows
173185 }
174186
187+ #[ instrument( level = "debug" , skip_all) ]
175188 pub async fn update_row < F > ( & mut self , row_id : RowId , f : F )
176189 where
177190 F : FnOnce ( RowUpdate ) ,
@@ -182,6 +195,7 @@ impl Block {
182195 }
183196 }
184197
198+ #[ instrument( level = "debug" , skip_all) ]
185199 pub async fn update_row_meta < F > ( & mut self , row_id : & RowId , f : F )
186200 where
187201 F : FnOnce ( RowMetaUpdate ) ,
@@ -191,30 +205,53 @@ impl Block {
191205 }
192206 }
193207
194- /// Get the [DatabaseRow] from the cache. If the row is not in the cache, initialize it.
208+ /// Initialize the [DatabaseRow] in the background and optionally return it via channel.
209+ #[ instrument( level = "debug" , skip_all) ]
195210 pub fn init_database_row ( & self , row_id : & RowId , ret : Option < InitRowChan > ) {
211+ let block = self . clone ( ) ;
196212 let row_id = * row_id;
197- let row_change_tx = self . row_change_tx . clone ( ) ;
198- let collab_service = self . collab_service . clone ( ) ;
199213 tokio:: task:: spawn ( async move {
200- let row = collab_service
201- . build_arc_database_row ( & row_id, None , row_change_tx)
202- . await ;
203-
214+ let row = block. get_or_init_database_row ( & row_id) . await ;
204215 if let Some ( ret) = ret {
205216 let _ = ret. send ( row) ;
206217 }
207218 } ) ;
208219 }
209220
221+ #[ instrument( level = "debug" , skip_all) ]
210222 pub async fn get_or_init_database_row (
211223 & self ,
212224 row_id : & RowId ,
213225 ) -> Result < Arc < RwLock < DatabaseRow > > , CollabError > {
214- let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
215- self . init_database_row ( row_id, Some ( tx) ) ;
216- rx. await
217- . map_err ( |e| CollabError :: Internal ( anyhow:: anyhow!( e) ) ) ?
226+ if let Some ( row) = self . get_cached_database_row ( row_id) {
227+ return Ok ( row) ;
228+ }
229+
230+ let init_lock = {
231+ // Drop DashMap guard before awaiting the per-row mutex.
232+ let entry = self
233+ . inflight_row_init
234+ . entry ( * row_id)
235+ . or_insert_with ( || Arc :: new ( tokio:: sync:: Mutex :: new ( ( ) ) ) ) ;
236+ entry. clone ( )
237+ } ;
238+
239+ let _guard = init_lock. lock ( ) . await ;
240+ if let Some ( row) = self . get_cached_database_row ( row_id) {
241+ drop ( _guard) ;
242+ self . inflight_row_init . remove ( row_id) ;
243+ return Ok ( row) ;
244+ }
245+
246+ let result = self
247+ . collab_service
248+ . build_arc_database_row ( row_id, None , self . row_change_tx . clone ( ) )
249+ . await ;
250+
251+ drop ( _guard) ;
252+ self . inflight_row_init . remove ( row_id) ;
253+
254+ result
218255 }
219256
220257 pub async fn init_database_rows (
0 commit comments