Skip to content

Commit c679cec

Browse files
authored
feat(dfget): enhance download process with dfdaemon client (#1498)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent b3c1733 commit c679cec

File tree

11 files changed

+135
-122
lines changed

11 files changed

+135
-122
lines changed

Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ members = [
1313
]
1414

1515
[workspace.package]
16-
version = "1.1.2"
16+
version = "1.1.3"
1717
authors = ["The Dragonfly Developers"]
1818
homepage = "https://d7y.io/"
1919
repository = "https://github.com/dragonflyoss/client.git"
@@ -23,14 +23,14 @@ readme = "README.md"
2323
edition = "2021"
2424

2525
[workspace.dependencies]
26-
dragonfly-client = { path = "dragonfly-client", version = "1.1.2" }
27-
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.1.2" }
28-
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.1.2" }
29-
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.1.2" }
30-
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.1.2" }
31-
dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.1.2" }
32-
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.1.2" }
33-
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.1.2" }
26+
dragonfly-client = { path = "dragonfly-client", version = "1.1.3" }
27+
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.1.3" }
28+
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.1.3" }
29+
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.1.3" }
30+
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.1.3" }
31+
dragonfly-client-metric = { path = "dragonfly-client-metric", version = "1.1.3" }
32+
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.1.3" }
33+
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.1.3" }
3434
dragonfly-api = "=2.1.81"
3535
thiserror = "2.0"
3636
futures = "0.3.31"

dragonfly-client/src/bin/dfcache/export.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ impl ExportCommand {
547547

548548
// Dfcache needs to write the piece content to the output file.
549549
if let Some(f) = &mut f {
550+
debug!("copy piece {} to {:?} started", piece.number, self.output);
550551
if let Err(err) =f.seek(SeekFrom::Start(piece.offset)).await {
551552
error!("seek {:?} failed: {}", self.output, err);
552553
fs::remove_file(&self.output).await.inspect_err(|err| {
@@ -577,15 +578,6 @@ impl ExportCommand {
577578
return Err(Error::IO(err));
578579
}
579580

580-
if let Err(err) = f.flush().await {
581-
error!("flush {:?} failed: {}", self.output, err);
582-
fs::remove_file(&self.output).await.inspect_err(|err| {
583-
error!("remove file {:?} failed: {}", self.output, err);
584-
})?;
585-
586-
return Err(Error::IO(err));
587-
}
588-
589581
debug!("copy piece {} to {:?} success", piece.number, self.output);
590582
};
591583

@@ -615,6 +607,18 @@ impl ExportCommand {
615607
}
616608
}
617609

610+
if let Some(f) = &mut f {
611+
if let Err(err) = f.flush().await {
612+
error!("flush {:?} failed: {}", self.output, err);
613+
fs::remove_file(&self.output).await.inspect_err(|err| {
614+
error!("remove file {:?} failed: {}", self.output, err);
615+
})?;
616+
617+
return Err(Error::IO(err));
618+
}
619+
};
620+
info!("flush {:?} success", self.output);
621+
618622
progress_bar.finish_with_message("downloaded");
619623
Ok(())
620624
}

dragonfly-client/src/bin/dfget/main.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -959,7 +959,6 @@ async fn download(
959959
// Download file.
960960
let mut downloaded = 0;
961961
let mut out_stream = response.into_inner();
962-
963962
loop {
964963
match out_stream.message().await {
965964
Ok(Some(message)) => {
@@ -997,6 +996,7 @@ async fn download(
997996

998997
// Dfget needs to write the piece content to the output file.
999998
if let Some(f) = &mut f {
999+
debug!("copy piece {} to {:?} started", piece.number, args.output);
10001000
if let Err(err) = f.seek(SeekFrom::Start(piece.offset)).await {
10011001
error!("seek {:?} failed: {}", args.output, err);
10021002
fs::remove_file(&args.output).await.inspect_err(|err| {
@@ -1030,15 +1030,6 @@ async fn download(
10301030
return Err(Error::IO(err));
10311031
}
10321032

1033-
if let Err(err) = f.flush().await {
1034-
error!("flush {:?} failed: {}", args.output, err);
1035-
fs::remove_file(&args.output).await.inspect_err(|err| {
1036-
error!("remove file {:?} failed: {}", args.output, err);
1037-
})?;
1038-
1039-
return Err(Error::IO(err));
1040-
}
1041-
10421033
debug!("copy piece {} to {:?} success", piece.number, args.output);
10431034
}
10441035

@@ -1071,6 +1062,18 @@ async fn download(
10711062
}
10721063
}
10731064

1065+
if let Some(f) = &mut f {
1066+
if let Err(err) = f.flush().await {
1067+
error!("flush {:?} failed: {}", args.output, err);
1068+
fs::remove_file(&args.output).await.inspect_err(|err| {
1069+
error!("remove file {:?} failed: {}", args.output, err);
1070+
})?;
1071+
1072+
return Err(Error::IO(err));
1073+
}
1074+
};
1075+
info!("flush {:?} success", args.output);
1076+
10741077
progress_bar.finish();
10751078
Ok(())
10761079
}

dragonfly-client/src/grpc/dfdaemon_download.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ impl DfdaemonDownloadServer {
166166
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
167167
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
168168
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
169+
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
170+
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
169171
.layer(rate_limit_layer)
170172
.add_service(reflection)
171173
.add_service(health_service)
@@ -404,7 +406,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
404406
let download_clone = download.clone();
405407
let task_manager_clone = self.task.clone();
406408
let task_clone = task.clone();
407-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
409+
let (out_stream_tx, out_stream_rx) = mpsc::channel(16);
408410

409411
// Define the error handler to send the error to the stream.
410412
async fn handle_error(
@@ -993,7 +995,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
993995
let request_clone = request.clone();
994996
let task_manager_clone = self.persistent_cache_task.clone();
995997
let task_clone = task.clone();
996-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
998+
let (out_stream_tx, out_stream_rx) = mpsc::channel(16);
997999

9981000
// Define the error handler to send the error to the stream.
9991001
async fn handle_error(
@@ -1411,6 +1413,8 @@ impl DfdaemonDownloadClient {
14111413
.unwrap()
14121414
.buffer_size(super::BUFFER_SIZE)
14131415
.connect_timeout(super::CONNECT_TIMEOUT)
1416+
.initial_stream_window_size(super::INITIAL_WINDOW_SIZE)
1417+
.initial_connection_window_size(super::INITIAL_WINDOW_SIZE)
14141418
.tcp_keepalive(Some(super::TCP_KEEPALIVE))
14151419
.http2_keep_alive_interval(super::HTTP2_KEEP_ALIVE_INTERVAL)
14161420
.keep_alive_timeout(super::HTTP2_KEEP_ALIVE_TIMEOUT)

dragonfly-client/src/grpc/dfdaemon_upload.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
401401
let download_clone = download.clone();
402402
let task_manager_clone = self.task.clone();
403403
let task_clone = task.clone();
404-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
404+
let (out_stream_tx, out_stream_rx) = mpsc::channel(16);
405405

406406
// Define the error handler to send the error to the stream.
407407
async fn handle_error(
@@ -875,7 +875,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
875875
let download_quic_port = self.config.storage.server.quic_port;
876876

877877
// Initialize stream channel.
878-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
878+
let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
879879
tokio::spawn(
880880
async move {
881881
match task_manager.get(task_id.as_str()) {
@@ -1161,7 +1161,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
11611161
let interface = self.interface.clone();
11621162

11631163
// Initialize stream channel.
1164-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
1164+
let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
11651165
tokio::spawn(
11661166
async move {
11671167
// Start the host info update loop.
@@ -1305,7 +1305,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
13051305
let request_clone = request.clone();
13061306
let task_manager_clone = self.persistent_cache_task.clone();
13071307
let task_clone = task.clone();
1308-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
1308+
let (out_stream_tx, out_stream_rx) = mpsc::channel(16);
13091309

13101310
// Define the error handler to send the error to the stream.
13111311
async fn handle_error(
@@ -1620,7 +1620,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
16201620
let download_quic_port = self.config.storage.server.quic_port;
16211621

16221622
// Initialize stream channel.
1623-
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
1623+
let (out_stream_tx, out_stream_rx) = mpsc::channel(128);
16241624
tokio::spawn(
16251625
async move {
16261626
match persistent_cache_task_manager.get(task_id.as_str()) {

dragonfly-client/src/grpc/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ pub const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(300);
5050
/// HTTP2_KEEP_ALIVE_TIMEOUT is the timeout for HTTP2 keep alive.
5151
pub const HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
5252

53-
/// MAX_FRAME_SIZE is the max frame size for GRPC, default is 4MB.
53+
/// MAX_FRAME_SIZE is the max frame size for GRPC, default is 4MiB.
5454
pub const MAX_FRAME_SIZE: u32 = 4 * 1024 * 1024;
5555

56-
/// BUFFER_SIZE is the buffer size for GRPC, default is 64KB.
57-
pub const BUFFER_SIZE: usize = 64 * 1024;
56+
/// BUFFER_SIZE is the buffer size for GRPC, default is 512KiB.
57+
pub const BUFFER_SIZE: usize = 512 * 1024;
58+
59+
/// INITIAL_WINDOW_SIZE is the initial window size for GRPC, default is 1MiB.
60+
pub const INITIAL_WINDOW_SIZE: u32 = 1024 * 1024;
5861

5962
/// prefetch_task prefetches the task if prefetch flag is true.
6063
#[instrument(skip_all)]

dragonfly-client/src/proxy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ async fn proxy_via_dfdaemon(
767767
};
768768

769769
// Write the status code to the writer.
770-
let (sender, mut receiver) = mpsc::channel(10 * 1024);
770+
let (sender, mut receiver) = mpsc::channel(8);
771771

772772
// Get the read buffer size from the config.
773773
let read_buffer_size = config.proxy.read_buffer_size;

0 commit comments

Comments
 (0)