Skip to content

Commit bf361dd

Browse files
committed
Decouple output and move TUI off Tokio
Route CLI output through a writer task, move TUI rendering to a dedicated thread, and simplify event flow with unbounded channels for non-blocking hot paths
1 parent b299d6b commit bf361dd

File tree

11 files changed

+642
-464
lines changed

11 files changed

+642
-464
lines changed

src/cli.rs

Lines changed: 89 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,43 @@ use crate::model::{RunConfig, TestEvent};
33
use anyhow::{Context, Result};
44
use clap::Parser;
55
use rand::RngCore;
6+
use std::io::Write;
67
use std::time::Duration;
78
use tokio::sync::mpsc;
89

10+
enum OutputLine {
11+
Stdout(String),
12+
Stderr(String),
13+
}
14+
15+
fn spawn_output_writer() -> (
16+
mpsc::UnboundedSender<OutputLine>,
17+
tokio::task::JoinHandle<()>,
18+
) {
19+
let (tx, mut rx) = mpsc::unbounded_channel::<OutputLine>();
20+
let handle = tokio::task::spawn_blocking(move || {
21+
let stdout = std::io::stdout();
22+
let stderr = std::io::stderr();
23+
let mut out = std::io::BufWriter::new(stdout.lock());
24+
let mut err = std::io::BufWriter::new(stderr.lock());
25+
26+
while let Some(line) = rx.blocking_recv() {
27+
match line {
28+
OutputLine::Stdout(msg) => {
29+
let _ = writeln!(out, "{}", msg);
30+
}
31+
OutputLine::Stderr(msg) => {
32+
let _ = writeln!(err, "{}", msg);
33+
}
34+
}
35+
}
36+
37+
let _ = out.flush();
38+
let _ = err.flush();
39+
});
40+
(tx, handle)
41+
}
42+
943
#[derive(Debug, Parser, Clone)]
1044
#[command(
1145
name = "cloudflare-speed-cli",
@@ -164,10 +198,16 @@ pub fn build_config(args: &Cli) -> RunConfig {
164198
async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
165199
let cfg = build_config(&args);
166200
let network_info = crate::network::gather_network_info(&args);
201+
let (out_tx, out_handle) = if silent {
202+
(None, None)
203+
} else {
204+
let (tx, handle) = spawn_output_writer();
205+
(Some(tx), Some(handle))
206+
};
167207
let enriched = if silent {
168208
// In silent mode, spawn task and consume events
169-
let (evt_tx, mut evt_rx) = mpsc::channel::<TestEvent>(2048);
170-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
209+
let (evt_tx, mut evt_rx) = mpsc::unbounded_channel::<TestEvent>();
210+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
171211

172212
let engine = TestEngine::new(cfg);
173213
let handle = tokio::spawn(async move { engine.run(evt_tx, ctrl_rx).await });
@@ -185,8 +225,8 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
185225
crate::network::enrich_result(&result, &network_info)
186226
} else {
187227
// In JSON mode, directly await the engine (no need to consume events)
188-
let (evt_tx, _) = mpsc::channel::<TestEvent>(1024);
189-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
228+
let (evt_tx, _) = mpsc::unbounded_channel::<TestEvent>();
229+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
190230

191231
let engine = TestEngine::new(cfg);
192232
let result = engine
@@ -200,29 +240,38 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
200240
// Handle exports (errors will propagate)
201241
handle_exports(&args, &enriched)?;
202242

203-
if !silent {
243+
if let Some(tx) = out_tx.as_ref() {
204244
// Print JSON output in non-silent mode
205-
println!("{}", serde_json::to_string_pretty(&enriched)?);
245+
let out = serde_json::to_string_pretty(&enriched)?;
246+
let _ = tx.send(OutputLine::Stdout(out));
206247
}
207248

208249
// Save results if auto_save is enabled
209250
if args.auto_save {
210251
if silent {
211252
crate::storage::save_run(&enriched).context("failed to save run results")?;
212-
} else {
253+
} else if let Some(tx) = out_tx.as_ref() {
213254
if let Ok(p) = crate::storage::save_run(&enriched) {
214-
eprintln!("Saved: {}", p.display());
255+
let _ = tx.send(OutputLine::Stderr(format!("Saved: {}", p.display())));
215256
}
216257
}
217258
}
218259

260+
if let Some(tx) = out_tx {
261+
drop(tx);
262+
}
263+
if let Some(handle) = out_handle {
264+
let _ = handle.await;
265+
}
266+
219267
Ok(())
220268
}
221269

222270
async fn run_text(args: Cli) -> Result<()> {
223271
let cfg = build_config(&args);
224-
let (evt_tx, mut evt_rx) = mpsc::channel::<TestEvent>(2048);
225-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
272+
let (out_tx, out_handle) = spawn_output_writer();
273+
let (evt_tx, mut evt_rx) = mpsc::unbounded_channel::<TestEvent>();
274+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
226275

227276
let engine = TestEngine::new(cfg);
228277
let handle = tokio::spawn(async move { engine.run(evt_tx, ctrl_rx).await });
@@ -238,7 +287,7 @@ async fn run_text(args: Cli) -> Result<()> {
238287
while let Some(ev) = evt_rx.recv().await {
239288
match ev {
240289
TestEvent::PhaseStarted { phase } => {
241-
eprintln!("== {phase:?} ==");
290+
let _ = out_tx.send(OutputLine::Stderr(format!("== {phase:?} ==")));
242291
}
243292
TestEvent::ThroughputTick {
244293
phase,
@@ -251,7 +300,7 @@ async fn run_text(args: Cli) -> Result<()> {
251300
) {
252301
let elapsed = run_start.elapsed().as_secs_f64();
253302
let mbps = (bps_instant * 8.0) / 1_000_000.0;
254-
eprintln!("{phase:?}: {:.2} Mbps", mbps);
303+
let _ = out_tx.send(OutputLine::Stderr(format!("{phase:?}: {:.2} Mbps", mbps)));
255304

256305
// Collect throughput points for metrics
257306
match phase {
@@ -275,7 +324,10 @@ async fn run_text(args: Cli) -> Result<()> {
275324
if let Some(ms) = rtt_ms {
276325
match (phase, during) {
277326
(crate::model::Phase::IdleLatency, None) => {
278-
eprintln!("Idle latency: {:.1} ms", ms);
327+
let _ = out_tx.send(OutputLine::Stderr(format!(
328+
"Idle latency: {:.1} ms",
329+
ms
330+
)));
279331
idle_latency_samples.push(ms);
280332
}
281333
(
@@ -292,10 +344,13 @@ async fn run_text(args: Cli) -> Result<()> {
292344
}
293345
}
294346
}
295-
TestEvent::Info { message } => eprintln!("{message}"),
347+
TestEvent::Info { message } => {
348+
let _ = out_tx.send(OutputLine::Stderr(message));
349+
}
296350
TestEvent::MetaInfo { .. } => {
297351
// Meta info is handled in TUI, ignore in text mode
298352
}
353+
TestEvent::RunCompleted { .. } => {}
299354
}
300355
}
301356

@@ -312,86 +367,90 @@ async fn run_text(args: Cli) -> Result<()> {
312367
let colo = extracted.colo.as_deref().unwrap_or("-");
313368
let asn = extracted.asn.as_deref().unwrap_or("-");
314369
let org = extracted.as_org.as_deref().unwrap_or("-");
315-
println!("IP/Colo/ASN: {ip} / {colo} / {asn} ({org})");
370+
let _ = out_tx.send(OutputLine::Stdout(format!(
371+
"IP/Colo/ASN: {ip} / {colo} / {asn} ({org})"
372+
)));
316373
}
317374
if let Some(server) = enriched.server.as_deref() {
318-
println!("Server: {server}");
375+
let _ = out_tx.send(OutputLine::Stdout(format!("Server: {server}")));
319376
}
320377
if let Some(comments) = enriched.comments.as_deref() {
321378
if !comments.trim().is_empty() {
322-
println!("Comments: {}", comments);
379+
let _ = out_tx.send(OutputLine::Stdout(format!("Comments: {}", comments)));
323380
}
324381
}
325382

326383
// Compute and display throughput metrics (mean, median, p25, p75)
327384
let dl_values: Vec<f64> = dl_points.iter().map(|(_, y)| *y).collect();
328385
let (dl_mean, dl_median, dl_p25, dl_p75) = crate::metrics::compute_metrics(&dl_values)
329386
.context("insufficient download throughput data to compute metrics")?;
330-
println!(
387+
let _ = out_tx.send(OutputLine::Stdout(format!(
331388
"Download: avg {:.2} med {:.2} p25 {:.2} p75 {:.2}",
332389
dl_mean, dl_median, dl_p25, dl_p75
333-
);
390+
)));
334391

335392
let ul_values: Vec<f64> = ul_points.iter().map(|(_, y)| *y).collect();
336393
let (ul_mean, ul_median, ul_p25, ul_p75) = crate::metrics::compute_metrics(&ul_values)
337394
.context("insufficient upload throughput data to compute metrics")?;
338-
println!(
395+
let _ = out_tx.send(OutputLine::Stdout(format!(
339396
"Upload: avg {:.2} med {:.2} p25 {:.2} p75 {:.2}",
340397
ul_mean, ul_median, ul_p25, ul_p75
341-
);
398+
)));
342399

343400
// Compute and display latency metrics (mean, median, p25, p75)
344401
let (idle_mean, idle_median, idle_p25, idle_p75) =
345402
crate::metrics::compute_metrics(&idle_latency_samples)
346403
.context("insufficient idle latency data to compute metrics")?;
347-
println!(
404+
let _ = out_tx.send(OutputLine::Stdout(format!(
348405
"Idle latency: avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
349406
idle_mean,
350407
idle_median,
351408
idle_p25,
352409
idle_p75,
353410
enriched.idle_latency.loss * 100.0,
354411
enriched.idle_latency.jitter_ms.unwrap_or(f64::NAN)
355-
);
412+
)));
356413

357414
let (dl_lat_mean, dl_lat_median, dl_lat_p25, dl_lat_p75) =
358415
crate::metrics::compute_metrics(&loaded_dl_latency_samples)
359416
.context("insufficient loaded download latency data to compute metrics")?;
360-
println!(
417+
let _ = out_tx.send(OutputLine::Stdout(format!(
361418
"Loaded latency (download): avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
362419
dl_lat_mean,
363420
dl_lat_median,
364421
dl_lat_p25,
365422
dl_lat_p75,
366423
enriched.loaded_latency_download.loss * 100.0,
367424
enriched.loaded_latency_download.jitter_ms.unwrap_or(f64::NAN)
368-
);
425+
)));
369426

370427
let (ul_lat_mean, ul_lat_median, ul_lat_p25, ul_lat_p75) =
371428
crate::metrics::compute_metrics(&loaded_ul_latency_samples)
372429
.context("insufficient loaded upload latency data to compute metrics")?;
373-
println!(
430+
let _ = out_tx.send(OutputLine::Stdout(format!(
374431
"Loaded latency (upload): avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
375432
ul_lat_mean,
376433
ul_lat_median,
377434
ul_lat_p25,
378435
ul_lat_p75,
379436
enriched.loaded_latency_upload.loss * 100.0,
380437
enriched.loaded_latency_upload.jitter_ms.unwrap_or(f64::NAN)
381-
);
438+
)));
382439
if let Some(ref exp) = enriched.experimental_udp {
383-
println!(
440+
let _ = out_tx.send(OutputLine::Stdout(format!(
384441
"Experimental UDP-like loss probe: loss {:.1}% med {} ms (target {:?})",
385442
exp.latency.loss * 100.0,
386443
exp.latency.median_ms.unwrap_or(f64::NAN),
387444
exp.target
388-
);
445+
)));
389446
}
390447
if args.auto_save {
391448
if let Ok(p) = crate::storage::save_run(&enriched) {
392-
eprintln!("Saved: {}", p.display());
449+
let _ = out_tx.send(OutputLine::Stderr(format!("Saved: {}", p.display())));
393450
}
394451
}
452+
drop(out_tx);
453+
let _ = out_handle.await;
395454
Ok(())
396455
}
397456

src/engine/cloudflare.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::model::TurnInfo;
1+
use crate::model::{TestEvent, TurnInfo};
22
use anyhow::{Context, Result};
33
use reqwest::Url;
44
use serde::Deserialize;
@@ -14,7 +14,10 @@ pub struct CloudflareClient {
1414
}
1515

1616
impl CloudflareClient {
17-
pub fn new(cfg: &RunConfig) -> Result<Self> {
17+
pub fn new(
18+
cfg: &RunConfig,
19+
event_tx: Option<tokio::sync::mpsc::UnboundedSender<TestEvent>>,
20+
) -> Result<Self> {
1821
let base_url = Url::parse(&cfg.base_url).context("invalid base_url")?;
1922

2023
let mut builder = reqwest::Client::builder()
@@ -28,10 +31,14 @@ impl CloudflareClient {
2831
match network_bind::get_interface_ip(iface) {
2932
Ok(ip) => {
3033
builder = builder.local_address(ip);
31-
eprintln!(
32-
"Binding HTTP connections to interface {} (IP: {})",
33-
iface, ip
34-
);
34+
if let Some(tx) = event_tx.as_ref() {
35+
let _ = tx.send(TestEvent::Info {
36+
message: format!(
37+
"Binding HTTP connections to interface {} (IP: {})",
38+
iface, ip
39+
),
40+
});
41+
}
3542
}
3643
Err(e) => {
3744
return Err(anyhow::anyhow!(
@@ -46,7 +53,11 @@ impl CloudflareClient {
4653
match source_ip.parse::<std::net::IpAddr>() {
4754
Ok(ip) => {
4855
builder = builder.local_address(ip);
49-
eprintln!("Binding HTTP connections to source IP: {}", ip);
56+
if let Some(tx) = event_tx.as_ref() {
57+
let _ = tx.send(TestEvent::Info {
58+
message: format!("Binding HTTP connections to source IP: {}", ip),
59+
});
60+
}
5061
}
5162
Err(e) => {
5263
return Err(anyhow::anyhow!(

src/engine/latency.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ use std::sync::{
99
use std::time::{Duration, Instant};
1010
use tokio::sync::mpsc;
1111

12+
#[allow(clippy::too_many_arguments)]
1213
pub async fn run_latency_probes(
1314
client: &CloudflareClient,
1415
phase: Phase,
1516
during: Option<Phase>,
1617
total_duration: Duration,
1718
interval_ms: u64,
1819
timeout_ms: u64,
19-
event_tx: &mpsc::Sender<TestEvent>,
20+
event_tx: &mpsc::UnboundedSender<TestEvent>,
2021
paused: Arc<AtomicBool>,
2122
cancel: Arc<AtomicBool>,
2223
) -> Result<LatencySummary> {
@@ -48,31 +49,25 @@ pub async fn run_latency_probes(
4849
// Extract meta from first successful response
4950
if !meta_sent && phase == Phase::IdleLatency {
5051
if let Some(meta) = meta_opt {
51-
event_tx.send(TestEvent::MetaInfo { meta }).await.ok();
52+
let _ = event_tx.send(TestEvent::MetaInfo { meta });
5253
meta_sent = true;
5354
}
5455
}
5556

56-
event_tx
57-
.send(TestEvent::LatencySample {
58-
phase,
59-
during,
60-
rtt_ms: Some(ms),
61-
ok: true,
62-
})
63-
.await
64-
.ok();
57+
let _ = event_tx.send(TestEvent::LatencySample {
58+
phase,
59+
during,
60+
rtt_ms: Some(ms),
61+
ok: true,
62+
});
6563
}
6664
Err(_) => {
67-
event_tx
68-
.send(TestEvent::LatencySample {
69-
phase,
70-
during,
71-
rtt_ms: None,
72-
ok: false,
73-
})
74-
.await
75-
.ok();
65+
let _ = event_tx.send(TestEvent::LatencySample {
66+
phase,
67+
during,
68+
rtt_ms: None,
69+
ok: false,
70+
});
7671
}
7772
}
7873

0 commit comments

Comments
 (0)