@@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicI64, Ordering};
1717use std:: sync:: { Arc , Weak } ;
1818use tokio:: sync:: { Mutex , RwLock } ;
1919use tokio_stream:: StreamExt ;
20- use tracing:: { debug, error, event, info, instrument} ;
20+ use tracing:: { debug, error, event, info, instrument, warn } ;
2121
2222use lib_dispatch:: prelude:: af_spawn;
2323use lib_infra:: box_any:: BoxAny ;
@@ -152,56 +152,88 @@ impl UserManager {
152152 user. email
153153 ) ;
154154
155+ self . prepare_user ( & session) . await ;
156+ self . prepare_backup ( & session) . await ;
157+
155158 // Set the token if the current cloud service using token to authenticate
156159 // Currently, only the AppFlowy cloud using token to init the client api.
157- if let Err ( err) = self . cloud_services . set_token ( & user. token ) {
158- error ! ( "Set token failed: {}" , err) ;
159- }
160+ // TODO(nathan): using trait to separate the init process for different cloud service
161+ if user. authenticator . is_appflowy_cloud ( ) {
162+ if let Err ( err) = self . cloud_services . set_token ( & user. token ) {
163+ error ! ( "Set token failed: {}" , err) ;
164+ }
160165
161- // Subscribe the token state
162- let weak_cloud_services = Arc :: downgrade ( & self . cloud_services ) ;
163- let weak_authenticate_user = Arc :: downgrade ( & self . authenticate_user ) ;
164- let weak_pool = Arc :: downgrade ( & self . db_pool ( user. uid ) ?) ;
165- let cloned_session = session. clone ( ) ;
166- if let Some ( mut token_state_rx) = self . cloud_services . subscribe_token_state ( ) {
167- event ! ( tracing:: Level :: DEBUG , "Listen token state change" ) ;
168- let user_uid = user. uid ;
169- let local_token = user. token . clone ( ) ;
170- af_spawn ( async move {
171- while let Some ( token_state) = token_state_rx. next ( ) . await {
172- debug ! ( "Token state changed: {:?}" , token_state) ;
173- match token_state {
174- UserTokenState :: Refresh { token : new_token } => {
175- // Only save the token if the token is different from the current token
176- if new_token != local_token {
177- if let Some ( conn) = weak_pool. upgrade ( ) . and_then ( |pool| pool. get ( ) . ok ( ) ) {
178- // Save the new token
179- if let Err ( err) = save_user_token ( user_uid, conn, new_token) {
180- error ! ( "Save user token failed: {}" , err) ;
166+ // Subscribe the token state
167+ let weak_cloud_services = Arc :: downgrade ( & self . cloud_services ) ;
168+ let weak_authenticate_user = Arc :: downgrade ( & self . authenticate_user ) ;
169+ let weak_pool = Arc :: downgrade ( & self . db_pool ( user. uid ) ?) ;
170+ let cloned_session = session. clone ( ) ;
171+ if let Some ( mut token_state_rx) = self . cloud_services . subscribe_token_state ( ) {
172+ event ! ( tracing:: Level :: DEBUG , "Listen token state change" ) ;
173+ let user_uid = user. uid ;
174+ let local_token = user. token . clone ( ) ;
175+ af_spawn ( async move {
176+ while let Some ( token_state) = token_state_rx. next ( ) . await {
177+ debug ! ( "Token state changed: {:?}" , token_state) ;
178+ match token_state {
179+ UserTokenState :: Refresh { token : new_token } => {
180+ // Only save the token if the token is different from the current token
181+ if new_token != local_token {
182+ if let Some ( conn) = weak_pool. upgrade ( ) . and_then ( |pool| pool. get ( ) . ok ( ) ) {
183+ // Save the new token
184+ if let Err ( err) = save_user_token ( user_uid, conn, new_token) {
185+ error ! ( "Save user token failed: {}" , err) ;
186+ }
181187 }
182188 }
183- }
184- } ,
185- UserTokenState :: Invalid => {
186- // Force user to sign out when the token is invalid
187- if let ( Some ( cloud_services) , Some ( authenticate_user) , Some ( conn) ) = (
188- weak_cloud_services. upgrade ( ) ,
189- weak_authenticate_user. upgrade ( ) ,
190- weak_pool. upgrade ( ) . and_then ( |pool| pool. get ( ) . ok ( ) ) ,
191- ) {
189+ } ,
190+ UserTokenState :: Invalid => {
191+ // Attempt to upgrade the weak reference for cloud_services
192+ let cloud_services = match weak_cloud_services. upgrade ( ) {
193+ Some ( cloud_services) => cloud_services,
194+ None => {
195+ error ! ( "Failed to upgrade weak reference for cloud_services" ) ;
196+ return ; // Exit early if the upgrade fails
197+ } ,
198+ } ;
199+
200+ // Attempt to upgrade the weak reference for authenticate_user
201+ let authenticate_user = match weak_authenticate_user. upgrade ( ) {
202+ Some ( authenticate_user) => authenticate_user,
203+ None => {
204+ warn ! ( "Failed to upgrade weak reference for authenticate_user" ) ;
205+ return ; // Exit early if the upgrade fails
206+ } ,
207+ } ;
208+
209+ // Attempt to upgrade the weak reference for pool and then get a connection
210+ let conn = match weak_pool. upgrade ( ) {
211+ Some ( pool) => match pool. get ( ) {
212+ Ok ( conn) => conn,
213+ Err ( _) => {
214+ warn ! ( "Failed to get connection from pool" ) ;
215+ return ; // Exit early if getting connection fails
216+ } ,
217+ } ,
218+ None => {
219+ warn ! ( "Failed to upgrade weak reference for pool" ) ;
220+ return ; // Exit early if the upgrade fails
221+ } ,
222+ } ;
223+
224+ // If all upgrades succeed, proceed with the sign_out operation
192225 if let Err ( err) =
193226 sign_out ( & cloud_services, & cloned_session, & authenticate_user, conn) . await
194227 {
195228 error ! ( "Sign out when token invalid failed: {:?}" , err) ;
196229 }
197- }
198- } ,
230+ // Force user to sign out when the token is invalid
231+ } ,
232+ }
199233 }
200- }
201- } ) ;
234+ } ) ;
235+ }
202236 }
203- self . prepare_user ( & session) . await ;
204- self . prepare_backup ( & session) . await ;
205237
206238 // Do the user data migration if needed
207239 event ! ( tracing:: Level :: INFO , "Prepare user data migration" ) ;
@@ -270,7 +302,7 @@ impl UserManager {
270302 ///
271303 /// A sign-in notification is also sent after a successful sign-in.
272304 ///
273- #[ tracing:: instrument( level = "debug " , skip( self , params) ) ]
305+ #[ tracing:: instrument( level = "info " , skip( self , params) ) ]
274306 pub async fn sign_in (
275307 & self ,
276308 params : SignInParams ,
0 commit comments