Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions collab/src/database/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,33 +270,59 @@ impl Database {
// Fetch row orders
let row_orders = self.get_all_row_orders().await;
let mut encoded_row_collabs = Vec::new();
let mut encoded_row_document_collabs = Vec::new();
let persistence = self.collab_service.persistence();
// Process row orders in chunks
for chunk in row_orders.chunks(20) {
let persistence = persistence.clone();
// Create async tasks for each row in the chunk
let tasks: Vec<_> = chunk
.iter()
.map(|chunk_row| async move {
let database_row = self
.body
.block
.get_or_init_database_row(&chunk_row.id)
.await
.ok()?;
let read_guard = database_row.read().await;
let row_collab = &read_guard.collab;
let object_id = *row_collab.object_id();
let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?;
Some(EncodedCollabInfo {
object_id,
collab_type: CollabType::DatabaseRow,
encoded_collab,
})
.map(|chunk_row| {
let persistence = persistence.clone();
async move {
let database_row = self
.body
.block
.get_or_init_database_row(&chunk_row.id)
.await
.ok()?;
let read_guard = database_row.read().await;
let row_collab = &read_guard.collab;
let object_id = *row_collab.object_id();

let encode_row_document_collab = persistence.as_ref().and_then(|persistence| {
let row_document_id = meta_id_from_row_id(&object_id, RowMetaKey::DocumentId);
Uuid::parse_str(&row_document_id)
.ok()
.and_then(|row_document_uuid| {
persistence
.get_encoded_collab(&row_document_uuid, CollabType::Document)
.map(|document_encoded_collab| EncodedCollabInfo {
object_id: row_document_uuid,
collab_type: CollabType::Document,
encoded_collab: document_encoded_collab,
})
})
});

let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?;
let encode_row_collab = EncodedCollabInfo {
object_id,
collab_type: CollabType::DatabaseRow,
encoded_collab,
};
Some((encode_row_collab, encode_row_document_collab))
}
})
.collect();

let chunk_results = join_all(tasks).await;
for collab_info in chunk_results.into_iter().flatten() {
encoded_row_collabs.push(collab_info);
for (encode_row_collab, encode_row_document_collab) in chunk_results.into_iter().flatten() {
encoded_row_collabs.push(encode_row_collab);
if let Some(encode_row_document_collab) = encode_row_document_collab {
encoded_row_document_collabs.push(encode_row_document_collab);
}
}

// Yield to the runtime after processing each chunk
Expand All @@ -306,6 +332,7 @@ impl Database {
Ok(EncodedDatabase {
encoded_database_collab,
encoded_row_collabs,
encoded_row_document_collabs,
})
}

Expand Down
2 changes: 2 additions & 0 deletions collab/src/database/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ use yrs::{Any, Out};
pub struct EncodedDatabase {
pub encoded_database_collab: EncodedCollabInfo,
pub encoded_row_collabs: Vec<EncodedCollabInfo>,
pub encoded_row_document_collabs: Vec<EncodedCollabInfo>,
}

impl EncodedDatabase {
pub fn into_collabs(self) -> Vec<EncodedCollabInfo> {
let mut collabs = vec![self.encoded_database_collab];
collabs.extend(self.encoded_row_collabs);
collabs.extend(self.encoded_row_document_collabs);
collabs
}
}
Expand Down
125 changes: 122 additions & 3 deletions collab/tests/database/database_test/encode_collab_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,123 @@ use crate::database_test::helper::create_database_with_default_data;
use assert_json_diff::assert_json_eq;
use collab::core::collab::CollabOptions;
use collab::core::origin::CollabOrigin;
use collab::document::blocks::{Block, DocumentData, DocumentMeta};
use collab::document::document::Document;
use collab::entity::{CollabType, EncodedCollab};
use collab::plugins::CollabKVDB;
use collab::plugins::local_storage::rocksdb::rocksdb_plugin::RocksdbDiskPlugin;
use collab::preclude::Collab;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use yrs::block::ClientID;

fn persist_row_document(
uid: i64,
workspace_id: &str,
document_id: &str,
client_id: ClientID,
collab_db: Arc<CollabKVDB>,
) -> EncodedCollab {
fn default_document_data() -> DocumentData {
let page_id = uuid::Uuid::new_v4().to_string();
let text_block_id = uuid::Uuid::new_v4().to_string();
let page_children_id = uuid::Uuid::new_v4().to_string();
let text_children_id = uuid::Uuid::new_v4().to_string();
let text_external_id = uuid::Uuid::new_v4().to_string();

let mut data = HashMap::new();
data.insert("delta".to_string(), json!([]));

let mut blocks = HashMap::new();
blocks.insert(
page_id.clone(),
Block {
id: page_id.clone(),
ty: "page".to_string(),
parent: "".to_string(),
children: page_children_id.clone(),
external_id: None,
external_type: None,
data: data.clone(),
},
);
blocks.insert(
text_block_id.clone(),
Block {
id: text_block_id.clone(),
ty: "text".to_string(),
parent: page_id.clone(),
children: text_children_id.clone(),
external_id: Some(text_external_id.clone()),
external_type: Some("text".to_string()),
data,
},
);

let mut children_map = HashMap::new();
children_map.insert(page_children_id, vec![text_block_id.clone()]);
children_map.insert(text_children_id, vec![]);

let mut text_map = HashMap::new();
text_map.insert(text_external_id, "[]".to_string());

DocumentData {
page_id,
blocks,
meta: DocumentMeta {
children_map,
text_map: Some(text_map),
},
}
}

let mut document = Document::create(document_id, default_document_data(), client_id).unwrap();
document.add_plugin(Box::new(RocksdbDiskPlugin::new(
uid,
workspace_id.to_string(),
document_id.to_string(),
CollabType::Document,
Arc::downgrade(&collab_db),
)));
document.initialize();
document.encode_collab().unwrap()
}

#[tokio::test]
async fn encode_database_collab_test() {
let database_id = uuid::Uuid::new_v4().to_string();
let database_test = create_database_with_default_data(1, &database_id).await;
let mut database_test = create_database_with_default_data(1, &database_id).await;

// Prepare a persisted row document for the first row so we can validate document encoding.
let row_id = database_test.pre_define_row_ids[0];
let document_id = database_test
.get_row_document_id(&row_id)
.expect("row document id");
let expected_document_collab = persist_row_document(
1,
&database_test.workspace_id,
&document_id,
database_test.client_id,
database_test.collab_db.clone(),
);
database_test
.update_row_meta(&row_id, |meta| {
meta.update_is_document_empty(false);
})
.await;

let database_collab = database_test.encode_database_collabs().await.unwrap();
assert_eq!(database_collab.encoded_row_collabs.len(), 3);
let collab::database::entity::EncodedDatabase {
encoded_database_collab: _,
encoded_row_collabs,
encoded_row_document_collabs,
} = database_collab;

assert_eq!(encoded_row_collabs.len(), 3);
assert_eq!(encoded_row_document_collabs.len(), 1);

for (index, encoded_info) in database_collab.encoded_row_collabs.into_iter().enumerate() {
for (index, encoded_info) in encoded_row_collabs.into_iter().enumerate() {
let object_id = database_test.pre_define_row_ids[index];
let options = CollabOptions::new(object_id, database_test.client_id)
.with_data_source(encoded_info.encoded_collab.into());
Expand All @@ -27,4 +133,17 @@ async fn encode_database_collab_test() {
.to_json_value();
assert_json_eq!(json, expected_json);
}

let mut encoded_row_document_collabs = encoded_row_document_collabs;
let encoded_document = encoded_row_document_collabs.pop().unwrap();
assert_eq!(encoded_document.collab_type, CollabType::Document);
assert_eq!(encoded_document.object_id.to_string(), document_id);
assert_eq!(
encoded_document.encoded_collab.state_vector,
expected_document_collab.state_vector
);
assert_eq!(
encoded_document.encoded_collab.doc_state,
expected_document_collab.doc_state
);
}
Loading