Skip to content

Commit 552b56f

Browse files
authored
Cleanup (#1588)
* Remove unused remote communication field from dataflow config * Resolve various clippy warnings
2 parents b75fecf + c5fcc05 commit 552b56f

File tree

18 files changed

+49
-142
lines changed

18 files changed

+49
-142
lines changed

apis/rust/node/src/event_stream/thread.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub fn init(
2323
) -> eyre::Result<EventStreamThreadHandle> {
2424
let node_id_cloned = node_id.clone();
2525
let join_handle = std::thread::spawn(|| event_stream_loop(node_id_cloned, tx, channel, clock));
26-
Ok(EventStreamThreadHandle::new(node_id, join_handle))
26+
Ok(EventStreamThreadHandle::new(join_handle))
2727
}
2828

2929
#[derive(Debug)]
@@ -38,20 +38,16 @@ pub enum EventItem {
3838
}
3939

4040
pub struct EventStreamThreadHandle {
41-
node_id: NodeId,
4241
handle: flume::Receiver<std::thread::Result<()>>,
4342
}
4443

4544
impl EventStreamThreadHandle {
46-
fn new(node_id: NodeId, join_handle: std::thread::JoinHandle<()>) -> Self {
45+
fn new(join_handle: std::thread::JoinHandle<()>) -> Self {
4746
let (tx, rx) = flume::bounded(1);
4847
std::thread::spawn(move || {
4948
let _ = tx.send(join_handle.join());
5049
});
51-
Self {
52-
node_id,
53-
handle: rx,
54-
}
50+
Self { handle: rx }
5551
}
5652
}
5753

apis/rust/node/src/node/arrow_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ mod tests {
145145
child_data: vec![],
146146
};
147147

148-
let raw_buffer = arrow::buffer::Buffer::from_slice_ref(&[1, 2, 3]);
148+
let raw_buffer = arrow::buffer::Buffer::from_slice_ref([1, 2, 3]);
149149

150150
let result = buffer_into_arrow_array(&raw_buffer, &type_info);
151151
assert!(result.is_err());

binaries/cli/src/command/inspect/top.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -257,20 +257,12 @@ async fn run_app<B: Backend>(
257257
) -> eyre::Result<()> {
258258
let mut app = App::new();
259259
let mut last_update = Instant::now();
260-
let mut node_infos: Vec<NodeInfo> = Vec::new();
261260

262261
// Reuse coordinator connection
263262
let client = connect_and_check_version(coordinator_addr, coordinator_port)
264263
.await
265264
.wrap_err("Failed to connect to coordinator")?;
266265

267-
// Query node info once initially
268-
node_infos = rpc(
269-
"get node info",
270-
client.get_node_info(tarpc::context::current()),
271-
)
272-
.await?;
273-
274266
loop {
275267
terminal.draw(|f| ui(f, &mut app, refresh_duration))?;
276268

@@ -323,14 +315,14 @@ async fn run_app<B: Backend>(
323315
// Update data if refresh interval has passed
324316
if last_update.elapsed() >= refresh_duration {
325317
// Query node info every refresh interval to get updated metrics
326-
node_infos = rpc(
318+
let node_infos = rpc(
327319
"refresh node info",
328320
client.get_node_info(tarpc::context::current()),
329321
)
330322
.await?;
331323

332324
// Update stats with current node info
333-
app.update_stats(node_infos.clone());
325+
app.update_stats(node_infos);
334326
last_update = Instant::now();
335327
}
336328
}
@@ -349,7 +341,7 @@ fn ui(f: &mut Frame, app: &mut App, refresh_duration: Duration) {
349341
}
350342
};
351343

352-
let header_strings = vec![
344+
let header_strings = [
353345
format!("NODE{}", sort_indicator(SortColumn::Node)),
354346
"DATAFLOW".to_string(),
355347
"PID".to_string(),

binaries/cli/src/command/list.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT;
1111
use dora_message::{
1212
cli_to_coordinator::CoordinatorControlClient, coordinator_to_cli::DataflowStatus, tarpc,
1313
};
14-
use eyre::{Context, eyre};
14+
use eyre::Context;
1515
use serde::Serialize;
1616
use tabwriter::TabWriter;
1717
use uuid::Uuid;
@@ -89,9 +89,7 @@ async fn list(
8989
std::collections::BTreeMap::new();
9090

9191
for node_info in node_infos {
92-
let metrics = dataflow_metrics
93-
.entry(node_info.dataflow_id)
94-
.or_insert_with(DataflowMetrics::default);
92+
let metrics = dataflow_metrics.entry(node_info.dataflow_id).or_default();
9593
metrics.node_count += 1;
9694

9795
if let Some(node_metrics) = node_info.metrics {
@@ -182,7 +180,11 @@ async fn list(
182180
OutputFormat::Table => {
183181
let mut tw = TabWriter::new(std::io::stdout().lock());
184182
// Header
185-
tw.write_all(format!("UUID\tName\tStatus\tNodes\tCPU\tMemory\n").as_bytes())?;
183+
tw.write_all(
184+
"UUID\tName\tStatus\tNodes\tCPU\tMemory\n"
185+
.to_string()
186+
.as_bytes(),
187+
)?;
186188
for entry in entries {
187189
let status = match entry.status {
188190
DataflowStatus::Running => "Running",
@@ -192,13 +194,8 @@ async fn list(
192194

193195
tw.write_all(
194196
format!(
195-
"{}\t{}\t{}\t{}\t{}\t{}\n",
196-
entry.uuid,
197-
entry.name,
198-
status,
199-
entry.nodes,
200-
format!("{:.1}%", entry.cpu),
201-
format!("{:.1} GB", entry.memory)
197+
"{}\t{}\t{}\t{}\t{:.1}%\t{:.1} GB\n",
198+
entry.uuid, entry.name, status, entry.nodes, entry.cpu, entry.memory
202199
)
203200
.as_bytes(),
204201
)?;

binaries/cli/src/command/start/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use dora_message::{
2323
use eyre::Context;
2424
use std::{
2525
net::{IpAddr, SocketAddr},
26-
path::PathBuf,
26+
path::Path,
2727
};
2828
use tokio::task::JoinHandle;
2929
use uuid::{NoContext, Timestamp, Uuid};
@@ -152,7 +152,7 @@ impl Executable for Start {
152152
/// Send the start RPC to the coordinator. The caller should have already
153153
/// subscribed to zenoh log topics before calling this.
154154
async fn send_start_rpc(
155-
dataflow: &PathBuf,
155+
dataflow: &Path,
156156
dataflow_descriptor: &Descriptor,
157157
dataflow_session: &DataflowSession,
158158
client: &CoordinatorControlClient,

binaries/cli/src/command/topic/hz.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,6 @@ fn ui(
347347
start: Instant,
348348
window_dur: Duration,
349349
) {
350-
// Layout: table | charts | footer
351-
let chunks = Layout::default()
352-
.direction(Direction::Vertical)
353-
.constraints([
354-
Constraint::Percentage(55),
355-
Constraint::Percentage(44),
356-
Constraint::Length(1),
357-
])
358-
.split(f.area());
359-
360350
// Table header: interval stats in ms + derived avg Hz
361351
let header = Row::new([
362352
"Output", "Avg (ms)", "Avg (Hz)", "Min (ms)", "Max (ms)", "Std (ms)",

binaries/cli/src/output.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ pub fn print_log_message(
185185
let node_id = node_id
186186
.to_string()
187187
.bold()
188-
.color(word_to_color(&node_id.to_string()));
188+
.color(word_to_color(node_id.as_ref()));
189189
let padding = if daemon.is_empty() { "" } else { " " };
190190
format!("{node_id}{padding}{daemon}{colon} ")
191191
}

binaries/coordinator/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ impl CoordinatorControl for CoordinatorControlServer {
382382
let mut node_infos = Vec::new();
383383
for r in self.state.running_dataflows.iter() {
384384
let dataflow = r.value();
385-
for (node_id, _node) in &dataflow.nodes {
385+
for node_id in dataflow.nodes.keys() {
386386
// Get the specific daemon this node is running on
387387
if let Some(daemon_id) = dataflow.node_to_daemon.get(node_id) {
388388
// Get metrics if available

binaries/daemon/src/coordinator.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,6 @@ impl DaemonControl for DaemonControlServer {
173173
uv,
174174
} = request;
175175

176-
match dataflow_descriptor.communication.remote {
177-
dora_core::config::RemoteCommunicationConfig::Tcp => {}
178-
}
179-
180176
let base_working_dir =
181177
crate::Daemon::base_working_dir_static(local_working_dir, session_id)
182178
.map_err(|err| format!("{err:?}"))?;
@@ -247,10 +243,6 @@ impl DaemonControl for DaemonControlServer {
247243
write_events_to,
248244
} = request;
249245

250-
match dataflow_descriptor.communication.remote {
251-
dora_core::config::RemoteCommunicationConfig::Tcp => {}
252-
}
253-
254246
// For spawn, we still route through the event loop because spawn_dataflow
255247
// needs mutable access to the logger and complex event loop integration.
256248
// TODO: Move spawn logic here once logger is refactored.

binaries/daemon/src/lib.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use aligned_vec::{AVec, ConstAlign};
22
use crossbeam::queue::ArrayQueue;
33
use dora_core::{
4-
build::{self, BuildInfo, PrevGitSource, TracingBuildLogger},
4+
build::{self, BuildInfo, PrevGitSource},
55
config::{DataId, Input, InputMapping, NodeId, NodeRunConfig},
66
descriptor::{
77
CoreNodeKind, DYNAMIC_SOURCE, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
@@ -56,11 +56,10 @@ use std::{
5656
use tokio::{
5757
fs::File,
5858
io::{AsyncReadExt, AsyncSeekExt},
59-
net::TcpStream,
6059
sync::{
6160
broadcast,
6261
mpsc::{self, UnboundedSender},
63-
oneshot::{self, Sender},
62+
oneshot::{self},
6463
},
6564
};
6665
use tokio_stream::{Stream, StreamExt, wrappers::ReceiverStream};
@@ -222,8 +221,7 @@ impl Daemon {
222221
let (events_tx, events_rx) = flume::bounded(10);
223222
if nodes
224223
.iter()
225-
.find(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
226-
.is_some()
224+
.any(|(_n, resolved_nodes)| resolved_nodes.kind.dynamic())
227225
{
228226
// Spawn local listener for dynamic nodes
229227
let _listen_port = local_listener::spawn_listener_loop(
@@ -709,10 +707,6 @@ impl Daemon {
709707
uv: bool,
710708
write_events_to: Option<PathBuf>,
711709
) -> Result<(), String> {
712-
match dataflow_descriptor.communication.remote {
713-
dora_core::config::RemoteCommunicationConfig::Tcp => {}
714-
}
715-
716710
// Resolve base working dir — for spawn we use the daemon's working dir
717711
let base_working_dir = match local_working_dir {
718712
Some(working_dir) => {
@@ -879,7 +873,7 @@ impl Daemon {
879873
send_output_to_local_receivers(
880874
node_id.clone(),
881875
output_id.clone(),
882-
&mut *dataflow,
876+
&mut dataflow,
883877
&metadata,
884878
data.map(DataMessage::Vec),
885879
&self.state.clock,
@@ -921,7 +915,7 @@ impl Daemon {
921915
if let Some(inputs) = dataflow.mappings.get(&output_id).cloned() {
922916
for (receiver_id, input_id) in &inputs {
923917
close_input(
924-
&mut *dataflow,
918+
&mut dataflow,
925919
receiver_id,
926920
input_id,
927921
&self.state.clock,
@@ -1461,7 +1455,7 @@ impl Daemon {
14611455
}
14621456
Ok(mut dataflow) => {
14631457
Self::subscribe(
1464-
&mut *dataflow,
1458+
&mut dataflow,
14651459
node_id.clone(),
14661460
event_sender,
14671461
&self.state.clock,
@@ -1615,7 +1609,7 @@ impl Daemon {
16151609
let data_bytes = send_output_to_local_receivers(
16161610
node_id.clone(),
16171611
output_id.clone(),
1618-
&mut *dataflow,
1612+
&mut dataflow,
16191613
&metadata,
16201614
data,
16211615
&self.state.clock,
@@ -1846,7 +1840,7 @@ impl Daemon {
18461840
.cloned()
18471841
.collect();
18481842
for (receiver_id, input_id) in &local_node_inputs {
1849-
close_input(&mut *dataflow, receiver_id, input_id, &self.state.clock);
1843+
close_input(&mut dataflow, receiver_id, input_id, &self.state.clock);
18501844
}
18511845

18521846
let mut closed = Vec::new();
@@ -2027,7 +2021,7 @@ impl Daemon {
20272021

20282022
let df = &mut *dataflow;
20292023
df.pending_nodes
2030-
.handle_node_stop_sync(node_id, &mut df.cascading_error_causes)
2024+
.handle_node_stop(node_id, &mut df.cascading_error_causes)
20312025
};
20322026
// DashMap guard is dropped — safe to do async I/O.
20332027
if exited_before_subscribe {
@@ -2573,7 +2567,7 @@ async fn read_last_n_lines(file: &mut File, mut tail: usize) -> io::Result<Vec<u
25732567
file.read_exact(&mut buffer[..read_len]).await?;
25742568
let read_buf = if at_end {
25752569
at_end = false;
2576-
&buffer[..read_len].trim_ascii_end()
2570+
buffer[..read_len].trim_ascii_end()
25772571
} else {
25782572
&buffer[..read_len]
25792573
};
@@ -2986,7 +2980,7 @@ impl RunningDataflow {
29862980
clock: &Arc<HLC>,
29872981
) -> eyre::Result<()> {
29882982
for interval in self.timers.keys().copied() {
2989-
if self._timer_handles.get(&interval).is_some() {
2983+
if self._timer_handles.contains_key(&interval) {
29902984
continue;
29912985
}
29922986
let events_tx = events_tx.clone();

0 commit comments

Comments
 (0)