Skip to content

Commit 42ad5f1

Browse files
committed
Title: Avoid blocking I/O in async paths
- Move storage to async tokio::fs APIs - Await storage calls in CLI/TUI flows - Offload current_dir and client init to blocking thread
1 parent b299d6b commit 42ad5f1

File tree

5 files changed

+120
-68
lines changed

5 files changed

+120
-68
lines changed

src/cli.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,12 @@ pub fn build_config(args: &Cli) -> RunConfig {
163163
/// `silent` controls whether to consume events and suppress output.
164164
async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
165165
let cfg = build_config(&args);
166-
let network_info = crate::network::gather_network_info(&args);
166+
let network_info = tokio::task::spawn_blocking({
167+
let args = args.clone();
168+
move || crate::network::gather_network_info(&args)
169+
})
170+
.await
171+
.context("gather_network_info task panicked")?;
167172
let enriched = if silent {
168173
// In silent mode, spawn task and consume events
169174
let (evt_tx, mut evt_rx) = mpsc::channel::<TestEvent>(2048);
@@ -198,7 +203,7 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
198203
};
199204

200205
// Handle exports (errors will propagate)
201-
handle_exports(&args, &enriched)?;
206+
handle_exports(&args, &enriched).await?;
202207

203208
if !silent {
204209
// Print JSON output in non-silent mode
@@ -208,9 +213,11 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
208213
// Save results if auto_save is enabled
209214
if args.auto_save {
210215
if silent {
211-
crate::storage::save_run(&enriched).context("failed to save run results")?;
216+
crate::storage::save_run(&enriched)
217+
.await
218+
.context("failed to save run results")?;
212219
} else {
213-
if let Ok(p) = crate::storage::save_run(&enriched) {
220+
if let Ok(p) = crate::storage::save_run(&enriched).await {
214221
eprintln!("Saved: {}", p.display());
215222
}
216223
}
@@ -302,10 +309,15 @@ async fn run_text(args: Cli) -> Result<()> {
302309
let result = handle.await??;
303310

304311
// Gather network information and enrich result
305-
let network_info = crate::network::gather_network_info(&args);
312+
let network_info = tokio::task::spawn_blocking({
313+
let args = args.clone();
314+
move || crate::network::gather_network_info(&args)
315+
})
316+
.await
317+
.context("gather_network_info task panicked")?;
306318
let enriched = crate::network::enrich_result(&result, &network_info);
307319

308-
handle_exports(&args, &enriched)?;
320+
handle_exports(&args, &enriched).await?;
309321
if let Some(meta) = enriched.meta.as_ref() {
310322
let extracted = crate::network::extract_metadata(meta);
311323
let ip = extracted.ip.as_deref().unwrap_or("-");
@@ -388,20 +400,20 @@ async fn run_text(args: Cli) -> Result<()> {
388400
);
389401
}
390402
if args.auto_save {
391-
if let Ok(p) = crate::storage::save_run(&enriched) {
403+
if let Ok(p) = crate::storage::save_run(&enriched).await {
392404
eprintln!("Saved: {}", p.display());
393405
}
394406
}
395407
Ok(())
396408
}
397409

398410
/// Handle export operations (JSON and CSV) for both text and JSON modes.
399-
fn handle_exports(args: &Cli, result: &crate::model::RunResult) -> Result<()> {
411+
async fn handle_exports(args: &Cli, result: &crate::model::RunResult) -> Result<()> {
400412
if let Some(p) = args.export_json.as_deref() {
401-
crate::storage::export_json(p, result)?;
413+
crate::storage::export_json(p, result).await?;
402414
}
403415
if let Some(p) = args.export_csv.as_deref() {
404-
crate::storage::export_csv(p, result)?;
416+
crate::storage::export_csv(p, result).await?;
405417
}
406418
Ok(())
407419
}

src/engine/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ mod throughput;
55
mod turn_udp;
66

77
use crate::model::{Phase, RunConfig, RunResult, TestEvent};
8-
use anyhow::Result;
8+
use anyhow::{Context, Result};
99
use std::sync::{
1010
atomic::{AtomicBool, Ordering},
1111
Arc,
@@ -34,7 +34,12 @@ impl TestEngine {
3434
event_tx: mpsc::Sender<TestEvent>,
3535
mut control_rx: mpsc::Receiver<EngineControl>,
3636
) -> Result<RunResult> {
37-
let client = cloudflare::CloudflareClient::new(&self.cfg)?;
37+
let client = tokio::task::spawn_blocking({
38+
let cfg = self.cfg.clone();
39+
move || cloudflare::CloudflareClient::new(&cfg)
40+
})
41+
.await
42+
.context("CloudflareClient::new task panicked")??;
3843

3944
let paused = Arc::new(AtomicBool::new(false));
4045
let cancel = Arc::new(AtomicBool::new(false));

src/network.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,24 @@ pub fn extract_metadata(meta: &Value) -> ExtractedMetadata {
2626
.and_then(|v| v.as_str())
2727
.map(|s| s.to_string());
2828

29-
let asn = meta
30-
.get("asn")
31-
.and_then(|v| {
32-
v.as_i64()
33-
.map(|n| n.to_string())
34-
.or_else(|| v.as_str().map(|s| s.to_string()))
35-
});
29+
let asn = meta.get("asn").and_then(|v| {
30+
v.as_i64()
31+
.map(|n| n.to_string())
32+
.or_else(|| v.as_str().map(|s| s.to_string()))
33+
});
3634

3735
let as_org = ["asOrganization", "asnOrg"]
3836
.iter()
3937
.find_map(|key| meta.get(*key))
4038
.and_then(|v| v.as_str())
4139
.map(|s| s.to_string());
4240

43-
ExtractedMetadata { ip, colo, asn, as_org }
41+
ExtractedMetadata {
42+
ip,
43+
colo,
44+
asn,
45+
as_org,
46+
}
4447
}
4548

4649
/// Network information gathered from the system

src/storage.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@ fn runs_dir() -> PathBuf {
1515
}
1616

1717
/// Ensure the necessary directories exist for storing data.
18-
pub fn ensure_dirs() -> Result<()> {
19-
std::fs::create_dir_all(runs_dir()).context("create runs dir")?;
18+
pub async fn ensure_dirs() -> Result<()> {
19+
tokio::fs::create_dir_all(runs_dir())
20+
.await
21+
.context("create runs dir")?;
2022
Ok(())
2123
}
2224

23-
pub fn save_run(result: &RunResult) -> Result<PathBuf> {
24-
ensure_dirs()?;
25+
pub async fn save_run(result: &RunResult) -> Result<PathBuf> {
26+
ensure_dirs().await?;
2527
let path = get_run_path(result)?;
2628
let data = serde_json::to_vec_pretty(result)?;
27-
std::fs::write(&path, data).context("write run json")?;
29+
tokio::fs::write(&path, data)
30+
.await
31+
.context("write run json")?;
2832
Ok(path)
2933
}
3034

@@ -34,28 +38,36 @@ pub fn get_run_path(result: &RunResult) -> Result<PathBuf> {
3438
Ok(runs_dir().join(format!("run-{safe_ts}-{}.json", result.meas_id)))
3539
}
3640

37-
pub fn delete_run(result: &RunResult) -> Result<()> {
41+
pub async fn delete_run(result: &RunResult) -> Result<()> {
3842
let path = get_run_path(result)?;
39-
if path.exists() {
40-
std::fs::remove_file(&path).context("delete run file")?;
43+
if tokio::fs::try_exists(&path).await.unwrap_or(false) {
44+
tokio::fs::remove_file(&path)
45+
.await
46+
.context("delete run file")?;
4147
}
4248
Ok(())
4349
}
4450

45-
pub fn export_json(path: &Path, result: &RunResult) -> Result<()> {
51+
pub async fn export_json(path: &Path, result: &RunResult) -> Result<()> {
4652
// Create parent directories if they don't exist
4753
if let Some(parent) = path.parent() {
48-
std::fs::create_dir_all(parent).context("create export directory")?;
54+
tokio::fs::create_dir_all(parent)
55+
.await
56+
.context("create export directory")?;
4957
}
5058
let data = serde_json::to_vec_pretty(result)?;
51-
std::fs::write(path, data).context("write export json")?;
59+
tokio::fs::write(path, data)
60+
.await
61+
.context("write export json")?;
5262
Ok(())
5363
}
5464

55-
pub fn export_csv(path: &Path, result: &RunResult) -> Result<()> {
65+
pub async fn export_csv(path: &Path, result: &RunResult) -> Result<()> {
5666
// Create parent directories if they don't exist
5767
if let Some(parent) = path.parent() {
58-
std::fs::create_dir_all(parent).context("create export directory")?;
68+
tokio::fs::create_dir_all(parent)
69+
.await
70+
.context("create export directory")?;
5971
}
6072
let mut out = String::new();
6173
out.push_str("timestamp_utc,base_url,meas_id,comments,server,download_mbps,upload_mbps,idle_mean_ms,idle_median_ms,idle_p25_ms,idle_p75_ms,idle_loss,dl_loaded_mean_ms,dl_loaded_median_ms,dl_loaded_p25_ms,dl_loaded_p75_ms,dl_loaded_loss,ul_loaded_mean_ms,ul_loaded_median_ms,ul_loaded_p25_ms,ul_loaded_p75_ms,ul_loaded_loss,ip,colo,asn,as_org,interface_name,network_name,is_wireless,interface_mac,link_speed_mbps\n");
@@ -93,7 +105,9 @@ pub fn export_csv(path: &Path, result: &RunResult) -> Result<()> {
93105
csv_escape(result.interface_mac.as_deref().unwrap_or("")),
94106
result.link_speed_mbps.map(|s| s.to_string()).unwrap_or_else(|| "".to_string()),
95107
));
96-
std::fs::write(path, out).context("write export csv")?;
108+
tokio::fs::write(path, out)
109+
.await
110+
.context("write export csv")?;
97111
Ok(())
98112
}
99113

@@ -106,17 +120,17 @@ fn csv_escape(s: &str) -> String {
106120
}
107121
}
108122

109-
pub fn load_recent(limit: usize) -> Result<Vec<RunResult>> {
110-
ensure_dirs()?;
123+
pub async fn load_recent(limit: usize) -> Result<Vec<RunResult>> {
124+
ensure_dirs().await?;
111125
let dir = runs_dir();
112126
let mut entries: Vec<(std::time::SystemTime, PathBuf)> = Vec::new();
113-
for e in std::fs::read_dir(&dir).context("read runs dir")? {
114-
let e = e?;
127+
let mut dir_iter = tokio::fs::read_dir(&dir).await.context("read runs dir")?;
128+
while let Some(e) = dir_iter.next_entry().await? {
115129
let p = e.path();
116130
if p.extension().and_then(|e| e.to_str()) != Some("json") {
117131
continue;
118132
}
119-
let m = e.metadata()?;
133+
let m = e.metadata().await?;
120134
let mt = m.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH);
121135
entries.push((mt, p));
122136
}
@@ -125,7 +139,9 @@ pub fn load_recent(limit: usize) -> Result<Vec<RunResult>> {
125139

126140
let mut out = Vec::new();
127141
for (_, p) in entries.into_iter().take(limit) {
128-
let data = std::fs::read(&p).with_context(|| format!("read {}", p.display()))?;
142+
let data = tokio::fs::read(&p)
143+
.await
144+
.with_context(|| format!("read {}", p.display()))?;
129145
let r: RunResult =
130146
serde_json::from_slice(&data).with_context(|| format!("parse {}", p.display()))?;
131147
out.push(r);

0 commit comments

Comments
 (0)