Skip to content

Commit 7e5e31d

Browse files
committed
chore: use cache before build arc row (#444)
1 parent 0a141fb commit 7e5e31d

File tree

10 files changed

+412
-45
lines changed

10 files changed

+412
-45
lines changed

Cargo.lock

Lines changed: 34 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

collab/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ futures-lite.workspace = true
3232
rand = { version = "0.8", optional = true }
3333
smallvec = { version = "1.10", features = ["write", "union", "const_generics", "const_new"], optional = true }
3434
nanoid = "0.4.0"
35-
markdown = "1.0.0-alpha.21"
36-
dashmap = "5"
35+
markdown = "1.0.0"
36+
dashmap = "7.0.0-rc2"
3737
strum = "0.25"
3838
strum_macros = "0.25"
3939
rayon = "1.10.0"
40-
csv = "1.3.0"
40+
csv = "1.4.0"
4141
tokio-util = "0.7"
4242
rusty-money = { version = "0.4.1", features = ["iso"] }
4343
fancy-regex = "0.13.0"

collab/src/core/collab_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl State {
150150
SyncState::try_from(self.sync_state.swap(new_state as u32, Ordering::AcqRel)).unwrap();
151151

152152
if old_state != new_state {
153-
tracing::debug!(
153+
tracing::trace!(
154154
"{} sync state {:?} => {:?}",
155155
self.object_id,
156156
old_state,

collab/src/database/blocks/block.rs

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::sync::broadcast;
1212
use tokio::sync::broadcast::Sender;
1313

1414
use crate::database::database_trait::{DatabaseRowCollabService, DatabaseRowDataVariant};
15+
use dashmap::DashMap;
1516
use tracing::{instrument, trace};
1617
use yrs::block::ClientID;
1718

@@ -32,6 +33,7 @@ pub struct Block {
3233
collab_service: Arc<dyn DatabaseRowCollabService>,
3334
pub notifier: Arc<Sender<BlockEvent>>,
3435
row_change_tx: Option<RowChangeSender>,
36+
inflight_row_init: Arc<DashMap<RowId, Arc<tokio::sync::Mutex<()>>>>,
3537
}
3638

3739
impl Block {
@@ -46,6 +48,7 @@ impl Block {
4648
collab_service,
4749
notifier: Arc::new(notifier),
4850
row_change_tx,
51+
inflight_row_init: Arc::new(DashMap::new()),
4952
}
5053
}
5154

@@ -127,6 +130,15 @@ impl Block {
127130
Ok(row_order)
128131
}
129132

133+
#[instrument(level = "debug", skip_all)]
134+
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
135+
let cache = self.collab_service.database_row_cache()?;
136+
cache.get(row_id).map(|row| row.clone())
137+
}
138+
139+
/// Return the [DatabaseRow], initializing it on demand if needed.
140+
/// Use [Self::get_cached_database_row] for cache-only access.
141+
#[instrument(level = "debug", skip_all)]
130142
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
131143
self.get_or_init_database_row(row_id).await.ok()
132144
}
@@ -173,6 +185,7 @@ impl Block {
173185
rows
174186
}
175187

188+
#[instrument(level = "debug", skip_all)]
176189
pub async fn update_row<F>(&mut self, row_id: RowId, f: F)
177190
where
178191
F: FnOnce(RowUpdate),
@@ -183,6 +196,7 @@ impl Block {
183196
}
184197
}
185198

199+
#[instrument(level = "debug", skip_all)]
186200
pub async fn update_row_meta<F>(&mut self, row_id: &RowId, f: F)
187201
where
188202
F: FnOnce(RowMetaUpdate),
@@ -192,31 +206,53 @@ impl Block {
192206
}
193207
}
194208

195-
/// Get the [DatabaseRow] from the cache. If the row is not in the cache, initialize it.
209+
/// Initialize the [DatabaseRow] in the background and optionally return it via channel.
196210
#[instrument(level = "debug", skip_all)]
197211
pub fn init_database_row(&self, row_id: &RowId, ret: Option<InitRowChan>) {
212+
let block = self.clone();
198213
let row_id = *row_id;
199-
let row_change_tx = self.row_change_tx.clone();
200-
let collab_service = self.collab_service.clone();
201214
tokio::task::spawn(async move {
202-
let row = collab_service
203-
.build_arc_database_row(&row_id, None, row_change_tx)
204-
.await;
205-
215+
let row = block.get_or_init_database_row(&row_id).await;
206216
if let Some(ret) = ret {
207217
let _ = ret.send(row);
208218
}
209219
});
210220
}
211221

222+
#[instrument(level = "debug", skip_all)]
212223
pub async fn get_or_init_database_row(
213224
&self,
214225
row_id: &RowId,
215226
) -> Result<Arc<RwLock<DatabaseRow>>, CollabError> {
216-
let (tx, rx) = tokio::sync::oneshot::channel();
217-
self.init_database_row(row_id, Some(tx));
218-
rx.await
219-
.map_err(|e| CollabError::Internal(anyhow::anyhow!(e)))?
227+
if let Some(row) = self.get_cached_database_row(row_id) {
228+
return Ok(row);
229+
}
230+
231+
let init_lock = {
232+
// Drop DashMap guard before awaiting the per-row mutex.
233+
let entry = self
234+
.inflight_row_init
235+
.entry(*row_id)
236+
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
237+
entry.clone()
238+
};
239+
240+
let _guard = init_lock.lock().await;
241+
if let Some(row) = self.get_cached_database_row(row_id) {
242+
drop(_guard);
243+
self.inflight_row_init.remove(row_id);
244+
return Ok(row);
245+
}
246+
247+
let result = self
248+
.collab_service
249+
.build_arc_database_row(row_id, None, self.row_change_tx.clone())
250+
.await;
251+
252+
drop(_guard);
253+
self.inflight_row_init.remove(row_id);
254+
255+
result
220256
}
221257

222258
#[instrument(level = "debug", skip_all)]

collab/src/database/database.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ impl Database {
259259
Ok(database)
260260
}
261261

262+
#[instrument(level = "debug", skip_all)]
262263
pub async fn encode_database_collabs(&self) -> Result<EncodedDatabase, CollabError> {
263264
let database_id = *self.collab.object_id();
264265
let encoded_database_collab = EncodedCollabInfo {
@@ -620,8 +621,12 @@ impl Database {
620621
.boxed()
621622
}
622623

623-
/// Return None if the row is not initialized.
624-
/// Use [Self::get_or_init_database_row] to initialize the row.
624+
/// Return the cached [DatabaseRow] if it is already initialized.
625+
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
626+
self.body.block.get_cached_database_row(row_id)
627+
}
628+
629+
/// Return the [DatabaseRow], initializing it on demand if needed.
625630
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
626631
self.body.block.get_database_row(row_id).await
627632
}

0 commit comments

Comments
 (0)