Skip to content

Commit 71022ed

Browse files
authored
fix: potential async errors (#1772)
1 parent aa15a45 commit 71022ed

File tree

23 files changed

+206
-223
lines changed

23 files changed

+206
-223
lines changed

frontend/appflowy_tauri/src-tauri/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
export * from './classes/flowy-user';
2-
export * from './classes/flowy-document';
3-
export * from './classes/flowy-grid';
4-
export * from './classes/flowy-folder';
5-
export * from './classes/flowy-net';
6-
export * from './classes/flowy-error';
1+
export * from "./classes/flowy-user";
2+
export * from "./classes/flowy-document";
3+
export * from "./classes/flowy-database";
4+
export * from "./classes/flowy-folder";
5+
export * from "./classes/flowy-net";
6+
export * from "./classes/flowy-error";

frontend/rust-lib/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/rust-lib/flowy-core/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ lib-dispatch = { path = "../lib-dispatch" }
1010
lib-log = { path = "../lib-log" }
1111
flowy-user = { path = "../flowy-user" }
1212
flowy-net = { path = "../flowy-net" }
13-
flowy-folder = { path = "../flowy-folder", default-features = false }
14-
flowy-database = { path = "../flowy-database", default-features = false }
13+
flowy-folder = { path = "../flowy-folder" }
14+
flowy-database = { path = "../flowy-database" }
1515
grid-model = { path = "../../../shared-lib/grid-model" }
16+
user-model = { path = "../../../shared-lib/user-model" }
1617
flowy-client-ws = { path = "../../../shared-lib/flowy-client-ws" }
1718
flowy-sqlite = { path = "../flowy-sqlite", optional = true }
18-
flowy-document = { path = "../flowy-document", default-features = false }
19+
flowy-document = { path = "../flowy-document" }
1920
flowy-revision = { path = "../flowy-revision" }
2021
flowy-error = { path = "../flowy-error", features = ["adaptor_ws"] }
2122
flowy-task = { path = "../flowy-task" }
@@ -32,6 +33,7 @@ lib-ws = { path = "../../../shared-lib/lib-ws" }
3233
lib-infra = { path = "../../../shared-lib/lib-infra" }
3334

3435
[features]
36+
default = ["rev-sqlite"]
3537
http_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
3638
native_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
3739
use_bunyan = ["lib-log/use_bunyan"]

frontend/rust-lib/flowy-core/src/lib.rs

Lines changed: 100 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
mod deps_resolve;
22
pub mod module;
3-
pub use flowy_net::get_client_server_configuration;
4-
53
use crate::deps_resolve::*;
6-
74
use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
85
use flowy_database::manager::DatabaseManager;
96
use flowy_document::entities::DocumentVersionPB;
107
use flowy_document::{DocumentConfig, DocumentManager};
8+
use flowy_error::FlowyResult;
119
use flowy_folder::entities::ViewDataFormatPB;
1210
use flowy_folder::{errors::FlowyError, manager::FolderManager};
11+
pub use flowy_net::get_client_server_configuration;
1312
use flowy_net::local_server::LocalServer;
1413
use flowy_net::ClientServerConfiguration;
1514
use flowy_task::{TaskDispatcher, TaskRunner};
16-
use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
15+
use flowy_user::event_map::UserStatusCallback;
16+
use flowy_user::services::{UserSession, UserSessionConfig};
1717
use lib_dispatch::prelude::*;
1818
use lib_dispatch::runtime::tokio_default_runtime;
19+
20+
use lib_infra::future::{to_fut, Fut};
1921
use module::make_plugins;
2022
pub use module::*;
2123
use std::time::Duration;
@@ -27,6 +29,7 @@ use std::{
2729
},
2830
};
2931
use tokio::sync::{broadcast, RwLock};
32+
use user_model::UserProfile;
3033

3134
static INIT_LOG: AtomicBool = AtomicBool::new(false);
3235

@@ -83,9 +86,10 @@ fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
8386
filters.push(format!("flowy_folder={}", level));
8487
filters.push(format!("flowy_user={}", level));
8588
filters.push(format!("flowy_document={}", level));
86-
filters.push(format!("flowy_grid={}", level));
87-
filters.push(format!("flowy_collaboration={}", "info"));
88-
filters.push(format!("flowy_notification={}", level));
89+
filters.push(format!("flowy_database={}", level));
90+
filters.push(format!("flowy_sync={}", "info"));
91+
filters.push(format!("flowy_client_sync={}", "info"));
92+
filters.push(format!("flowy_notification={}", "info"));
8993
filters.push(format!("lib_ot={}", level));
9094
filters.push(format!("lib_ws={}", level));
9195
filters.push(format!("lib_infra={}", level));
@@ -162,6 +166,21 @@ impl FlowySDK {
162166
)
163167
});
164168

169+
let user_status_listener = UserStatusListener {
170+
document_manager: document_manager.clone(),
171+
folder_manager: folder_manager.clone(),
172+
grid_manager: grid_manager.clone(),
173+
ws_conn: ws_conn.clone(),
174+
config: config.clone(),
175+
};
176+
let user_status_callback = UserStatusCallbackImpl {
177+
listener: Arc::new(user_status_listener),
178+
};
179+
let cloned_user_session = user_session.clone();
180+
runtime.block_on(async move {
181+
cloned_user_session.clone().init(user_status_callback).await;
182+
});
183+
165184
let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
166185
make_plugins(
167186
&ws_conn,
@@ -171,16 +190,7 @@ impl FlowySDK {
171190
&document_manager,
172191
)
173192
}));
174-
175-
_start_listening(
176-
&config,
177-
&event_dispatcher,
178-
&ws_conn,
179-
&user_session,
180-
&document_manager,
181-
&folder_manager,
182-
&grid_manager,
183-
);
193+
_start_listening(&event_dispatcher, &ws_conn, &folder_manager);
184194

185195
Self {
186196
config,
@@ -201,36 +211,17 @@ impl FlowySDK {
201211
}
202212

203213
fn _start_listening(
204-
config: &FlowySDKConfig,
205214
event_dispatcher: &AFPluginDispatcher,
206215
ws_conn: &Arc<FlowyWebSocketConnect>,
207-
user_session: &Arc<UserSession>,
208-
document_manager: &Arc<DocumentManager>,
209216
folder_manager: &Arc<FolderManager>,
210-
grid_manager: &Arc<DatabaseManager>,
211217
) {
212-
let subscribe_user_status = user_session.notifier.subscribe_user_status();
213218
let subscribe_network_type = ws_conn.subscribe_network_ty();
214219
let folder_manager = folder_manager.clone();
215-
let grid_manager = grid_manager.clone();
216-
let cloned_folder_manager = folder_manager.clone();
220+
let cloned_folder_manager = folder_manager;
217221
let ws_conn = ws_conn.clone();
218-
let user_session = user_session.clone();
219-
let document_manager = document_manager.clone();
220-
let config = config.clone();
221222

222223
event_dispatcher.spawn(async move {
223-
user_session.init();
224224
listen_on_websocket(ws_conn.clone());
225-
_listen_user_status(
226-
config,
227-
ws_conn.clone(),
228-
subscribe_user_status,
229-
document_manager,
230-
folder_manager,
231-
grid_manager,
232-
)
233-
.await;
234225
});
235226

236227
event_dispatcher.spawn(async move {
@@ -253,66 +244,6 @@ fn mk_local_server(
253244
}
254245
}
255246

256-
async fn _listen_user_status(
257-
config: FlowySDKConfig,
258-
ws_conn: Arc<FlowyWebSocketConnect>,
259-
mut subscribe: broadcast::Receiver<UserStatus>,
260-
document_manager: Arc<DocumentManager>,
261-
folder_manager: Arc<FolderManager>,
262-
grid_manager: Arc<DatabaseManager>,
263-
) {
264-
while let Ok(status) = subscribe.recv().await {
265-
let result = || async {
266-
match status {
267-
UserStatus::Login { token, user_id } => {
268-
tracing::trace!("User did login");
269-
folder_manager.initialize(&user_id, &token).await?;
270-
document_manager.initialize(&user_id).await?;
271-
grid_manager.initialize(&user_id, &token).await?;
272-
ws_conn.start(token, user_id).await?;
273-
}
274-
UserStatus::Logout { token: _, user_id } => {
275-
tracing::trace!("User did logout");
276-
folder_manager.clear(&user_id).await;
277-
ws_conn.stop().await;
278-
}
279-
UserStatus::Expired { token: _, user_id } => {
280-
tracing::trace!("User session has been expired");
281-
folder_manager.clear(&user_id).await;
282-
ws_conn.stop().await;
283-
}
284-
UserStatus::SignUp { profile, ret } => {
285-
tracing::trace!("User did sign up");
286-
287-
let view_data_type = match config.document.version {
288-
DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
289-
DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
290-
};
291-
folder_manager
292-
.initialize_with_new_user(&profile.id, &profile.token, view_data_type)
293-
.await?;
294-
document_manager
295-
.initialize_with_new_user(&profile.id, &profile.token)
296-
.await?;
297-
298-
grid_manager
299-
.initialize_with_new_user(&profile.id, &profile.token)
300-
.await?;
301-
302-
ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
303-
let _ = ret.send(());
304-
}
305-
}
306-
Ok::<(), FlowyError>(())
307-
};
308-
309-
match result().await {
310-
Ok(_) => {}
311-
Err(e) => tracing::error!("{}", e),
312-
}
313-
}
314-
}
315-
316247
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<FolderManager>) {
317248
while let Ok(_new_type) = subscribe.recv().await {
318249
// core.network_state_changed(new_type);
@@ -345,3 +276,75 @@ fn mk_user_session(
345276
let cloud_service = UserDepsResolver::resolve(local_server, server_config);
346277
Arc::new(UserSession::new(user_config, cloud_service))
347278
}
279+
280+
struct UserStatusListener {
281+
document_manager: Arc<DocumentManager>,
282+
folder_manager: Arc<FolderManager>,
283+
grid_manager: Arc<DatabaseManager>,
284+
ws_conn: Arc<FlowyWebSocketConnect>,
285+
config: FlowySDKConfig,
286+
}
287+
288+
impl UserStatusListener {
289+
async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
290+
self.folder_manager.initialize(user_id, token).await?;
291+
self.document_manager.initialize(user_id).await?;
292+
self.grid_manager.initialize(user_id, token).await?;
293+
self.ws_conn.start(token.to_owned(), user_id.to_owned()).await?;
294+
Ok(())
295+
}
296+
297+
async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
298+
let view_data_type = match self.config.document.version {
299+
DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
300+
DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
301+
};
302+
self.folder_manager
303+
.initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type)
304+
.await?;
305+
self.document_manager
306+
.initialize_with_new_user(&user_profile.id, &user_profile.token)
307+
.await?;
308+
309+
self.grid_manager
310+
.initialize_with_new_user(&user_profile.id, &user_profile.token)
311+
.await?;
312+
313+
self.ws_conn
314+
.start(user_profile.token.clone(), user_profile.id.clone())
315+
.await?;
316+
Ok(())
317+
}
318+
319+
async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> {
320+
self.folder_manager.clear(user_id).await;
321+
self.ws_conn.stop().await;
322+
Ok(())
323+
}
324+
}
325+
326+
struct UserStatusCallbackImpl {
327+
listener: Arc<UserStatusListener>,
328+
}
329+
330+
impl UserStatusCallback for UserStatusCallbackImpl {
331+
fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
332+
let listener = self.listener.clone();
333+
let token = token.to_owned();
334+
let user_id = user_id.to_owned();
335+
to_fut(async move { listener.did_sign_in(&token, &user_id).await })
336+
}
337+
338+
fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
339+
let listener = self.listener.clone();
340+
let user_profile = user_profile.clone();
341+
to_fut(async move { listener.did_sign_up(&user_profile).await })
342+
}
343+
344+
fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
345+
let listener = self.listener.clone();
346+
let token = token.to_owned();
347+
let user_id = user_id.to_owned();
348+
to_fut(async move { listener.did_expired(&token, &user_id).await })
349+
}
350+
}

frontend/rust-lib/flowy-database/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ flowy-database = { path = "", features = ["flowy_unit_test"]}
5656
flowy-codegen = { path = "../flowy-codegen"}
5757

5858
[features]
59-
default = []
59+
default = ["rev-sqlite"]
6060
rev-sqlite = ["flowy-sqlite"]
6161
dart = ["flowy-codegen/dart", "flowy-notification/dart"]
6262
ts = ["flowy-codegen/ts", "flowy-notification/ts"]

frontend/rust-lib/flowy-document/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ flowy-codegen = { path = "../flowy-codegen"}
5858

5959

6060
[features]
61+
default = ["rev-sqlite"]
6162
sync = []
6263
cloud_sync = ["sync"]
6364
rev-sqlite = ["flowy-sqlite"]

frontend/rust-lib/flowy-document/src/services/migration.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ impl DocumentMigration {
6565
//
6666

6767
KV::set_bool(&key, true);
68-
tracing::debug!("Run document v1 migration");
6968
Ok(())
7069
}
7170
}

frontend/rust-lib/flowy-folder/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ flowy-codegen = { path = "../flowy-codegen"}
4848

4949

5050
[features]
51-
default = []
51+
default = ["rev-sqlite"]
5252
sync = []
5353
cloud_sync = ["sync"]
5454
rev-sqlite = ["flowy-sqlite", "flowy-folder/rev-sqlite"]

frontend/rust-lib/flowy-folder/src/services/view/controller.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ impl ViewController {
209209
let deleted_view = self
210210
.persistence
211211
.begin_transaction(|transaction| {
212-
let view = transaction.read_view(&view_id)?;
212+
let view = transaction.read_view(view_id)?;
213213
let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?;
214214

215215
let index = views
@@ -223,12 +223,12 @@ impl ViewController {
223223
})
224224
.await?;
225225

226-
send_notification(&view_id, FolderNotification::ViewMoveToTrash)
226+
send_notification(view_id, FolderNotification::ViewMoveToTrash)
227227
.payload(deleted_view)
228228
.send();
229229

230-
let processor = self.get_data_processor_from_view_id(&view_id).await?;
231-
processor.close_view(&view_id).await?;
230+
let processor = self.get_data_processor_from_view_id(view_id).await?;
231+
processor.close_view(view_id).await?;
232232
Ok(())
233233
}
234234

0 commit comments

Comments
 (0)