Skip to content

Commit 4badae4

Browse files
committed
Decouple output, move TUI off Tokio, and centralize orchestration
Route CLI output through a writer task, run TUI in a dedicated thread, move run control and result processing into the orchestrator, and update summary phase timing to avoid UI stalls
1 parent b299d6b commit 4badae4

File tree

13 files changed

+856
-549
lines changed

13 files changed

+856
-549
lines changed

src/cli.rs

Lines changed: 82 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,43 @@ use crate::model::{RunConfig, TestEvent};
33
use anyhow::{Context, Result};
44
use clap::Parser;
55
use rand::RngCore;
6+
use std::io::Write;
67
use std::time::Duration;
78
use tokio::sync::mpsc;
89

10+
enum OutputLine {
11+
Stdout(String),
12+
Stderr(String),
13+
}
14+
15+
fn spawn_output_writer() -> (
16+
mpsc::UnboundedSender<OutputLine>,
17+
tokio::task::JoinHandle<()>,
18+
) {
19+
let (tx, mut rx) = mpsc::unbounded_channel::<OutputLine>();
20+
let handle = tokio::task::spawn_blocking(move || {
21+
let stdout = std::io::stdout();
22+
let stderr = std::io::stderr();
23+
let mut out = std::io::LineWriter::new(stdout.lock());
24+
let mut err = std::io::LineWriter::new(stderr.lock());
25+
26+
while let Some(line) = rx.blocking_recv() {
27+
match line {
28+
OutputLine::Stdout(msg) => {
29+
let _ = writeln!(out, "{}", msg);
30+
}
31+
OutputLine::Stderr(msg) => {
32+
let _ = writeln!(err, "{}", msg);
33+
}
34+
}
35+
}
36+
37+
let _ = out.flush();
38+
let _ = err.flush();
39+
});
40+
(tx, handle)
41+
}
42+
943
#[derive(Debug, Parser, Clone)]
1044
#[command(
1145
name = "cloudflare-speed-cli",
@@ -164,10 +198,16 @@ pub fn build_config(args: &Cli) -> RunConfig {
164198
async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
165199
let cfg = build_config(&args);
166200
let network_info = crate::network::gather_network_info(&args);
201+
let (out_tx, out_handle) = if silent {
202+
(None, None)
203+
} else {
204+
let (tx, handle) = spawn_output_writer();
205+
(Some(tx), Some(handle))
206+
};
167207
let enriched = if silent {
168208
// In silent mode, spawn task and consume events
169-
let (evt_tx, mut evt_rx) = mpsc::channel::<TestEvent>(2048);
170-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
209+
let (evt_tx, mut evt_rx) = mpsc::unbounded_channel::<TestEvent>();
210+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
171211

172212
let engine = TestEngine::new(cfg);
173213
let handle = tokio::spawn(async move { engine.run(evt_tx, ctrl_rx).await });
@@ -185,8 +225,8 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
185225
crate::network::enrich_result(&result, &network_info)
186226
} else {
187227
// In JSON mode, directly await the engine (no need to consume events)
188-
let (evt_tx, _) = mpsc::channel::<TestEvent>(1024);
189-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
228+
let (evt_tx, _) = mpsc::unbounded_channel::<TestEvent>();
229+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
190230

191231
let engine = TestEngine::new(cfg);
192232
let result = engine
@@ -200,29 +240,38 @@ async fn run_test_engine(args: Cli, silent: bool) -> Result<()> {
200240
// Handle exports (errors will propagate)
201241
handle_exports(&args, &enriched)?;
202242

203-
if !silent {
243+
if let Some(tx) = out_tx.as_ref() {
204244
// Print JSON output in non-silent mode
205-
println!("{}", serde_json::to_string_pretty(&enriched)?);
245+
let out = serde_json::to_string_pretty(&enriched)?;
246+
let _ = tx.send(OutputLine::Stdout(out));
206247
}
207248

208249
// Save results if auto_save is enabled
209250
if args.auto_save {
210251
if silent {
211252
crate::storage::save_run(&enriched).context("failed to save run results")?;
212-
} else {
253+
} else if let Some(tx) = out_tx.as_ref() {
213254
if let Ok(p) = crate::storage::save_run(&enriched) {
214-
eprintln!("Saved: {}", p.display());
255+
let _ = tx.send(OutputLine::Stderr(format!("Saved: {}", p.display())));
215256
}
216257
}
217258
}
218259

260+
if let Some(tx) = out_tx {
261+
drop(tx);
262+
}
263+
if let Some(handle) = out_handle {
264+
let _ = handle.await;
265+
}
266+
219267
Ok(())
220268
}
221269

222270
async fn run_text(args: Cli) -> Result<()> {
223271
let cfg = build_config(&args);
224-
let (evt_tx, mut evt_rx) = mpsc::channel::<TestEvent>(2048);
225-
let (_, ctrl_rx) = mpsc::channel::<EngineControl>(16);
272+
let (out_tx, out_handle) = spawn_output_writer();
273+
let (evt_tx, mut evt_rx) = mpsc::unbounded_channel::<TestEvent>();
274+
let (_, ctrl_rx) = mpsc::unbounded_channel::<EngineControl>();
226275

227276
let engine = TestEngine::new(cfg);
228277
let handle = tokio::spawn(async move { engine.run(evt_tx, ctrl_rx).await });
@@ -238,7 +287,7 @@ async fn run_text(args: Cli) -> Result<()> {
238287
while let Some(ev) = evt_rx.recv().await {
239288
match ev {
240289
TestEvent::PhaseStarted { phase } => {
241-
eprintln!("== {phase:?} ==");
290+
let _ = out_tx.send(OutputLine::Stderr(format!("== {phase:?} ==")));
242291
}
243292
TestEvent::ThroughputTick {
244293
phase,
@@ -251,7 +300,7 @@ async fn run_text(args: Cli) -> Result<()> {
251300
) {
252301
let elapsed = run_start.elapsed().as_secs_f64();
253302
let mbps = (bps_instant * 8.0) / 1_000_000.0;
254-
eprintln!("{phase:?}: {:.2} Mbps", mbps);
303+
let _ = out_tx.send(OutputLine::Stderr(format!("{phase:?}: {:.2} Mbps", mbps)));
255304

256305
// Collect throughput points for metrics
257306
match phase {
@@ -275,7 +324,10 @@ async fn run_text(args: Cli) -> Result<()> {
275324
if let Some(ms) = rtt_ms {
276325
match (phase, during) {
277326
(crate::model::Phase::IdleLatency, None) => {
278-
eprintln!("Idle latency: {:.1} ms", ms);
327+
let _ = out_tx.send(OutputLine::Stderr(format!(
328+
"Idle latency: {:.1} ms",
329+
ms
330+
)));
279331
idle_latency_samples.push(ms);
280332
}
281333
(
@@ -292,10 +344,13 @@ async fn run_text(args: Cli) -> Result<()> {
292344
}
293345
}
294346
}
295-
TestEvent::Info { message } => eprintln!("{message}"),
347+
TestEvent::Info(info) => {
348+
let _ = out_tx.send(OutputLine::Stderr(info.to_message()));
349+
}
296350
TestEvent::MetaInfo { .. } => {
297351
// Meta info is handled in TUI, ignore in text mode
298352
}
353+
TestEvent::RunCompleted { .. } => {}
299354
}
300355
}
301356

@@ -306,92 +361,24 @@ async fn run_text(args: Cli) -> Result<()> {
306361
let enriched = crate::network::enrich_result(&result, &network_info);
307362

308363
handle_exports(&args, &enriched)?;
309-
if let Some(meta) = enriched.meta.as_ref() {
310-
let extracted = crate::network::extract_metadata(meta);
311-
let ip = extracted.ip.as_deref().unwrap_or("-");
312-
let colo = extracted.colo.as_deref().unwrap_or("-");
313-
let asn = extracted.asn.as_deref().unwrap_or("-");
314-
let org = extracted.as_org.as_deref().unwrap_or("-");
315-
println!("IP/Colo/ASN: {ip} / {colo} / {asn} ({org})");
316-
}
317-
if let Some(server) = enriched.server.as_deref() {
318-
println!("Server: {server}");
319-
}
320-
if let Some(comments) = enriched.comments.as_deref() {
321-
if !comments.trim().is_empty() {
322-
println!("Comments: {}", comments);
323-
}
324-
}
325-
326-
// Compute and display throughput metrics (mean, median, p25, p75)
327-
let dl_values: Vec<f64> = dl_points.iter().map(|(_, y)| *y).collect();
328-
let (dl_mean, dl_median, dl_p25, dl_p75) = crate::metrics::compute_metrics(&dl_values)
329-
.context("insufficient download throughput data to compute metrics")?;
330-
println!(
331-
"Download: avg {:.2} med {:.2} p25 {:.2} p75 {:.2}",
332-
dl_mean, dl_median, dl_p25, dl_p75
333-
);
334-
335-
let ul_values: Vec<f64> = ul_points.iter().map(|(_, y)| *y).collect();
336-
let (ul_mean, ul_median, ul_p25, ul_p75) = crate::metrics::compute_metrics(&ul_values)
337-
.context("insufficient upload throughput data to compute metrics")?;
338-
println!(
339-
"Upload: avg {:.2} med {:.2} p25 {:.2} p75 {:.2}",
340-
ul_mean, ul_median, ul_p25, ul_p75
341-
);
342-
343-
// Compute and display latency metrics (mean, median, p25, p75)
344-
let (idle_mean, idle_median, idle_p25, idle_p75) =
345-
crate::metrics::compute_metrics(&idle_latency_samples)
346-
.context("insufficient idle latency data to compute metrics")?;
347-
println!(
348-
"Idle latency: avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
349-
idle_mean,
350-
idle_median,
351-
idle_p25,
352-
idle_p75,
353-
enriched.idle_latency.loss * 100.0,
354-
enriched.idle_latency.jitter_ms.unwrap_or(f64::NAN)
355-
);
356-
357-
let (dl_lat_mean, dl_lat_median, dl_lat_p25, dl_lat_p75) =
358-
crate::metrics::compute_metrics(&loaded_dl_latency_samples)
359-
.context("insufficient loaded download latency data to compute metrics")?;
360-
println!(
361-
"Loaded latency (download): avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
362-
dl_lat_mean,
363-
dl_lat_median,
364-
dl_lat_p25,
365-
dl_lat_p75,
366-
enriched.loaded_latency_download.loss * 100.0,
367-
enriched.loaded_latency_download.jitter_ms.unwrap_or(f64::NAN)
368-
);
369-
370-
let (ul_lat_mean, ul_lat_median, ul_lat_p25, ul_lat_p75) =
371-
crate::metrics::compute_metrics(&loaded_ul_latency_samples)
372-
.context("insufficient loaded upload latency data to compute metrics")?;
373-
println!(
374-
"Loaded latency (upload): avg {:.1} med {:.1} p25 {:.1} p75 {:.1} ms (loss {:.1}%, jitter {:.1} ms)",
375-
ul_lat_mean,
376-
ul_lat_median,
377-
ul_lat_p25,
378-
ul_lat_p75,
379-
enriched.loaded_latency_upload.loss * 100.0,
380-
enriched.loaded_latency_upload.jitter_ms.unwrap_or(f64::NAN)
381-
);
382-
if let Some(ref exp) = enriched.experimental_udp {
383-
println!(
384-
"Experimental UDP-like loss probe: loss {:.1}% med {} ms (target {:?})",
385-
exp.latency.loss * 100.0,
386-
exp.latency.median_ms.unwrap_or(f64::NAN),
387-
exp.target
388-
);
364+
let summary = crate::text_summary::build_text_summary(
365+
&enriched,
366+
&dl_points,
367+
&ul_points,
368+
&idle_latency_samples,
369+
&loaded_dl_latency_samples,
370+
&loaded_ul_latency_samples,
371+
)?;
372+
for line in summary.lines {
373+
let _ = out_tx.send(OutputLine::Stdout(line));
389374
}
390375
if args.auto_save {
391376
if let Ok(p) = crate::storage::save_run(&enriched) {
392-
eprintln!("Saved: {}", p.display());
377+
let _ = out_tx.send(OutputLine::Stderr(format!("Saved: {}", p.display())));
393378
}
394379
}
380+
drop(out_tx);
381+
let _ = out_handle.await;
395382
Ok(())
396383
}
397384

src/engine/cloudflare.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::model::TurnInfo;
1+
use crate::model::{InfoEvent, TestEvent, TurnInfo};
22
use anyhow::{Context, Result};
33
use reqwest::Url;
44
use serde::Deserialize;
@@ -14,7 +14,10 @@ pub struct CloudflareClient {
1414
}
1515

1616
impl CloudflareClient {
17-
pub fn new(cfg: &RunConfig) -> Result<Self> {
17+
pub fn new(
18+
cfg: &RunConfig,
19+
event_tx: Option<tokio::sync::mpsc::UnboundedSender<TestEvent>>,
20+
) -> Result<Self> {
1821
let base_url = Url::parse(&cfg.base_url).context("invalid base_url")?;
1922

2023
let mut builder = reqwest::Client::builder()
@@ -28,10 +31,12 @@ impl CloudflareClient {
2831
match network_bind::get_interface_ip(iface) {
2932
Ok(ip) => {
3033
builder = builder.local_address(ip);
31-
eprintln!(
32-
"Binding HTTP connections to interface {} (IP: {})",
33-
iface, ip
34-
);
34+
if let Some(tx) = event_tx.as_ref() {
35+
let _ = tx.send(TestEvent::Info(InfoEvent::BindInterface {
36+
iface: iface.clone(),
37+
ip,
38+
}));
39+
}
3540
}
3641
Err(e) => {
3742
return Err(anyhow::anyhow!(
@@ -46,7 +51,9 @@ impl CloudflareClient {
4651
match source_ip.parse::<std::net::IpAddr>() {
4752
Ok(ip) => {
4853
builder = builder.local_address(ip);
49-
eprintln!("Binding HTTP connections to source IP: {}", ip);
54+
if let Some(tx) = event_tx.as_ref() {
55+
let _ = tx.send(TestEvent::Info(InfoEvent::BindSourceIp { ip }));
56+
}
5057
}
5158
Err(e) => {
5259
return Err(anyhow::anyhow!(

src/engine/latency.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ use std::sync::{
99
use std::time::{Duration, Instant};
1010
use tokio::sync::mpsc;
1111

12+
#[allow(clippy::too_many_arguments)]
1213
pub async fn run_latency_probes(
1314
client: &CloudflareClient,
1415
phase: Phase,
1516
during: Option<Phase>,
1617
total_duration: Duration,
1718
interval_ms: u64,
1819
timeout_ms: u64,
19-
event_tx: &mpsc::Sender<TestEvent>,
20+
event_tx: &mpsc::UnboundedSender<TestEvent>,
2021
paused: Arc<AtomicBool>,
2122
cancel: Arc<AtomicBool>,
2223
) -> Result<LatencySummary> {
@@ -48,31 +49,25 @@ pub async fn run_latency_probes(
4849
// Extract meta from first successful response
4950
if !meta_sent && phase == Phase::IdleLatency {
5051
if let Some(meta) = meta_opt {
51-
event_tx.send(TestEvent::MetaInfo { meta }).await.ok();
52+
let _ = event_tx.send(TestEvent::MetaInfo { meta });
5253
meta_sent = true;
5354
}
5455
}
5556

56-
event_tx
57-
.send(TestEvent::LatencySample {
58-
phase,
59-
during,
60-
rtt_ms: Some(ms),
61-
ok: true,
62-
})
63-
.await
64-
.ok();
57+
let _ = event_tx.send(TestEvent::LatencySample {
58+
phase,
59+
during,
60+
rtt_ms: Some(ms),
61+
ok: true,
62+
});
6563
}
6664
Err(_) => {
67-
event_tx
68-
.send(TestEvent::LatencySample {
69-
phase,
70-
during,
71-
rtt_ms: None,
72-
ok: false,
73-
})
74-
.await
75-
.ok();
65+
let _ = event_tx.send(TestEvent::LatencySample {
66+
phase,
67+
during,
68+
rtt_ms: None,
69+
ok: false,
70+
});
7671
}
7772
}
7873

0 commit comments

Comments
 (0)