Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ fn host_log<'py>(record: Bound<'py, PyAny>) -> PyResult<()> {
///
/// Since any call like `logging.warn(...)` sets up logging via `logging.basicConfig`, all log messages are now
/// delivered to `crate::host_log`, which will send them to `tracing::event!`.
pub fn setup_logging(py: Python, node_id: NodeId, dataflow_id: DataflowId) -> PyResult<()> {
pub fn setup_logging(py: Python, _node_id: NodeId, _dataflow_id: DataflowId) -> PyResult<()> {
let logging = py.import("logging")?;
logging.setattr("host_log", wrap_pyfunction!(host_log, &logging)?)?;
py.run(
cr#"
class HostHandler(Handler):
def __init__(self, level=0):
super().__init__(level=level)

def emit(self, record):
host_log(record)

Expand Down Expand Up @@ -218,10 +218,7 @@ impl Node {
#[allow(clippy::should_implement_trait)]
pub fn try_recv(&mut self, py: Python) -> Option<Py<PyDict>> {
match self.events.try_recv() {
Ok(event) => match event.to_py_dict(py) {
Ok(dict) => Some(dict),
Err(_) => None,
},
Ok(event) => event.to_py_dict(py).ok(),
Err(_) => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<T> DelayedCleanup<T> {
CleanupHandle(self.0.clone())
}

pub fn get_mut(&self) -> std::sync::MutexGuard<T> {
pub fn get_mut(&self) -> std::sync::MutexGuard<'_, T> {
self.0.try_lock().expect("failed to lock DelayedCleanup")
}
}
Expand Down
1 change: 0 additions & 1 deletion binaries/cli/src/command/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{CommandFactory, ValueEnum};
use clap_complete::Shell;

use crate::command::Executable;
use sysinfo;

#[derive(Debug, clap::Args)]
#[command(after_help = r#"
Expand Down
39 changes: 13 additions & 26 deletions binaries/cli/src/command/inspect/top.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl App {
dataflow_id: node_info.dataflow_id,
dataflow_name: node_info
.dataflow_name
.unwrap_or_else(|| "<unnamed>".to_string()),
.unwrap_or_else(|| "<unnamed>".to_owned()),
node_id: node_info.node_id,
pid,
cpu_usage,
Expand All @@ -256,30 +256,19 @@ fn run_app<B: Backend>(
) -> eyre::Result<()> {
let mut app = App::new();
let mut last_update = Instant::now();
let mut node_infos: Vec<NodeInfo> = Vec::new();

// Reuse coordinator connection
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("Failed to connect to coordinator")?;

// Query node info once initially
let request = ControlRequest::GetNodeInfo;
let reply_raw = session
.request(&serde_json::to_vec(&request).unwrap())
.wrap_err("failed to send initial request to coordinator")?;

let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;

node_infos = match reply {
ControlRequestReply::NodeInfoList(infos) => infos,
ControlRequestReply::Error(err) => {
return Err(eyre!("coordinator error: {err}"));
}
_ => {
return Err(eyre!("unexpected reply from coordinator"));
}
};
// let request = ControlRequest::GetNodeInfo;
// let reply_raw = session
// .request(&serde_json::to_vec(&request).unwrap())
// .wrap_err("failed to send initial request to coordinator")?;

// let reply: ControlRequestReply =
// serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;

loop {
terminal.draw(|f| ui(f, &mut app, refresh_duration))?;
Expand Down Expand Up @@ -333,20 +322,18 @@ fn run_app<B: Backend>(
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;

match reply {
ControlRequestReply::NodeInfoList(infos) => {
node_infos = infos;
}
let node_infos = match reply {
ControlRequestReply::NodeInfoList(infos) => infos,
ControlRequestReply::Error(err) => {
return Err(eyre!("coordinator error: {err}"));
}
_ => {
return Err(eyre!("unexpected reply from coordinator"));
}
}
};

// Update stats with current node info
app.update_stats(node_infos.clone());
app.update_stats(node_infos);
last_update = Instant::now();
}
}
Expand All @@ -365,7 +352,7 @@ fn ui(f: &mut Frame, app: &mut App, refresh_duration: Duration) {
}
};

let header_strings = vec![
let header_strings = [
format!("NODE{}", sort_indicator(SortColumn::Node)),
"DATAFLOW".to_string(),
"PID".to_string(),
Expand Down
3 changes: 1 addition & 2 deletions binaries/cli/src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ mod topic;
mod up;

pub use build::build;
pub use run::{run, run_func};
pub use system::check_environment;
pub use run::run;

use build::Build;
use completion::Completion;
Expand Down
1 change: 1 addition & 0 deletions binaries/cli/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct Run {
}

#[deprecated(note = "use `run` instead")]
#[allow(dead_code)]
pub fn run_func(dataflow: String, uv: bool) -> eyre::Result<()> {
run(dataflow, uv)
}
Expand Down
2 changes: 0 additions & 2 deletions binaries/cli/src/command/system/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pub mod status;

pub use status::check_environment;

use super::Executable;
use status::Status;

Expand Down
16 changes: 8 additions & 8 deletions binaries/cli/src/command/topic/hz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,14 @@ fn ui(
window_dur: Duration,
) {
// Layout: table | charts | footer
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Percentage(55),
Constraint::Percentage(44),
Constraint::Length(1),
])
.split(f.area());
// let chunks = Layout::default()
// .direction(Direction::Vertical)
// .constraints([
// Constraint::Percentage(55),
// Constraint::Percentage(44),
// Constraint::Length(1),
// ])
// .split(f.area());

// Table header: interval stats in ms + derived avg Hz
let header = Row::new([
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod session;
mod template;

pub use command::build;
pub use command::{run, run_func};
pub use command::run;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn print_log_message(
let node_id = node_id
.to_string()
.bold()
.color(word_to_color(&node_id.to_string()));
.color(word_to_color(node_id.as_ref()));
let padding = if daemon.is_empty() { "" } else { " " };
format!("{node_id}{padding}{daemon}{colon} ")
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ async fn start_inner(

let mut node_infos = Vec::new();
for dataflow in running_dataflows.values() {
for (node_id, _node) in &dataflow.nodes {
for node_id in dataflow.nodes.keys() {
// Get the specific daemon this node is running on
if let Some(daemon_id) = dataflow.node_to_daemon.get(node_id) {
// Get metrics if available
Expand Down
8 changes: 4 additions & 4 deletions binaries/daemon/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct NodeLogger<'a> {
}

impl NodeLogger<'_> {
pub fn inner(&self) -> &DataflowLogger {
pub fn inner(&self) -> &DataflowLogger<'_> {
&self.logger
}

Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a> DataflowLogger<'a> {
}
}

pub fn reborrow(&mut self) -> DataflowLogger {
pub fn reborrow(&mut self) -> DataflowLogger<'_> {
DataflowLogger {
dataflow_id: self.dataflow_id,
logger: CowMut::Borrowed(&mut self.logger),
Expand Down Expand Up @@ -153,14 +153,14 @@ pub struct DaemonLogger {
}

impl DaemonLogger {
pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger {
pub fn for_dataflow(&mut self, dataflow_id: Uuid) -> DataflowLogger<'_> {
DataflowLogger {
dataflow_id,
logger: CowMut::Borrowed(self),
}
}

pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger {
pub fn for_node_build(&mut self, build_id: BuildId, node_id: NodeId) -> NodeBuildLogger<'_> {
NodeBuildLogger {
build_id,
node_id,
Expand Down
1 change: 0 additions & 1 deletion binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ impl PreparedNode {
// Stderr listener stream
let stderr_tx = tx.clone();
let node_id = self.node.id.clone();
let uhlc = self.clock.clone();
let daemon_tx_log = self.daemon_tx.clone();
tokio::spawn(async move {
let mut buffer = String::new();
Expand Down
1 change: 0 additions & 1 deletion examples/benchmark/node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dora_node_api::{self, DoraNode, dora_core::config::DataId};
use eyre::Context;
use rand::RngCore;
use std::time::Duration;

Expand Down
1 change: 0 additions & 1 deletion examples/benchmark/sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dora_node_api::{self, DoraNode, Event};
use eyre::Context;
use std::time::{Duration, Instant};
fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;
Expand Down
6 changes: 3 additions & 3 deletions libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
Descriptor::parse(buf)
}

fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut<'_>> {
match node.kind()? {
NodeKind::Standard(_) => {
let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
Expand Down Expand Up @@ -239,11 +239,11 @@ pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
}

pub trait NodeExt {
fn kind(&self) -> eyre::Result<NodeKind>;
fn kind(&self) -> eyre::Result<NodeKind<'_>>;
}

impl NodeExt for Node {
fn kind(&self) -> eyre::Result<NodeKind> {
fn kind(&self) -> eyre::Result<NodeKind<'_>> {
match (&self.path, &self.operators, &self.custom, &self.operator) {
(None, None, None, None) => {
eyre::bail!(
Expand Down
2 changes: 1 addition & 1 deletion libraries/extensions/ros2-bridge/msg-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::path::Path;

use quote::{ToTokens, format_ident, quote};
use quote::{format_ident, quote};

pub mod parser;
pub mod types;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
fn get_ros_msgs_each_package<P: AsRef<Path>>(root_dirs: &[P]) -> Result<Vec<Package>> {
let mut map: HashMap<String, Package> = HashMap::new();

let ros_formats = vec!["msg", "srv", "action"];
let ros_formats = ["msg", "srv", "action"];

// Return empty vector if root_dir is empty
for root_dir in root_dirs {
Expand Down Expand Up @@ -93,7 +93,7 @@

let mut packages = Vec::new();
let mut dependencies = HashMap::<String, BTreeSet<String>>::new();
for (_pkg_name, pkg) in &map {
for pkg in map.values() {
packages.push(pkg.clone());
}
for package in &mut packages {
Expand Down Expand Up @@ -128,7 +128,7 @@
.get(&package.name)
.unwrap()
.iter()
.filter_map(|dep| map.get(dep).map(|pkg| pkg.clone()))
.filter_map(|dep| map.get(dep).cloned())
.collect();
}

Expand All @@ -154,7 +154,7 @@
let direct_deps: BTreeSet<String> = dependencies.get(name).unwrap().iter().cloned().collect();
for dep_name in &direct_deps {
let indirect_deps = flatten_dependencies(
&dep_name,

Check warning on line 157 in libraries/extensions/ros2-bridge/msg-gen/src/parser/package.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
dependencies,
flattened_dependencies,
unfinished_set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Action {
let create_client = format_ident!("new_ActionClient__{package_name}__{}", self.name);
let cxx_create_client = format!("create_action_client_{package_name}_{}", self.name);

let package = format_ident!("{package_name}");
// let package = format_ident!("{package_name}");
let self_name = format_ident!("{}", self.name);
let self_name_str = &self.name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl Message {
let publisher_name = format_ident!("Publisher__{package_name}__{}", self.name);
let cxx_publisher_name = format!("Publisher_{}", self.name);
let create_publisher = format_ident!("new__Publisher__{package_name}__{}", self.name);
let cxx_create_publisher = format!("create_publisher");
let cxx_create_publisher = "create_publisher";

let struct_raw_name = format_ident!("{package_name}__{}", self.name);
let struct_raw_name_str = struct_raw_name.to_string();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeSet, path::PathBuf};
use std::path::PathBuf;

use proc_macro2::Span;
use quote::{ToTokens, format_ident, quote};
Expand All @@ -10,7 +10,7 @@
pub struct Package {
pub name: String,
pub path: PathBuf,
pub dependencies: Vec<Package>,

Check warning on line 13 in libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs

View workflow job for this annotation

GitHub Actions / Clippy

unnecessary structure name repetition
pub messages: Vec<Message>,
pub services: Vec<Service>,
pub actions: Vec<Action>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Service {
let create_client = format_ident!("new_Client__{package_name}__{}", self.name);
let cxx_create_client = format!("create_client_{package_name}_{}", self.name);

let package = format_ident!("{package_name}");
// let package = format_ident!("{package_name}");
let self_name = format_ident!("{}", self.name);
let self_name_str = &self.name;

Expand Down
2 changes: 1 addition & 1 deletion libraries/message/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use aligned_vec::{AVec, ConstAlign};
use chrono::{DateTime, Utc};
use eyre::Context as _;
use serde::{Deserialize, Deserializer};
use serde::Deserialize;
use uuid::Uuid;

use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId};
Expand Down Expand Up @@ -56,7 +56,7 @@
.map(|id| DataflowId::from(Uuid::parse_str(&id).unwrap()))),
node_id: helper.node_id.or(fields
.and_then(|f| f.get("node_id").cloned())
.map(|id| NodeId(id))),

Check warning on line 59 in libraries/message/src/common.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
daemon_id: helper
.daemon_id
.or(fields.and_then(|f| f.get("daemon_id").cloned()).map(|id| {
Expand All @@ -69,7 +69,7 @@
} else {
DaemonId {
machine_id: None,
uuid: Uuid::parse_str(&parts[0]).unwrap(),

Check warning on line 72 in libraries/message/src/common.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
}
}
})),
Expand Down
Loading
Loading