diff --git a/collab/src/database/database.rs b/collab/src/database/database.rs index fc76043bb..df7a0f07d 100644 --- a/collab/src/database/database.rs +++ b/collab/src/database/database.rs @@ -270,33 +270,59 @@ impl Database { // Fetch row orders let row_orders = self.get_all_row_orders().await; let mut encoded_row_collabs = Vec::new(); + let mut encoded_row_document_collabs = Vec::new(); + let persistence = self.collab_service.persistence(); // Process row orders in chunks for chunk in row_orders.chunks(20) { + let persistence = persistence.clone(); // Create async tasks for each row in the chunk let tasks: Vec<_> = chunk .iter() - .map(|chunk_row| async move { - let database_row = self - .body - .block - .get_or_init_database_row(&chunk_row.id) - .await - .ok()?; - let read_guard = database_row.read().await; - let row_collab = &read_guard.collab; - let object_id = *row_collab.object_id(); - let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?; - Some(EncodedCollabInfo { - object_id, - collab_type: CollabType::DatabaseRow, - encoded_collab, - }) + .map(|chunk_row| { + let persistence = persistence.clone(); + async move { + let database_row = self + .body + .block + .get_or_init_database_row(&chunk_row.id) + .await + .ok()?; + let read_guard = database_row.read().await; + let row_collab = &read_guard.collab; + let object_id = *row_collab.object_id(); + + let encode_row_document_collab = persistence.as_ref().and_then(|persistence| { + let row_document_id = meta_id_from_row_id(&object_id, RowMetaKey::DocumentId); + Uuid::parse_str(&row_document_id) + .ok() + .and_then(|row_document_uuid| { + persistence + .get_encoded_collab(&row_document_uuid, CollabType::Document) + .map(|document_encoded_collab| EncodedCollabInfo { + object_id: row_document_uuid, + collab_type: CollabType::Document, + encoded_collab: document_encoded_collab, + }) + }) + }); + + let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?; + let encode_row_collab = EncodedCollabInfo { + object_id, + collab_type: CollabType::DatabaseRow, + encoded_collab, + }; + Some((encode_row_collab, encode_row_document_collab)) + } }) .collect(); let chunk_results = join_all(tasks).await; - for collab_info in chunk_results.into_iter().flatten() { - encoded_row_collabs.push(collab_info); + for (encode_row_collab, encode_row_document_collab) in chunk_results.into_iter().flatten() { + encoded_row_collabs.push(encode_row_collab); + if let Some(encode_row_document_collab) = encode_row_document_collab { + encoded_row_document_collabs.push(encode_row_document_collab); + } } // Yield to the runtime after processing each chunk @@ -306,6 +332,7 @@ impl Database { Ok(EncodedDatabase { encoded_database_collab, encoded_row_collabs, + encoded_row_document_collabs, }) } diff --git a/collab/src/database/entity.rs b/collab/src/database/entity.rs index d31e34816..ba0f9378d 100644 --- a/collab/src/database/entity.rs +++ b/collab/src/database/entity.rs @@ -37,12 +37,14 @@ use yrs::{Any, Out}; pub struct EncodedDatabase { pub encoded_database_collab: EncodedCollabInfo, pub encoded_row_collabs: Vec, + pub encoded_row_document_collabs: Vec, } impl EncodedDatabase { pub fn into_collabs(self) -> Vec { let mut collabs = vec![self.encoded_database_collab]; collabs.extend(self.encoded_row_collabs); + collabs.extend(self.encoded_row_document_collabs); collabs } } diff --git a/collab/tests/database/database_test/encode_collab_test.rs b/collab/tests/database/database_test/encode_collab_test.rs index 861c4334d..c7372be47 100644 --- a/collab/tests/database/database_test/encode_collab_test.rs +++ b/collab/tests/database/database_test/encode_collab_test.rs @@ -2,17 +2,123 @@ use crate::database_test::helper::create_database_with_default_data; use assert_json_diff::assert_json_eq; use collab::core::collab::CollabOptions; use collab::core::origin::CollabOrigin; +use collab::document::blocks::{Block, DocumentData, DocumentMeta}; +use collab::document::document::Document; +use collab::entity::{CollabType, EncodedCollab}; +use collab::plugins::CollabKVDB; +use collab::plugins::local_storage::rocksdb::rocksdb_plugin::RocksdbDiskPlugin; use collab::preclude::Collab; +use serde_json::json; +use std::collections::HashMap; +use std::sync::Arc; +use yrs::block::ClientID; + +fn persist_row_document( + uid: i64, + workspace_id: &str, + document_id: &str, + client_id: ClientID, + collab_db: Arc, +) -> EncodedCollab { + fn default_document_data() -> DocumentData { + let page_id = uuid::Uuid::new_v4().to_string(); + let text_block_id = uuid::Uuid::new_v4().to_string(); + let page_children_id = uuid::Uuid::new_v4().to_string(); + let text_children_id = uuid::Uuid::new_v4().to_string(); + let text_external_id = uuid::Uuid::new_v4().to_string(); + + let mut data = HashMap::new(); + data.insert("delta".to_string(), json!([])); + + let mut blocks = HashMap::new(); + blocks.insert( + page_id.clone(), + Block { + id: page_id.clone(), + ty: "page".to_string(), + parent: "".to_string(), + children: page_children_id.clone(), + external_id: None, + external_type: None, + data: data.clone(), + }, + ); + blocks.insert( + text_block_id.clone(), + Block { + id: text_block_id.clone(), + ty: "text".to_string(), + parent: page_id.clone(), + children: text_children_id.clone(), + external_id: Some(text_external_id.clone()), + external_type: Some("text".to_string()), + data, + }, + ); + + let mut children_map = HashMap::new(); + children_map.insert(page_children_id, vec![text_block_id.clone()]); + children_map.insert(text_children_id, vec![]); + + let mut text_map = HashMap::new(); + text_map.insert(text_external_id, "[]".to_string()); + + DocumentData { + page_id, + blocks, + meta: DocumentMeta { + children_map, + text_map: Some(text_map), + }, + } + } + + let mut document = Document::create(document_id, default_document_data(), client_id).unwrap(); + document.add_plugin(Box::new(RocksdbDiskPlugin::new( + uid, + workspace_id.to_string(), + document_id.to_string(), + CollabType::Document, + Arc::downgrade(&collab_db), + ))); + document.initialize(); + document.encode_collab().unwrap() +} #[tokio::test] async fn encode_database_collab_test() { let database_id = uuid::Uuid::new_v4().to_string(); - let database_test = create_database_with_default_data(1, &database_id).await; + let mut database_test = create_database_with_default_data(1, &database_id).await; + + // Prepare a persisted row document for the first row so we can validate document encoding. + let row_id = database_test.pre_define_row_ids[0]; + let document_id = database_test + .get_row_document_id(&row_id) + .expect("row document id"); + let expected_document_collab = persist_row_document( + 1, + &database_test.workspace_id, + &document_id, + database_test.client_id, + database_test.collab_db.clone(), + ); + database_test + .update_row_meta(&row_id, |meta| { + meta.update_is_document_empty(false); + }) + .await; let database_collab = database_test.encode_database_collabs().await.unwrap(); - assert_eq!(database_collab.encoded_row_collabs.len(), 3); + let collab::database::entity::EncodedDatabase { + encoded_database_collab: _, + encoded_row_collabs, + encoded_row_document_collabs, + } = database_collab; + + assert_eq!(encoded_row_collabs.len(), 3); + assert_eq!(encoded_row_document_collabs.len(), 1); - for (index, encoded_info) in database_collab.encoded_row_collabs.into_iter().enumerate() { + for (index, encoded_info) in encoded_row_collabs.into_iter().enumerate() { let object_id = database_test.pre_define_row_ids[index]; let options = CollabOptions::new(object_id, database_test.client_id) .with_data_source(encoded_info.encoded_collab.into()); @@ -27,4 +133,17 @@ async fn encode_database_collab_test() { .to_json_value(); assert_json_eq!(json, expected_json); } + + let mut encoded_row_document_collabs = encoded_row_document_collabs; + let encoded_document = encoded_row_document_collabs.pop().unwrap(); + assert_eq!(encoded_document.collab_type, CollabType::Document); + assert_eq!(encoded_document.object_id.to_string(), document_id); + assert_eq!( + encoded_document.encoded_collab.state_vector, + expected_document_collab.state_vector + ); + assert_eq!( + encoded_document.encoded_collab.doc_state, + expected_document_collab.doc_state + ); }