@@ -104,9 +104,14 @@ where
104104 new_revision : & ' a Revision ,
105105 rev_compress : & Arc < dyn RevisionMergeable + ' a > ,
106106 ) -> FlowyResult < i64 > {
107- let mut sync_seq_write_guard = self . sync_seq . write ( ) . await ;
108- if sync_seq_write_guard. step > self . configuration . merge_threshold {
109- let compact_seq = sync_seq_write_guard. compact ( ) ;
107+ let mut sync_seq = self . sync_seq . write ( ) . await ;
108+ let step = sync_seq. step ;
109+
110+ // Before the new_revision pushed into the sync_seq, we check if the current `step` of the
111+ // sync_seq is less equal or greater than the merge threshold. If yes, it's need to merged
112+ // with the new_revision into one revision.
113+ if step >= self . configuration . merge_threshold - 1 {
114+ let compact_seq = sync_seq. compact ( ) ;
110115 let range = RevisionRange {
111116 start : * compact_seq. front ( ) . unwrap ( ) ,
112117 end : * compact_seq. back ( ) . unwrap ( ) ,
@@ -119,20 +124,18 @@ where
119124 revisions. push ( new_revision. clone ( ) ) ;
120125
121126 // compact multiple revisions into one
122- let compact_revision = rev_compress. merge_revisions ( & self . user_id , & self . object_id , revisions) ?;
123- let rev_id = compact_revision. rev_id ;
124- tracing:: Span :: current ( ) . record ( "rev_id" , & rev_id) ;
125-
126- // insert new revision
127- let _ = sync_seq_write_guard. dry_push ( rev_id) ?;
127+ let merged_revision = rev_compress. merge_revisions ( & self . user_id , & self . object_id , revisions) ?;
128+ let rev_id = merged_revision. rev_id ;
129+ tracing:: Span :: current ( ) . record ( "rev_id" , & merged_revision. rev_id ) ;
130+ let _ = sync_seq. dry_push ( merged_revision. rev_id ) ?;
128131
129132 // replace the revisions in range with compact revision
130- self . compact ( & range, compact_revision ) . await ?;
133+ self . compact ( & range, merged_revision ) . await ?;
131134 Ok ( rev_id)
132135 } else {
133136 tracing:: Span :: current ( ) . record ( "rev_id" , & new_revision. rev_id ) ;
134137 self . add ( new_revision. clone ( ) , RevisionState :: Sync , true ) . await ?;
135- sync_seq_write_guard . push ( new_revision. rev_id ) ?;
138+ sync_seq . push ( new_revision. rev_id ) ?;
136139 Ok ( new_revision. rev_id )
137140 }
138141 }
@@ -201,7 +204,6 @@ where
201204 let _ = self
202205 . disk_cache
203206 . delete_revision_records ( & self . object_id , Some ( rev_ids) ) ?;
204-
205207 self . add ( new_revision, RevisionState :: Sync , true ) . await ?;
206208 Ok ( ( ) )
207209 }
0 commit comments