@@ -101,6 +101,31 @@ impl StorageManager {
101101 }
102102 } ) ;
103103
104+ let mut rx = global_notifier. subscribe ( ) ;
105+ let weak_notifier = Arc :: downgrade ( & progress_notifiers) ;
106+ tokio:: spawn ( async move {
107+ while let Ok ( progress) = rx. recv ( ) . await {
108+ if let Some ( notifiers) = weak_notifier. upgrade ( ) {
109+ if let Some ( mut notifier) = notifiers. get_mut ( & progress. file_id ) {
110+ if progress. progress >= 1.0 {
111+ let finish = FileUploadState :: Finished {
112+ file_id : progress. file_id ,
113+ } ;
114+ notifier. notify ( finish) . await ;
115+ } else {
116+ let progress = FileUploadState :: Uploading {
117+ progress : progress. progress ,
118+ } ;
119+ notifier. notify ( progress) . await ;
120+ }
121+ }
122+ } else {
123+ info ! ( "progress notifiers is dropped" ) ;
124+ break ;
125+ }
126+ }
127+ } ) ;
128+
104129 Self {
105130 storage_service,
106131 cloud_service,
@@ -139,6 +164,7 @@ impl StorageManager {
139164
140165 if let Err ( err) = self . global_notifier . send ( FileProgress :: new_progress (
141166 url. to_string ( ) ,
167+ file_id. clone ( ) ,
142168 if is_finish { 1.0 } else { 0.0 } ,
143169 ) ) {
144170 error ! ( "[File] send global notifier failed: {}" , err) ;
@@ -194,7 +220,7 @@ async fn prepare_upload_task(
194220) -> FlowyResult < ( ) > {
195221 let uid = user_service. user_id ( ) ?;
196222 let conn = user_service. sqlite_connection ( uid) ?;
197- let upload_files = batch_select_upload_file ( conn, 100 ) ?;
223+ let upload_files = batch_select_upload_file ( conn, 100 , false ) ?;
198224 let tasks = upload_files
199225 . into_iter ( )
200226 . map ( |upload_file| UploadTask :: BackgroundTask {
@@ -317,7 +343,6 @@ impl StorageService for StorageServiceImpl {
317343
318344 // 1. create a file record and chunk the file
319345 let ( chunks, record) = create_upload_record ( workspace_id, parent_dir, local_file_path) . await ?;
320-
321346 // 2. save the record to sqlite
322347 let conn = self
323348 . user_service
@@ -379,7 +404,6 @@ impl StorageService for StorageServiceImpl {
379404 & self . temp_storage ,
380405 chunks,
381406 file_record,
382- self . progress_notifiers . clone ( ) ,
383407 self . global_notifier . clone ( ) ,
384408 )
385409 . await ?;
@@ -404,7 +428,6 @@ impl StorageService for StorageServiceImpl {
404428 & self . user_service ,
405429 & self . temp_storage ,
406430 upload_file,
407- self . progress_notifiers . clone ( ) ,
408431 self . global_notifier . clone ( ) ,
409432 )
410433 . await ?;
@@ -460,6 +483,7 @@ async fn create_upload_record(
460483 let record = UploadFileTable {
461484 workspace_id,
462485 file_id,
486+ // When the upload_id is empty string, we will create a new upload using [Self::start_upload] method
463487 upload_id : "" . to_string ( ) ,
464488 parent_dir,
465489 local_file_path,
@@ -479,7 +503,6 @@ async fn start_upload(
479503 temp_storage : & Arc < FileTempStorage > ,
480504 mut chunked_bytes : ChunkedBytes ,
481505 upload_file : & UploadFileTable ,
482- progress_notifiers : Arc < DashMap < String , ProgressNotifier > > ,
483506 global_notifier : GlobalNotifier ,
484507) -> FlowyResult < ( ) > {
485508 // 4. gather existing completed parts
@@ -503,50 +526,47 @@ async fn start_upload(
503526 ) ;
504527
505528 let mut upload_file = upload_file. clone ( ) ;
506- if upload_file. upload_id . is_empty ( ) {
507- // 1. create upload
508- trace ! (
509- "[File] create upload for workspace: {}, parent_dir: {}, file_id: {}" ,
510- upload_file. workspace_id,
511- upload_file. parent_dir,
512- upload_file. file_id
513- ) ;
514-
515- let create_upload_resp_result = cloud_service
516- . create_upload (
517- & upload_file. workspace_id ,
518- & upload_file. parent_dir ,
519- & upload_file. file_id ,
520- & upload_file. content_type ,
521- )
522- . await ;
523- if let Err ( err) = create_upload_resp_result. as_ref ( ) {
524- if err. is_file_limit_exceeded ( ) {
525- make_notification ( StorageNotification :: FileStorageLimitExceeded )
526- . payload ( err. clone ( ) )
527- . send ( ) ;
528- }
529- }
530- let create_upload_resp = create_upload_resp_result?;
529+ // 1. create upload
530+ trace ! (
531+ "[File] create upload for workspace: {}, parent_dir: {}, file_id: {}" ,
532+ upload_file. workspace_id,
533+ upload_file. parent_dir,
534+ upload_file. file_id
535+ ) ;
531536
532- // 2. update upload_id
533- let conn = user_service. sqlite_connection ( user_service. user_id ( ) ?) ?;
534- update_upload_file_upload_id (
535- conn,
537+ let create_upload_resp_result = cloud_service
538+ . create_upload (
536539 & upload_file. workspace_id ,
537540 & upload_file. parent_dir ,
538541 & upload_file. file_id ,
539- & create_upload_resp. upload_id ,
540- ) ?;
541-
542- trace ! (
543- "[File] {} update upload_id: {}" ,
544- upload_file. file_id,
545- create_upload_resp. upload_id
546- ) ;
547- // temporary store the upload_id
548- upload_file. upload_id = create_upload_resp. upload_id ;
542+ & upload_file. content_type ,
543+ )
544+ . await ;
545+ if let Err ( err) = create_upload_resp_result. as_ref ( ) {
546+ if err. is_file_limit_exceeded ( ) {
547+ make_notification ( StorageNotification :: FileStorageLimitExceeded )
548+ . payload ( err. clone ( ) )
549+ . send ( ) ;
550+ }
549551 }
552+ let create_upload_resp = create_upload_resp_result?;
553+
554+ // 2. update upload_id
555+ let conn = user_service. sqlite_connection ( user_service. user_id ( ) ?) ?;
556+ update_upload_file_upload_id (
557+ conn,
558+ & upload_file. workspace_id ,
559+ & upload_file. parent_dir ,
560+ & upload_file. file_id ,
561+ & create_upload_resp. upload_id ,
562+ ) ?;
563+
564+ trace ! (
565+ "[File] {} update upload_id: {}" ,
566+ upload_file. file_id,
567+ create_upload_resp. upload_id
568+ ) ;
569+ upload_file. upload_id = create_upload_resp. upload_id ;
550570
551571 // 3. start uploading parts
552572 info ! (
@@ -586,22 +606,17 @@ async fn start_upload(
586606 . await
587607 {
588608 Ok ( resp) => {
589- let progress = ( part_number as f64 / total_parts as f64 ) . clamp ( 0.0 , 1.0 ) ;
590- trace ! (
591- "[File] {} upload progress:{}, etag: {}" ,
592- upload_file. file_id,
593- progress,
594- resp. e_tag
595- ) ;
596-
597- if let Err ( err) = global_notifier. send ( FileProgress :: new_progress ( file_url, progress) ) {
598- error ! ( "[File] send global notifier failed: {}" , err) ;
609+ let mut progress_value = ( part_number as f64 / total_parts as f64 ) . clamp ( 0.0 , 1.0 ) ;
610+ // The 0.1 is reserved for the complete_upload progress
611+ if progress_value >= 0.9 {
612+ progress_value = 0.9 ;
599613 }
614+ let progress =
615+ FileProgress :: new_progress ( file_url, upload_file. file_id . clone ( ) , progress_value) ;
616+ trace ! ( "[File] upload progress: {}" , progress) ;
600617
601- if let Some ( mut notifier) = progress_notifiers. get_mut ( & upload_file. file_id ) {
602- notifier
603- . notify ( FileUploadState :: Uploading { progress } )
604- . await ;
618+ if let Err ( err) = global_notifier. send ( progress) {
619+ error ! ( "[File] send global notifier failed: {}" , err) ;
605620 }
606621
607622 // gather completed part
@@ -617,7 +632,11 @@ async fn start_upload(
617632 . send ( ) ;
618633 }
619634
620- if let Err ( err) = global_notifier. send ( FileProgress :: new_error ( file_url, err. msg . clone ( ) ) ) {
635+ if let Err ( err) = global_notifier. send ( FileProgress :: new_error (
636+ file_url,
637+ upload_file. file_id . clone ( ) ,
638+ err. msg . clone ( ) ,
639+ ) ) {
621640 error ! ( "[File] send global notifier failed: {}" , err) ;
622641 }
623642 return Err ( err) ;
@@ -632,7 +651,6 @@ async fn start_upload(
632651 temp_storage,
633652 & upload_file,
634653 completed_parts,
635- & progress_notifiers,
636654 & global_notifier,
637655 )
638656 . await ;
@@ -655,7 +673,6 @@ async fn resume_upload(
655673 user_service : & Arc < dyn StorageUserService > ,
656674 temp_storage : & Arc < FileTempStorage > ,
657675 upload_file : UploadFileTable ,
658- progress_notifiers : Arc < DashMap < String , ProgressNotifier > > ,
659676 global_notifier : GlobalNotifier ,
660677) -> FlowyResult < ( ) > {
661678 trace ! (
@@ -675,7 +692,6 @@ async fn resume_upload(
675692 temp_storage,
676693 chunked_bytes,
677694 & upload_file,
678- progress_notifiers,
679695 global_notifier,
680696 )
681697 . await ?;
@@ -740,14 +756,8 @@ async fn complete_upload(
740756 temp_storage : & Arc < FileTempStorage > ,
741757 upload_file : & UploadFileTable ,
742758 parts : Vec < CompletedPartRequest > ,
743- progress_notifiers : & Arc < DashMap < String , ProgressNotifier > > ,
744759 global_notifier : & GlobalNotifier ,
745760) -> Result < ( ) , FlowyError > {
746- trace ! (
747- "[File]: completing file upload: {}, num parts: {}" ,
748- upload_file. file_id,
749- parts. len( )
750- ) ;
751761 let file_url = cloud_service
752762 . get_object_url_v1 (
753763 & upload_file. workspace_id ,
@@ -756,6 +766,12 @@ async fn complete_upload(
756766 )
757767 . await ?;
758768
769+ info ! (
770+ "[File]: completing file upload: {}, num parts: {}, url:{}" ,
771+ upload_file. file_id,
772+ parts. len( ) ,
773+ file_url
774+ ) ;
759775 match cloud_service
760776 . complete_upload (
761777 & upload_file. workspace_id ,
@@ -768,15 +784,7 @@ async fn complete_upload(
768784 {
769785 Ok ( _) => {
770786 info ! ( "[File] completed upload file: {}" , upload_file. file_id) ;
771- if let Some ( mut notifier) = progress_notifiers. get_mut ( & upload_file. file_id ) {
772- notifier
773- . notify ( FileUploadState :: Finished {
774- file_id : upload_file. file_id . clone ( ) ,
775- } )
776- . await ;
777- }
778-
779- let progress = FileProgress :: new_progress ( file_url, 1.0 ) ;
787+ let progress = FileProgress :: new_progress ( file_url, upload_file. file_id . clone ( ) , 1.0 ) ;
780788 info ! (
781789 "[File]: notify upload progress:{}, {}" ,
782790 upload_file. file_id, progress
@@ -796,7 +804,10 @@ async fn complete_upload(
796804 }
797805 } ,
798806 Err ( err) => {
799- let progress = FileProgress :: new_error ( file_url, err. msg . clone ( ) ) ;
807+ error ! ( "[File] complete upload failed: {}" , err) ;
808+
809+ let progress =
810+ FileProgress :: new_error ( file_url, upload_file. file_id . clone ( ) , err. msg . clone ( ) ) ;
800811 if let Err ( send_err) = global_notifier. send ( progress) {
801812 error ! ( "[File] send global notifier failed: {}" , send_err) ;
802813 }
0 commit comments