Skip to content

Commit 3c8de8a

Browse files
authored
chore: update upload file logs (#6735)
1 parent d38a5f3 commit 3c8de8a

File tree

4 files changed

+87
-61
lines changed

4 files changed

+87
-61
lines changed

frontend/rust-lib/flowy-core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ collab-entity = { workspace = true }
2929
collab-plugins = { workspace = true }
3030
collab-folder = { workspace = true }
3131

32-
#collab = { workspace = true }
33-
collab = { workspace = true, features = ["verbose_log"] }
32+
collab = { workspace = true }
33+
#collab = { workspace = true, features = ["verbose_log"] }
3434

3535
diesel.workspace = true
3636
uuid.workspace = true

frontend/rust-lib/flowy-storage/src/manager.rs

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::entities::FileStatePB;
22
use crate::file_cache::FileTempStorage;
33
use crate::notification::{make_notification, StorageNotification};
44
use crate::sqlite_sql::{
5-
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
6-
is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed,
7-
update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
5+
batch_select_upload_file, delete_all_upload_parts, delete_upload_file, insert_upload_file,
6+
insert_upload_part, is_upload_completed, select_upload_file, select_upload_parts,
7+
update_upload_file_completed, update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
88
};
99
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
1010
use allo_isolate::Isolate;
@@ -64,7 +64,7 @@ impl StorageManager {
6464
"{}/cache_files",
6565
user_service.get_application_root_dir()
6666
));
67-
let (global_notifier, _) = broadcast::channel(1000);
67+
let (global_notifier, _) = broadcast::channel(2000);
6868
let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path));
6969
let (notifier, notifier_rx) = watch::channel(Signal::Proceed);
7070
let task_queue = Arc::new(UploadTaskQueue::new(notifier));
@@ -205,7 +205,7 @@ async fn prepare_upload_task(
205205
retry_count: 0,
206206
})
207207
.collect::<Vec<_>>();
208-
info!("prepare upload task: {}", tasks.len());
208+
info!("[File] prepare upload task: {}", tasks.len());
209209
uploader.queue_tasks(tasks).await;
210210
Ok(())
211211
}
@@ -359,7 +359,7 @@ impl StorageService for StorageServiceImpl {
359359
},
360360
Err(err) => {
361361
if matches!(err.code, ErrorCode::DuplicateSqliteRecord) {
362-
info!("upload record already exists, skip creating new upload task");
362+
info!("[File] upload record already exists, skip creating new upload task");
363363
Ok::<_, FlowyError>((CreatedUpload { url, file_id }, None))
364364
} else {
365365
Err(err)
@@ -373,7 +373,7 @@ impl StorageService for StorageServiceImpl {
373373
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
374374
})?;
375375

376-
if let Err(err) = start_upload(
376+
start_upload(
377377
&self.cloud_service,
378378
&self.user_service,
379379
&self.temp_storage,
@@ -382,10 +382,8 @@ impl StorageService for StorageServiceImpl {
382382
self.progress_notifiers.clone(),
383383
self.global_notifier.clone(),
384384
)
385-
.await
386-
{
387-
error!("[File] start upload failed: {}", err);
388-
}
385+
.await?;
386+
389387
Ok(())
390388
}
391389

@@ -496,6 +494,7 @@ async fn start_upload(
496494
.collect::<Vec<_>>();
497495

498496
let upload_offset = completed_parts.len() as i32;
497+
let total_parts = chunked_bytes.iter().count();
499498
chunked_bytes.set_current_offset(upload_offset);
500499

501500
info!(
@@ -550,22 +549,29 @@ async fn start_upload(
550549
}
551550

552551
// 3. start uploading parts
553-
trace!(
554-
"[File] {} start uploading parts: {}",
552+
info!(
553+
"[File] {} start uploading parts:{}, offset:{}",
555554
upload_file.file_id,
556-
chunked_bytes.iter().count()
555+
chunked_bytes.iter().count(),
556+
upload_offset,
557557
);
558-
let total_parts = chunked_bytes.iter().count();
559558
let iter = chunked_bytes.iter().enumerate();
560-
561559
for (index, chunk_bytes) in iter {
562560
let part_number = upload_offset + index as i32 + 1;
563-
trace!(
561+
info!(
564562
"[File] {} uploading {}th part, size:{}KB",
565563
upload_file.file_id,
566564
part_number,
567565
chunk_bytes.len() / 1000,
568566
);
567+
568+
let file_url = cloud_service
569+
.get_object_url_v1(
570+
&upload_file.workspace_id,
571+
&upload_file.parent_dir,
572+
&upload_file.file_id,
573+
)
574+
.await?;
569575
// start uploading parts
570576
match upload_part(
571577
cloud_service,
@@ -582,19 +588,12 @@ async fn start_upload(
582588
Ok(resp) => {
583589
let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
584590
trace!(
585-
"[File] {} upload progress: {}",
591+
"[File] {} upload progress:{}, etag: {}",
586592
upload_file.file_id,
587-
progress
593+
progress,
594+
resp.e_tag
588595
);
589596

590-
let file_url = cloud_service
591-
.get_object_url_v1(
592-
&upload_file.workspace_id,
593-
&upload_file.parent_dir,
594-
&upload_file.file_id,
595-
)
596-
.await?;
597-
598597
if let Err(err) = global_notifier.send(FileProgress::new_progress(file_url, progress)) {
599598
error!("[File] send global notifier failed: {}", err);
600599
}
@@ -618,7 +617,9 @@ async fn start_upload(
618617
.send();
619618
}
620619

621-
error!("[File] {} upload part failed: {}", upload_file.file_id, err);
620+
if let Err(err) = global_notifier.send(FileProgress::new_error(file_url, err.msg.clone())) {
621+
error!("[File] send global notifier failed: {}", err);
622+
}
622623
return Err(err);
623624
},
624625
}
@@ -641,9 +642,10 @@ async fn start_upload(
641642
.payload(err.clone())
642643
.send();
643644
}
645+
646+
return Err(err);
644647
}
645648

646-
trace!("[File] {} upload completed", upload_file.file_id);
647649
Ok(())
648650
}
649651

@@ -678,21 +680,18 @@ async fn resume_upload(
678680
)
679681
.await?;
680682
},
681-
Err(err) => {
682-
//
683-
match err.kind() {
684-
ErrorKind::NotFound => {
685-
error!("[File] file not found: {}", upload_file.local_file_path);
686-
if let Ok(uid) = user_service.user_id() {
687-
if let Ok(conn) = user_service.sqlite_connection(uid) {
688-
delete_upload_file(conn, &upload_file.upload_id)?;
689-
}
683+
Err(err) => match err.kind() {
684+
ErrorKind::NotFound => {
685+
error!("[File] file not found: {}", upload_file.local_file_path);
686+
if let Ok(uid) = user_service.user_id() {
687+
if let Ok(conn) = user_service.sqlite_connection(uid) {
688+
delete_upload_file(conn, &upload_file.upload_id)?;
690689
}
691-
},
692-
_ => {
693-
error!("[File] read file failed: {}", err);
694-
},
695-
}
690+
}
691+
},
692+
_ => {
693+
error!("[File] read file failed: {}", err);
694+
},
696695
},
697696
}
698697
Ok(())
@@ -749,6 +748,14 @@ async fn complete_upload(
749748
upload_file.file_id,
750749
parts.len()
751750
);
751+
let file_url = cloud_service
752+
.get_object_url_v1(
753+
&upload_file.workspace_id,
754+
&upload_file.parent_dir,
755+
&upload_file.file_id,
756+
)
757+
.await?;
758+
752759
match cloud_service
753760
.complete_upload(
754761
&upload_file.workspace_id,
@@ -769,19 +776,12 @@ async fn complete_upload(
769776
.await;
770777
}
771778

772-
let file_url = cloud_service
773-
.get_object_url_v1(
774-
&upload_file.workspace_id,
775-
&upload_file.parent_dir,
776-
&upload_file.file_id,
777-
)
778-
.await?;
779-
780779
let progress = FileProgress::new_progress(file_url, 1.0);
781780
info!(
782781
"[File]: notify upload progress:{}, {}",
783782
upload_file.file_id, progress
784783
);
784+
785785
if let Err(err) = global_notifier.send(progress) {
786786
error!("[File] send global notifier failed: {}", err);
787787
}
@@ -796,7 +796,16 @@ async fn complete_upload(
796796
}
797797
},
798798
Err(err) => {
799-
error!("[File] complete upload failed: {}", err);
799+
let progress = FileProgress::new_error(file_url, err.msg.clone());
800+
if let Err(send_err) = global_notifier.send(progress) {
801+
error!("[File] send global notifier failed: {}", send_err);
802+
}
803+
804+
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
805+
if let Err(err) = delete_all_upload_parts(conn, &upload_file.upload_id) {
806+
error!("[File] delete all upload parts failed: {}", err);
807+
}
808+
return Err(err);
800809
},
801810
}
802811
Ok(())

frontend/rust-lib/flowy-storage/src/sqlite_sql.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResul
137137
Ok(())
138138
}
139139

140+
pub fn delete_all_upload_parts(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
141+
diesel::delete(
142+
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
143+
)
144+
.execute(&mut *conn)?;
145+
Ok(())
146+
}
147+
140148
pub fn insert_upload_part(
141149
mut conn: DBConnection,
142150
upload_part: &UploadFilePartTable,

frontend/rust-lib/flowy-storage/src/uploader.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU8};
1010
use std::sync::{Arc, Weak};
1111
use std::time::Duration;
1212
use tokio::sync::{watch, RwLock};
13-
use tracing::{error, info, trace};
13+
use tracing::{error, info, instrument, trace, warn};
1414

1515
#[derive(Clone)]
1616
pub enum Signal {
@@ -34,7 +34,7 @@ impl UploadTaskQueue {
3434
pub async fn queue_task(&self, task: UploadTask) {
3535
trace!("[File] Queued task: {}", task);
3636
self.tasks.write().await.push(task);
37-
let _ = self.notifier.send(Signal::Proceed);
37+
let _ = self.notifier.send_replace(Signal::Proceed);
3838
}
3939
}
4040

@@ -104,6 +104,7 @@ impl FileUploader {
104104
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(3));
105105
}
106106

107+
#[instrument(name = "[File]: process next", level = "debug", skip(self))]
107108
pub async fn process_next(&self) -> Option<()> {
108109
// Do not proceed if the uploader is paused.
109110
if self.pause_sync.load(std::sync::atomic::Ordering::Relaxed) {
@@ -125,6 +126,7 @@ impl FileUploader {
125126
{
126127
// If the current uploads count is greater than or equal to the max uploads, do not proceed.
127128
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(10));
129+
trace!("[File] max uploads reached, process_next after 10 seconds");
128130
return None;
129131
}
130132

@@ -133,13 +135,15 @@ impl FileUploader {
133135
.load(std::sync::atomic::Ordering::SeqCst)
134136
{
135137
// If the storage limitation is enabled, do not proceed.
138+
error!("[File] storage limit exceeded, uploader is disabled");
136139
return None;
137140
}
138141

139142
let task = self.queue.tasks.write().await.pop()?;
140143
if task.retry_count() > 5 {
141144
// If the task has been retried more than 5 times, we should not retry it anymore.
142145
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
146+
warn!("[File] Task has been retried more than 5 times: {}", task);
143147
return None;
144148
}
145149

@@ -166,12 +170,12 @@ impl FileUploader {
166170
.await
167171
{
168172
if err.is_file_limit_exceeded() {
169-
error!("Failed to upload file: {}", err);
173+
error!("[File] Failed to upload file: {}", err);
170174
self.disable_storage_write();
171175
}
172176

173177
info!(
174-
"Failed to upload file: {}, retry_count:{}",
178+
"[File] Failed to upload file: {}, retry_count:{}",
175179
err, retry_count
176180
);
177181

@@ -197,12 +201,12 @@ impl FileUploader {
197201
.await
198202
{
199203
if err.is_file_limit_exceeded() {
200-
error!("Failed to upload file: {}", err);
204+
error!("[File] failed to upload file: {}", err);
201205
self.disable_storage_write();
202206
}
203207

204208
info!(
205-
"Failed to resume upload file: {}, retry_count:{}",
209+
"[File] failed to resume upload file: {}, retry_count:{}",
206210
err, retry_count
207211
);
208212
retry_count += 1;
@@ -216,10 +220,15 @@ impl FileUploader {
216220
}
217221
},
218222
}
223+
219224
self
220225
.current_uploads
221226
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
222-
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
227+
trace!("[File] process_next after 2 seconds");
228+
self
229+
.queue
230+
.notifier
231+
.send_replace(Signal::ProceedAfterSecs(2));
223232
None
224233
}
225234
}

0 commit comments

Comments
 (0)