Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions collab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ futures-lite.workspace = true
rand = { version = "0.8", optional = true }
smallvec = { version = "1.10", features = ["write", "union", "const_generics", "const_new"], optional = true }
nanoid = "0.4.0"
markdown = "1.0.0-alpha.21"
dashmap = "5"
markdown = "1.0.0"
dashmap = "7.0.0-rc2"
strum = "0.25"
strum_macros = "0.25"
rayon = "1.10.0"
csv = "1.3.0"
csv = "1.4.0"
tokio-util = "0.7"
rusty-money = { version = "0.4.1", features = ["iso"] }
fancy-regex = "0.13.0"
Expand Down
2 changes: 1 addition & 1 deletion collab/src/core/collab_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl State {
SyncState::try_from(self.sync_state.swap(new_state as u32, Ordering::AcqRel)).unwrap();

if old_state != new_state {
tracing::debug!(
tracing::trace!(
"{} sync state {:?} => {:?}",
self.object_id,
old_state,
Expand Down
59 changes: 48 additions & 11 deletions collab/src/database/blocks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;

use crate::database::database_trait::{DatabaseRowCollabService, DatabaseRowDataVariant};
use dashmap::DashMap;
use tracing::{instrument, trace};
use yrs::block::ClientID;

Expand All @@ -32,6 +33,7 @@ pub struct Block {
collab_service: Arc<dyn DatabaseRowCollabService>,
pub notifier: Arc<Sender<BlockEvent>>,
row_change_tx: Option<RowChangeSender>,
inflight_row_init: Arc<DashMap<RowId, Arc<tokio::sync::Mutex<()>>>>,
}

impl Block {
Expand All @@ -46,6 +48,7 @@ impl Block {
collab_service,
notifier: Arc::new(notifier),
row_change_tx,
inflight_row_init: Arc::new(DashMap::new()),
}
}

Expand Down Expand Up @@ -126,6 +129,15 @@ impl Block {
Ok(row_order)
}

#[instrument(level = "debug", skip_all)]
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
let cache = self.collab_service.database_row_cache()?;
cache.get(row_id).map(|row| row.clone())
}

/// Return the [DatabaseRow], initializing it on demand if needed.
/// Use [Self::get_cached_database_row] for cache-only access.
#[instrument(level = "debug", skip_all)]
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
self.get_or_init_database_row(row_id).await.ok()
}
Expand Down Expand Up @@ -172,6 +184,7 @@ impl Block {
rows
}

#[instrument(level = "debug", skip_all)]
pub async fn update_row<F>(&mut self, row_id: RowId, f: F)
where
F: FnOnce(RowUpdate),
Expand All @@ -182,6 +195,7 @@ impl Block {
}
}

#[instrument(level = "debug", skip_all)]
pub async fn update_row_meta<F>(&mut self, row_id: &RowId, f: F)
where
F: FnOnce(RowMetaUpdate),
Expand All @@ -191,30 +205,53 @@ impl Block {
}
}

/// Get the [DatabaseRow] from the cache. If the row is not in the cache, initialize it.
/// Initialize the [DatabaseRow] in the background and optionally return it via channel.
#[instrument(level = "debug", skip_all)]
pub fn init_database_row(&self, row_id: &RowId, ret: Option<InitRowChan>) {
let block = self.clone();
let row_id = *row_id;
let row_change_tx = self.row_change_tx.clone();
let collab_service = self.collab_service.clone();
tokio::task::spawn(async move {
let row = collab_service
.build_arc_database_row(&row_id, None, row_change_tx)
.await;

let row = block.get_or_init_database_row(&row_id).await;
if let Some(ret) = ret {
let _ = ret.send(row);
}
});
}

#[instrument(level = "debug", skip_all)]
pub async fn get_or_init_database_row(
&self,
row_id: &RowId,
) -> Result<Arc<RwLock<DatabaseRow>>, CollabError> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.init_database_row(row_id, Some(tx));
rx.await
.map_err(|e| CollabError::Internal(anyhow::anyhow!(e)))?
if let Some(row) = self.get_cached_database_row(row_id) {
return Ok(row);
}

let init_lock = {
// Drop DashMap guard before awaiting the per-row mutex.
let entry = self
.inflight_row_init
.entry(*row_id)
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
entry.clone()
};

let _guard = init_lock.lock().await;
if let Some(row) = self.get_cached_database_row(row_id) {
drop(_guard);
self.inflight_row_init.remove(row_id);
return Ok(row);
}

let result = self
.collab_service
.build_arc_database_row(row_id, None, self.row_change_tx.clone())
.await;

drop(_guard);
self.inflight_row_init.remove(row_id);

result
}

pub async fn init_database_rows(
Expand Down
9 changes: 7 additions & 2 deletions collab/src/database/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Database {
Ok(database)
}

#[instrument(level = "debug", skip_all)]
pub async fn encode_database_collabs(&self) -> Result<EncodedDatabase, CollabError> {
let database_id = *self.collab.object_id();
let encoded_database_collab = EncodedCollabInfo {
Expand Down Expand Up @@ -619,8 +620,12 @@ impl Database {
.boxed()
}

/// Return None if the row is not initialized.
/// Use [Self::get_or_init_database_row] to initialize the row.
/// Return the cached [DatabaseRow] if it is already initialized.
pub fn get_cached_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
self.body.block.get_cached_database_row(row_id)
}

/// Return the [DatabaseRow], initializing it on demand if needed.
pub async fn get_database_row(&self, row_id: &RowId) -> Option<Arc<RwLock<DatabaseRow>>> {
self.body.block.get_database_row(row_id).await
}
Expand Down
Loading
Loading