Skip to content

Commit a5379da

Browse files
committed
Run cargo fmt
1 parent cb82027 commit a5379da

File tree

8 files changed

+57
-53
lines changed

8 files changed

+57
-53
lines changed

binaries/cli/src/command/build/distributed.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ pub async fn build_distributed_dataflow(
3232
local_working_dir,
3333
uv,
3434
},
35-
)).await?;
35+
))
36+
.await?;
3637
eprintln!("dataflow build triggered: {build_id}");
3738
Ok(build_id)
3839
}

binaries/cli/src/command/build/local.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,7 @@ pub async fn build_dataflow_locally(
1717
working_dir: PathBuf,
1818
uv: bool,
1919
) -> eyre::Result<BuildInfo> {
20-
build_dataflow(
21-
dataflow,
22-
git_sources,
23-
dataflow_session,
24-
working_dir,
25-
uv,
26-
).await
20+
build_dataflow(dataflow, git_sources, dataflow_session, working_dir, uv).await
2721
}
2822

2923
async fn build_dataflow(

binaries/cli/src/command/destroy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ impl Executable for Destroy {
2323
up::destroy(
2424
self.config.as_deref(),
2525
(self.coordinator_addr, self.coordinator_port).into(),
26-
).await
26+
)
27+
.await
2728
}
2829
}

binaries/cli/src/command/inspect/top.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::{
55

66
use clap::Args;
77
use crossterm::{
8-
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyEventKind},
8+
event::{
9+
self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEvent, KeyEventKind,
10+
},
911
execute,
1012
terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
1113
};
@@ -67,7 +69,8 @@ impl Executable for Top {
6769
self.coordinator_addr,
6870
self.coordinator_port,
6971
refresh_duration,
70-
).await;
72+
)
73+
.await;
7174

7275
// Restore terminal
7376
disable_raw_mode()?;

binaries/cli/src/command/logs.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,17 @@ impl Executable for LogsArgs {
4646
let client = connect_to_coordinator_rpc(self.coordinator_addr, self.coordinator_port)
4747
.await
4848
.wrap_err("failed to connect to dora coordinator")?;
49-
let uuid = resolve_dataflow_identifier_interactive(&client, self.dataflow.as_deref()).await?;
49+
let uuid =
50+
resolve_dataflow_identifier_interactive(&client, self.dataflow.as_deref()).await?;
5051
logs(
5152
&client,
5253
uuid,
5354
self.node,
5455
self.tail,
5556
self.follow,
5657
(self.coordinator_addr, self.coordinator_port).into(),
57-
).await
58+
)
59+
.await
5860
}
5961
}
6062

@@ -72,7 +74,8 @@ pub async fn logs(
7274
None,
7375
node.to_string(),
7476
tail,
75-
)).await?;
77+
))
78+
.await?;
7679

7780
std::io::stdout()
7881
.write_all(&logs)

binaries/cli/src/command/stop.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ impl Executable for Stop {
5858
(None, Some(name)) => {
5959
stop_dataflow_by_name(name, self.grace_duration, self.force, &client).await
6060
}
61-
(None, None) => stop_dataflow_interactive(self.grace_duration, self.force, &client).await,
61+
(None, None) => {
62+
stop_dataflow_interactive(self.grace_duration, self.force, &client).await
63+
}
6264
}
6365
}
6466
}
@@ -68,7 +70,9 @@ async fn stop_dataflow_interactive(
6870
force: bool,
6971
client: &CliControlClient,
7072
) -> eyre::Result<()> {
71-
let list = query_running_dataflows(client).await.wrap_err("failed to query running dataflows")?;
73+
let list = query_running_dataflows(client)
74+
.await
75+
.wrap_err("failed to query running dataflows")?;
7276
let active = list.get_active();
7377
if active.is_empty() {
7478
eprintln!("No dataflows are running");

binaries/cli/src/command/system/status.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,30 @@ pub async fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()>
2727
let mut stdout = termcolor::StandardStream::stdout(color_choice);
2828

2929
// Coordinator status
30-
let client = match connect_to_coordinator_rpc(coordinator_addr.ip(), coordinator_addr.port()).await {
31-
Ok(client) => {
32-
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
33-
write!(stdout, "✓ ")?;
34-
let _ = stdout.reset();
35-
writeln!(stdout, "Coordinator: Running")?;
36-
writeln!(
37-
stdout,
38-
" Address: {}:{}",
39-
coordinator_addr.ip(),
40-
coordinator_addr.port()
41-
)?;
42-
Some(client)
43-
}
44-
Err(_) => {
45-
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
46-
write!(stdout, "✗ ")?;
47-
let _ = stdout.reset();
48-
writeln!(stdout, "Coordinator: Not running")?;
49-
error_occurred = true;
50-
None
51-
}
52-
};
30+
let client =
31+
match connect_to_coordinator_rpc(coordinator_addr.ip(), coordinator_addr.port()).await {
32+
Ok(client) => {
33+
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
34+
write!(stdout, "✓ ")?;
35+
let _ = stdout.reset();
36+
writeln!(stdout, "Coordinator: Running")?;
37+
writeln!(
38+
stdout,
39+
" Address: {}:{}",
40+
coordinator_addr.ip(),
41+
coordinator_addr.port()
42+
)?;
43+
Some(client)
44+
}
45+
Err(_) => {
46+
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Red)));
47+
write!(stdout, "✗ ")?;
48+
let _ = stdout.reset();
49+
writeln!(stdout, "Coordinator: Not running")?;
50+
error_occurred = true;
51+
None
52+
}
53+
};
5354

5455
// Daemon status
5556
let daemon_running_result = match client.as_ref() {
@@ -131,7 +132,9 @@ impl Executable for Status {
131132
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
132133
check_environment((self.coordinator_addr, self.coordinator_port).into()).await?
133134
}
134-
None => check_environment((self.coordinator_addr, self.coordinator_port).into()).await?,
135+
None => {
136+
check_environment((self.coordinator_addr, self.coordinator_port).into()).await?
137+
}
135138
}
136139

137140
Ok(())

binaries/cli/src/command/topic/info.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,7 @@ async fn info(
150150
.await
151151
.context("failed to open zenoh session")?;
152152

153-
let subscribe_topic = zenoh_output_publish_topic(
154-
dataflow_id,
155-
&topic.node_id,
156-
&topic.data_id,
157-
);
153+
let subscribe_topic = zenoh_output_publish_topic(dataflow_id, &topic.node_id, &topic.data_id);
158154
let subscriber = zenoh_session
159155
.declare_subscriber(subscribe_topic)
160156
.await
@@ -168,19 +164,18 @@ async fn info(
168164

169165
// Collect messages for the specified duration
170166
while Instant::now() < end_time {
171-
let Ok(sample) = tokio::time::timeout_at(deadline, subscriber.recv_async()).await
172-
else {
167+
let Ok(sample) = tokio::time::timeout_at(deadline, subscriber.recv_async()).await else {
173168
break;
174169
};
175170

176171
match sample {
177172
Ok(sample) => {
178-
let event = match Timestamped::deserialize_inter_daemon_event(
179-
&sample.payload().to_bytes(),
180-
) {
181-
Ok(event) => event,
182-
Err(_) => continue,
183-
};
173+
let event =
174+
match Timestamped::deserialize_inter_daemon_event(&sample.payload().to_bytes())
175+
{
176+
Ok(event) => event,
177+
Err(_) => continue,
178+
};
184179

185180
match event.inner {
186181
InterDaemonEvent::Output { metadata, data, .. } => {

0 commit comments

Comments
 (0)