1- use crate :: { core :: ClientDocumentEditor , errors:: FlowyError , DocumentCloudService } ;
1+ use crate :: { editor :: ClientDocumentEditor , errors:: FlowyError , DocumentCloudService } ;
22use async_trait:: async_trait;
33use bytes:: Bytes ;
44use dashmap:: DashMap ;
@@ -29,31 +29,31 @@ pub(crate) trait DocumentWSReceiver: Send + Sync {
2929type WebSocketDataReceivers = Arc < DashMap < String , Arc < dyn DocumentWSReceiver > > > ;
3030pub struct FlowyDocumentManager {
3131 cloud_service : Arc < dyn DocumentCloudService > ,
32- ws_receivers : WebSocketDataReceivers ,
33- web_socket : Arc < dyn RevisionWebSocket > ,
34- open_cache : Arc < OpenDocCache > ,
35- user : Arc < dyn DocumentUser > ,
32+ ws_data_receivers : WebSocketDataReceivers ,
33+ rev_web_socket : Arc < dyn RevisionWebSocket > ,
34+ document_handlers : Arc < DocumentEditorHandlers > ,
35+ document_user : Arc < dyn DocumentUser > ,
3636}
3737
3838impl FlowyDocumentManager {
3939 pub fn new (
4040 cloud_service : Arc < dyn DocumentCloudService > ,
41- user : Arc < dyn DocumentUser > ,
42- web_socket : Arc < dyn RevisionWebSocket > ,
41+ document_user : Arc < dyn DocumentUser > ,
42+ rev_web_socket : Arc < dyn RevisionWebSocket > ,
4343 ) -> Self {
44- let ws_receivers = Arc :: new ( DashMap :: new ( ) ) ;
45- let open_cache = Arc :: new ( OpenDocCache :: new ( ) ) ;
44+ let ws_data_receivers = Arc :: new ( DashMap :: new ( ) ) ;
45+ let document_handlers = Arc :: new ( DocumentEditorHandlers :: new ( ) ) ;
4646 Self {
4747 cloud_service,
48- ws_receivers ,
49- web_socket ,
50- open_cache ,
51- user ,
48+ ws_data_receivers ,
49+ rev_web_socket ,
50+ document_handlers ,
51+ document_user ,
5252 }
5353 }
5454
5555 pub fn init ( & self ) -> FlowyResult < ( ) > {
56- listen_ws_state_changed ( self . web_socket . clone ( ) , self . ws_receivers . clone ( ) ) ;
56+ listen_ws_state_changed ( self . rev_web_socket . clone ( ) , self . ws_data_receivers . clone ( ) ) ;
5757
5858 Ok ( ( ) )
5959 }
@@ -69,17 +69,17 @@ impl FlowyDocumentManager {
6969 pub fn close_document < T : AsRef < str > > ( & self , doc_id : T ) -> Result < ( ) , FlowyError > {
7070 let doc_id = doc_id. as_ref ( ) ;
7171 tracing:: Span :: current ( ) . record ( "doc_id" , & doc_id) ;
72- self . open_cache . remove ( doc_id) ;
73- self . remove_ws_receiver ( doc_id) ;
72+ self . document_handlers . remove ( doc_id) ;
73+ self . ws_data_receivers . remove ( doc_id) ;
7474 Ok ( ( ) )
7575 }
7676
7777 #[ tracing:: instrument( level = "debug" , skip( self , doc_id) , fields( doc_id) , err) ]
7878 pub fn delete < T : AsRef < str > > ( & self , doc_id : T ) -> Result < ( ) , FlowyError > {
7979 let doc_id = doc_id. as_ref ( ) ;
8080 tracing:: Span :: current ( ) . record ( "doc_id" , & doc_id) ;
81- self . open_cache . remove ( doc_id) ;
82- self . remove_ws_receiver ( doc_id) ;
81+ self . document_handlers . remove ( doc_id) ;
82+ self . ws_data_receivers . remove ( doc_id) ;
8383 Ok ( ( ) )
8484 }
8585
@@ -94,18 +94,18 @@ impl FlowyDocumentManager {
9494 } )
9595 }
9696
97- pub async fn save_document < T : AsRef < str > > ( & self , doc_id : T , revisions : RepeatedRevision ) -> FlowyResult < ( ) > {
97+ pub async fn receive_revisions < T : AsRef < str > > ( & self , doc_id : T , revisions : RepeatedRevision ) -> FlowyResult < ( ) > {
9898 let doc_id = doc_id. as_ref ( ) . to_owned ( ) ;
99- let db_pool = self . user . db_pool ( ) ?;
99+ let db_pool = self . document_user . db_pool ( ) ?;
100100 let rev_manager = self . make_rev_manager ( & doc_id, db_pool) ?;
101101 let _ = rev_manager. reset_object ( revisions) . await ?;
102102 Ok ( ( ) )
103103 }
104104
105- pub async fn did_receive_ws_data ( & self , data : Bytes ) {
105+ pub async fn receive_ws_data ( & self , data : Bytes ) {
106106 let result: Result < ServerRevisionWSData , protobuf:: ProtobufError > = data. try_into ( ) ;
107107 match result {
108- Ok ( data) => match self . ws_receivers . get ( & data. object_id ) {
108+ Ok ( data) => match self . ws_data_receivers . get ( & data. object_id ) {
109109 None => tracing:: error!( "Can't find any source handler for {:?}-{:?}" , data. object_id, data. ty) ,
110110 Some ( handler) => match handler. receive_ws_data ( data) . await {
111111 Ok ( _) => { }
@@ -117,19 +117,13 @@ impl FlowyDocumentManager {
117117 }
118118 }
119119 }
120-
121- pub async fn ws_connect_state_changed ( & self , state : & WSConnectState ) {
122- for receiver in self . ws_receivers . iter ( ) {
123- receiver. value ( ) . connect_state_changed ( state. clone ( ) ) ;
124- }
125- }
126120}
127121
128122impl FlowyDocumentManager {
129123 async fn get_editor ( & self , doc_id : & str ) -> FlowyResult < Arc < ClientDocumentEditor > > {
130- match self . open_cache . get ( doc_id) {
124+ match self . document_handlers . get ( doc_id) {
131125 None => {
132- let db_pool = self . user . db_pool ( ) ?;
126+ let db_pool = self . document_user . db_pool ( ) ?;
133127 self . make_editor ( doc_id, db_pool) . await
134128 }
135129 Some ( editor) => Ok ( editor) ,
@@ -141,35 +135,26 @@ impl FlowyDocumentManager {
141135 doc_id : & str ,
142136 pool : Arc < ConnectionPool > ,
143137 ) -> Result < Arc < ClientDocumentEditor > , FlowyError > {
144- let user = self . user . clone ( ) ;
145- let token = self . user . token ( ) ?;
138+ let user = self . document_user . clone ( ) ;
139+ let token = self . document_user . token ( ) ?;
146140 let rev_manager = self . make_rev_manager ( doc_id, pool. clone ( ) ) ?;
147- let server = Arc :: new ( DocumentRevisionCloudServiceImpl {
141+ let cloud_service = Arc :: new ( DocumentRevisionCloudServiceImpl {
148142 token,
149143 server : self . cloud_service . clone ( ) ,
150144 } ) ;
151- let doc_editor = ClientDocumentEditor :: new ( doc_id, user, rev_manager, self . web_socket . clone ( ) , server) . await ?;
152- self . add_ws_receiver ( doc_id, doc_editor. ws_handler ( ) ) ;
153- self . open_cache . insert ( doc_id, & doc_editor) ;
145+ let doc_editor =
146+ ClientDocumentEditor :: new ( doc_id, user, rev_manager, self . rev_web_socket . clone ( ) , cloud_service) . await ?;
147+ self . ws_data_receivers
148+ . insert ( doc_id. to_string ( ) , doc_editor. ws_handler ( ) ) ;
149+ self . document_handlers . insert ( doc_id, & doc_editor) ;
154150 Ok ( doc_editor)
155151 }
156152
157153 fn make_rev_manager ( & self , doc_id : & str , pool : Arc < ConnectionPool > ) -> Result < RevisionManager , FlowyError > {
158- let user_id = self . user . user_id ( ) ?;
154+ let user_id = self . document_user . user_id ( ) ?;
159155 let cache = Arc :: new ( RevisionCache :: new ( & user_id, doc_id, pool) ) ;
160156 Ok ( RevisionManager :: new ( & user_id, doc_id, cache) )
161157 }
162-
163- fn add_ws_receiver ( & self , object_id : & str , receiver : Arc < dyn DocumentWSReceiver > ) {
164- if self . ws_receivers . contains_key ( object_id) {
165- log:: error!( "Duplicate handler registered for {:?}" , object_id) ;
166- }
167- self . ws_receivers . insert ( object_id. to_string ( ) , receiver) ;
168- }
169-
170- fn remove_ws_receiver ( & self , id : & str ) {
171- self . ws_receivers . remove ( id) ;
172- }
173158}
174159
175160struct DocumentRevisionCloudServiceImpl {
@@ -202,11 +187,11 @@ impl RevisionCloudService for DocumentRevisionCloudServiceImpl {
202187 }
203188}
204189
205- pub struct OpenDocCache {
190+ pub struct DocumentEditorHandlers {
206191 inner : DashMap < String , Arc < ClientDocumentEditor > > ,
207192}
208193
209- impl OpenDocCache {
194+ impl DocumentEditorHandlers {
210195 fn new ( ) -> Self {
211196 Self { inner : DashMap :: new ( ) }
212197 }
0 commit comments