@@ -5,7 +5,7 @@ use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLite
55use crate :: services:: DocumentPersistence ;
66use crate :: { errors:: FlowyError , DocumentCloudService } ;
77use bytes:: Bytes ;
8- use dashmap :: DashMap ;
8+
99use flowy_database:: ConnectionPool ;
1010use flowy_error:: FlowyResult ;
1111use flowy_revision:: {
@@ -16,9 +16,11 @@ use flowy_sync::client_document::initial_delta_document_content;
1616use flowy_sync:: entities:: { document:: DocumentIdPB , revision:: Revision , ws_data:: ServerRevisionWSData } ;
1717use flowy_sync:: util:: md5;
1818use lib_infra:: future:: FutureResult ;
19+ use lib_infra:: ref_map:: { RefCountHashMap , RefCountValue } ;
1920use lib_ws:: WSConnectState ;
2021use std:: any:: Any ;
2122use std:: { convert:: TryInto , sync:: Arc } ;
23+ use tokio:: sync:: RwLock ;
2224
2325pub trait DocumentUser : Send + Sync {
2426 fn user_dir ( & self ) -> Result < String , FlowyError > ;
@@ -76,7 +78,7 @@ impl std::default::Default for DocumentConfig {
7678pub struct DocumentManager {
7779 cloud_service : Arc < dyn DocumentCloudService > ,
7880 rev_web_socket : Arc < dyn RevisionWebSocket > ,
79- editor_map : Arc < DocumentEditorMap > ,
81+ editor_map : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
8082 user : Arc < dyn DocumentUser > ,
8183 persistence : Arc < DocumentPersistence > ,
8284 #[ allow( dead_code) ]
@@ -94,7 +96,7 @@ impl DocumentManager {
9496 Self {
9597 cloud_service,
9698 rev_web_socket,
97- editor_map : Arc :: new ( DocumentEditorMap :: new ( ) ) ,
99+ editor_map : Arc :: new ( RwLock :: new ( RefCountHashMap :: new ( ) ) ) ,
98100 user : document_user,
99101 persistence : Arc :: new ( DocumentPersistence :: new ( database) ) ,
100102 config,
@@ -124,10 +126,10 @@ impl DocumentManager {
124126 }
125127
126128 #[ tracing:: instrument( level = "trace" , skip( self , editor_id) , fields( editor_id) , err) ]
127- pub fn close_document_editor < T : AsRef < str > > ( & self , editor_id : T ) -> Result < ( ) , FlowyError > {
129+ pub async fn close_document_editor < T : AsRef < str > > ( & self , editor_id : T ) -> Result < ( ) , FlowyError > {
128130 let editor_id = editor_id. as_ref ( ) ;
129131 tracing:: Span :: current ( ) . record ( "editor_id" , & editor_id) ;
130- self . editor_map . remove ( editor_id) ;
132+ self . editor_map . write ( ) . await . remove ( editor_id) ;
131133 Ok ( ( ) )
132134 }
133135
@@ -149,9 +151,9 @@ impl DocumentManager {
149151 pub async fn receive_ws_data ( & self , data : Bytes ) {
150152 let result: Result < ServerRevisionWSData , protobuf:: ProtobufError > = data. try_into ( ) ;
151153 match result {
152- Ok ( data) => match self . editor_map . get ( & data. object_id ) {
154+ Ok ( data) => match self . editor_map . read ( ) . await . get ( & data. object_id ) {
153155 None => tracing:: error!( "Can't find any source handler for {:?}-{:?}" , data. object_id, data. ty) ,
154- Some ( editor ) => match editor . receive_ws_data ( data) . await {
156+ Some ( handler ) => match handler . 0 . receive_ws_data ( data) . await {
155157 Ok ( _) => { }
156158 Err ( e) => tracing:: error!( "{}" , e) ,
157159 } ,
@@ -180,13 +182,13 @@ impl DocumentManager {
180182 /// returns: Result<Arc<DocumentEditor>, FlowyError>
181183 ///
182184 async fn get_document_editor ( & self , doc_id : & str ) -> FlowyResult < Arc < dyn DocumentEditor > > {
183- match self . editor_map . get ( doc_id) {
185+ match self . editor_map . read ( ) . await . get ( doc_id) {
184186 None => {
185187 //
186188 tracing:: warn!( "Should call init_document_editor first" ) ;
187189 self . init_document_editor ( doc_id) . await
188190 }
189- Some ( editor ) => Ok ( editor ) ,
191+ Some ( handler ) => Ok ( handler . 0 . clone ( ) ) ,
190192 }
191193 }
192194
@@ -216,14 +218,20 @@ impl DocumentManager {
216218 DeltaDocumentEditor :: new ( doc_id, user, rev_manager, self . rev_web_socket . clone ( ) , cloud_service)
217219 . await ?,
218220 ) ;
219- self . editor_map . insert ( doc_id, editor. clone ( ) ) ;
221+ self . editor_map
222+ . write ( )
223+ . await
224+ . insert ( doc_id. to_string ( ) , RefCountDocumentHandler ( editor. clone ( ) ) ) ;
220225 Ok ( editor)
221226 }
222227 DocumentVersionPB :: V1 => {
223228 let rev_manager = self . make_document_rev_manager ( doc_id, pool. clone ( ) ) ?;
224229 let editor: Arc < dyn DocumentEditor > =
225230 Arc :: new ( AppFlowyDocumentEditor :: new ( doc_id, user, rev_manager, cloud_service) . await ?) ;
226- self . editor_map . insert ( doc_id, editor. clone ( ) ) ;
231+ self . editor_map
232+ . write ( )
233+ . await
234+ . insert ( doc_id. to_string ( ) , RefCountDocumentHandler ( editor. clone ( ) ) ) ;
227235 Ok ( editor)
228236 }
229237 }
@@ -247,7 +255,7 @@ impl DocumentManager {
247255 ) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
248256 let user_id = self . user . user_id ( ) ?;
249257 let disk_cache = SQLiteDocumentRevisionPersistence :: new ( & user_id, pool. clone ( ) ) ;
250- let configuration = RevisionPersistenceConfiguration :: new ( 100 ) ;
258+ let configuration = RevisionPersistenceConfiguration :: new ( 100 , true ) ;
251259 let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache, configuration) ;
252260 // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
253261 let snapshot_persistence = SQLiteRevisionSnapshotPersistence :: new ( doc_id, pool) ;
@@ -268,7 +276,7 @@ impl DocumentManager {
268276 ) -> Result < RevisionManager < Arc < ConnectionPool > > , FlowyError > {
269277 let user_id = self . user . user_id ( ) ?;
270278 let disk_cache = SQLiteDeltaDocumentRevisionPersistence :: new ( & user_id, pool. clone ( ) ) ;
271- let configuration = RevisionPersistenceConfiguration :: new ( 100 ) ;
279+ let configuration = RevisionPersistenceConfiguration :: new ( 100 , true ) ;
272280 let rev_persistence = RevisionPersistence :: new ( & user_id, doc_id, disk_cache, configuration) ;
273281 // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
274282 let snapshot_persistence = SQLiteRevisionSnapshotPersistence :: new ( doc_id, pool) ;
@@ -309,40 +317,32 @@ impl RevisionCloudService for DocumentRevisionCloudService {
309317 }
310318}
311319
312- pub struct DocumentEditorMap {
313- inner : DashMap < String , Arc < dyn DocumentEditor > > ,
314- }
315-
316- impl DocumentEditorMap {
317- fn new ( ) -> Self {
318- Self { inner : DashMap :: new ( ) }
319- }
320+ #[ derive( Clone ) ]
321+ struct RefCountDocumentHandler ( Arc < dyn DocumentEditor > ) ;
320322
321- pub ( crate ) fn insert ( & self , editor_id : & str , editor : Arc < dyn DocumentEditor > ) {
322- if self . inner . contains_key ( editor_id) {
323- log:: warn!( "Editor:{} already open" , editor_id) ;
324- }
325- self . inner . insert ( editor_id. to_string ( ) , editor) ;
323+ impl RefCountValue for RefCountDocumentHandler {
324+ fn did_remove ( & self ) {
325+ self . 0 . close ( ) ;
326326 }
327+ }
327328
328- pub ( crate ) fn get ( & self , editor_id : & str ) -> Option < Arc < dyn DocumentEditor > > {
329- Some ( self . inner . get ( editor_id) ?. clone ( ) )
330- }
329+ impl std:: ops:: Deref for RefCountDocumentHandler {
330+ type Target = Arc < dyn DocumentEditor > ;
331331
332- pub ( crate ) fn remove ( & self , editor_id : & str ) {
333- if let Some ( editor) = self . get ( editor_id) {
334- editor. close ( )
335- }
336- self . inner . remove ( editor_id) ;
332+ fn deref ( & self ) -> & Self :: Target {
333+ & self . 0
337334 }
338335}
339336
340337#[ tracing:: instrument( level = "trace" , skip( web_socket, handlers) ) ]
341- fn listen_ws_state_changed ( web_socket : Arc < dyn RevisionWebSocket > , handlers : Arc < DocumentEditorMap > ) {
338+ fn listen_ws_state_changed (
339+ web_socket : Arc < dyn RevisionWebSocket > ,
340+ handlers : Arc < RwLock < RefCountHashMap < RefCountDocumentHandler > > > ,
341+ ) {
342342 tokio:: spawn ( async move {
343343 let mut notify = web_socket. subscribe_state_changed ( ) . await ;
344344 while let Ok ( state) = notify. recv ( ) . await {
345- handlers. inner . iter ( ) . for_each ( |handler| {
345+ handlers. read ( ) . await . values ( ) . iter ( ) . for_each ( |handler| {
346346 handler. receive_ws_state ( & state) ;
347347 } )
348348 }
0 commit comments