Skip to content

Commit fd29e0d

Browse files
edgarribaclaude
andauthored
fix: add input validation and CLI consistency across all nodes (#6)
Audit all nodes against CLAUDE.md checklist and fix compliance gaps: - system-telemetry: add topic name validation and rate_hz bounds checking - network-monitor: add topic/numeric validation, endpoint default, build task - openmeteo: add config bounds validation, fix Header scope field, scope topics with bubbaloop/{scope}/{machine}/ prefix, rename -z to -e flag, add HTTP client timeout - inference: add -c/-e CLI flags, config.yaml, scoped subscribe topic Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e58da69 commit fd29e0d

File tree

13 files changed

+197
-33
lines changed

13 files changed

+197
-33
lines changed

inference/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ env_logger = "0.11"
1515
log = "0.4"
1616
ctrlc = "3.4"
1717
hostname = "0.4"
18+
argh = "0.1"
19+
serde = { version = "1.0", features = ["derive"] }
20+
serde_yaml = "0.9"
21+
regex-lite = "0.1"
1822
bubbaloop-schemas = { git = "https://github.com/kornia/bubbaloop.git", branch = "main" }
1923

2024
[[bin]]

inference/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Instance parameters for inference.
2+
# Run: ./target/release/inference_node -c config.yaml
3+
4+
# Topic pattern to subscribe to (wildcard for all cameras)
5+
subscribe_topic: camera/*/raw_shm

inference/node.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ type: rust
44
description: Inference node for camera processing
55
author: Bubbaloop Team
66
build: pixi run build
7-
command: ./target/release/inference_node
7+
command: ./target/release/inference_node -c config.yaml

inference/src/bin/inference_node.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,63 @@
55
//!
66
//! Based on: https://github.com/eclipse-zenoh/zenoh/blob/main/examples/examples/z_sub_shm.rs
77
8+
use argh::FromArgs;
89
use bubbaloop_schemas::RawImage;
910
use prost::Message;
11+
use serde::Deserialize;
12+
use std::path::{Path, PathBuf};
1013
use zenoh::Wait;
1114

15+
/// Inference node configuration
16+
#[derive(Debug, Clone, Deserialize)]
17+
struct Config {
18+
/// Topic pattern to subscribe to
19+
subscribe_topic: String,
20+
}
21+
22+
impl Default for Config {
23+
fn default() -> Self {
24+
Self {
25+
subscribe_topic: "camera/*/raw_shm".to_string(),
26+
}
27+
}
28+
}
29+
30+
/// Inference node for camera stream processing (SHM subscriber)
31+
#[derive(FromArgs)]
32+
struct Args {
33+
/// path to configuration file
34+
#[argh(option, short = 'c', default = "default_config_path()")]
35+
config: PathBuf,
36+
37+
/// zenoh endpoint to connect to
38+
#[argh(option, short = 'e', default = "default_endpoint()")]
39+
endpoint: String,
40+
}
41+
42+
fn default_config_path() -> PathBuf {
43+
PathBuf::from("config.yaml")
44+
}
45+
46+
fn default_endpoint() -> String {
47+
String::from("tcp/127.0.0.1:7447")
48+
}
49+
50+
fn load_config(path: &Path) -> Config {
51+
if path.exists() {
52+
match std::fs::read_to_string(path) {
53+
Ok(content) => match serde_yaml::from_str(&content) {
54+
Ok(config) => return config,
55+
Err(e) => log::warn!("Failed to parse config file: {}, using defaults", e),
56+
},
57+
Err(e) => log::warn!("Failed to read config file: {}, using defaults", e),
58+
}
59+
} else {
60+
log::warn!("Config file not found: {:?}, using defaults", path);
61+
}
62+
Config::default()
63+
}
64+
1265
/// Compute mean and standard deviation of pixel values
1366
fn compute_image_stats(data: &[u8]) -> (f64, f64) {
1467
if data.is_empty() {
@@ -30,8 +83,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3083
zenoh::init_log_from_env_or("error");
3184
env_logger::init();
3285

86+
let args: Args = argh::from_env();
87+
3388
log::info!("Starting inference node (SHM subscriber)...");
3489

90+
// Load and validate config
91+
let config = load_config(&args.config);
92+
93+
let topic_re = regex_lite::Regex::new(r"^[a-zA-Z0-9/_\-\.\*]+$").unwrap();
94+
if !topic_re.is_match(&config.subscribe_topic) {
95+
log::error!(
96+
"subscribe_topic '{}' contains invalid characters",
97+
config.subscribe_topic
98+
);
99+
std::process::exit(1);
100+
}
101+
35102
// Read scope/machine env vars for health heartbeat
36103
let scope = std::env::var("BUBBALOOP_SCOPE").unwrap_or_else(|_| "local".to_string());
37104
let machine_id = std::env::var("BUBBALOOP_MACHINE_ID").unwrap_or_else(|_| {
@@ -54,26 +121,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
54121
}
55122

56123
// Create Zenoh session with SHM enabled
57-
let mut config = zenoh::Config::default();
58-
config.insert_json5("transport/shared_memory/enabled", "true")?;
59-
if let Ok(endpoint) = std::env::var("ZENOH_ENDPOINT") {
60-
config.insert_json5("connect/endpoints", &format!(r#"["{}"]"#, endpoint))?;
61-
}
124+
let endpoint = std::env::var("ZENOH_ENDPOINT").unwrap_or(args.endpoint);
125+
log::info!("Connecting to Zenoh at: {}", endpoint);
126+
127+
let mut zenoh_config = zenoh::Config::default();
128+
zenoh_config.insert_json5("transport/shared_memory/enabled", "true")?;
129+
zenoh_config.insert_json5("connect/endpoints", &format!(r#"["{}"]"#, endpoint))?;
62130

63-
let session = zenoh::open(config).wait()?;
131+
let session = zenoh::open(zenoh_config).wait()?;
64132

65133
// Create health heartbeat publisher
66134
let health_topic = format!("bubbaloop/{}/{}/health/inference", scope, machine_id);
67135
let health_publisher = session.declare_publisher(health_topic.clone()).await?;
68136
log::info!("Health heartbeat topic: {}", health_topic);
69137

70-
// Subscribe to all camera raw_shm topics using wildcard
71-
let topic = "camera/*/raw_shm";
72-
let subscriber = session.declare_subscriber(topic).wait()?;
138+
// Subscribe using scoped topic
139+
let full_topic = format!(
140+
"bubbaloop/{}/{}/{}",
141+
scope, machine_id, config.subscribe_topic
142+
);
143+
let subscriber = session.declare_subscriber(&full_topic).wait()?;
73144

74145
log::info!(
75146
"Inference node subscribed to '{}', waiting for SHM images...",
76-
topic
147+
full_topic
77148
);
78149

79150
let mut frame_count = 0u64;

network-monitor/main.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
from datetime import datetime, timezone
1313
from pathlib import Path
1414

15+
import re
16+
1517
import requests
1618
import yaml
1719
import zenoh
1820

21+
# Topic name validation pattern
22+
TOPIC_RE = re.compile(r"^[a-zA-Z0-9/_\-\.]+$")
23+
1924
# Configure logging
2025
logging.basicConfig(
2126
level=logging.INFO,
@@ -53,6 +58,20 @@ def __init__(self, config_path: Path, endpoint: str | None = None):
5358
"checks": [],
5459
}
5560

61+
# Validate config
62+
topic = self.config.get("publish_topic", "")
63+
if not TOPIC_RE.match(topic):
64+
raise ValueError(
65+
f"publish_topic '{topic}' contains invalid characters "
66+
f"(must match [a-zA-Z0-9/_\\-\\.]+)"
67+
)
68+
rate_hz = self.config.get("rate_hz", 0.1)
69+
if not (0.01 <= rate_hz <= 1000.0):
70+
raise ValueError(f"rate_hz {rate_hz} out of range (0.01-1000.0)")
71+
timeout_secs = self.config.get("timeout_secs", 5)
72+
if not (1 <= timeout_secs <= 300):
73+
raise ValueError(f"timeout_secs {timeout_secs} out of range (1-300)")
74+
5675
# Resolve scope and machine_id from env vars
5776
import os
5877
self.scope = os.environ.get("BUBBALOOP_SCOPE", "local")
@@ -277,8 +296,8 @@ def main():
277296
"-e",
278297
"--endpoint",
279298
type=str,
280-
default=None,
281-
help="Zenoh endpoint to connect to",
299+
default="tcp/127.0.0.1:7447",
300+
help="Zenoh endpoint to connect to (default: tcp/127.0.0.1:7447)",
282301
)
283302
args = parser.parse_args()
284303

network-monitor/pixi.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ channels = ["conda-forge"]
44
platforms = ["linux-64", "linux-aarch64"]
55

66
[tasks]
7+
build = "python build_proto.py"
78
build-proto = "python build_proto.py"
89
run = "python main.py -c config.yaml"
910
dev = "python main.py -c config.yaml"

openmeteo/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde_json = "1.0"
2222
serde_yaml = "0.9"
2323
thiserror = "2.0"
2424
hostname = "0.4"
25+
regex-lite = "0.1"
2526
bubbaloop-schemas = { git = "https://github.com/kornia/bubbaloop.git", branch = "main", features = ["ros-z"] }
2627

2728
[[bin]]

openmeteo/src/api.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ pub struct OpenMeteoClient {
120120
impl OpenMeteoClient {
121121
pub fn new() -> Self {
122122
Self {
123-
client: reqwest::Client::new(),
123+
client: reqwest::Client::builder()
124+
.timeout(std::time::Duration::from_secs(30))
125+
.build()
126+
.expect("Failed to build HTTP client"),
124127
}
125128
}
126129

openmeteo/src/bin/openmeteo_node.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ struct Args {
1313

1414
/// zenoh router endpoint to connect to
1515
/// Default: tcp/127.0.0.1:7447 (local zenohd router)
16-
#[argh(option, short = 'z', default = "String::from(\"tcp/127.0.0.1:7447\")")]
17-
zenoh_endpoint: String,
16+
#[argh(option, short = 'e', default = "String::from(\"tcp/127.0.0.1:7447\")")]
17+
endpoint: String,
1818
}
1919

2020
#[tokio::main]
@@ -82,7 +82,7 @@ async fn main() -> ZResult<()> {
8282
log::info!("Scope: {}, Machine ID: {}", scope, machine_id);
8383

8484
// Initialize ROS-Z context
85-
let endpoint = std::env::var("ZENOH_ENDPOINT").unwrap_or(args.zenoh_endpoint);
85+
let endpoint = std::env::var("ZENOH_ENDPOINT").unwrap_or(args.endpoint);
8686
log::info!("Connecting to Zenoh at: {}", endpoint);
8787
let ctx = Arc::new(
8888
ZContextBuilder::default()

openmeteo/src/config.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,43 @@ pub struct Config {
9191
pub fetch: FetchConfig,
9292
}
9393

94+
impl FetchConfig {
95+
/// Validate fetch config bounds
96+
pub fn validate(&self) -> Result<(), ConfigError> {
97+
if self.current_interval_secs == 0 || self.current_interval_secs > 86400 {
98+
return Err(ConfigError::ValidationError(format!(
99+
"current_interval_secs {} out of range (1-86400)",
100+
self.current_interval_secs
101+
)));
102+
}
103+
if self.hourly_interval_secs == 0 || self.hourly_interval_secs > 86400 {
104+
return Err(ConfigError::ValidationError(format!(
105+
"hourly_interval_secs {} out of range (1-86400)",
106+
self.hourly_interval_secs
107+
)));
108+
}
109+
if self.daily_interval_secs == 0 || self.daily_interval_secs > 86400 {
110+
return Err(ConfigError::ValidationError(format!(
111+
"daily_interval_secs {} out of range (1-86400)",
112+
self.daily_interval_secs
113+
)));
114+
}
115+
if self.hourly_forecast_hours == 0 || self.hourly_forecast_hours > 384 {
116+
return Err(ConfigError::ValidationError(format!(
117+
"hourly_forecast_hours {} out of range (1-384)",
118+
self.hourly_forecast_hours
119+
)));
120+
}
121+
if self.daily_forecast_days == 0 || self.daily_forecast_days > 16 {
122+
return Err(ConfigError::ValidationError(format!(
123+
"daily_forecast_days {} out of range (1-16)",
124+
self.daily_forecast_days
125+
)));
126+
}
127+
Ok(())
128+
}
129+
}
130+
94131
impl Config {
95132
/// Load configuration from a YAML file
96133
pub fn from_file(path: impl AsRef<Path>) -> Result<Self, ConfigError> {
@@ -101,7 +138,10 @@ impl Config {
101138

102139
/// Parse configuration from a YAML string
103140
pub fn parse(yaml: &str) -> Result<Self, ConfigError> {
104-
serde_yaml::from_str(yaml).map_err(|e| ConfigError::ParseError(e.to_string()))
141+
let config: Config =
142+
serde_yaml::from_str(yaml).map_err(|e| ConfigError::ParseError(e.to_string()))?;
143+
config.fetch.validate()?;
144+
Ok(config)
105145
}
106146
}
107147

@@ -112,4 +152,6 @@ pub enum ConfigError {
112152
IoError(String),
113153
#[error("Parse error: {0}")]
114154
ParseError(String),
155+
#[error("Validation error: {0}")]
156+
ValidationError(String),
115157
}

0 commit comments

Comments
 (0)