@@ -10,54 +10,146 @@ use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionS
1010use flowy_database:: ConnectionPool ;
1111use flowy_error:: { internal_error, FlowyError , FlowyResult } ;
1212
13- use std:: {
14- borrow:: Cow ,
15- sync:: {
16- atomic:: { AtomicI64 , Ordering :: SeqCst } ,
17- Arc ,
18- } ,
19- } ;
13+ use crate :: RevisionCompact ;
14+ use std:: collections:: VecDeque ;
15+ use std:: { borrow:: Cow , sync:: Arc } ;
16+ use tokio:: sync:: RwLock ;
2017use tokio:: task:: spawn_blocking;
2118
2219pub const REVISION_WRITE_INTERVAL_IN_MILLIS : u64 = 600 ;
2320
2421pub struct RevisionCache {
22+ user_id : String ,
2523 object_id : String ,
2624 disk_cache : Arc < dyn RevisionDiskCache < Error = FlowyError > > ,
2725 memory_cache : Arc < RevisionMemoryCache > ,
28- latest_rev_id : AtomicI64 ,
26+ sync_seq : RwLock < SyncSequence > ,
2927}
3028impl RevisionCache {
3129 pub fn new ( user_id : & str , object_id : & str , pool : Arc < ConnectionPool > ) -> RevisionCache {
3230 let disk_cache = Arc :: new ( SQLitePersistence :: new ( user_id, pool) ) ;
3331 let memory_cache = Arc :: new ( RevisionMemoryCache :: new ( object_id, Arc :: new ( disk_cache. clone ( ) ) ) ) ;
3432 let object_id = object_id. to_owned ( ) ;
33+ let user_id = user_id. to_owned ( ) ;
34+ let sync_seq = RwLock :: new ( SyncSequence :: new ( ) ) ;
3535 Self {
36+ user_id,
3637 object_id,
3738 disk_cache,
3839 memory_cache,
39- latest_rev_id : AtomicI64 :: new ( 0 ) ,
40+ sync_seq,
41+ }
42+ }
43+
44+ /// Save the revision that comes from remote to disk.
45+ #[ tracing:: instrument( level = "trace" , skip( self , revision) , fields( rev_id, object_id=%self . object_id) , err) ]
46+ pub ( crate ) async fn add_ack_revision ( & self , revision : & Revision ) -> FlowyResult < ( ) > {
47+ tracing:: Span :: current ( ) . record ( "rev_id" , & revision. rev_id ) ;
48+ self . add ( revision. clone ( ) , RevisionState :: Ack , true ) . await
49+ }
50+
51+ /// Append the revision that already existed in the local DB state to sync sequence
52+ #[ tracing:: instrument( level = "trace" , skip( self ) , fields( rev_id, object_id=%self . object_id) , err) ]
53+ pub ( crate ) async fn sync_revision ( & self , revision : & Revision ) -> FlowyResult < ( ) > {
54+ tracing:: Span :: current ( ) . record ( "rev_id" , & revision. rev_id ) ;
55+ self . add ( revision. clone ( ) , RevisionState :: Sync , false ) . await ?;
56+ self . sync_seq . write ( ) . await . add ( revision. rev_id ) ?;
57+ Ok ( ( ) )
58+ }
59+
60+ /// Save the revision to disk and append it to the end of the sync sequence.
61+ #[ tracing:: instrument( level = "trace" , skip( self , revision) , fields( rev_id, compact_range, object_id=%self . object_id) , err) ]
62+ pub ( crate ) async fn add_sync_revision < C > ( & self , revision : & Revision ) -> FlowyResult < i64 >
63+ where
64+ C : RevisionCompact ,
65+ {
66+ let result = self . sync_seq . read ( ) . await . compact ( ) ;
67+ match result {
68+ None => {
69+ tracing:: Span :: current ( ) . record ( "rev_id" , & revision. rev_id ) ;
70+ self . add ( revision. clone ( ) , RevisionState :: Sync , true ) . await ?;
71+ self . sync_seq . write ( ) . await . add ( revision. rev_id ) ?;
72+ Ok ( revision. rev_id )
73+ }
74+ Some ( ( range, mut compact_seq) ) => {
75+ tracing:: Span :: current ( ) . record ( "compact_range" , & format ! ( "{}" , range) . as_str ( ) ) ;
76+ let mut revisions = self . revisions_in_range ( & range) . await ?;
77+ if range. to_rev_ids ( ) . len ( ) != revisions. len ( ) {
78+ debug_assert_eq ! ( range. to_rev_ids( ) . len( ) , revisions. len( ) ) ;
79+ }
80+
81+ // append the new revision
82+ revisions. push ( revision. clone ( ) ) ;
83+
84+ // compact multiple revisions into one
85+ let compact_revision = C :: compact_revisions ( & self . user_id , & self . object_id , revisions) ?;
86+ let rev_id = compact_revision. rev_id ;
87+ tracing:: Span :: current ( ) . record ( "rev_id" , & rev_id) ;
88+
89+ // insert new revision
90+ compact_seq. push_back ( rev_id) ;
91+
92+ // replace the revisions in range with compact revision
93+ self . compact ( & range, compact_revision) . await ?;
94+ debug_assert_eq ! ( self . sync_seq. read( ) . await . len( ) , compact_seq. len( ) ) ;
95+ self . sync_seq . write ( ) . await . reset ( compact_seq) ;
96+ Ok ( rev_id)
97+ }
4098 }
4199 }
42100
43- pub async fn add ( & self , revision : Revision , state : RevisionState , write_to_disk : bool ) -> FlowyResult < ( ) > {
101+ /// Remove the revision with rev_id from the sync sequence.
102+ pub ( crate ) async fn ack_revision ( & self , rev_id : i64 ) -> FlowyResult < ( ) > {
103+ if self . sync_seq . write ( ) . await . ack ( & rev_id) . is_ok ( ) {
104+ self . memory_cache . ack ( & rev_id) . await ;
105+ }
106+ Ok ( ( ) )
107+ }
108+
109+ pub ( crate ) async fn next_sync_revision ( & self ) -> FlowyResult < Option < Revision > > {
110+ match self . sync_seq . read ( ) . await . next_rev_id ( ) {
111+ None => Ok ( None ) ,
112+ Some ( rev_id) => Ok ( self . get ( rev_id) . await . map ( |record| record. revision ) ) ,
113+ }
114+ }
115+
116+ /// The cache gets reset while it conflicts with the remote revisions.
117+ #[ tracing:: instrument( level = "trace" , skip( self , revisions) , err) ]
118+ pub ( crate ) async fn reset ( & self , revisions : Vec < Revision > ) -> FlowyResult < ( ) > {
119+ let records = revisions
120+ . to_vec ( )
121+ . into_iter ( )
122+ . map ( |revision| RevisionRecord {
123+ revision,
124+ state : RevisionState :: Sync ,
125+ write_to_disk : false ,
126+ } )
127+ . collect :: < Vec < _ > > ( ) ;
128+
129+ let _ = self
130+ . disk_cache
131+ . delete_and_insert_records ( & self . object_id , None , records. clone ( ) ) ?;
132+ let _ = self . memory_cache . reset_with_revisions ( records) . await ;
133+ self . sync_seq . write ( ) . await . clear ( ) ;
134+ Ok ( ( ) )
135+ }
136+
137+ async fn add ( & self , revision : Revision , state : RevisionState , write_to_disk : bool ) -> FlowyResult < ( ) > {
44138 if self . memory_cache . contains ( & revision. rev_id ) {
45139 tracing:: warn!( "Duplicate revision: {}:{}-{:?}" , self . object_id, revision. rev_id, state) ;
46140 return Ok ( ( ) ) ;
47141 }
48- let rev_id = revision. rev_id ;
49142 let record = RevisionRecord {
50143 revision,
51144 state,
52145 write_to_disk,
53146 } ;
54147
55148 self . memory_cache . add ( Cow :: Owned ( record) ) . await ;
56- self . set_latest_rev_id ( rev_id) ;
57149 Ok ( ( ) )
58150 }
59151
60- pub async fn compact ( & self , range : & RevisionRange , new_revision : Revision ) -> FlowyResult < ( ) > {
152+ async fn compact ( & self , range : & RevisionRange , new_revision : Revision ) -> FlowyResult < ( ) > {
61153 self . memory_cache . remove_with_range ( range) ;
62154 let rev_ids = range. to_rev_ids ( ) ;
63155 let _ = self
@@ -68,10 +160,6 @@ impl RevisionCache {
68160 Ok ( ( ) )
69161 }
70162
71- pub async fn ack ( & self , rev_id : i64 ) {
72- self . memory_cache . ack ( & rev_id) . await ;
73- }
74-
75163 pub async fn get ( & self , rev_id : i64 ) -> Option < RevisionRecord > {
76164 match self . memory_cache . get ( & rev_id) . await {
77165 None => match self
@@ -122,31 +210,6 @@ impl RevisionCache {
122210 . map ( |record| record. revision )
123211 . collect :: < Vec < Revision > > ( ) )
124212 }
125-
126- #[ tracing:: instrument( level = "debug" , skip( self , revisions) , err) ]
127- pub async fn reset_with_revisions ( & self , object_id : & str , revisions : Vec < Revision > ) -> FlowyResult < ( ) > {
128- let records = revisions
129- . to_vec ( )
130- . into_iter ( )
131- . map ( |revision| RevisionRecord {
132- revision,
133- state : RevisionState :: Sync ,
134- write_to_disk : false ,
135- } )
136- . collect :: < Vec < _ > > ( ) ;
137-
138- let _ = self
139- . disk_cache
140- . delete_and_insert_records ( object_id, None , records. clone ( ) ) ?;
141- let _ = self . memory_cache . reset_with_revisions ( records) . await ;
142-
143- Ok ( ( ) )
144- }
145-
146- #[ inline]
147- fn set_latest_rev_id ( & self , rev_id : i64 ) {
148- let _ = self . latest_rev_id . fetch_update ( SeqCst , SeqCst , |_e| Some ( rev_id) ) ;
149- }
150213}
151214
152215pub fn mk_revision_disk_cache (
@@ -196,3 +259,67 @@ impl RevisionRecord {
196259 self . state = RevisionState :: Ack ;
197260 }
198261}
262+
263+ #[ derive( Default ) ]
264+ struct SyncSequence ( VecDeque < i64 > ) ;
265+ impl SyncSequence {
266+ fn new ( ) -> Self {
267+ SyncSequence :: default ( )
268+ }
269+
270+ fn add ( & mut self , new_rev_id : i64 ) -> FlowyResult < ( ) > {
271+ // The last revision's rev_id must be greater than the new one.
272+ if let Some ( rev_id) = self . 0 . back ( ) {
273+ if * rev_id >= new_rev_id {
274+ return Err (
275+ FlowyError :: internal ( ) . context ( format ! ( "The new revision's id must be greater than {}" , rev_id) )
276+ ) ;
277+ }
278+ }
279+ self . 0 . push_back ( new_rev_id) ;
280+ Ok ( ( ) )
281+ }
282+
283+ fn ack ( & mut self , rev_id : & i64 ) -> FlowyResult < ( ) > {
284+ let cur_rev_id = self . 0 . front ( ) . cloned ( ) ;
285+ if let Some ( pop_rev_id) = cur_rev_id {
286+ if & pop_rev_id != rev_id {
287+ let desc = format ! (
288+ "The ack rev_id:{} is not equal to the current rev_id:{}" ,
289+ rev_id, pop_rev_id
290+ ) ;
291+ return Err ( FlowyError :: internal ( ) . context ( desc) ) ;
292+ }
293+ let _ = self . 0 . pop_front ( ) ;
294+ }
295+ Ok ( ( ) )
296+ }
297+
298+ fn next_rev_id ( & self ) -> Option < i64 > {
299+ self . 0 . front ( ) . cloned ( )
300+ }
301+
302+ fn reset ( & mut self , new_seq : VecDeque < i64 > ) {
303+ self . 0 = new_seq;
304+ }
305+
306+ fn clear ( & mut self ) {
307+ self . 0 . clear ( ) ;
308+ }
309+
310+ fn len ( & self ) -> usize {
311+ self . 0 . len ( )
312+ }
313+
314+ // Compact the rev_ids into one except the current synchronizing rev_id.
315+ fn compact ( & self ) -> Option < ( RevisionRange , VecDeque < i64 > ) > {
316+ self . next_rev_id ( ) ?;
317+
318+ let mut new_seq = self . 0 . clone ( ) ;
319+ let mut drained = new_seq. drain ( 1 ..) . collect :: < VecDeque < _ > > ( ) ;
320+
321+ let start = drained. pop_front ( ) ?;
322+ let end = drained. pop_back ( ) . unwrap_or ( start) ;
323+ Some ( ( RevisionRange { start, end } , new_seq) )
324+ }
325+ }
0 commit comments