@@ -5,22 +5,22 @@ use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLite
55use crate :: services:: DocumentPersistence ;
66use crate :: { errors:: FlowyError , DocumentCloudService } ;
77use bytes:: Bytes ;
8- use dashmap :: DashMap ;
8+
99use flowy_database:: ConnectionPool ;
1010use flowy_error:: FlowyResult ;
1111use flowy_revision:: {
12- RevisionCloudService , RevisionManager , RevisionPersistence , RevisionWebSocket , SQLiteRevisionSnapshotPersistence ,
12+ RevisionCloudService , RevisionManager , RevisionPersistence , RevisionPersistenceConfiguration , RevisionWebSocket ,
13+ SQLiteRevisionSnapshotPersistence ,
1314} ;
1415use flowy_sync:: client_document:: initial_delta_document_content;
15- use flowy_sync:: entities:: {
16- document:: DocumentIdPB ,
17- revision:: { md5, RepeatedRevision , Revision } ,
18- ws_data:: ServerRevisionWSData ,
19- } ;
16+ use flowy_sync:: entities:: { document:: DocumentIdPB , revision:: Revision , ws_data:: ServerRevisionWSData } ;
17+ use flowy_sync:: util:: md5;
2018use lib_infra:: future:: FutureResult ;
19+ use lib_infra:: ref_map:: { RefCountHashMap , RefCountValue } ;
2120use lib_ws:: WSConnectState ;
2221use std:: any:: Any ;
2322use std:: { convert:: TryInto , sync:: Arc } ;
23+ use tokio:: sync:: RwLock ;
2424
2525pub trait DocumentUser : Send + Sync {
2626 fn user_dir ( & self ) -> Result < String , FlowyError > ;
@@ -78,7 +78,7 @@ impl std::default::Default for DocumentConfig {
7878pub struct DocumentManager {
7979 cloud_service : Arc < dyn DocumentCloudService > ,
8080 rev_web_socket : Arc < dyn RevisionWebSocket > ,
81- editor_map : Arc < DocumentEditorMap > ,
81+ editor_map : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
8282 user : Arc < dyn DocumentUser > ,
8383 persistence : Arc < DocumentPersistence > ,
8484 #[ allow( dead_code) ]
@@ -96,7 +96,7 @@ impl DocumentManager {
9696 Self {
9797 cloud_service,
9898 rev_web_socket,
99- editor_map : Arc :: new ( DocumentEditorMap :: new ( ) ) ,
99+ editor_map : Arc :: new ( RwLock :: new ( RefCountHashMap :: new ( ) ) ) ,
100100 user : document_user,
101101 persistence : Arc :: new ( DocumentPersistence :: new ( database) ) ,
102102 config,
@@ -126,10 +126,10 @@ impl DocumentManager {
126126 }
127127
128128 #[ tracing:: instrument( level = "trace" , skip( self , editor_id) , fields( editor_id) , err) ]
129- pub fn close_document_editor < T : AsRef < str > > ( & self , editor_id : T ) -> Result < ( ) , FlowyError > {
129+ pub async fn close_document_editor < T : AsRef < str > > ( & self , editor_id : T ) -> Result < ( ) , FlowyError > {
130130 let editor_id = editor_id. as_ref ( ) ;
131131 tracing:: Span :: current ( ) . record ( "editor_id" , & editor_id) ;
132- self . editor_map . remove ( editor_id) ;
132+ self . editor_map . write ( ) . await . remove ( editor_id) ;
133133 Ok ( ( ) )
134134 }
135135
@@ -139,7 +139,7 @@ impl DocumentManager {
139139 Ok ( ( ) )
140140 }
141141
142- pub async fn create_document < T : AsRef < str > > ( & self , doc_id : T , revisions : RepeatedRevision ) -> FlowyResult < ( ) > {
142+ pub async fn create_document < T : AsRef < str > > ( & self , doc_id : T , revisions : Vec < Revision > ) -> FlowyResult < ( ) > {
143143 let doc_id = doc_id. as_ref ( ) . to_owned ( ) ;
144144 let db_pool = self . persistence . database . db_pool ( ) ?;
145145 // Maybe we could save the document to disk without creating the RevisionManager
@@ -151,9 +151,9 @@ impl DocumentManager {
151151 pub async fn receive_ws_data ( & self , data : Bytes ) {
152152 let result: Result < ServerRevisionWSData , protobuf:: ProtobufError > = data. try_into ( ) ;
153153 match result {
154- Ok ( data) => match self . editor_map . get ( & data. object_id ) {
154+ Ok ( data) => match self . editor_map . read ( ) . await . get ( & data. object_id ) {
155155 None => tracing:: error!( "Can't find any source handler for {:?}-{:?}" , data. object_id, data. ty) ,
156- Some ( editor ) => match editor . receive_ws_data ( data) . await {
156+ Some ( handler ) => match handler . 0 . receive_ws_data ( data) . await {
157157 Ok ( _) => { }
158158 Err ( e) => tracing:: error!( "{}" , e) ,
159159 } ,
@@ -182,13 +182,13 @@ impl DocumentManager {
182182 /// returns: Result<Arc<DocumentEditor>, FlowyError>
183183 ///
184184 async fn get_document_editor ( & self , doc_id : & str ) -> FlowyResult < Arc < dyn DocumentEditor > > {
185- match self . editor_map . get ( doc_id) {
185+ match self . editor_map . read ( ) . await . get ( doc_id) {
186186 None => {
187187 //
188188 tracing:: warn!( "Should call init_document_editor first" ) ;
189189 self . init_document_editor ( doc_id) . await
190190 }
191- Some ( editor ) => Ok ( editor ) ,
191+ Some ( handler ) => Ok ( handler . 0 . clone ( ) ) ,
192192 }
193193 }
194194
@@ -218,14 +218,20 @@ impl DocumentManager {
218218 DeltaDocumentEditor :: new ( doc_id, user, rev_manager, self . rev_web_socket . clone ( ) , cloud_service)
219219 . await ?,
220220 ) ;
221- self . editor_map . insert ( doc_id, editor. clone ( ) ) ;
221+ self . editor_map
222+ . write ( )
223+ . await
224+ . insert ( doc_id. to_string ( ) , RefCountDocumentHandler ( editor. clone ( ) ) ) ;
222225 Ok ( editor)
223226 }
224227 DocumentVersionPB :: V1 => {
225228 let rev_manager = self . make_document_rev_manager ( doc_id, pool. clone ( ) ) ?;
226229 let editor: Arc < dyn DocumentEditor > =
227230 Arc :: new ( AppFlowyDocumentEditor :: new ( doc_id, user, rev_manager, cloud_service) . await ?) ;
228- self . editor_map . insert ( doc_id, editor. clone ( ) ) ;
231+ self . editor_map
232+ . write ( )
233+ . await
234+ . insert ( doc_id. to_string ( ) , RefCountDocumentHandler ( editor. clone ( ) ) ) ;
229235 Ok ( editor)
230236 }
231237 }
@@ -249,7 +255,8 @@ impl DocumentManager {
249255 ) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
250256 let user_id = self . user . user_id ( ) ?;
251257 let disk_cache = SQLiteDocumentRevisionPersistence :: new ( & user_id, pool. clone ( ) ) ;
252- let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache) ;
258+ let configuration = RevisionPersistenceConfiguration :: new ( 100 , true ) ;
259+ let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache, configuration) ;
253260 // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
254261 let snapshot_persistence = SQLiteRevisionSnapshotPersistence :: new ( doc_id, pool) ;
255262 Ok ( RevisionManager :: new (
@@ -269,7 +276,8 @@ impl DocumentManager {
269276 ) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
270277 let user_id = self . user . user_id ( ) ?;
271278 let disk_cache = SQLiteDeltaDocumentRevisionPersistence :: new ( & user_id, pool. clone ( ) ) ;
272- let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache) ;
279+ let configuration = RevisionPersistenceConfiguration :: new ( 100 , true ) ;
280+ let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache, configuration) ;
273281 // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
274282 let snapshot_persistence = SQLiteRevisionSnapshotPersistence :: new ( doc_id, pool) ;
275283 Ok ( RevisionManager :: new (
@@ -294,63 +302,47 @@ impl RevisionCloudService for DocumentRevisionCloudService {
294302 let params: DocumentIdPB = object_id. to_string ( ) . into ( ) ;
295303 let server = self . server . clone ( ) ;
296304 let token = self . token . clone ( ) ;
297- let user_id = user_id. to_string ( ) ;
298305
299306 FutureResult :: new ( async move {
300307 match server. fetch_document ( & token, params) . await ? {
301308 None => Err ( FlowyError :: record_not_found ( ) . context ( "Remote doesn't have this document" ) ) ,
302309 Some ( payload) => {
303310 let bytes = Bytes :: from ( payload. content . clone ( ) ) ;
304311 let doc_md5 = md5 ( & bytes) ;
305- let revision = Revision :: new (
306- & payload. doc_id ,
307- payload. base_rev_id ,
308- payload. rev_id ,
309- bytes,
310- & user_id,
311- doc_md5,
312- ) ;
312+ let revision = Revision :: new ( & payload. doc_id , payload. base_rev_id , payload. rev_id , bytes, doc_md5) ;
313313 Ok ( vec ! [ revision] )
314314 }
315315 }
316316 } )
317317 }
318318}
319319
320- pub struct DocumentEditorMap {
321- inner : DashMap < String , Arc < dyn DocumentEditor > > ,
322- }
320+ #[ derive( Clone ) ]
321+ struct RefCountDocumentHandler ( Arc < dyn DocumentEditor > ) ;
323322
324- impl DocumentEditorMap {
325- fn new ( ) -> Self {
326- Self { inner : DashMap :: new ( ) }
327- }
328-
329- pub ( crate ) fn insert ( & self , editor_id : & str , editor : Arc < dyn DocumentEditor > ) {
330- if self . inner . contains_key ( editor_id) {
331- log:: warn!( "Editor:{} already open" , editor_id) ;
332- }
333- self . inner . insert ( editor_id. to_string ( ) , editor) ;
323+ impl RefCountValue for RefCountDocumentHandler {
324+ fn did_remove ( & self ) {
325+ self . 0 . close ( ) ;
334326 }
327+ }
335328
336- pub ( crate ) fn get ( & self , editor_id : & str ) -> Option < Arc < dyn DocumentEditor > > {
337- Some ( self . inner . get ( editor_id) ?. clone ( ) )
338- }
329+ impl std:: ops:: Deref for RefCountDocumentHandler {
330+ type Target = Arc < dyn DocumentEditor > ;
339331
340- pub ( crate ) fn remove ( & self , editor_id : & str ) {
341- if let Some ( editor) = self . get ( editor_id) {
342- editor. close ( )
343- }
344- self . inner . remove ( editor_id) ;
332+ fn deref ( & self ) -> & Self :: Target {
333+ & self . 0
345334 }
346335}
347336
348337#[ tracing:: instrument( level = "trace" , skip( web_socket, handlers) ) ]
349- fn listen_ws_state_changed ( web_socket : Arc < dyn RevisionWebSocket > , handlers : Arc < DocumentEditorMap > ) {
338+ fn listen_ws_state_changed (
339+ web_socket : Arc < dyn RevisionWebSocket > ,
340+ handlers : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
341+ ) {
350342 tokio:: spawn ( async move {
351343 let mut notify = web_socket. subscribe_state_changed ( ) . await ;
352344 while let Ok ( state) = notify. recv ( ) . await {
353- handlers. inner . iter ( ) . for_each ( |handler| {
345+ handlers. read ( ) . await . values ( ) . iter ( ) . for_each ( |handler| {
354346 handler. receive_ws_state ( & state) ;
355347 } )
356348 }
0 commit comments