Skip to content

Commit 6785104

Browse files
authored
chore: send notification for file upload state (#6738)
* chore: send notification for file upload state * chore: fix test
1 parent 193c824 commit 6785104

File tree

6 files changed

+105
-91
lines changed

6 files changed

+105
-91
lines changed

frontend/appflowy_flutter/lib/startup/tasks/file_storage_task.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FileStorageService {
3737
final fileProgress = FileProgress.fromJsonString(event);
3838
if (fileProgress != null) {
3939
Log.debug(
40-
"Upload progress: file: ${fileProgress.fileUrl} ${fileProgress.progress}",
40+
"FileStorageService upload file: ${fileProgress.fileUrl} ${fileProgress.progress}",
4141
);
4242
final notifier = _notifierList[fileProgress.fileUrl];
4343
if (notifier != null) {

frontend/rust-lib/flowy-ai/src/local_ai/local_llm_resource.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ impl LocalAIResourceController {
411411
Ok(())
412412
}
413413

414-
#[instrument(level = "info", skip_all, err)]
414+
#[instrument(level = "info", skip_all)]
415415
pub fn get_chat_config(&self, rag_enabled: bool) -> FlowyResult<AIPluginConfig> {
416416
if !self.is_resource_ready() {
417417
return Err(FlowyError::local_ai().with_context("Local AI resources are not ready"));

frontend/rust-lib/flowy-storage-pub/src/storage.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use serde::Serialize;
77
use std::fmt::Display;
88
use std::ops::{Deref, DerefMut};
99
use tokio::sync::broadcast;
10-
use tracing::warn;
1110

1211
#[async_trait]
1312
pub trait StorageService: Send + Sync {
@@ -68,22 +67,25 @@ pub enum FileUploadState {
6867
#[derive(Clone, Debug, Serialize)]
6968
pub struct FileProgress {
7069
pub file_url: String,
70+
pub file_id: String,
7171
pub progress: f64,
7272
pub error: Option<String>,
7373
}
7474

7575
impl FileProgress {
76-
pub fn new_progress(file_url: String, progress: f64) -> Self {
76+
pub fn new_progress(file_url: String, file_id: String, progress: f64) -> Self {
7777
FileProgress {
7878
file_url,
79-
progress,
79+
file_id,
80+
progress: (progress * 10.0).round() / 10.0,
8081
error: None,
8182
}
8283
}
8384

84-
pub fn new_error(file_url: String, error: String) -> Self {
85+
pub fn new_error(file_url: String, file_id: String, error: String) -> Self {
8586
FileProgress {
8687
file_url,
88+
file_id,
8789
progress: 0.0,
8890
error: Some(error),
8991
}
@@ -92,7 +94,7 @@ impl FileProgress {
9294

9395
impl Display for FileProgress {
9496
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95-
write!(f, "FileProgress: {} - {}", self.file_url, self.progress)
97+
write!(f, "FileProgress: {} - {}", self.file_id, self.progress)
9698
}
9799
}
98100

@@ -122,9 +124,7 @@ impl ProgressNotifier {
122124

123125
pub async fn notify(&mut self, progress: FileUploadState) {
124126
self.current_value = Some(progress.clone());
125-
if let Err(err) = self.tx.send(progress) {
126-
warn!("Failed to send progress notification: {:?}", err);
127-
}
127+
let _ = self.tx.send(progress);
128128
}
129129
}
130130

frontend/rust-lib/flowy-storage/src/manager.rs

Lines changed: 89 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,31 @@ impl StorageManager {
101101
}
102102
});
103103

104+
let mut rx = global_notifier.subscribe();
105+
let weak_notifier = Arc::downgrade(&progress_notifiers);
106+
tokio::spawn(async move {
107+
while let Ok(progress) = rx.recv().await {
108+
if let Some(notifiers) = weak_notifier.upgrade() {
109+
if let Some(mut notifier) = notifiers.get_mut(&progress.file_id) {
110+
if progress.progress >= 1.0 {
111+
let finish = FileUploadState::Finished {
112+
file_id: progress.file_id,
113+
};
114+
notifier.notify(finish).await;
115+
} else {
116+
let progress = FileUploadState::Uploading {
117+
progress: progress.progress,
118+
};
119+
notifier.notify(progress).await;
120+
}
121+
}
122+
} else {
123+
info!("progress notifiers is dropped");
124+
break;
125+
}
126+
}
127+
});
128+
104129
Self {
105130
storage_service,
106131
cloud_service,
@@ -139,6 +164,7 @@ impl StorageManager {
139164

140165
if let Err(err) = self.global_notifier.send(FileProgress::new_progress(
141166
url.to_string(),
167+
file_id.clone(),
142168
if is_finish { 1.0 } else { 0.0 },
143169
)) {
144170
error!("[File] send global notifier failed: {}", err);
@@ -194,7 +220,7 @@ async fn prepare_upload_task(
194220
) -> FlowyResult<()> {
195221
let uid = user_service.user_id()?;
196222
let conn = user_service.sqlite_connection(uid)?;
197-
let upload_files = batch_select_upload_file(conn, 100)?;
223+
let upload_files = batch_select_upload_file(conn, 100, false)?;
198224
let tasks = upload_files
199225
.into_iter()
200226
.map(|upload_file| UploadTask::BackgroundTask {
@@ -317,7 +343,6 @@ impl StorageService for StorageServiceImpl {
317343

318344
// 1. create a file record and chunk the file
319345
let (chunks, record) = create_upload_record(workspace_id, parent_dir, local_file_path).await?;
320-
321346
// 2. save the record to sqlite
322347
let conn = self
323348
.user_service
@@ -379,7 +404,6 @@ impl StorageService for StorageServiceImpl {
379404
&self.temp_storage,
380405
chunks,
381406
file_record,
382-
self.progress_notifiers.clone(),
383407
self.global_notifier.clone(),
384408
)
385409
.await?;
@@ -404,7 +428,6 @@ impl StorageService for StorageServiceImpl {
404428
&self.user_service,
405429
&self.temp_storage,
406430
upload_file,
407-
self.progress_notifiers.clone(),
408431
self.global_notifier.clone(),
409432
)
410433
.await?;
@@ -460,6 +483,7 @@ async fn create_upload_record(
460483
let record = UploadFileTable {
461484
workspace_id,
462485
file_id,
486+
// When the upload_id is empty string, we will create a new upload using [Self::start_upload] method
463487
upload_id: "".to_string(),
464488
parent_dir,
465489
local_file_path,
@@ -479,7 +503,6 @@ async fn start_upload(
479503
temp_storage: &Arc<FileTempStorage>,
480504
mut chunked_bytes: ChunkedBytes,
481505
upload_file: &UploadFileTable,
482-
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
483506
global_notifier: GlobalNotifier,
484507
) -> FlowyResult<()> {
485508
// 4. gather existing completed parts
@@ -503,50 +526,47 @@ async fn start_upload(
503526
);
504527

505528
let mut upload_file = upload_file.clone();
506-
if upload_file.upload_id.is_empty() {
507-
// 1. create upload
508-
trace!(
509-
"[File] create upload for workspace: {}, parent_dir: {}, file_id: {}",
510-
upload_file.workspace_id,
511-
upload_file.parent_dir,
512-
upload_file.file_id
513-
);
514-
515-
let create_upload_resp_result = cloud_service
516-
.create_upload(
517-
&upload_file.workspace_id,
518-
&upload_file.parent_dir,
519-
&upload_file.file_id,
520-
&upload_file.content_type,
521-
)
522-
.await;
523-
if let Err(err) = create_upload_resp_result.as_ref() {
524-
if err.is_file_limit_exceeded() {
525-
make_notification(StorageNotification::FileStorageLimitExceeded)
526-
.payload(err.clone())
527-
.send();
528-
}
529-
}
530-
let create_upload_resp = create_upload_resp_result?;
529+
// 1. create upload
530+
trace!(
531+
"[File] create upload for workspace: {}, parent_dir: {}, file_id: {}",
532+
upload_file.workspace_id,
533+
upload_file.parent_dir,
534+
upload_file.file_id
535+
);
531536

532-
// 2. update upload_id
533-
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
534-
update_upload_file_upload_id(
535-
conn,
537+
let create_upload_resp_result = cloud_service
538+
.create_upload(
536539
&upload_file.workspace_id,
537540
&upload_file.parent_dir,
538541
&upload_file.file_id,
539-
&create_upload_resp.upload_id,
540-
)?;
541-
542-
trace!(
543-
"[File] {} update upload_id: {}",
544-
upload_file.file_id,
545-
create_upload_resp.upload_id
546-
);
547-
// temporary store the upload_id
548-
upload_file.upload_id = create_upload_resp.upload_id;
542+
&upload_file.content_type,
543+
)
544+
.await;
545+
if let Err(err) = create_upload_resp_result.as_ref() {
546+
if err.is_file_limit_exceeded() {
547+
make_notification(StorageNotification::FileStorageLimitExceeded)
548+
.payload(err.clone())
549+
.send();
550+
}
549551
}
552+
let create_upload_resp = create_upload_resp_result?;
553+
554+
// 2. update upload_id
555+
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
556+
update_upload_file_upload_id(
557+
conn,
558+
&upload_file.workspace_id,
559+
&upload_file.parent_dir,
560+
&upload_file.file_id,
561+
&create_upload_resp.upload_id,
562+
)?;
563+
564+
trace!(
565+
"[File] {} update upload_id: {}",
566+
upload_file.file_id,
567+
create_upload_resp.upload_id
568+
);
569+
upload_file.upload_id = create_upload_resp.upload_id;
550570

551571
// 3. start uploading parts
552572
info!(
@@ -586,22 +606,17 @@ async fn start_upload(
586606
.await
587607
{
588608
Ok(resp) => {
589-
let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
590-
trace!(
591-
"[File] {} upload progress:{}, etag: {}",
592-
upload_file.file_id,
593-
progress,
594-
resp.e_tag
595-
);
596-
597-
if let Err(err) = global_notifier.send(FileProgress::new_progress(file_url, progress)) {
598-
error!("[File] send global notifier failed: {}", err);
609+
let mut progress_value = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
610+
// The 0.1 is reserved for the complete_upload progress
611+
if progress_value >= 0.9 {
612+
progress_value = 0.9;
599613
}
614+
let progress =
615+
FileProgress::new_progress(file_url, upload_file.file_id.clone(), progress_value);
616+
trace!("[File] upload progress: {}", progress);
600617

601-
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
602-
notifier
603-
.notify(FileUploadState::Uploading { progress })
604-
.await;
618+
if let Err(err) = global_notifier.send(progress) {
619+
error!("[File] send global notifier failed: {}", err);
605620
}
606621

607622
// gather completed part
@@ -617,7 +632,11 @@ async fn start_upload(
617632
.send();
618633
}
619634

620-
if let Err(err) = global_notifier.send(FileProgress::new_error(file_url, err.msg.clone())) {
635+
if let Err(err) = global_notifier.send(FileProgress::new_error(
636+
file_url,
637+
upload_file.file_id.clone(),
638+
err.msg.clone(),
639+
)) {
621640
error!("[File] send global notifier failed: {}", err);
622641
}
623642
return Err(err);
@@ -632,7 +651,6 @@ async fn start_upload(
632651
temp_storage,
633652
&upload_file,
634653
completed_parts,
635-
&progress_notifiers,
636654
&global_notifier,
637655
)
638656
.await;
@@ -655,7 +673,6 @@ async fn resume_upload(
655673
user_service: &Arc<dyn StorageUserService>,
656674
temp_storage: &Arc<FileTempStorage>,
657675
upload_file: UploadFileTable,
658-
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
659676
global_notifier: GlobalNotifier,
660677
) -> FlowyResult<()> {
661678
trace!(
@@ -675,7 +692,6 @@ async fn resume_upload(
675692
temp_storage,
676693
chunked_bytes,
677694
&upload_file,
678-
progress_notifiers,
679695
global_notifier,
680696
)
681697
.await?;
@@ -740,14 +756,8 @@ async fn complete_upload(
740756
temp_storage: &Arc<FileTempStorage>,
741757
upload_file: &UploadFileTable,
742758
parts: Vec<CompletedPartRequest>,
743-
progress_notifiers: &Arc<DashMap<String, ProgressNotifier>>,
744759
global_notifier: &GlobalNotifier,
745760
) -> Result<(), FlowyError> {
746-
trace!(
747-
"[File]: completing file upload: {}, num parts: {}",
748-
upload_file.file_id,
749-
parts.len()
750-
);
751761
let file_url = cloud_service
752762
.get_object_url_v1(
753763
&upload_file.workspace_id,
@@ -756,6 +766,12 @@ async fn complete_upload(
756766
)
757767
.await?;
758768

769+
info!(
770+
"[File]: completing file upload: {}, num parts: {}, url:{}",
771+
upload_file.file_id,
772+
parts.len(),
773+
file_url
774+
);
759775
match cloud_service
760776
.complete_upload(
761777
&upload_file.workspace_id,
@@ -768,15 +784,7 @@ async fn complete_upload(
768784
{
769785
Ok(_) => {
770786
info!("[File] completed upload file: {}", upload_file.file_id);
771-
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
772-
notifier
773-
.notify(FileUploadState::Finished {
774-
file_id: upload_file.file_id.clone(),
775-
})
776-
.await;
777-
}
778-
779-
let progress = FileProgress::new_progress(file_url, 1.0);
787+
let progress = FileProgress::new_progress(file_url, upload_file.file_id.clone(), 1.0);
780788
info!(
781789
"[File]: notify upload progress:{}, {}",
782790
upload_file.file_id, progress
@@ -796,7 +804,10 @@ async fn complete_upload(
796804
}
797805
},
798806
Err(err) => {
799-
let progress = FileProgress::new_error(file_url, err.msg.clone());
807+
error!("[File] complete upload failed: {}", err);
808+
809+
let progress =
810+
FileProgress::new_error(file_url, upload_file.file_id.clone(), err.msg.clone());
800811
if let Err(send_err) = global_notifier.send(progress) {
801812
error!("[File] send global notifier failed: {}", send_err);
802813
}

frontend/rust-lib/flowy-storage/src/sqlite_sql.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,14 @@ pub fn select_upload_parts(
180180
pub fn batch_select_upload_file(
181181
mut conn: DBConnection,
182182
limit: i32,
183+
is_finish: bool,
183184
) -> FlowyResult<Vec<UploadFileTable>> {
184185
let results = upload_file_table::dsl::upload_file_table
186+
.filter(upload_file_table::is_finish.eq(is_finish))
185187
.order(upload_file_table::created_at.desc())
186188
.limit(limit.into())
187-
.load::<UploadFileTable>(&mut conn)?;
189+
.load::<UploadFileTable>(&mut *conn)?;
190+
188191
Ok(results)
189192
}
190193

0 commit comments

Comments
 (0)