Skip to content

Commit ee14e75

Browse files
committed
chore: use cache before build arc row
1 parent 981a955 commit ee14e75

File tree

10 files changed

+414
-45
lines changed

10 files changed

+414
-45
lines changed

Cargo.lock

Lines changed: 34 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

collab/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ futures-lite.workspace = true
3232
rand = { version = "0.8", optional = true }
3333
smallvec = { version = "1.10", features = ["write", "union", "const_generics", "const_new"], optional = true }
3434
nanoid = "0.4.0"
35-
markdown = "1.0.0-alpha.21"
36-
dashmap = "5"
35+
markdown = "1.0.0"
36+
dashmap = "7.0.0-rc2"
3737
strum = "0.25"
3838
strum_macros = "0.25"
3939
rayon = "1.10.0"
40-
csv = "1.3.0"
40+
csv = "1.4.0"
4141
tokio-util = "0.7"
4242
rusty-money = { version = "0.4.1", features = ["iso"] }
4343
fancy-regex = "0.13.0"

collab/src/core/collab_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl State {
150150
SyncState::try_from(self.sync_state.swap(new_state as u32, Ordering::AcqRel)).unwrap();
151151

152152
if old_state != new_state {
153-
tracing::debug!(
153+
tracing::trace!(
154154
"{} sync state {:?} => {:?}",
155155
self.object_id,
156156
old_state,

collab/src/database/blocks/block.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::sync::broadcast;
1212
use tokio::sync::broadcast::Sender;
1313

1414
use crate::database::database_trait::{DatabaseRowCollabService, DatabaseRowDataVariant};
15+
use dashmap::DashMap;
1516
use tracing::{instrument, trace};
1617
use yrs::block::ClientID;
1718

@@ -32,6 +33,7 @@ pub struct Block {
3233
collab_service: Arc<dyn DatabaseRowCollabService>,
3334
pub notifier: Arc<Sender<BlockEvent>>,
3435
row_change_tx: Option<RowChangeSender>,
36+
inflight_row_init: Arc<DashMap<RowId, Arc<tokio::sync::Mutex<()>>>>,
3537
}
3638

3739
impl Block {
@@ -46,6 +48,7 @@ impl Block {
4648
collab_service,
4749
notifier: Arc::new(notifier),
4850
row_change_tx,
51+
inflight_row_init: Arc::new(DashMap::new()),
4952
}
5053
}
5154

@@ -126,6 +129,15 @@ impl Block {
126129
Ok(row_order)
127130
}
128131

132+
#[instrument(level = "debug", skip_all)]
133+
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
134+
let cache = self.collab_service.database_row_cache()?;
135+
cache.get(row_id).map(|row| row.clone())
136+
}
137+
138+
/// Return the [DatabaseRow], initializing it on demand if needed.
139+
/// Use [Self::get_cached_database_row] for cache-only access.
140+
#[instrument(level = "debug", skip_all)]
129141
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
130142
self.get_or_init_database_row(row_id).await.ok()
131143
}
@@ -172,6 +184,7 @@ impl Block {
172184
rows
173185
}
174186

187+
#[instrument(level = "debug", skip_all)]
175188
pub async fn update_row<F>(&mut self, row_id: RowId, f: F)
176189
where
177190
F: FnOnce(RowUpdate),
@@ -182,6 +195,7 @@ impl Block {
182195
}
183196
}
184197

198+
#[instrument(level = "debug", skip_all)]
185199
pub async fn update_row_meta<F>(&mut self, row_id: &RowId, f: F)
186200
where
187201
F: FnOnce(RowMetaUpdate),
@@ -191,30 +205,53 @@ impl Block {
191205
}
192206
}
193207

194-
/// Get the [DatabaseRow] from the cache. If the row is not in the cache, initialize it.
208+
/// Initialize the [DatabaseRow] in the background and optionally return it via channel.
209+
#[instrument(level = "debug", skip_all)]
195210
pub fn init_database_row(&self, row_id: &RowId, ret: Option<InitRowChan>) {
211+
let block = self.clone();
196212
let row_id = *row_id;
197-
let row_change_tx = self.row_change_tx.clone();
198-
let collab_service = self.collab_service.clone();
199213
tokio::task::spawn(async move {
200-
let row = collab_service
201-
.build_arc_database_row(&row_id, None, row_change_tx)
202-
.await;
203-
214+
let row = block.get_or_init_database_row(&row_id).await;
204215
if let Some(ret) = ret {
205216
let _ = ret.send(row);
206217
}
207218
});
208219
}
209220

221+
#[instrument(level = "debug", skip_all)]
210222
pub async fn get_or_init_database_row(
211223
&self,
212224
row_id: &RowId,
213225
) -> Result<Arc<RwLock<DatabaseRow>>, CollabError> {
214-
let (tx, rx) = tokio::sync::oneshot::channel();
215-
self.init_database_row(row_id, Some(tx));
216-
rx.await
217-
.map_err(|e| CollabError::Internal(anyhow::anyhow!(e)))?
226+
if let Some(row) = self.get_cached_database_row(row_id) {
227+
return Ok(row);
228+
}
229+
230+
let init_lock = {
231+
// Drop DashMap guard before awaiting the per-row mutex.
232+
let entry = self
233+
.inflight_row_init
234+
.entry(*row_id)
235+
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
236+
entry.clone()
237+
};
238+
239+
let _guard = init_lock.lock().await;
240+
if let Some(row) = self.get_cached_database_row(row_id) {
241+
drop(_guard);
242+
self.inflight_row_init.remove(row_id);
243+
return Ok(row);
244+
}
245+
246+
let result = self
247+
.collab_service
248+
.build_arc_database_row(row_id, None, self.row_change_tx.clone())
249+
.await;
250+
251+
drop(_guard);
252+
self.inflight_row_init.remove(row_id);
253+
254+
result
218255
}
219256

220257
pub async fn init_database_rows(

collab/src/database/database.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ impl Database {
259259
Ok(database)
260260
}
261261

262+
#[instrument(level = "debug", skip_all)]
262263
pub async fn encode_database_collabs(&self) -> Result<EncodedDatabase, CollabError> {
263264
let database_id = *self.collab.object_id();
264265
let encoded_database_collab = EncodedCollabInfo {
@@ -619,8 +620,12 @@ impl Database {
619620
.boxed()
620621
}
621622

622-
/// Return None if the row is not initialized.
623-
/// Use [Self::get_or_init_database_row] to initialize the row.
623+
/// Return the cached [DatabaseRow] if it is already initialized.
624+
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
625+
self.body.block.get_cached_database_row(row_id)
626+
}
627+
628+
/// Return the [DatabaseRow], initializing it on demand if needed.
624629
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
625630
self.body.block.get_database_row(row_id).await
626631
}

0 commit comments

Comments
 (0)