Skip to content

Commit 9854878

Browse files
committed
feat: stream/write file from request body stream
1 parent 94730c6 commit 9854878

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ version = "1.0.0-pre.cfad518"
2525
[workspace.dependencies]
2626
anyhow = "1.0"
2727
async-trait = "0.1.83"
28+
async-stream = "0.3"
2829
bytes = "1.7.1"
2930
chrono = "0.4.38"
3031
clap = "4.5.20"

src/http-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ path = "src/main.rs"
1818
[dependencies]
1919
anyhow = { workspace = true }
2020
async-trait = { workspace = true }
21+
async-stream = { workspace = true }
2122
bytes = { workspace = true }
2223
chrono = { workspace = true, features = ["serde"] }
2324
clap = { workspace = true, features = ["env", "derive", "std"] }

src/http-server/src/handler/file_explorer/mod.rs

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use proto::{DirectoryEntry, DirectoryIndex, EntryType, Sort};
1919
use rust_embed::Embed;
2020
use tokio::fs::File;
2121
use tokio::io::AsyncWriteExt;
22+
use tokio::sync::mpsc;
2223

2324
use crate::handler::Handler;
2425
use crate::server::{HttpRequest, HttpResponse};
@@ -33,6 +34,12 @@ const X_FILE_NAME_HTTP_HEADER: HeaderName = HeaderName::from_static(X_FILE_NAME)
3334
#[folder = "./ui"]
3435
struct FileExplorerAssets;
3536

37+
#[derive(Debug)]
38+
pub enum UploadFileMessage {
39+
Progress(u64),
40+
Failed(String),
41+
}
42+
3643
pub struct FileExplorer {
3744
file_explorer: core::FileExplorer,
3845
path: PathBuf,
@@ -102,16 +109,44 @@ impl FileExplorer {
102109
.headers
103110
.get(X_FILE_NAME_HTTP_HEADER)
104111
.and_then(|hv| hv.to_str().ok())
105-
.context(format!("Missing '{X_FILE_NAME}' header"))?;
106-
let mut stream = bytes.into_data_stream();
107-
let mut file = File::create(file_name)
108-
.await
109-
.context("Failed to create target file for upload.")?;
110-
111-
while let Some(Ok(bytes)) = stream.next().await {
112-
file.write_all(&bytes)
113-
.await
114-
.context("Failed to write bytes to file")?;
112+
.context(format!("Missing '{X_FILE_NAME}' header"))?
113+
.to_string();
114+
let (tx, mut rx) = mpsc::channel(100);
115+
116+
tokio::spawn(async move {
117+
let mut stream = bytes.into_data_stream();
118+
let mut file = match File::create(file_name).await {
119+
Ok(f) => f,
120+
Err(err) => {
121+
if let Err(err) = tx.send(UploadFileMessage::Failed(err.to_string())).await {
122+
eprintln!("Failed to send message through mpsc channel. {err:?}");
123+
}
124+
125+
return;
126+
}
127+
};
128+
129+
let mut total = 0u64;
130+
131+
while let Some(Ok(bytes)) = stream.next().await {
132+
total += bytes.len() as u64;
133+
134+
if let Err(err) = file.write_all(&bytes).await {
135+
if let Err(err) = tx.send(UploadFileMessage::Failed(err.to_string())).await {
136+
eprintln!("Failed to send message through mpsc channel. {err:?}");
137+
}
138+
139+
break;
140+
}
141+
142+
if let Err(err) = tx.send(UploadFileMessage::Progress(total)).await {
143+
eprintln!("Failed to send message through mpsc channel. {err:?}");
144+
}
145+
}
146+
});
147+
148+
while let Some(message) = rx.recv().await {
149+
println!("{message:?}");
115150
}
116151

117152
Ok(())

0 commit comments

Comments
 (0)