Skip to content

Commit 2a76b6a

Browse files
committed
chore: add collab version into persistence
1 parent 558e36b commit 2a76b6a

File tree

11 files changed

+134
-55
lines changed

11 files changed

+134
-55
lines changed

collab-database/tests/user_test/helper.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,15 @@ impl DatabaseCollabPersistenceService for TestUserDatabasePersistenceImpl {
7676

7777
fn upsert_collab(
7878
&self,
79-
object_id: &collab_entity::uuid_validation::ObjectId,
79+
object_id: &ObjectId,
8080
encoded_collab: EncodedCollab,
8181
) -> Result<(), DatabaseError> {
8282
let db_write = self.db.write_txn();
8383
let _ = db_write.upsert_doc_with_doc_state(
8484
self.uid,
8585
&self.workspace_id,
8686
&object_id.to_string(),
87+
encoded_collab.collab_version.as_ref(),
8788
encoded_collab.state_vector.to_vec(),
8889
encoded_collab.doc_state.to_vec(),
8990
);

collab-plugins/src/cloud_storage/remote_collab.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::time::{Duration, SystemTime};
66

77
use anyhow::{Error, anyhow};
88
use async_trait::async_trait;
9-
use collab::core::collab::{CollabOptions, DataSource, TransactionMutExt, default_client_id};
9+
use collab::core::collab::{
10+
CollabOptions, DataSource, TransactionMutExt, VersionedData, default_client_id,
11+
};
1012
use collab::core::collab_state::SyncState;
1113
use collab::core::origin::CollabOrigin;
1214
use collab::lock::RwLock;
@@ -210,8 +212,8 @@ impl RemoteCollab {
210212
/// If the remote collab contains any updates, it will return None.
211213
/// Otherwise, it will merge the updates into one and return the merged update.
212214
#[allow(dead_code)]
213-
pub async fn sync(&self, local_collab: Weak<RwLock<Collab>>) -> Result<Vec<u8>, Error> {
214-
let mut remote_update = vec![];
215+
pub async fn sync(&self, local_collab: Weak<RwLock<Collab>>) -> Result<VersionedData, Error> {
216+
let mut remote_update = None;
215217
// It would be better if creating a edge function that calculate the diff between the local and remote.
216218
// The local only need to send its state vector to the remote. In this way, the local does not need to
217219
// get all the updates from remote.
@@ -232,7 +234,7 @@ impl RemoteCollab {
232234
} else {
233235
tracing::error!("🔴decode update failed");
234236
}
235-
remote_update = doc_state;
237+
remote_update = Some(doc_state);
236238
},
237239
DataSource::DocStateV2(doc_state) => {
238240
if let Ok(update) = Update::decode_v2(&doc_state) {
@@ -242,7 +244,7 @@ impl RemoteCollab {
242244
} else {
243245
tracing::error!("🔴decode update failed");
244246
}
245-
remote_update = doc_state;
247+
remote_update = Some(doc_state);
246248
},
247249
}
248250

@@ -311,7 +313,7 @@ impl RemoteCollab {
311313
object_id_string: self.object.object_id.to_string(),
312314
});
313315
}
314-
Ok(remote_update)
316+
Ok(remote_update.unwrap_or(VersionedData::new(vec![], None)))
315317
}
316318

317319
pub fn push_update(&self, update: &[u8]) -> Result<(), Error> {

collab-plugins/src/local_storage/indexeddb/indexeddb_plugin.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use collab_entity::CollabType;
88
use futures::stream::StreamExt;
99

1010
use crate::local_storage::kv::PersistenceError;
11-
use collab::core::collab::make_yrs_doc;
11+
use collab::core::collab::{CollabContext, make_yrs_doc};
1212
use collab::core::transaction::DocTransactionExtension;
1313
use std::sync::atomic::AtomicBool;
1414
use std::sync::atomic::Ordering::SeqCst;
@@ -64,15 +64,17 @@ impl IndexeddbDiskPlugin {
6464
}
6565

6666
impl CollabPlugin for IndexeddbDiskPlugin {
67-
fn init(&self, object_id: &str, _origin: &CollabOrigin, doc: &Doc) {
67+
fn init(&self, object_id: &str, _origin: &CollabOrigin, ctx: &mut CollabContext) {
6868
if let Some(db) = self.collab_db.upgrade() {
6969
let object_id = object_id.to_string();
70-
let doc = doc.clone();
70+
let doc = ctx.doc().clone();
7171
let uid = self.uid;
7272

7373
tokio::task::spawn_local(async move {
7474
match db.load_doc(uid, &object_id, doc.clone()).await {
75-
Ok(_) => {},
75+
Ok(version) => {
76+
ctx.version = version;
77+
},
7678
Err(err) => {
7779
if err.is_record_not_found() {
7880
let encoded_collab = doc.get_encoded_collab_v1();

collab-plugins/src/local_storage/kv/db.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::local_storage::kv::PersistenceError;
77
use crate::local_storage::kv::keys::*;
88
use crate::local_storage::kv::oid::{DocIDGen, OID};
99
use crate::local_storage::kv::snapshot::CollabSnapshot;
10+
use collab::core::collab::CollabVersion;
1011
use smallvec::SmallVec;
1112
use yrs::{TransactionMut, Update};
1213

@@ -112,6 +113,7 @@ pub fn insert_doc_update<'a, K, S>(
112113
db: &S,
113114
doc_id: DocID,
114115
object_id: &K,
116+
version: Option<&CollabVersion>,
115117
value: Vec<u8>,
116118
) -> Result<Vec<u8>, PersistenceError>
117119
where

collab-plugins/src/local_storage/kv/doc.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::local_storage::kv::keys::*;
22
use crate::local_storage::kv::snapshot::SnapshotAction;
33
use crate::local_storage::kv::*;
4+
use collab::core::collab::{CollabVersion, VersionedData};
45
use smallvec::{SmallVec, smallvec};
56
use std::collections::HashSet;
67
use tracing::{error, info};
@@ -19,6 +20,7 @@ where
1920
uid: i64,
2021
workspace_id: &str,
2122
object_id: &str,
23+
collab_version: Option<&CollabVersion>,
2224
txn: &T,
2325
) -> Result<(), PersistenceError> {
2426
if self.is_exist(uid, workspace_id, object_id) {
@@ -28,7 +30,7 @@ where
2830
let doc_id = get_or_create_did(uid, self, workspace_id, object_id)?;
2931
let doc_state = txn.encode_diff_v1(&StateVector::default());
3032
let sv = txn.state_vector().encode_v1();
31-
let doc_state_key = make_doc_state_key(doc_id);
33+
let doc_state_key = make_doc_state_key(doc_id, collab_version);
3234
let sv_key = make_state_vector_key(doc_id);
3335

3436
info!("new doc:{:?}, doc state len:{}", object_id, doc_state.len());
@@ -43,6 +45,7 @@ where
4345
uid: i64,
4446
workspace_id: &str,
4547
object_id: &str,
48+
collab_version: Option<&CollabVersion>,
4649
state_vector: Vec<u8>,
4750
doc_state: Vec<u8>,
4851
) -> Result<(), PersistenceError> {
@@ -51,7 +54,7 @@ where
5154
let end = make_doc_end_key(doc_id);
5255
self.remove_range(start.as_ref(), end.as_ref())?;
5356

54-
let doc_state_key = make_doc_state_key(doc_id);
57+
let doc_state_key = make_doc_state_key(doc_id, collab_version);
5558
let sv_key = make_state_vector_key(doc_id);
5659

5760
info!("new doc:{:?}, doc state len:{}", object_id, doc_state.len());
@@ -73,6 +76,7 @@ where
7376
uid: i64,
7477
workspace_id: &str,
7578
object_id: &str,
79+
collab_version: Option<&CollabVersion>,
7680
state_vector: Vec<u8>,
7781
doc_state: Vec<u8>,
7882
) -> Result<(), PersistenceError> {
@@ -83,7 +87,7 @@ where
8387
let end = make_doc_end_key(doc_id);
8488
self.remove_range(start.as_ref(), end.as_ref())?;
8589

86-
let doc_state_key = make_doc_state_key(doc_id);
90+
let doc_state_key = make_doc_state_key(doc_id, collab_version);
8791
let sv_key = make_state_vector_key(doc_id);
8892
// Insert new doc state and state vector
8993
self.insert(doc_state_key, doc_state)?;
@@ -95,6 +99,35 @@ where
9599
get_doc_id(uid, self, workspace_id, object_id).is_some()
96100
}
97101

102+
fn get_doc_state(&self, doc_id: DocID) -> Result<Option<VersionedData>, PersistenceError> {
103+
let doc_state_start = make_doc_start_key(doc_id);
104+
let doc_state_end = make_doc_end_key(doc_id);
105+
let mut cursor = self.range(doc_state_start.clone()..doc_state_end)?;
106+
if let Some(entry) = cursor.next() {
107+
let key = entry.key();
108+
let value = entry.value();
109+
if key.starts_with(doc_state_start.as_ref()) {
110+
let version = if key.len() > doc_state_start.len() {
111+
let collab_version: [u8; 16] =
112+
match key[doc_state_start.len()..doc_state_start.len() + 16].try_into() {
113+
Ok(v) => v,
114+
Err(_) => {
115+
return Err(PersistenceError::InvalidData(format!(
116+
"invalid collab version in doc state key: {:?}",
117+
key
118+
)));
119+
},
120+
};
121+
Some(CollabVersion::from_bytes(collab_version))
122+
} else {
123+
None
124+
};
125+
return Ok(Some(VersionedData::new(value, version)));
126+
}
127+
}
128+
Ok(None)
129+
}
130+
98131
/// Load the document from the database and apply the updates to the transaction.
99132
/// It will try to load the document in these two ways:
100133
/// 1. D = document state + updates
@@ -107,15 +140,14 @@ where
107140
workspace_id: &str,
108141
object_id: &str,
109142
txn: &mut TransactionMut,
110-
) -> Result<u32, PersistenceError> {
143+
) -> Result<Option<CollabVersion>, PersistenceError> {
111144
let mut update_count = 0;
145+
let mut collab_version = None;
112146

113147
if let Some(doc_id) = get_doc_id(uid, self, workspace_id, object_id) {
114-
let doc_state_key = make_doc_state_key(doc_id);
115-
if let Some(doc_state) = self.get(doc_state_key.as_ref())? {
148+
if let Some(versioned) = self.get_doc_state(doc_id)? {
116149
// Load the doc state
117-
118-
match Update::decode_v1(doc_state.as_ref()) {
150+
match Update::decode_v1(&versioned.data) {
119151
Ok(update) => {
120152
txn.try_apply_update(update)?;
121153
},
@@ -125,6 +157,8 @@ where
125157
},
126158
}
127159

160+
collab_version = versioned.version;
161+
128162
// If the enable_snapshot is true, we will try to load the snapshot.
129163
let update_start = make_doc_update_key(doc_id, 0).to_vec();
130164
let update_end = make_doc_update_key(doc_id, Clock::MAX);
@@ -155,7 +189,7 @@ where
155189
);
156190
}
157191

158-
Ok(update_count)
192+
Ok(collab_version)
159193
} else {
160194
tracing::trace!("[Client] => {:?} not exist", object_id);
161195
Err(PersistenceError::RecordNotFound(format!(
@@ -171,7 +205,7 @@ where
171205
workspace_id: &str,
172206
object_id: &str,
173207
doc: &Doc,
174-
) -> Result<u32, PersistenceError> {
208+
) -> Result<Option<CollabVersion>, PersistenceError> {
175209
let mut txn = doc.transact_mut();
176210
self.load_doc_with_txn(uid, workspace_id, object_id, &mut txn)
177211
}
@@ -182,6 +216,7 @@ where
182216
uid: i64,
183217
workspace_id: &str,
184218
object_id: &str,
219+
version: Option<&CollabVersion>,
185220
update: &[u8],
186221
) -> Result<Vec<u8>, PersistenceError> {
187222
match get_doc_id(uid, self, workspace_id, object_id) {
@@ -196,7 +231,7 @@ where
196231
object_id
197232
)))
198233
},
199-
Some(doc_id) => insert_doc_update(self, doc_id, object_id, update.to_vec()),
234+
Some(doc_id) => insert_doc_update(self, doc_id, object_id, version, update.to_vec()),
200235
}
201236
}
202237

@@ -234,6 +269,7 @@ where
234269
uid: i64,
235270
workspace_id: &str,
236271
object_id: &str,
272+
version: Option<&CollabVersion>,
237273
doc_state: &[u8],
238274
sv: &[u8],
239275
) -> Result<(), PersistenceError> {
@@ -242,7 +278,7 @@ where
242278
let end = make_doc_end_key(doc_id);
243279
self.remove_range(start.as_ref(), end.as_ref())?;
244280

245-
let doc_state_key = make_doc_state_key(doc_id);
281+
let doc_state_key = make_doc_state_key(doc_id, version);
246282
let sv_key = make_state_vector_key(doc_id);
247283

248284
// Insert new doc state and state vector
@@ -294,10 +330,9 @@ where
294330
self.remove_range(start.as_ref(), end.as_ref())?;
295331

296332
// Delete the document state and the state vector
297-
let doc_state_key = make_doc_state_key(did);
298-
let sv_key = make_state_vector_key(did);
299-
let _ = self.remove(doc_state_key.as_ref());
300-
let _ = self.remove(sv_key.as_ref());
333+
let doc_state_key = make_doc_start_key(did);
334+
let sv_key = make_doc_end_key(did);
335+
let _ = self.remove_range(doc_state_key.as_ref(), sv_key.as_ref());
301336

302337
// Delete the snapshot
303338
self.delete_all_snapshots(uid, object_id)?;

collab-plugins/src/local_storage/kv/keys.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::io::Write;
22
use std::ops::Deref;
33

4+
use collab::core::collab::CollabVersion;
45
use smallvec::{SmallVec, smallvec};
5-
66
// https://github.com/spacejam/sled
77
// sled performs prefix encoding on long keys with similar prefixes that are grouped together in a
88
// range, as well as suffix truncation to further reduce the indexing costs of long keys. Nodes
@@ -99,17 +99,24 @@ pub fn oid_from_key(key: &[u8]) -> &[u8] {
9999
&key[10..(key.len() - 1)]
100100
}
101101

102-
// [1,1, 0,0,0,0,0,0,0,0, 0]
103-
pub fn make_doc_state_key(doc_id: DocID) -> Key<DOC_STATE_KEY_LEN> {
102+
// [1,1, 0,0,0,0,0,0,0,0, 0] if version is None
103+
// [1,1, 0,0,0,0,0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] if version is Some
104+
pub fn make_doc_state_key(
105+
doc_id: DocID,
106+
version: Option<&CollabVersion>,
107+
) -> Key<DOC_STATE_KEY_LEN> {
104108
let mut v: SmallVec<[u8; DOC_STATE_KEY_LEN]> = smallvec![DOC_SPACE, DOC_SPACE_OBJECT_KEY];
105109
v.write_all(&doc_id.to_be_bytes()).unwrap();
106110
v.push(DOC_STATE);
111+
if let Some(ver) = version {
112+
v.write_all(ver.as_bytes()).unwrap();
113+
}
107114
Key(v)
108115
}
109116

110117
// document related elements are stored within bounds [0,1,..did,0]..[0,1,..did,255]
111118
pub fn make_doc_start_key(doc_id: DocID) -> Key<DOC_STATE_KEY_LEN> {
112-
make_doc_state_key(doc_id)
119+
make_doc_state_key(doc_id, None)
113120
}
114121
// [1,1, 0,0,0,0,0,0,0,0, 255]
115122
pub fn make_doc_end_key(doc_id: DocID) -> Key<DOC_STATE_KEY_LEN> {

collab-plugins/src/local_storage/rocksdb/rocksdb_plugin.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,16 @@ impl RocksdbDiskPlugin {
9494
if !rocksdb_read.is_exist(self.uid, &self.workspace_id, &self.object_id) {
9595
match self.collab_type.validate_require_data(collab) {
9696
Ok(_) => {
97+
let version = collab.version();
9798
let txn = collab.transact();
9899
if let Err(err) = collab_db.with_write_txn(|w_db_txn| {
99-
w_db_txn.create_new_doc(self.uid, &self.workspace_id, &self.object_id, &txn)?;
100+
w_db_txn.create_new_doc(
101+
self.uid,
102+
&self.workspace_id,
103+
&self.object_id,
104+
version,
105+
&txn,
106+
)?;
100107
info!(
101108
"[Rocksdb Plugin]: created new doc {}, collab_type:{}",
102109
self.object_id, self.collab_type
@@ -132,7 +139,7 @@ impl CollabPlugin for RocksdbDiskPlugin {
132139
object_id: &str,
133140
_txn: &TransactionMut,
134141
update: &[u8],
135-
_collab_version: Option<&CollabVersion>,
142+
collab_version: Option<&CollabVersion>,
136143
) {
137144
// Only push update if the doc is loaded
138145
if !self.did_init.load(SeqCst) {
@@ -142,7 +149,13 @@ impl CollabPlugin for RocksdbDiskPlugin {
142149
self.increase_count();
143150
//Acquire a write transaction to ensure consistency
144151
let result = db.with_write_txn(|w_db_txn| {
145-
let _ = w_db_txn.push_update(self.uid, self.workspace_id.as_str(), object_id, update)?;
152+
let _ = w_db_txn.push_update(
153+
self.uid,
154+
self.workspace_id.as_str(),
155+
object_id,
156+
collab_version,
157+
update,
158+
)?;
146159
use yrs::updates::decoder::Decode;
147160
tracing::trace!(
148161
"[Rocksdb Plugin]: Collab {} {} persisting update: {:#?}",

collab-plugins/tests/disk/insert_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async fn flush_test() {
7575
test.uid,
7676
&test.workspace_id,
7777
&object_id_str,
78+
encode_collab.collab_version.as_ref(),
7879
encode_collab.state_vector.to_vec(),
7980
encode_collab.doc_state.to_vec(),
8081
)

0 commit comments

Comments
 (0)