Skip to content

Commit b3b24d0

Browse files
committed
chore: calculate the compact length after receiving ack
1 parent e729c0a commit b3b24d0

File tree

15 files changed

+446
-94
lines changed

15 files changed

+446
-94
lines changed

frontend/rust-lib/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/rust-lib/flowy-document/src/editor/editor.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ impl AppFlowyDocumentEditor {
2929
mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
3030
cloud_service: Arc<dyn RevisionCloudService>,
3131
) -> FlowyResult<Arc<Self>> {
32-
let document = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
32+
let document = rev_manager
33+
.initialize::<DocumentRevisionSerde>(Some(cloud_service))
34+
.await?;
3335
let rev_manager = Arc::new(rev_manager);
3436
let command_sender = spawn_edit_queue(user, rev_manager.clone(), document);
3537
let doc_id = doc_id.to_string();

frontend/rust-lib/flowy-document/src/old_editor/editor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl DeltaDocumentEditor {
4545
cloud_service: Arc<dyn RevisionCloudService>,
4646
) -> FlowyResult<Arc<Self>> {
4747
let document = rev_manager
48-
.load::<DeltaDocumentRevisionSerde>(Some(cloud_service))
48+
.initialize::<DeltaDocumentRevisionSerde>(Some(cloud_service))
4949
.await?;
5050
let operations = DeltaTextOperations::from_bytes(&document.content)?;
5151
let rev_manager = Arc::new(rev_manager);

frontend/rust-lib/flowy-folder/src/services/folder_editor.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ impl FolderEditor {
3838
let cloud = Arc::new(FolderRevisionCloudService {
3939
token: token.to_string(),
4040
});
41-
let folder = Arc::new(RwLock::new(rev_manager.load::<FolderRevisionSerde>(Some(cloud)).await?));
41+
let folder = Arc::new(RwLock::new(
42+
rev_manager.initialize::<FolderRevisionSerde>(Some(cloud)).await?,
43+
));
4244
let rev_manager = Arc::new(rev_manager);
4345

4446
#[cfg(feature = "sync")]

frontend/rust-lib/flowy-grid/src/services/block_editor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl GridBlockRevisionEditor {
3434
let cloud = Arc::new(GridBlockRevisionCloudService {
3535
token: token.to_owned(),
3636
});
37-
let block_revision_pad = rev_manager.load::<GridBlockRevisionSerde>(Some(cloud)).await?;
37+
let block_revision_pad = rev_manager.initialize::<GridBlockRevisionSerde>(Some(cloud)).await?;
3838
let pad = Arc::new(RwLock::new(block_revision_pad));
3939
let rev_manager = Arc::new(rev_manager);
4040
let user_id = user_id.to_owned();

frontend/rust-lib/flowy-grid/src/services/grid_editor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl GridRevisionEditor {
6060
) -> FlowyResult<Arc<Self>> {
6161
let token = user.token()?;
6262
let cloud = Arc::new(GridRevisionCloudService { token });
63-
let grid_pad = rev_manager.load::<GridRevisionSerde>(Some(cloud)).await?;
63+
let grid_pad = rev_manager.initialize::<GridRevisionSerde>(Some(cloud)).await?;
6464
let rev_manager = Arc::new(rev_manager);
6565
let grid_pad = Arc::new(RwLock::new(grid_pad));
6666

frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl GridViewRevisionEditor {
5555
let cloud = Arc::new(GridViewRevisionCloudService {
5656
token: token.to_owned(),
5757
});
58-
let view_revision_pad = rev_manager.load::<GridViewRevisionSerde>(Some(cloud)).await?;
58+
let view_revision_pad = rev_manager.initialize::<GridViewRevisionSerde>(Some(cloud)).await?;
5959
let pad = Arc::new(RwLock::new(view_revision_pad));
6060
let rev_manager = Arc::new(rev_manager);
6161
let group_controller = new_group_controller(

frontend/rust-lib/flowy-revision/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ serde_json = {version = "1.0"}
2323

2424
[dev-dependencies]
2525
nanoid = "0.4.0"
26+
flowy-revision = {path = ".", features = ["flowy_unit_test"]}
2627
serde = { version = "1.0", features = ["derive"] }
2728
serde_json = { version = "1.0" }
2829
parking_lot = "0.11"

frontend/rust-lib/flowy-revision/src/rev_manager.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
108108
}
109109

110110
#[tracing::instrument(level = "debug", skip_all, fields(object_id) err)]
111-
pub async fn load<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
111+
pub async fn initialize<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
112112
where
113113
B: RevisionObjectDeserializer,
114114
{
@@ -199,6 +199,10 @@ impl<Connection: 'static> RevisionManager<Connection> {
199199
self.rev_persistence.number_of_sync_records()
200200
}
201201

202+
pub fn number_of_revisions_in_disk(&self) -> usize {
203+
self.rev_persistence.number_of_records_in_disk()
204+
}
205+
202206
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
203207
let revisions = self.rev_persistence.revisions_in_range(&range).await?;
204208
Ok(revisions)
@@ -230,13 +234,16 @@ impl<Connection: 'static> WSDataProviderDataSource for Arc<RevisionManager<Conne
230234
}
231235

232236
#[cfg(feature = "flowy_unit_test")]
233-
impl<Connection> RevisionManager<Connection> {
237+
impl<Connection: 'static> RevisionManager<Connection> {
234238
pub async fn revision_cache(&self) -> Arc<RevisionPersistence<Connection>> {
235239
self.rev_persistence.clone()
236240
}
237241
pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
238242
self.rev_ack_notifier.subscribe()
239243
}
244+
pub fn get_all_revision_records(&self) -> FlowyResult<Vec<crate::disk::SyncRecord>> {
245+
self.rev_persistence.load_all_records(&self.object_id)
246+
}
240247
}
241248

242249
pub struct RevisionLoader<Connection> {
@@ -248,7 +255,7 @@ pub struct RevisionLoader<Connection> {
248255

249256
impl<Connection: 'static> RevisionLoader<Connection> {
250257
pub async fn load(&self) -> Result<(Vec<Revision>, i64), FlowyError> {
251-
let records = self.rev_persistence.batch_get(&self.object_id)?;
258+
let records = self.rev_persistence.load_all_records(&self.object_id)?;
252259
let revisions: Vec<Revision>;
253260
let mut rev_id = 0;
254261
if records.is_empty() && self.cloud.is_some() {
@@ -282,7 +289,7 @@ impl<Connection: 'static> RevisionLoader<Connection> {
282289
}
283290

284291
pub async fn load_revisions(&self) -> Result<Vec<Revision>, FlowyError> {
285-
let records = self.rev_persistence.batch_get(&self.object_id)?;
292+
let records = self.rev_persistence.load_all_records(&self.object_id)?;
286293
let revisions = records.into_iter().map(|record| record.revision).collect::<_>();
287294
Ok(revisions)
288295
}

frontend/rust-lib/flowy-revision/src/rev_persistence.rs

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tokio::task::spawn_blocking;
1414

1515
pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
1616

17+
#[derive(Clone)]
1718
pub struct RevisionPersistenceConfiguration {
1819
merge_threshold: usize,
1920
}
@@ -24,14 +25,14 @@ impl RevisionPersistenceConfiguration {
2425
if merge_threshold > 1 {
2526
Self { merge_threshold }
2627
} else {
27-
Self { merge_threshold: 2 }
28+
Self { merge_threshold: 100 }
2829
}
2930
}
3031
}
3132

3233
impl std::default::Default for RevisionPersistenceConfiguration {
3334
fn default() -> Self {
34-
Self { merge_threshold: 2 }
35+
Self { merge_threshold: 100 }
3536
}
3637
}
3738

@@ -93,7 +94,7 @@ where
9394
pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> {
9495
tracing::Span::current().record("rev_id", &revision.rev_id);
9596
self.add(revision.clone(), RevisionState::Sync, false).await?;
96-
self.sync_seq.write().await.dry_push(revision.rev_id)?;
97+
self.sync_seq.write().await.recv(revision.rev_id)?;
9798
Ok(())
9899
}
99100

@@ -105,13 +106,17 @@ where
105106
rev_compress: &Arc<dyn RevisionMergeable + 'a>,
106107
) -> FlowyResult<i64> {
107108
let mut sync_seq = self.sync_seq.write().await;
108-
let step = sync_seq.step;
109+
let compact_length = sync_seq.compact_length;
109110

110-
// Before the new_revision pushed into the sync_seq, we check if the current `step` of the
111-
// sync_seq is less equal or greater than the merge threshold. If yes, it's need to merged
111+
// Before the new_revision is pushed into the sync_seq, we check if the current `step` of the
112+
// sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged
112113
// with the new_revision into one revision.
113-
if step >= self.configuration.merge_threshold - 1 {
114-
let compact_seq = sync_seq.compact();
114+
let mut compact_seq = VecDeque::default();
115+
// tracing::info!("{}", compact_seq)
116+
if compact_length >= self.configuration.merge_threshold - 1 {
117+
compact_seq.extend(sync_seq.compact());
118+
}
119+
if !compact_seq.is_empty() {
115120
let range = RevisionRange {
116121
start: *compact_seq.front().unwrap(),
117122
end: *compact_seq.back().unwrap(),
@@ -127,15 +132,15 @@ where
127132
let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
128133
let rev_id = merged_revision.rev_id;
129134
tracing::Span::current().record("rev_id", &merged_revision.rev_id);
130-
let _ = sync_seq.dry_push(merged_revision.rev_id)?;
135+
let _ = sync_seq.recv(merged_revision.rev_id)?;
131136

132137
// replace the revisions in range with compact revision
133138
self.compact(&range, merged_revision).await?;
134139
Ok(rev_id)
135140
} else {
136141
tracing::Span::current().record("rev_id", &new_revision.rev_id);
137142
self.add(new_revision.clone(), RevisionState::Sync, true).await?;
138-
sync_seq.push(new_revision.rev_id)?;
143+
sync_seq.merge_recv(new_revision.rev_id)?;
139144
Ok(new_revision.rev_id)
140145
}
141146
}
@@ -163,6 +168,16 @@ where
163168
self.memory_cache.number_of_sync_records()
164169
}
165170

171+
pub(crate) fn number_of_records_in_disk(&self) -> usize {
172+
match self.disk_cache.read_revision_records(&self.object_id, None) {
173+
Ok(records) => records.len(),
174+
Err(e) => {
175+
tracing::error!("Read revision records failed: {:?}", e);
176+
0
177+
}
178+
}
179+
}
180+
166181
/// The cache gets reset while it conflicts with the remote revisions.
167182
#[tracing::instrument(level = "trace", skip(self, revisions), err)]
168183
pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
@@ -228,8 +243,8 @@ where
228243
}
229244
}
230245

231-
pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<SyncRecord>> {
232-
self.disk_cache.read_revision_records(doc_id, None)
246+
pub fn load_all_records(&self, object_id: &str) -> FlowyResult<Vec<SyncRecord>> {
247+
self.disk_cache.read_revision_records(object_id, None)
233248
}
234249

235250
// Read the revision which rev_id >= range.start && rev_id <= range.end
@@ -289,26 +304,31 @@ impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = Flo
289304
#[derive(Default)]
290305
struct DeferSyncSequence {
291306
rev_ids: VecDeque<i64>,
292-
start: Option<usize>,
293-
step: usize,
307+
compact_index: Option<usize>,
308+
compact_length: usize,
294309
}
295310

296311
impl DeferSyncSequence {
297312
fn new() -> Self {
298313
DeferSyncSequence::default()
299314
}
300315

301-
fn push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
302-
let _ = self.dry_push(new_rev_id)?;
316+
/// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable.
317+
///
318+
/// When calling `compact` method, it will return a list of revision ids started from
319+
/// the `compact_start_pos`, and ends with the `compact_length`.
320+
fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
321+
let _ = self.recv(new_rev_id)?;
303322

304-
self.step += 1;
305-
if self.start.is_none() && !self.rev_ids.is_empty() {
306-
self.start = Some(self.rev_ids.len() - 1);
323+
self.compact_length += 1;
324+
if self.compact_index.is_none() && !self.rev_ids.is_empty() {
325+
self.compact_index = Some(self.rev_ids.len() - 1);
307326
}
308327
Ok(())
309328
}
310329

311-
fn dry_push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
330+
/// Pushes the new_rev_id to the end of the list.
331+
fn recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
312332
// The last revision's rev_id must be greater than the new one.
313333
if let Some(rev_id) = self.rev_ids.back() {
314334
if *rev_id >= new_rev_id {
@@ -321,6 +341,7 @@ impl DeferSyncSequence {
321341
Ok(())
322342
}
323343

344+
/// Removes the rev_id from the list
324345
fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
325346
let cur_rev_id = self.rev_ids.front().cloned();
326347
if let Some(pop_rev_id) = cur_rev_id {
@@ -331,7 +352,20 @@ impl DeferSyncSequence {
331352
);
332353
return Err(FlowyError::internal().context(desc));
333354
}
334-
let _ = self.rev_ids.pop_front();
355+
356+
let mut compact_rev_id = None;
357+
if let Some(compact_index) = self.compact_index {
358+
compact_rev_id = self.rev_ids.get(compact_index).cloned();
359+
}
360+
361+
let pop_rev_id = self.rev_ids.pop_front();
362+
if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) {
363+
if compact_rev_id <= pop_rev_id {
364+
if self.compact_length > 0 {
365+
self.compact_length -= 1;
366+
}
367+
}
368+
}
335369
}
336370
Ok(())
337371
}
@@ -341,28 +375,22 @@ impl DeferSyncSequence {
341375
}
342376

343377
fn clear(&mut self) {
344-
self.start = None;
345-
self.step = 0;
378+
self.compact_index = None;
379+
self.compact_length = 0;
346380
self.rev_ids.clear();
347381
}
348382

349383
// Compact the rev_ids into one except the current synchronizing rev_id.
350384
fn compact(&mut self) -> VecDeque<i64> {
351-
if self.start.is_none() {
352-
return VecDeque::default();
385+
let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len());
386+
if let Some(start) = self.compact_index {
387+
if start < self.rev_ids.len() {
388+
let seq = self.rev_ids.split_off(start);
389+
compact_seq.extend(seq);
390+
}
353391
}
354-
355-
let start = self.start.unwrap();
356-
let compact_seq = self.rev_ids.split_off(start);
357-
self.start = None;
358-
self.step = 0;
392+
self.compact_index = None;
393+
self.compact_length = 0;
359394
compact_seq
360-
361-
// let mut new_seq = self.rev_ids.clone();
362-
// let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
363-
//
364-
// let start = drained.pop_front()?;
365-
// let end = drained.pop_back().unwrap_or(start);
366-
// Some((RevisionRange { start, end }, new_seq))
367395
}
368396
}

0 commit comments

Comments
 (0)