diff --git a/Cargo.lock b/Cargo.lock index 9dcc899ec..bcc4f8038 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + [[package]] name = "arrayvec" version = "0.7.6" @@ -257,6 +263,19 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake3" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq 0.3.1", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -356,13 +375,14 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.105" +version = "1.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5208975e568d83b6b05cc0a063c8e7e9acc2b43bee6da15616a5b73e109d7437" +checksum = "65193589c6404eb80b450d618eaf9a2cafaaafd57ecce47370519ef674a7bd44" dependencies = [ + "find-msvc-tools", "jobserver", "libc", - "once_cell", + "shlex", ] [[package]] @@ -455,6 +475,7 @@ dependencies = [ "async_zip", "base64", "bincode", + "blake3", "bytes", "chrono", "chrono-tz", @@ -518,6 +539,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -714,6 +741,12 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" + [[package]] name = "fixedbitset" version = "0.4.2" @@ -2517,7 +2550,7 @@ dependencies = [ "aes", "byteorder", "bzip2", - "constant_time_eq", + "constant_time_eq 0.1.5", "crc32fast", "crossbeam-utils", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 6d21a52af..f2388f9f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ async-trait = "0.1" arc-swap = { version = "1.7" } uuid = { version = "1.17.0", features = ["v4", "v5", "v7", "serde"] } futures-lite = { version = "2.6.0", features = ["futures-io"] } +blake3 = "1.8" [patch.crates-io] # We're using a specific commit here because rust-rocksdb doesn't publish the latest version that includes the memory alignment fix. diff --git a/collab/Cargo.toml b/collab/Cargo.toml index 60db1668b..da4e52e58 100644 --- a/collab/Cargo.toml +++ b/collab/Cargo.toml @@ -54,6 +54,7 @@ sanitize-filename = "0.5.0" indexmap = { version = "2.3", features = ["serde"] } zip = "0.6.6" rocksdb = { version = "0.22.0", default-features = false, features = ["zstd"], optional = true } +blake3.workspace = true [dev-dependencies] tokio = { workspace = true, features = ["macros", "sync", "rt"] } diff --git a/collab/src/core/collab.rs b/collab/src/core/collab.rs index 98f1d5dea..efb9902a6 100644 --- a/collab/src/core/collab.rs +++ b/collab/src/core/collab.rs @@ -5,11 +5,10 @@ use std::panic; use std::panic::AssertUnwindSafe; use arc_swap::ArcSwapOption; +use blake3::Hasher; +use serde_json::json; use std::sync::Arc; use std::vec::IntoIter; - -use serde_json::json; - use tokio_stream::wrappers::WatchStream; use tracing::trace; use yrs::block::{ClientID, Prelim}; @@ -21,9 +20,7 @@ use crate::core::awareness::Awareness; use crate::core::collab_plugin::{CollabPersistence, CollabPlugin, CollabPluginType, Plugins}; use crate::core::collab_state::{InitState, SnapshotState, State, SyncState}; use crate::core::origin::{CollabClient, CollabOrigin}; -use crate::core::revisions::{Revision, RevisionId, Revisions}; use crate::core::transaction::DocTransactionExtension; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1, EncoderV2}; use yrs::{ Any, Doc, Map, MapRef, Observable, OffsetKind, Options, Out, ReadTxn, StateVector, Subscription, Transact, Transaction, TransactionMut, UndoManager, Update, @@ -36,7 +33,6 @@ use uuid::Uuid; pub const DATA_SECTION: &str = "data"; pub const META_SECTION: &str = "meta"; -pub const REVISIONS_SECTION: &str = "revisions"; type AfterTransactionSubscription = Subscription; @@ -65,7 +61,6 @@ pub struct Collab { pub data: MapRef, #[allow(dead_code)] meta: MapRef, - revisions: Revisions, /// This is an inner collab state that requires mut access in order to modify it. pub context: CollabContext, } @@ -90,21 +85,31 @@ pub struct CollabContext { /// The current transaction that is being executed. current_txn: Option>, + version: Option, } unsafe impl Send for CollabContext {} unsafe impl Sync for CollabContext {} impl CollabContext { - fn new(origin: CollabOrigin, awareness: Awareness) -> Self { + fn new(origin: CollabOrigin, awareness: Awareness, version: Option) -> Self { CollabContext { origin, awareness, + version, undo_manager: None, current_txn: None, } } + pub fn version(&self) -> Option<&CollabVersion> { + self.version.as_ref() + } + + pub fn version_mut(&mut self) -> &mut Option { + &mut self.version + } + pub fn with_txn(&mut self, f: F) -> Result where F: FnOnce(&mut TransactionMut) -> T, @@ -242,6 +247,57 @@ pub fn make_yrs_doc(object_id: &str, skip_gc: bool, client_id: ClientID) -> Doc Doc::with_options(options) } +pub type CollabVersion = Uuid; + +pub trait ConsistentHash { + fn hash(&self, h: &mut blake3::Hasher); + fn consistent_hash(&self) -> u128 { + use blake3::Hasher; + let mut h = Hasher::new(); + + self.hash(&mut h); + + let mut hash = [0; 16]; + h.finalize_xof().fill(&mut hash); + u128::from_be_bytes(hash) + } +} + +impl ConsistentHash for yrs::StateVector { + fn hash(&self, h: &mut Hasher) { + let mut clients = self.iter().map(|(k, _)| k).collect::>(); + clients.sort(); + for client in clients { + let clock = self.get(client); + h.update(&client.to_be_bytes()); + h.update(&clock.to_be_bytes()); + } + } +} + +impl ConsistentHash for yrs::DeleteSet { + fn hash(&self, h: &mut Hasher) { + let mut clients = self.iter().map(|(c, _)| c).collect::>(); + clients.sort(); + for client in clients { + if let Some(range) = self.range(client) { + h.update(&client.to_be_bytes()); + for r in range.iter() { + h.update(&r.start.to_be_bytes()); + h.update(&r.end.to_be_bytes()); + } + } + } + } +} + +impl ConsistentHash for yrs::Snapshot { + fn hash(&self, h: &mut Hasher) { + ConsistentHash::hash(&self.state_map, h); + ConsistentHash::hash(&self.delete_set, h); + } +} + pub struct CollabOptions { pub object_id: Uuid, pub data_source: Option, @@ -255,6 +311,7 @@ impl Display for CollabOptions { .field("object_id", &self.object_id) .field("client_id", &self.client_id) .field("data_source", &self.data_source) + .field("skip_gc", &self.skip_gc) .finish() } } @@ -295,17 +352,19 @@ impl Collab { let doc = make_yrs_doc(&object_id.to_string(), options.skip_gc, options.client_id); let data = doc.get_or_insert_map(DATA_SECTION); let meta = doc.get_or_insert_map(META_SECTION); - let revisions = Revisions::new(doc.get_or_insert_array(REVISIONS_SECTION)); let plugins = Plugins::new(vec![]); let state = Arc::new(State::new(&object_id.to_string())); let awareness = Awareness::new(doc); let mut this = Self { object_id, - context: CollabContext::new(origin, awareness), + context: CollabContext::new( + origin, + awareness, + options.data_source.as_ref().and_then(DataSource::version), + ), state, data, meta, - revisions, plugins, update_subscription: Default::default(), after_txn_subscription: Default::default(), @@ -337,95 +396,8 @@ impl Collab { Ok(this) } - pub fn revisions(&self) -> &Revisions { - &self.revisions - } - - pub fn prune_revisions(&mut self, predicate: F) -> Result - where - F: FnMut(&Revision) -> bool, - { - let mut txn = self.context.transact_mut(); - let removed = self.revisions.remove_where(&mut txn, predicate)?; - Ok(removed) - } - - pub fn gc(&mut self) -> Result<(), CollabError> { - let mut txn = self.context.transact_mut(); - self.revisions.gc(&mut txn)?; - Ok(()) - } - - pub fn revision(&self, revision_id: &RevisionId) -> Result { - let txn = self.context.transact(); - self.revisions.get(&txn, revision_id) - } - - /// Create a new revision for the current collab state and return its identifier. - pub fn create_revision(&mut self) -> Result { - let mut txn = self.context.transact_mut(); - self.revisions.create_revision(&mut txn, None) - } - - /// Create a new revision for the current collab state and return its identifier. - pub fn create_named_revision>( - &mut self, - name: S, - ) -> Result { - let mut txn = self.context.transact_mut(); - self.revisions.create_revision(&mut txn, Some(name.into())) - } - - /// Remove a revision by its identifier. - pub fn remove_revision(&mut self, revision_id: &RevisionId) -> Result { - let mut txn = self.context.transact_mut(); - let removed = self - .revisions - .remove_where(&mut txn, |rev| rev.id() == revision_id)?; - Ok(removed == 1) - } - - /// Remove all revisions that were created before the specified timestamp. - pub fn remove_revisions_before( - &mut self, - timestamp: chrono::DateTime, - ) -> Result { - let mut txn = self.context.transact_mut(); - self - .revisions - .remove_where(&mut txn, |rev| rev.created_at().unwrap() < timestamp) - } - - /// Restore document state up to a given revision. This method **WON'T** change the state of the - /// current collab. - /// - /// Instead, it returns [EncodedCollab] that contains the current collab state at given revision. - pub fn restore_revision( - &self, - revision_id: &RevisionId, - version: EncoderVersion, - ) -> Result { - let txn = self.context.transact(); - let revision = self.revisions.get(&txn, revision_id)?; - let snapshot = revision.snapshot()?; - match version { - EncoderVersion::V1 => { - let mut encoder = EncoderV1::new(); - txn - .encode_state_from_snapshot(&snapshot, &mut encoder) - .map_err(|e| CollabError::Internal(e.into()))?; - let data = encoder.to_vec(); - Ok(EncodedCollab::new_v1(snapshot.state_map.encode_v1(), data)) - }, - EncoderVersion::V2 => { - let mut encoder = EncoderV2::new(); - txn - .encode_state_from_snapshot(&snapshot, &mut encoder) - .map_err(|e| CollabError::Internal(e.into()))?; - let data = encoder.to_vec(); - Ok(EncodedCollab::new_v2(snapshot.state_map.encode_v2(), data)) - }, - } + pub fn version(&self) -> Option<&Uuid> { + self.version.as_ref() } /// Each collab can have only one cloud plugin @@ -452,18 +424,16 @@ impl Collab { let object_id = Uuid::parse_str(&object_id_str).unwrap_or_else(|_| Uuid::new_v4()); let data = doc.get_or_insert_map(DATA_SECTION); let meta = doc.get_or_insert_map(META_SECTION); - let revisions = Revisions::new(doc.get_or_insert_array(REVISIONS_SECTION)); let state = Arc::new(State::new(&object_id_str)); let awareness = Awareness::new(doc); Self { object_id, // if not the fact that we need origin here, it would be // not necessary either - context: CollabContext::new(origin, awareness), + context: CollabContext::new(origin, awareness, None), state, data, meta, - revisions, plugins: Plugins::default(), update_subscription: Default::default(), after_txn_subscription: Default::default(), @@ -486,12 +456,12 @@ impl Collab { /// /// This method must be called after all plugins have been added. pub fn initialize(&mut self) { - let doc = self.context.doc(); { - let origin = self.origin(); + let origin = self.origin.clone(); + let context = &mut self.context; self .plugins - .each(|plugin| plugin.init(&self.object_id.to_string(), origin, doc)); + .each(|plugin| plugin.init(&self.object_id.to_string(), &origin, context)); } self.observe_update(); { @@ -512,6 +482,7 @@ impl Collab { self.object_id.to_string(), self.plugins.clone(), self.origin().clone(), + self.version().copied(), ); let awareness_subscription = observe_awareness( @@ -685,6 +656,7 @@ fn observe_doc( oid: String, plugins: Plugins, local_origin: CollabOrigin, + collab_version: Option, ) -> (Subscription, Option) { let cloned_oid = oid.clone(); let cloned_plugins = plugins.clone(); @@ -701,7 +673,7 @@ fn observe_doc( } } - plugin.receive_update(&cloned_oid, txn, &event.update); + plugin.receive_update(&cloned_oid, txn, &event.update, collab_version.as_ref()); let remote_origin = CollabOrigin::from(txn); if remote_origin == local_origin { plugin.receive_local_update(&local_origin, &cloned_oid, &event.update); @@ -722,14 +694,37 @@ fn observe_doc( (update_sub, after_txn_sub) } +#[derive(Clone, Debug)] +pub struct VersionedData { + pub version: Option, + pub data: Vec, +} + +impl Deref for VersionedData { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl VersionedData { + pub fn new>>(data: B, version: Option) -> Self { + Self { + version, + data: data.into(), + } + } +} + /// The raw data of a collab document. It is a list of updates. Each of them can be parsed by /// [Update::decode_v1]. pub enum DataSource { /// when CollabPersistence is not provided, which means the data is not persisted to disk yet /// otherwise, it is already persisted to disk. Disk(Option>), - DocStateV1(Vec), - DocStateV2(Vec), + DocStateV1(VersionedData), + DocStateV2(VersionedData), } impl Debug for DataSource { @@ -744,9 +739,13 @@ impl Debug for DataSource { impl From for DataSource { fn from(encoded: EncodedCollab) -> Self { + let versioned = VersionedData { + version: encoded.collab_version, + data: encoded.doc_state.into(), + }; match encoded.version { - EncoderVersion::V1 => DataSource::DocStateV1(encoded.doc_state.into()), - EncoderVersion::V2 => DataSource::DocStateV2(encoded.doc_state.into()), + EncoderVersion::V1 => DataSource::DocStateV1(versioned), + EncoderVersion::V2 => DataSource::DocStateV2(versioned), } } } @@ -759,6 +758,15 @@ impl DataSource { DataSource::DocStateV2(d) => d.is_empty(), } } + + pub fn version(&self) -> Option { + match self { + DataSource::Disk(_) => None, + DataSource::DocStateV1(d) => d.version, + DataSource::DocStateV2(d) => d.version, + } + } + pub fn as_update(&self) -> Result, CollabError> { match self { DataSource::DocStateV1(doc_state) if !doc_state.is_empty() => { diff --git a/collab/src/core/collab_plugin.rs b/collab/src/core/collab_plugin.rs index 7c4e87286..4db4be279 100644 --- a/collab/src/core/collab_plugin.rs +++ b/collab/src/core/collab_plugin.rs @@ -3,13 +3,13 @@ use crate::core::awareness::{AwarenessUpdate, Event}; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use std::sync::Arc; -use tracing::trace; -use yrs::{Doc, TransactionMut}; - +use crate::core::collab::{CollabContext, CollabVersion}; use crate::core::origin::CollabOrigin; use crate::error::CollabError; use crate::preclude::Collab; +use std::sync::Arc; +use tracing::trace; +use yrs::TransactionMut; #[derive(Debug, Eq, PartialEq, Clone)] pub enum CollabPluginType { @@ -36,14 +36,21 @@ pub trait CollabPlugin: Send + Sync + 'static { /// Called when the plugin is initialized. /// The will apply the updates to the current [TransactionMut] which will restore the state of /// the document. - fn init(&self, _object_id: &str, _origin: &CollabOrigin, _doc: &Doc) {} + fn init(&self, _object_id: &str, _origin: &CollabOrigin, _doc: &mut CollabContext) {} /// Called when the plugin is initialized. fn did_init(&self, _collab: &Collab, _object_id: &str) {} /// Called when the plugin receives an update. It happens after the [TransactionMut] commit to /// the Yrs document. - fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, _update: &[u8]) {} + fn receive_update( + &self, + _object_id: &str, + _txn: &TransactionMut, + _update: &[u8], + _collab_version: Option<&CollabVersion>, + ) { + } /// Called when the plugin receives a local update. /// We use the [CollabOrigin] to know if the update comes from the local user or from a remote @@ -82,16 +89,22 @@ impl CollabPlugin for Box where T: CollabPlugin, { - fn init(&self, object_id: &str, origin: &CollabOrigin, doc: &Doc) { - (**self).init(object_id, origin, doc); + fn init(&self, object_id: &str, origin: &CollabOrigin, ctx: &mut CollabContext) { + (**self).init(object_id, origin, ctx); } fn did_init(&self, collab: &Collab, _object_id: &str) { (**self).did_init(collab, _object_id) } - fn receive_update(&self, object_id: &str, txn: &TransactionMut, update: &[u8]) { - (**self).receive_update(object_id, txn, update) + fn receive_update( + &self, + object_id: &str, + txn: &TransactionMut, + update: &[u8], + collab_version: Option<&CollabVersion>, + ) { + (**self).receive_update(object_id, txn, update, collab_version) } fn receive_local_update(&self, origin: &CollabOrigin, object_id: &str, update: &[u8]) { diff --git a/collab/src/core/mod.rs b/collab/src/core/mod.rs index 0f9c1b09c..6fb54cc86 100644 --- a/collab/src/core/mod.rs +++ b/collab/src/core/mod.rs @@ -5,7 +5,6 @@ mod collab_search; pub mod collab_state; pub mod fill; pub mod origin; -mod revisions; pub mod transaction; pub mod user_data; pub mod value; diff --git a/collab/src/core/revisions.rs b/collab/src/core/revisions.rs deleted file mode 100644 index 8a19c55ed..000000000 --- a/collab/src/core/revisions.rs +++ /dev/null @@ -1,228 +0,0 @@ -use crate::error::CollabError; -use anyhow::anyhow; -use serde::{Deserialize, Serialize}; -use uuid::{NoContext, Timestamp, Uuid}; -use yrs::encoding::serde::{from_any, to_any}; -use yrs::types::array::ArrayIter; -use yrs::updates::decoder::Decode; -use yrs::updates::encoder::Encode; -use yrs::{Array, ArrayRef, Out, ReadTxn, Snapshot, TransactionMut}; - -pub type RevisionId = Uuid; - -/// Revision is a record of a collab state at a specific point in time. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Revision { - id: RevisionId, - name: Option, - snapshot_state: Vec, -} - -impl Revision { - /// Creates a new [Revision] with the given name and snapshot state. - pub fn new(name: Option, snapshot_state: &Snapshot) -> Self { - Self { - id: RevisionId::new_v7(Timestamp::now(NoContext)), - name, - snapshot_state: snapshot_state.encode_v1(), - } - } - - /// Globally unique identifier for the revision. - pub fn id(&self) -> &RevisionId { - &self.id - } - - /// Returns the timestamp when the revision was created. - pub fn created_at(&self) -> Option> { - let timestamp = self.id.get_timestamp()?; - let (seconds, nanos) = timestamp.to_unix(); - chrono::DateTime::::from_timestamp(seconds as i64, nanos) - } - - /// Returns the name of the revision, if it exists. - pub fn name(&self) -> Option<&str> { - self.name.as_deref() - } - - /// Deserializes revision's snapshot state into a yrs [Snapshot]. - pub fn snapshot(&self) -> Result { - Ok(Snapshot::decode_v1(&self.snapshot_state)?) - } -} - -/// It's a collaborative collection used to store document history revisions. -/// These revisions can be used to restore the document state at a specific version. -/// -/// Revisions are not compatible with garbage collection, so they must be created with -/// garbage collection disabled. -pub struct Revisions { - revisions: ArrayRef, -} - -impl Revisions { - pub fn new(revisions: ArrayRef) -> Self { - Self { revisions } - } - - /// Creates a new revision with the given name, if provided. - /// Returns the unique identifier of the created revision. - pub fn create_revision( - &self, - txn: &mut TransactionMut, - name: Option, - ) -> Result { - ensure_gc_disabled(txn)?; - - let snapshot = txn.snapshot(); - let revision = Revision::new(name, &snapshot); - let encoded_revision = to_any(&revision).map_err(|err| CollabError::Internal(err.into()))?; - - self.revisions.push_back(txn, encoded_revision); - - Ok(revision.id) - } - - /// Returns a revision by its unique identifier. - /// If the revision is not found, it returns an [CollabError::NoRequiredData] error. - pub fn get( - &self, - txn: &T, - revision_id: &RevisionId, - ) -> Result { - let i = self.iter(txn); - for revision in i { - let revision = revision?; - if &revision.id == revision_id { - return Ok(revision); - } - } - - Err(CollabError::NoRequiredData(format!( - "Revision '{revision_id}' not found" - ))) - } - - /// Removes all revisions matching the specified predicate. - /// - /// This method will also garbage collect the data stored inside the collab, that is no longer - /// accessible but was required by the removed revisions for the sake of restoring past document - /// state. - pub fn remove_where( - &self, - txn: &mut TransactionMut, - mut predicate: F, - ) -> Result - where - F: FnMut(&Revision) -> bool, - { - ensure_gc_disabled(txn)?; - - let mut oldest: Option = None; - let mut revisions_to_remove = Vec::new(); - for (index, revision) in self.iter(txn).enumerate() { - if let Ok(revision) = revision { - if predicate(&revision) { - // if the predicate matches, we mark this revision for removal. - revisions_to_remove.push(index as u32); - } else { - // if the predicate does not match, we keep track of the oldest revision for future gc - match &oldest { - None => oldest = Some(revision), - Some(oldest_revision) => { - if revision.created_at() < oldest_revision.created_at() { - oldest = Some(revision); - } - }, - } - } - } - } - - // remove revisions that matched the predicate - // use reverse order (last-to-first) to avoid shifting indices - let result = revisions_to_remove.len(); - for index in revisions_to_remove.into_iter().rev() { - self.revisions.remove(txn, index); - } - - // garbage collect revisions - match oldest { - Some(oldest_revision) => { - // if we have the oldest revision, we can safely gc everything up to that revision - let snapshot = oldest_revision.snapshot()?; - txn.gc(Some(&snapshot.delete_set)); - }, - None => txn.gc(None), // there are no revisions left, we can safely gc everything - } - - Ok(result) - } - - /// Iterates over all active revisions in the collection. - pub fn iter<'a, T: ReadTxn>(&self, txn: &'a T) -> RevisionsIter<'a, T> { - let iter = self.revisions.iter(txn); - RevisionsIter { iter } - } - - /// Performs garbage collection on the document, removing all data that is no longer - /// accessible but might have been required by revisions in the past. - pub fn gc(&self, txn: &mut TransactionMut) -> Result<(), CollabError> { - ensure_gc_disabled(txn)?; - - // find the oldest revision to determine the cutoff point for garbage collection - let mut oldest: Option = None; - for revision in self.iter(txn).flatten() { - match &oldest { - None => oldest = Some(revision), - Some(oldest_revision) => { - if revision.created_at() < oldest_revision.created_at() { - oldest = Some(revision); - } - }, - } - } - - let snapshot = match oldest { - Some(oldest_revision) => Some(oldest_revision.snapshot()?), - None => None, - }; - - txn.gc(snapshot.as_ref().map(|s| &s.delete_set)); - - Ok(()) - } -} - -/// Make sure that garbage collection on the document is disabled. -/// This is necessary because revisions are not compatible with garbage collection. -/// Garbage collection can still be performed manually with the respect to data required by revisions. -fn ensure_gc_disabled(txn: &TransactionMut) -> Result<(), CollabError> { - if !txn.doc().skip_gc() { - return Err(CollabError::Internal(anyhow!( - "revisions cannot be created when garbage collection is enabled" - ))); - } - Ok(()) -} - -pub struct RevisionsIter<'a, T: ReadTxn> { - iter: ArrayIter<&'a T, T>, -} - -impl Iterator for RevisionsIter<'_, T> { - type Item = Result; - - fn next(&mut self) -> Option { - match self.iter.next()? { - Out::Any(revision) => { - let revision = - from_any::(&revision).map_err(|err| CollabError::Internal(err.into())); - Some(revision) - }, - _ => Some(Err(CollabError::NoRequiredData( - "Cannot decode revision".to_string(), - ))), - } - } -} diff --git a/collab/src/database/database_remapper.rs b/collab/src/database/database_remapper.rs index 27603c2e1..13f3582e9 100644 --- a/collab/src/database/database_remapper.rs +++ b/collab/src/database/database_remapper.rs @@ -1,4 +1,4 @@ -use crate::core::collab::{CollabOptions, DataSource}; +use crate::core::collab::{CollabOptions, CollabVersion, DataSource, VersionedData}; use crate::core::origin::CollabOrigin; use crate::preclude::*; @@ -49,9 +49,10 @@ impl DatabaseCollabRemapper { database_id: &str, user_id: &str, db_state: &[u8], + version: Option<&CollabVersion>, ) -> Result, CollabError> { let database_data = self - .collab_bytes_to_database_data(database_id, user_id, db_state) + .collab_bytes_to_database_data(database_id, user_id, db_state, version) .await?; let remapped_data = self.remap_database_data(database_data)?; let remapped_bytes = self @@ -66,9 +67,10 @@ impl DatabaseCollabRemapper { database_id: &str, user_id: &str, db_state: &[u8], + version: Option<&CollabVersion>, ) -> Result { let client_id = user_id.parse::().unwrap_or(0); - let data_source = DataSource::DocStateV1(db_state.to_owned()); + let data_source = DataSource::DocStateV1(VersionedData::new(db_state, version.copied())); let database_uuid = Uuid::parse_str(database_id).map_err(|err| CollabError::Internal(err.into()))?; diff --git a/collab/src/document/document_remapper.rs b/collab/src/document/document_remapper.rs index 3e47d996d..4a313e8cb 100644 --- a/collab/src/document/document_remapper.rs +++ b/collab/src/document/document_remapper.rs @@ -1,5 +1,5 @@ -use crate::core::collab::CollabOptions; use crate::core::collab::DataSource; +use crate::core::collab::{CollabOptions, VersionedData}; use crate::core::origin::CollabOrigin; use crate::preclude::*; use std::collections::HashMap; @@ -60,7 +60,7 @@ impl DocumentCollabRemapper { let client_id = user_id.parse::().unwrap_or(0); let doc_uuid = Uuid::parse_str(doc_id).unwrap_or_else(|_| Uuid::new_v4()); let options = CollabOptions::new(doc_uuid, client_id) - .with_data_source(DataSource::DocStateV1(doc_state.to_owned())); + .with_data_source(DataSource::DocStateV1(VersionedData::new(doc_state, None))); let collab = Collab::new_with_options(CollabOrigin::Empty, options) .map_err(|e| CollabError::Internal(anyhow::Error::new(e)))?; let document = Document::open(collab)?; diff --git a/collab/src/entity/mod.rs b/collab/src/entity/mod.rs index 0e025598f..80277f1ac 100644 --- a/collab/src/entity/mod.rs +++ b/collab/src/entity/mod.rs @@ -6,6 +6,7 @@ pub mod uuid_validation; pub use collab_object::*; +use crate::core::collab::{CollabVersion, VersionedData}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; @@ -19,6 +20,8 @@ pub struct EncodedCollab { pub doc_state: Bytes, #[serde(default)] pub version: EncoderVersion, + #[serde(default)] + pub collab_version: Option, } impl Debug for EncodedCollab { @@ -51,6 +54,7 @@ impl EncodedCollab { state_vector: state_vector.into(), doc_state: doc_state.into(), version: EncoderVersion::V1, + collab_version: None, } } @@ -59,9 +63,14 @@ impl EncodedCollab { state_vector: state_vector.into(), doc_state: doc_state.into(), version: EncoderVersion::V2, + collab_version: None, } } + pub fn versioned_data(self) -> VersionedData { + VersionedData::new(self.doc_state, self.collab_version) + } + pub fn encode_to_bytes(&self) -> Result, bincode::Error> { bincode::serialize(self) } @@ -79,6 +88,7 @@ impl EncodedCollab { state_vector: old_collab.state_vector, doc_state: old_collab.doc_state, version: EncoderVersion::V1, + collab_version: None, }) }, } @@ -110,6 +120,7 @@ mod tests { state_vector: Bytes::from(vec![1, 2, 3]), doc_state: Bytes::from(vec![4, 5, 6]), version: EncoderVersion::V1, + collab_version: None, } ); } @@ -120,6 +131,7 @@ mod tests { state_vector: Bytes::from(vec![1, 2, 3]), doc_state: Bytes::from(vec![4, 5, 6]), version: EncoderVersion::V1, + collab_version: None, }; let new_encoded_collab_bytes = new_encoded_collab.encode_to_bytes().unwrap(); diff --git a/collab/src/error.rs b/collab/src/error.rs index 580fc15d7..0610158d2 100644 --- a/collab/src/error.rs +++ b/collab/src/error.rs @@ -183,6 +183,9 @@ pub enum CollabError { #[error("Database: Import data failed: {0}")] DatabaseImportData(String), + #[error("Collab version could not be determined")] + InvalidVersion, + #[error(transparent)] Uuid(#[from] uuid::Error), diff --git a/collab/src/folder/folder.rs b/collab/src/folder/folder.rs index fe8e41447..b8414a644 100644 --- a/collab/src/folder/folder.rs +++ b/collab/src/folder/folder.rs @@ -1079,6 +1079,7 @@ mod tests { let space_view = View { id: Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), parent_view_id: Some(workspace_id), + version: None, name: "Space 1".to_string(), children: RepeatedViewIdentifier::new(vec![ ViewIdentifier::new(view_1_id), @@ -1148,6 +1149,7 @@ mod tests { let space_view = View { id: space_view_id, parent_view_id: None, + version: None, name: "Space".to_string(), children: RepeatedViewIdentifier::new( views diff --git a/collab/src/folder/hierarchy_builder.rs b/collab/src/folder/hierarchy_builder.rs index 92f3d7181..6bae2bca8 100644 --- a/collab/src/folder/hierarchy_builder.rs +++ b/collab/src/folder/hierarchy_builder.rs @@ -5,6 +5,7 @@ use super::{ ViewLayout, timestamp, }; +use crate::core::collab::CollabVersion; use serde_json::json; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -109,6 +110,7 @@ impl DerefMut for NestedViews { pub struct NestedChildViewBuilder { uid: i64, parent_view_id: ViewId, + version: Option, view_id: ViewId, name: String, desc: String, @@ -127,6 +129,7 @@ impl NestedChildViewBuilder { Self { uid, parent_view_id, + version: None, view_id: uuid::Uuid::new_v4(), name: Default::default(), desc: Default::default(), @@ -148,6 +151,11 @@ impl NestedChildViewBuilder { self } + pub fn with_collab_version(mut self, version: CollabVersion) -> Self { + self.version = Some(version); + self + } + pub fn with_children(mut self, mut views: Vec) -> Self { self.children.append(&mut views); self @@ -204,6 +212,7 @@ impl NestedChildViewBuilder { let view = View { id: self.view_id, parent_view_id: Some(self.parent_view_id), + version: self.version, name: self.name, created_at: timestamp(), is_favorite: self.is_favorite, diff --git a/collab/src/folder/view.rs b/collab/src/folder/view.rs index ae47bf57f..aa90ebbd0 100644 --- a/collab/src/folder/view.rs +++ b/collab/src/folder/view.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::sync::Arc; +use crate::core::collab::CollabVersion; use crate::preclude::{Any, Map, MapExt, MapPrelim, MapRef, ReadTxn, Subscription, TransactionMut}; use anyhow::bail; use dashmap::DashMap; @@ -31,6 +33,7 @@ const VIEW_LAST_EDITED_TIME: &str = "last_edited_time"; const VIEW_LAST_EDITED_BY: &str = "last_edited_by"; const VIEW_IS_LOCKED: &str = "is_locked"; const VIEW_EXTRA: &str = "extra"; +const COLLAB_VERSION: &str = "version"; // const VIEW_LAST_VIEWED_TIME: &str = "last_viewed_time"; pub fn timestamp() -> i64 { @@ -566,8 +569,12 @@ pub(crate) fn view_from_map_ref( let is_locked = map_ref.get_with_txn(txn, VIEW_IS_LOCKED); let extra = map_ref.get_with_txn(txn, VIEW_EXTRA); + let version: Option = map_ref.get_with_txn(txn, COLLAB_VERSION); + let version = version.and_then(|v| Uuid::from_str(&v).ok()); + Some(View { id, + version, parent_view_id, name, children, @@ -900,6 +907,8 @@ pub struct View { /// The id for given parent view #[serde(with = "crate::preclude::serde_option_uuid")] pub parent_view_id: Option, + /// The version of the view, used when corresponding page has been reverted to past state. + pub version: Option, /// The name that display on the left sidebar pub name: String, /// A list of ids, each of them is the id of other view @@ -938,6 +947,7 @@ impl View { Self { id: view_id, parent_view_id: Some(parent_view_id), + version: None, name, children: Default::default(), created_at: timestamp(), diff --git a/collab/src/folder/workspace.rs b/collab/src/folder/workspace.rs index 511c76aa8..9feff724e 100644 --- a/collab/src/folder/workspace.rs +++ b/collab/src/folder/workspace.rs @@ -47,6 +47,7 @@ impl From for View { Self { id: value.id, parent_view_id: None, + version: None, name: value.name, children: value.child_views, created_at: value.created_at, diff --git a/collab/src/importer/space_view.rs b/collab/src/importer/space_view.rs index 6dfa3ef1b..918ad608b 100644 --- a/collab/src/importer/space_view.rs +++ b/collab/src/importer/space_view.rs @@ -1,4 +1,4 @@ -use crate::core::collab::{CollabOptions, DataSource, default_client_id}; +use crate::core::collab::{CollabOptions, DataSource, VersionedData, default_client_id}; use crate::core::origin::CollabOrigin; use crate::document::document_data::default_document_collab_data; use crate::error::CollabError; @@ -17,13 +17,12 @@ pub fn create_space_view( space_info: SpaceInfo, ) -> Result<(ParentChildViews, Collab), CollabError> { let client_id = default_client_id(); - let import_container_doc_state = default_document_collab_data(&view_id.to_string(), client_id) - .map_err(|err| CollabError::Internal(err.into()))? - .doc_state - .to_vec(); + let encoded_collab = default_document_collab_data(&view_id.to_string(), client_id) + .map_err(|err| CollabError::Internal(err.into()))?; - let options = CollabOptions::new(*view_id, client_id) - .with_data_source(DataSource::DocStateV1(import_container_doc_state)); + let data = VersionedData::new(encoded_collab.doc_state, encoded_collab.collab_version); + let options = + CollabOptions::new(*view_id, client_id).with_data_source(DataSource::DocStateV1(data)); let collab = Collab::new_with_options(CollabOrigin::Empty, options) .map_err(|err| CollabError::Internal(err.into()))?; diff --git a/collab/src/plugins/local_storage/kv/db.rs b/collab/src/plugins/local_storage/kv/db.rs index 31371b42b..f63e69f86 100644 --- a/collab/src/plugins/local_storage/kv/db.rs +++ b/collab/src/plugins/local_storage/kv/db.rs @@ -5,6 +5,7 @@ use std::io::Write; use std::ops::RangeBounds; use std::sync::Arc; +use crate::core::collab::CollabVersion; use crate::error::CollabError; use crate::plugins::local_storage::kv::keys::*; use crate::plugins::local_storage::kv::oid::{DocIDGen, OID}; @@ -114,6 +115,7 @@ pub fn insert_doc_update<'a, K, S>( db: &S, doc_id: DocID, object_id: &K, + _version: Option<&CollabVersion>, value: Vec, ) -> Result, CollabError> where diff --git a/collab/src/plugins/local_storage/kv/doc.rs b/collab/src/plugins/local_storage/kv/doc.rs index 5a2c9b132..f4cfc6316 100644 --- a/collab/src/plugins/local_storage/kv/doc.rs +++ b/collab/src/plugins/local_storage/kv/doc.rs @@ -1,5 +1,6 @@ #![cfg(feature = "plugins")] +use crate::core::collab::{CollabVersion, VersionedData}; use crate::error::CollabError; use crate::plugins::local_storage::kv::keys::*; use crate::plugins::local_storage::kv::snapshot::SnapshotAction; @@ -22,6 +23,7 @@ where uid: i64, workspace_id: &str, object_id: &str, + collab_version: Option<&CollabVersion>, txn: &T, ) -> Result<(), CollabError> { if self.is_exist(uid, workspace_id, object_id) { @@ -31,7 +33,7 @@ where let doc_id = get_or_create_did(uid, self, workspace_id, object_id)?; let doc_state = txn.encode_diff_v1(&StateVector::default()); let sv = txn.state_vector().encode_v1(); - let doc_state_key = make_doc_state_key(doc_id); + let doc_state_key = make_doc_state_key(doc_id, collab_version); let sv_key = make_state_vector_key(doc_id); self.insert(doc_state_key, doc_state)?; @@ -45,6 +47,7 @@ where uid: i64, workspace_id: &str, object_id: &str, + collab_version: Option<&CollabVersion>, state_vector: Vec, doc_state: Vec, ) -> Result<(), CollabError> { @@ -53,7 +56,7 @@ where let end = make_doc_end_key(doc_id); self.remove_range(start.as_ref(), end.as_ref())?; - let doc_state_key = make_doc_state_key(doc_id); + let doc_state_key = make_doc_state_key(doc_id, collab_version); let sv_key = make_state_vector_key(doc_id); self.insert(doc_state_key, doc_state)?; @@ -74,6 +77,7 @@ where uid: i64, workspace_id: &str, object_id: &str, + collab_version: Option<&CollabVersion>, state_vector: Vec, doc_state: Vec, ) -> Result<(), CollabError> { @@ -84,7 +88,7 @@ where let end = make_doc_end_key(doc_id); self.remove_range(start.as_ref(), end.as_ref())?; - let doc_state_key = make_doc_state_key(doc_id); + let doc_state_key = make_doc_state_key(doc_id, collab_version); let sv_key = make_state_vector_key(doc_id); // Insert new doc state and state vector self.insert(doc_state_key, doc_state)?; @@ -96,6 +100,32 @@ where get_doc_id(uid, self, workspace_id, object_id).is_some() } + fn get_doc_state(&self, doc_id: DocID) -> Result, CollabError> { + let doc_state_start = make_doc_start_key(doc_id); + let doc_state_end = make_doc_end_key(doc_id); + let mut cursor = self.range(doc_state_start.clone()..doc_state_end)?; + if let Some(entry) = cursor.next() { + let key = entry.key(); + let value = entry.value(); + if key.starts_with(doc_state_start.as_ref()) { + let version = if key.len() > doc_state_start.len() { + let collab_version: [u8; 16] = + match key[doc_state_start.len()..doc_state_start.len() + 16].try_into() { + Ok(v) => v, + Err(_) => { + return Err(CollabError::InvalidVersion); + }, + }; + Some(CollabVersion::from_bytes(collab_version)) + } else { + None + }; + return Ok(Some(VersionedData::new(value, version))); + } + } + Ok(None) + } + /// Load the document from the database and apply the updates to the transaction. /// It will try to load the document in these two ways: /// 1. D = document state + updates @@ -108,15 +138,13 @@ where workspace_id: &str, object_id: &str, txn: &mut TransactionMut, - ) -> Result { - let mut update_count = 0; + ) -> Result, CollabError> { + let mut collab_version = None; if let Some(doc_id) = get_doc_id(uid, self, workspace_id, object_id) { - let doc_state_key = make_doc_state_key(doc_id); - if let Some(doc_state) = self.get(doc_state_key.as_ref())? { + if let Some(versioned) = self.get_doc_state(doc_id)? { // Load the doc state - - match Update::decode_v1(doc_state.as_ref()) { + match Update::decode_v1(&versioned.data) { Ok(update) => { txn.try_apply_update(update)?; }, @@ -126,6 +154,8 @@ where }, } + collab_version = versioned.version; + // If the enable_snapshot is true, we will try to load the snapshot. let update_start = make_doc_update_key(doc_id, 0).to_vec(); let update_end = make_doc_update_key(doc_id, Clock::MAX); @@ -146,7 +176,6 @@ where self.remove_range(encoded_update.key().as_ref(), update_end.as_ref())?; break; } - update_count += 1; } } else { tracing::error!( @@ -156,7 +185,7 @@ where ); } - Ok(update_count) + Ok(collab_version) } else { tracing::trace!("[Client] => {:?} not exist", object_id); Err(CollabError::PersistenceRecordNotFound(format!( @@ -172,7 +201,7 @@ where workspace_id: &str, object_id: &str, doc: &Doc, - ) -> Result { + ) -> Result, CollabError> { let mut txn = doc.transact_mut(); self.load_doc_with_txn(uid, workspace_id, object_id, &mut txn) } @@ -183,6 +212,7 @@ where uid: i64, workspace_id: &str, object_id: &str, + version: Option<&CollabVersion>, update: &[u8], ) -> Result, CollabError> { match get_doc_id(uid, self, workspace_id, object_id) { @@ -197,7 +227,7 @@ where object_id ))) }, - Some(doc_id) => insert_doc_update(self, doc_id, object_id, update.to_vec()), + Some(doc_id) => insert_doc_update(self, doc_id, object_id, version, update.to_vec()), } } @@ -235,6 +265,7 @@ where uid: i64, workspace_id: &str, object_id: &str, + version: Option<&CollabVersion>, doc_state: &[u8], sv: &[u8], ) -> Result<(), CollabError> { @@ -243,7 +274,7 @@ where let end = make_doc_end_key(doc_id); self.remove_range(start.as_ref(), end.as_ref())?; - let doc_state_key = make_doc_state_key(doc_id); + let doc_state_key = make_doc_state_key(doc_id, version); let sv_key = make_state_vector_key(doc_id); // Insert new doc state and state vector @@ -290,10 +321,9 @@ where self.remove_range(start.as_ref(), end.as_ref())?; // Delete the document state and the state vector - let doc_state_key = make_doc_state_key(did); - let sv_key = make_state_vector_key(did); - let _ = self.remove(doc_state_key.as_ref()); - let _ = self.remove(sv_key.as_ref()); + let doc_state_key = make_doc_start_key(did); + let sv_key = make_doc_end_key(did); + let _ = self.remove_range(doc_state_key.as_ref(), sv_key.as_ref()); // Delete the snapshot self.delete_all_snapshots(uid, object_id)?; diff --git a/collab/src/plugins/local_storage/kv/keys.rs b/collab/src/plugins/local_storage/kv/keys.rs index 71e4e27ea..55ac8eef4 100644 --- a/collab/src/plugins/local_storage/kv/keys.rs +++ b/collab/src/plugins/local_storage/kv/keys.rs @@ -3,8 +3,8 @@ use std::io::Write; use std::ops::Deref; +use crate::core::collab::CollabVersion; use smallvec::{SmallVec, smallvec}; - // https://github.com/spacejam/sled // sled performs prefix encoding on long keys with similar prefixes that are grouped together in a // range, as well as suffix truncation to further reduce the indexing costs of long keys. Nodes @@ -101,17 +101,24 @@ pub fn oid_from_key(key: &[u8]) -> &[u8] { &key[10..(key.len() - 1)] } -// [1,1, 0,0,0,0,0,0,0,0, 0] -pub fn make_doc_state_key(doc_id: DocID) -> Key { +// [1,1, 0,0,0,0,0,0,0,0, 0] if version is None +// [1,1, 0,0,0,0,0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] if version is Some +pub fn make_doc_state_key( + doc_id: DocID, + version: Option<&CollabVersion>, +) -> Key { let mut v: SmallVec<[u8; DOC_STATE_KEY_LEN]> = smallvec![DOC_SPACE, DOC_SPACE_OBJECT_KEY]; v.write_all(&doc_id.to_be_bytes()).unwrap(); v.push(DOC_STATE); + if let Some(ver) = version { + v.write_all(ver.as_bytes()).unwrap(); + } Key(v) } // document related elements are stored within bounds [0,1,..did,0]..[0,1,..did,255] pub fn make_doc_start_key(doc_id: DocID) -> Key { - make_doc_state_key(doc_id) + make_doc_state_key(doc_id, None) } // [1,1, 0,0,0,0,0,0,0,0, 255] pub fn make_doc_end_key(doc_id: DocID) -> Key { diff --git a/collab/src/plugins/local_storage/rocksdb/rocksdb_plugin.rs b/collab/src/plugins/local_storage/rocksdb/rocksdb_plugin.rs index 5c17e20b9..158bf744a 100644 --- a/collab/src/plugins/local_storage/rocksdb/rocksdb_plugin.rs +++ b/collab/src/plugins/local_storage/rocksdb/rocksdb_plugin.rs @@ -3,6 +3,8 @@ use crate::plugins::local_storage::CollabPersistenceConfig; use crate::plugins::local_storage::kv::KVTransactionDB; use crate::plugins::local_storage::kv::doc::CollabKVAction; +use crate::core::collab::CollabVersion; + use std::ops::Deref; use std::sync::atomic::Ordering::SeqCst; use std::sync::atomic::{AtomicBool, AtomicU32}; @@ -92,9 +94,16 @@ impl RocksdbDiskPlugin { if !rocksdb_read.is_exist(self.uid, &self.workspace_id, &self.object_id) { match self.collab_type.validate_require_data(collab) { Ok(_) => { + let version = collab.version(); let txn = collab.transact(); if let Err(err) = collab_db.with_write_txn(|w_db_txn| { - w_db_txn.create_new_doc(self.uid, &self.workspace_id, &self.object_id, &txn)?; + w_db_txn.create_new_doc( + self.uid, + &self.workspace_id, + &self.object_id, + version, + &txn, + )?; info!( "[Rocksdb Plugin]: created new doc {}, collab_type:{}", self.object_id, self.collab_type @@ -125,7 +134,13 @@ impl CollabPlugin for RocksdbDiskPlugin { self.write_to_disk(collab); } - fn receive_update(&self, object_id: &str, _txn: &TransactionMut, update: &[u8]) { + fn receive_update( + &self, + object_id: &str, + _txn: &TransactionMut, + update: &[u8], + collab_version: Option<&CollabVersion>, + ) { // Only push update if the doc is loaded if !self.did_init.load(SeqCst) { return; @@ -134,7 +149,13 @@ impl CollabPlugin for RocksdbDiskPlugin { self.increase_count(); //Acquire a write transaction to ensure consistency let result = db.with_write_txn(|w_db_txn| { - let _ = w_db_txn.push_update(self.uid, self.workspace_id.as_str(), object_id, update)?; + let _ = w_db_txn.push_update( + self.uid, + self.workspace_id.as_str(), + object_id, + collab_version, + update, + )?; use yrs::updates::decoder::Decode; tracing::trace!( "[Rocksdb Plugin]: Collab {} {} persisting update: {:#?}", diff --git a/collab/tests/database/remapper/database_remapper_test.rs b/collab/tests/database/remapper/database_remapper_test.rs index 5de637b7f..de0759664 100644 --- a/collab/tests/database/remapper/database_remapper_test.rs +++ b/collab/tests/database/remapper/database_remapper_test.rs @@ -52,7 +52,7 @@ async fn test_remap_database_with_database_id() { let user_id = "123456"; let remapped_state = remapper - .remap_database_collab_state(database_id, user_id, &db_state) + .remap_database_collab_state(database_id, user_id, &db_state, None) .await .map_err(|e| { eprintln!("Failed to remap database collab state: {:?}", e); @@ -66,7 +66,7 @@ async fn test_remap_database_with_database_id() { ); let remapped_data = remapper - .collab_bytes_to_database_data(database_id, user_id, &remapped_state) + .collab_bytes_to_database_data(database_id, user_id, &remapped_state, None) .await .unwrap(); diff --git a/collab/tests/database/user_test/helper.rs b/collab/tests/database/user_test/helper.rs index 1f133aadd..2ea3745b0 100644 --- a/collab/tests/database/user_test/helper.rs +++ b/collab/tests/database/user_test/helper.rs @@ -76,7 +76,7 @@ impl DatabaseCollabPersistenceService for TestUserDatabasePersistenceImpl { fn upsert_collab( &self, - object_id: &collab::entity::uuid_validation::ObjectId, + object_id: &ObjectId, encoded_collab: EncodedCollab, ) -> Result<(), CollabError> { let db_write = self.db.write_txn(); @@ -84,6 +84,7 @@ impl DatabaseCollabPersistenceService for TestUserDatabasePersistenceImpl { self.uid, &self.workspace_id, &object_id.to_string(), + encoded_collab.collab_version.as_ref(), encoded_collab.state_vector.to_vec(), encoded_collab.doc_state.to_vec(), ); diff --git a/collab/tests/document/remapper/document_remapper_test.rs b/collab/tests/document/remapper/document_remapper_test.rs index 65be9fcd4..d40433058 100644 --- a/collab/tests/document/remapper/document_remapper_test.rs +++ b/collab/tests/document/remapper/document_remapper_test.rs @@ -1,4 +1,4 @@ -use collab::core::collab::{CollabOptions, DataSource}; +use collab::core::collab::{CollabOptions, CollabVersion, DataSource, VersionedData}; use collab::core::origin::CollabOrigin; use collab::document::document::Document; use collab::document::document_remapper::DocumentCollabRemapper; @@ -7,13 +7,21 @@ use std::collections::HashMap; use std::fs; use uuid::Uuid; -fn doc_state_to_document(doc_state: &[u8], doc_id: &str, user_id: &str) -> Document { +fn doc_state_to_document( + doc_state: &[u8], + doc_id: &str, + user_id: &str, + version: Option<&CollabVersion>, +) -> Document { let client_id = user_id.parse::().unwrap_or(0); let options = CollabOptions::new( Uuid::parse_str(doc_id).unwrap_or_else(|_| Uuid::new_v4()), client_id, ) - .with_data_source(DataSource::DocStateV1(doc_state.to_owned())); + .with_data_source(DataSource::DocStateV1(VersionedData::new( + doc_state, + version.copied(), + ))); let collab = Collab::new_with_options(CollabOrigin::Empty, options).expect("Failed to create collab"); Document::open(collab).expect("Failed to open document") @@ -60,7 +68,7 @@ fn test_remap_collab_with_mentioned_page_ids() { .remap_collab_doc_state(doc_id, user_id, &doc_state) .unwrap(); - let remapped_doc = doc_state_to_document(&remapped_state, doc_id, user_id); + let remapped_doc = doc_state_to_document(&remapped_state, doc_id, user_id, None); let remapped_data = remapped_doc.get_document_data().unwrap(); let mut found_remapped_mentions = 0; @@ -120,7 +128,7 @@ fn test_remap_collab_with_inline_database() { let remapped_state = remapper .remap_collab_doc_state(doc_id, user_id, &doc_state) .unwrap(); - let remapped_doc = doc_state_to_document(&remapped_state, doc_id, user_id); + let remapped_doc = doc_state_to_document(&remapped_state, doc_id, user_id, None); let remapped_data = remapped_doc.get_document_data().unwrap(); let mut found_remapped_database_parent_id = false; diff --git a/collab/tests/edit_test/mod.rs b/collab/tests/edit_test/mod.rs index 2b331a9b7..39842f166 100644 --- a/collab/tests/edit_test/mod.rs +++ b/collab/tests/edit_test/mod.rs @@ -2,5 +2,4 @@ mod awareness_test; mod insert_test; mod observer_test; mod restore_test; -mod revision_test; mod state_vec_test; diff --git a/collab/tests/edit_test/restore_test.rs b/collab/tests/edit_test/restore_test.rs index 9304fb1ec..771d29e6e 100644 --- a/collab/tests/edit_test/restore_test.rs +++ b/collab/tests/edit_test/restore_test.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use assert_json_diff::assert_json_eq; -use collab::core::collab::{CollabOptions, DataSource, default_client_id}; +use collab::core::collab::{CollabOptions, CollabVersion, DataSource, default_client_id}; use collab::core::origin::CollabOrigin; use collab::preclude::{Collab, CollabPlugin, MapExt}; @@ -488,7 +488,13 @@ impl ReceiveUpdatesPlugin { } impl CollabPlugin for ReceiveUpdatesPlugin { - fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, update: &[u8]) { + fn receive_update( + &self, + _object_id: &str, + _txn: &TransactionMut, + update: &[u8], + _collab_version: Option<&CollabVersion>, + ) { self.updates.lock().unwrap().push(update.to_vec()); } diff --git a/collab/tests/edit_test/revision_test.rs b/collab/tests/edit_test/revision_test.rs deleted file mode 100644 index b7adf9ca2..000000000 --- a/collab/tests/edit_test/revision_test.rs +++ /dev/null @@ -1,190 +0,0 @@ -use collab::core::collab::{CollabOptions, DataSource, default_client_id}; -use collab::core::origin::{CollabClient, CollabOrigin}; -use collab::entity::EncoderVersion; -use collab::preclude::Collab; -use serde_json::json; -use uuid::Uuid; -use yrs::Update; -use yrs::updates::decoder::Decode; - -#[tokio::test] -async fn create_restore_revision() { - let options = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut collab = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "1")), options).unwrap(); - collab.insert("key", "value1"); - let state1 = collab - .encode_collab_v1(|_| Ok::<_, anyhow::Error>(())) - .unwrap(); - let state2 = collab.encode_collab_v2(); - let r1 = collab.create_revision().unwrap(); - collab.insert("key", "value2"); - - // revision is equal to the state before the second insert - let restored = collab.restore_revision(&r1, EncoderVersion::V1).unwrap(); - assert_eq!(restored, state1); - - let restored = collab.restore_revision(&r1, EncoderVersion::V2).unwrap(); - assert_eq!(restored, state2); - - let restored = Collab::new_with_options( - CollabOrigin::Empty, - CollabOptions::new(Uuid::new_v4(), default_client_id()) - .with_data_source(DataSource::DocStateV2(restored.doc_state.into())), - ) - .unwrap(); - - // we restored the state before the second insert - assert_eq!(restored.to_json_value(), json!({"key": "value1"})); -} - -#[tokio::test] -async fn remove_revision_cleanups_deleted_data() { - let options = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut collab = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "1")), options).unwrap(); - collab.insert("key", "value1"); - let r1 = collab.create_named_revision("r1").unwrap(); - collab.insert("key", "value2"); - let r2 = collab.create_named_revision("r2").unwrap(); - collab.insert("key", "value3"); - let r3 = collab.create_named_revision("r3").unwrap(); - - let full_state = collab - .encode_collab_v1(|_| Ok::<_, anyhow::Error>(())) - .unwrap(); - - // removing a middle revision does not clean up the state - assert!(collab.remove_revision(&r2).unwrap()); - let state = collab - .encode_collab_v1(|_| Ok::<_, anyhow::Error>(())) - .unwrap(); - assert!(state.doc_state.len() >= full_state.doc_state.len()); // no data was removed - assert!(collab.restore_revision(&r2, EncoderVersion::V1).is_err()); // revision no longer exists - - // removing the oldest revision cleans up the state - assert!(collab.remove_revision(&r1).unwrap()); - let state = collab - .encode_collab_v1(|_| Ok::<_, anyhow::Error>(())) - .unwrap(); - assert!(state.doc_state.len() < full_state.doc_state.len()); - assert!(collab.restore_revision(&r1, EncoderVersion::V1).is_err()); // revision no longer exists - - collab.remove_revision(&r3).unwrap(); -} - -#[tokio::test] -async fn remove_revision_should_eventually_remove_revision_data() { - let options = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut collab = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "1")), options).unwrap(); - collab.insert("key", "value1"); - let r1 = collab.create_named_revision("revision1").unwrap(); - collab.insert("key", "value2"); - let r2 = collab.create_named_revision("revision2").unwrap(); - - let state = collab.encode_collab_v2(); - // initially we should be able to find r1 and r2 keys in the payload - assert!(contains(&state.doc_state, "revision1")); - assert!(contains(&state.doc_state, "revision2")); - - // removing the first revision should not remove the data immediately, - // since r2 was created while r1 was still present - collab.remove_revision(&r1).unwrap(); - let state = collab.encode_collab_v2(); - assert!(contains(&state.doc_state, "revision1")); - assert!(!contains(&state.doc_state, "value1")); // revision is present, but its data is not - - collab.insert("key", "value3"); - let _r3 = collab.create_named_revision("revision3").unwrap(); - - // r3 was created after r1 was removed, so it should not contain r1 data - // removing r2 should remove the traces of r1 as well - collab.remove_revision(&r2).unwrap(); - let state = collab.encode_collab_v2(); - assert!(!contains(&state.doc_state, "revision1")); // r1 is finally removed - assert!(!contains(&state.doc_state, "value2")); // r2 was removed, so its data is gone -} - -#[tokio::test] -async fn cleaning_all_revisions_doesnt_leave_garbage() { - let options = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut collab = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "1")), options).unwrap(); - collab.insert("key", "value1"); - let r1 = collab.create_named_revision("revision1").unwrap(); - collab.insert("key", "value2"); - let r2 = collab.create_named_revision("revision2").unwrap(); - collab.insert("key", "value3"); - - let state = collab.encode_collab_v2(); - // initially we should be able to find all revisions and values in the payload - assert!(contains(&state.doc_state, "revision1")); - assert!(contains(&state.doc_state, "value1")); - assert!(contains(&state.doc_state, "revision2")); - assert!(contains(&state.doc_state, "value2")); - - collab.remove_revision(&r1).unwrap(); - collab.remove_revision(&r2).unwrap(); - - // removing all revisions should not leave any garbage - let state = collab.encode_collab_v2(); - assert!(!contains(&state.doc_state, "revision1")); - assert!(!contains(&state.doc_state, "value1")); - assert!(!contains(&state.doc_state, "revision2")); - assert!(!contains(&state.doc_state, "value2")); -} - -#[tokio::test] -async fn remote_updates_can_be_cleaned_up() { - let options = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut c1 = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "1")), options).unwrap(); - c1.insert("key", "value1"); - let r1 = c1.create_named_revision("revision1").unwrap(); - c1.insert("key", "value2"); - let _r2 = c1.create_named_revision("revision2").unwrap(); - c1.insert("key", "value3"); - - let options2 = CollabOptions::new(Uuid::new_v4(), default_client_id()).with_gc(false); - let mut c2 = - Collab::new_with_options(CollabOrigin::Client(CollabClient::new(1, "2")), options2).unwrap(); - let state = c1.encode_collab_v2(); - c2.apply_update(Update::decode_v2(&state.doc_state).unwrap()) - .unwrap(); - - // we should be able to find all revisions and values in the payload - let state = c2.encode_collab_v2(); - assert!(contains(&state.doc_state, "revision1")); - assert!(contains(&state.doc_state, "value1")); - assert!(contains(&state.doc_state, "revision2")); - assert!(contains(&state.doc_state, "value2")); - - // remove revision locally and apply the update to the remote state - c1.remove_revision(&r1).unwrap(); - let state = c1.encode_collab_v2(); - c2.apply_update(Update::decode_v2(&state.doc_state).unwrap()) - .unwrap(); - - // locally we removed r1, so it's data is not present - assert!(!contains(&state.doc_state, "value1")); - assert!(contains(&state.doc_state, "value2")); - - // removing revisions locally doesn't remove them from the remote state after applying updates - let state = c2.encode_collab_v2(); - assert!(contains(&state.doc_state, "value1")); - assert!(contains(&state.doc_state, "value2")); - - // perform manual garbage collection - c2.gc().unwrap(); - - // after garbage collection, the data of r1 should be removed from the remote state - let state = c2.encode_collab_v2(); - assert!(!contains(&state.doc_state, "value1")); - assert!(contains(&state.doc_state, "value2")); -} - -fn contains>(container: &[u8], key: P) -> bool { - let key = key.as_ref(); - container.windows(key.len()).any(|window| window == key) -} diff --git a/collab/tests/edit_test/state_vec_test.rs b/collab/tests/edit_test/state_vec_test.rs index 420a07c82..bbad5c586 100644 --- a/collab/tests/edit_test/state_vec_test.rs +++ b/collab/tests/edit_test/state_vec_test.rs @@ -1,7 +1,8 @@ +use collab::core::collab::ConsistentHash; use serde_json::json; use yrs::types::ToJson; use yrs::updates::decoder::Decode; -use yrs::{Doc, Map, MapPrelim, MapRef, ReadTxn, Transact, Update}; +use yrs::{Doc, Map, MapPrelim, MapRef, ReadTxn, StateVector, Text, Transact, Update}; #[tokio::test] async fn state_vec_apply_test() { @@ -120,6 +121,35 @@ async fn two_way_sync_result_undetermined() { assert_eq!(a, b); } +#[test] +fn snapshot_produces_consistent_hash() { + let d1 = Doc::with_client_id(0xdeadbeef); + let txt1 = d1.get_or_insert_text("text"); + let mut t1 = d1.transact_mut(); + txt1.insert(&mut t1, 0, "Hello world!"); + txt1.remove_range(&mut t1, 4, 5); + + // we need at least 2 client IDs to produce a non-trivial state vector where order could be + // possibly different between runs + let d2 = Doc::with_client_id(123); + let txt2 = d2.get_or_insert_text("text"); + let mut t2 = d2.transact_mut(); + t2.apply_update( + Update::decode_v1(&t1.encode_state_as_update_v1(&StateVector::default())).unwrap(), + ) + .unwrap(); + txt2.insert(&mut t2, 0, "Acronym!"); + txt2.remove_range(&mut t2, 1, 2); + + let snapshot = t2.snapshot(); + + // We're going to use consistent hash to uniquely identify the snapshot based on its internal + // state. This way we can use it as ID and quickly compare if two snapshots are identical. + let hash = snapshot.consistent_hash(); + let expected: u128 = 42481876838278106308370919647884892234; // produced in previous run + assert_eq!(hash, expected); +} + #[tokio::test] async fn two_way_sync_test() { let doc_1 = Doc::new(); diff --git a/collab/tests/folder/util.rs b/collab/tests/folder/util.rs index 25d83ed5b..11d0e9df4 100644 --- a/collab/tests/folder/util.rs +++ b/collab/tests/folder/util.rs @@ -92,6 +92,7 @@ pub fn make_test_view(view_id: &str, parent_view_id: Uuid, belongings: Vec>>); +pub struct CollabStateCachePlugin(Arc>); + +#[derive(Debug, Default, Clone)] +struct State { + updates: Vec, + version: Option, +} impl CollabStateCachePlugin { pub fn new() -> Self { @@ -43,20 +49,24 @@ impl CollabStateCachePlugin { pub fn get_doc_state(&self) -> Result { let mut updates = vec![]; let lock = self.0.read().unwrap(); - for encoded_data in lock.iter() { + for encoded_data in lock.updates.iter() { updates.push(encoded_data.as_ref()); } let doc_state = merge_updates_v1(&updates) .map_err(|err| anyhow::anyhow!("merge updates failed: {:?}", err)) .unwrap(); - Ok(DataSource::DocStateV1(doc_state)) + Ok(DataSource::DocStateV1(VersionedData::new( + doc_state, + lock.version, + ))) } #[allow(dead_code)] pub fn get_update(&self) -> Result { let read_guard = self.0.read().unwrap(); let updates = read_guard + .updates .iter() .map(|update| update.as_ref()) .collect::>(); @@ -67,13 +77,20 @@ impl CollabStateCachePlugin { } impl CollabPlugin for CollabStateCachePlugin { - fn receive_update(&self, _object_id: &str, txn: &TransactionMut, update: &[u8]) { + fn receive_update( + &self, + _object_id: &str, + txn: &TransactionMut, + update: &[u8], + collab_version: Option<&CollabVersion>, + ) { let mut write_guard = self.0.write().unwrap(); - if write_guard.is_empty() { + if write_guard.updates.is_empty() { let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); - write_guard.push(Bytes::from(doc_state)); + write_guard.updates.push(Bytes::from(doc_state)); } - write_guard.push(Bytes::from(update.to_vec())); + write_guard.updates.push(Bytes::from(update.to_vec())); + write_guard.version = collab_version.cloned(); } fn plugin_type(&self) -> CollabPluginType {