Skip to content

Commit d69cf4b

Browse files
committed
fix: improve type description robustness and fix console issues
- Add sequence length safety check (max 100M elements) to prevent OOM from malformed data - Fix type description dependency resolution to handle proper ordering - Preserve explicit type_info when provided (don't overwrite with schema-derived hash) - Fix blocking operation in DynamicTopicSubscriber (use spawn_blocking) - Set default-run for ros-z-console and remove test binary
1 parent 37fdd12 commit d69cf4b

File tree

5 files changed

+91
-53
lines changed

5 files changed

+91
-53
lines changed

ros-z-cdr/src/primitives.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,21 @@ impl<'a, BO: ByteOrder> CdrReader<'a, BO> {
333333
}
334334

335335
/// Read a sequence length prefix.
336+
///
337+
/// # Safety
338+
/// Includes a sanity check to prevent absurd allocations from malformed data.
339+
/// Maximum sequence length is 100 million elements (~400MB for float32 arrays).
336340
#[inline]
337341
pub fn read_sequence_length(&mut self) -> Result<usize> {
338-
Ok(self.read_u32()? as usize)
342+
const MAX_SEQUENCE_LENGTH: u32 = 100_000_000;
343+
let len = self.read_u32()?;
344+
if len > MAX_SEQUENCE_LENGTH {
345+
return Err(Error::Custom(format!(
346+
"Sequence length {} exceeds maximum allowed ({}). Possible schema mismatch or corrupted data.",
347+
len, MAX_SEQUENCE_LENGTH
348+
)));
349+
}
350+
Ok(len as usize)
339351
}
340352

341353
/// Read raw bytes with length prefix.

ros-z-console/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ version = "0.1.0"
44
edition = "2024"
55
description = "ROS 2 Graph Inspector & Dataflow Monitor TUI"
66
homepage = "https://github.com/ZettaScaleLabs/ros-z"
7+
default-run = "ros-z-console"
78

89
[dependencies]
910
ros-z = { path = "../ros-z" }
@@ -21,7 +22,3 @@ tracing = { workspace = true }
2122
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
2223
zenoh = { workspace = true }
2324
chrono = { version = "0.4", features = ["serde"] }
24-
25-
[[bin]]
26-
name = "simple-test-pub"
27-
path = "simple-test-pub.rs"

ros-z-console/src/core/dynamic_subscriber.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl DynamicTopicSubscriber {
6363
// Spawn background task to forward messages to channel
6464
let subscriber = Arc::new(subscriber);
6565
let sub_clone = subscriber.clone();
66-
tokio::spawn(async move {
66+
tokio::task::spawn_blocking(move || {
6767
loop {
6868
match sub_clone.recv() {
6969
Ok(msg) => {

ros-z/src/dynamic/type_description.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,31 @@ pub fn type_description_msg_to_schema(
247247
}
248248

249249
// Second pass: resolve nested types in referenced schemas
250+
// Process in dependency order: iterate until all are resolved
250251
let mut resolved_map: HashMap<String, Arc<MessageSchema>> = HashMap::new();
251-
for ref_td in &msg.referenced_type_descriptions {
252-
let schema = type_description_to_schema_full(ref_td, &type_map)?;
253-
resolved_map.insert(ref_td.type_name.clone(), Arc::new(schema));
252+
let mut remaining: Vec<&TypeDescription> = msg.referenced_type_descriptions.iter().collect();
253+
254+
while !remaining.is_empty() {
255+
let mut made_progress = false;
256+
remaining.retain(|ref_td| {
257+
// Try to build with current resolved_map
258+
match type_description_to_schema_full(ref_td, &resolved_map) {
259+
Ok(schema) => {
260+
resolved_map.insert(ref_td.type_name.clone(), Arc::new(schema));
261+
made_progress = true;
262+
false // Remove from remaining
263+
}
264+
Err(_) => true // Keep in remaining, dependencies not ready yet
265+
}
266+
});
267+
268+
if !made_progress && !remaining.is_empty() {
269+
// Circular dependency or missing type
270+
return Err(DynamicError::SerializationError(format!(
271+
"Cannot resolve dependencies for types: {}",
272+
remaining.iter().map(|t| t.type_name.as_str()).collect::<Vec<_>>().join(", ")
273+
)));
274+
}
254275
}
255276

256277
// Build the main schema with resolved references

ros-z/src/pubsub.rs

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -141,29 +141,33 @@ impl<T, S> ZPubBuilder<T, S> {
141141
pub fn with_dyn_schema(mut self, schema: Arc<crate::dynamic::schema::MessageSchema>) -> Self {
142142
use crate::dynamic::MessageSchemaTypeDescription;
143143

144-
// Compute TypeInfo from schema for proper key expression matching with ROS 2
145-
// Convert ROS 2 canonical name to DDS name
146-
// "std_msgs/msg/String" → "std_msgs::msg::dds_::String_"
147-
let dds_name = schema.type_name
148-
.replace("/msg/", "::msg::dds_::")
149-
.replace("/srv/", "::srv::dds_::")
150-
.replace("/action/", "::action::dds_::")
151-
+ "_";
152-
153-
// Convert schema TypeHash to entity TypeHash via RIHS string
154-
let type_hash = match schema.compute_type_hash() {
155-
Ok(hash) => {
156-
let rihs_string = hash.to_rihs_string();
157-
crate::entity::TypeHash::from_rihs_string(&rihs_string)
158-
.unwrap_or_else(crate::entity::TypeHash::zero)
159-
}
160-
Err(_) => crate::entity::TypeHash::zero(),
161-
};
144+
// Only compute and set type_info if it hasn't been set already
145+
// (e.g., from create_dyn_sub_auto which provides the publisher's hash)
146+
if self.entity.type_info.is_none() {
147+
// Compute TypeInfo from schema for proper key expression matching with ROS 2
148+
// Convert ROS 2 canonical name to DDS name
149+
// "std_msgs/msg/String" → "std_msgs::msg::dds_::String_"
150+
let dds_name = schema.type_name
151+
.replace("/msg/", "::msg::dds_::")
152+
.replace("/srv/", "::srv::dds_::")
153+
.replace("/action/", "::action::dds_::")
154+
+ "_";
155+
156+
// Convert schema TypeHash to entity TypeHash via RIHS string
157+
let type_hash = match schema.compute_type_hash() {
158+
Ok(hash) => {
159+
let rihs_string = hash.to_rihs_string();
160+
crate::entity::TypeHash::from_rihs_string(&rihs_string)
161+
.unwrap_or_else(crate::entity::TypeHash::zero)
162+
}
163+
Err(_) => crate::entity::TypeHash::zero(),
164+
};
162165

163-
self.entity.type_info = Some(crate::entity::TypeInfo {
164-
name: dds_name,
165-
hash: type_hash,
166-
});
166+
self.entity.type_info = Some(crate::entity::TypeInfo {
167+
name: dds_name,
168+
hash: type_hash,
169+
});
170+
}
167171

168172
self.dyn_schema = Some(schema);
169173
self
@@ -449,29 +453,33 @@ where
449453
pub fn with_dyn_schema(mut self, schema: Arc<crate::dynamic::schema::MessageSchema>) -> Self {
450454
use crate::dynamic::MessageSchemaTypeDescription;
451455

452-
// Compute TypeInfo from schema for proper key expression matching with ROS 2
453-
// Convert ROS 2 canonical name to DDS name
454-
// "std_msgs/msg/String" → "std_msgs::msg::dds_::String_"
455-
let dds_name = schema.type_name
456-
.replace("/msg/", "::msg::dds_::")
457-
.replace("/srv/", "::srv::dds_::")
458-
.replace("/action/", "::action::dds_::")
459-
+ "_";
460-
461-
// Convert schema TypeHash to entity TypeHash via RIHS string
462-
let type_hash = match schema.compute_type_hash() {
463-
Ok(hash) => {
464-
let rihs_string = hash.to_rihs_string();
465-
crate::entity::TypeHash::from_rihs_string(&rihs_string)
466-
.unwrap_or_else(crate::entity::TypeHash::zero)
467-
}
468-
Err(_) => crate::entity::TypeHash::zero(),
469-
};
456+
// Only compute and set type_info if it hasn't been set already
457+
// (e.g., from create_dyn_sub_auto which provides the publisher's hash)
458+
if self.entity.type_info.is_none() {
459+
// Compute TypeInfo from schema for proper key expression matching with ROS 2
460+
// Convert ROS 2 canonical name to DDS name
461+
// "std_msgs/msg/String" → "std_msgs::msg::dds_::String_"
462+
let dds_name = schema.type_name
463+
.replace("/msg/", "::msg::dds_::")
464+
.replace("/srv/", "::srv::dds_::")
465+
.replace("/action/", "::action::dds_::")
466+
+ "_";
467+
468+
// Convert schema TypeHash to entity TypeHash via RIHS string
469+
let type_hash = match schema.compute_type_hash() {
470+
Ok(hash) => {
471+
let rihs_string = hash.to_rihs_string();
472+
crate::entity::TypeHash::from_rihs_string(&rihs_string)
473+
.unwrap_or_else(crate::entity::TypeHash::zero)
474+
}
475+
Err(_) => crate::entity::TypeHash::zero(),
476+
};
470477

471-
self.entity.type_info = Some(crate::entity::TypeInfo {
472-
name: dds_name,
473-
hash: type_hash,
474-
});
478+
self.entity.type_info = Some(crate::entity::TypeInfo {
479+
name: dds_name,
480+
hash: type_hash,
481+
});
482+
}
475483

476484
self.dyn_schema = Some(schema);
477485
self

0 commit comments

Comments
 (0)