Skip to content

Commit 72fa51f

Browse files
committed
--wip--
1 parent c518ef9 commit 72fa51f

File tree

8 files changed

+106
-28
lines changed

8 files changed

+106
-28
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ watch = "0.2.3"
265265
webots = { version = "0.8.0" }
266266
wgpu = "24.0.5"
267267
xdg = "2.5.2"
268+
yuv = "0.8.9"
268269
zbus = "5.5.0"
269270
zed = { path = "crates/zed" }
270271
zenoh = "1.7.2"

crates/hulkz/src/buffer.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@ use std::{
66

77
use serde::Deserialize;
88

9-
use crate::{Cache, Timestamped, TopicStream};
9+
use crate::{stream::StreamError, Cache, Timestamped, TopicStream};
10+
11+
#[derive(Debug, thiserror::Error)]
12+
pub enum BufferError {
13+
#[error("Stream error: {0}")]
14+
StreamError(StreamError),
15+
}
16+
17+
pub type Result<T, E = BufferError> = std::result::Result<T, E>;
1018

1119
#[derive(Clone)]
1220
pub struct TopicBuffer<T> {
@@ -17,15 +25,25 @@ impl<T> TopicBuffer<T>
1725
where
1826
for<'de> T: Deserialize<'de> + Timestamped + Clone + Send + 'static,
1927
{
20-
pub fn new(mut stream: TopicStream<T>, capacity: usize) -> (Self, impl Future<Output = ()>) {
28+
pub fn new(
29+
mut stream: TopicStream<T>,
30+
capacity: usize,
31+
) -> (Self, impl Future<Output = Result<()>>) {
2132
let cache = Arc::new(RwLock::new(Cache::new(capacity)));
2233

2334
let driver = {
2435
let cache = cache.clone();
2536
async move {
26-
while let Ok(msg) = stream.recv_async().await {
27-
let mut cache = cache.write().expect("lock is poisoned");
28-
cache.add(msg);
37+
loop {
38+
match stream.recv_async().await {
39+
Ok(msg) => {
40+
let mut cache = cache.write().expect("lock is poisoned");
41+
cache.add(msg);
42+
}
43+
Err(err) => {
44+
return Err(BufferError::StreamError(err));
45+
}
46+
}
2947
}
3048
}
3149
};

crates/hulkz/src/publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub enum PublisherError {
1111
Zenoh(#[from] zenoh::Error),
1212
}
1313

14-
pub type Result<T> = std::result::Result<T, PublisherError>;
14+
pub type Result<T, E = PublisherError> = std::result::Result<T, E>;
1515

1616
pub struct Publisher<'a, T>
1717
where

crates/hulkz/src/session.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use std::{future::Future, sync::Arc};
33
use serde::{Deserialize, Serialize};
44
use zenoh::handlers::RingChannel;
55

6-
use crate::{Publisher, Timestamped, TopicBuffer, TopicStream};
6+
use crate::{buffer::BufferError, Publisher, Timestamped, TopicBuffer, TopicStream};
77

88
#[derive(Debug, thiserror::Error)]
99
pub enum SessionError {
1010
#[error("Zenoh session error: {0}")]
1111
Zenoh(#[from] zenoh::Error),
1212
}
1313

14-
pub type Result<T> = std::result::Result<T, SessionError>;
14+
pub type Result<T, E = SessionError> = std::result::Result<T, E>;
1515

1616
pub struct Session {
1717
session: Arc<zenoh::Session>,
@@ -54,7 +54,10 @@ impl Session {
5454
&self,
5555
key_exp: &str,
5656
capacity: usize,
57-
) -> Result<(TopicBuffer<T>, impl Future<Output = ()>)>
57+
) -> Result<(
58+
TopicBuffer<T>,
59+
impl Future<Output = Result<(), BufferError>>,
60+
)>
5861
where
5962
for<'de> T: Deserialize<'de> + Timestamped + Clone + Send + 'static,
6063
{

crates/hulkz/src/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub enum StreamError {
1010
Zenoh(#[from] zenoh::Error),
1111
}
1212

13-
pub type Result<T> = std::result::Result<T, StreamError>;
13+
pub type Result<T, E = StreamError> = std::result::Result<T, E>;
1414

1515
pub struct TopicStream<T> {
1616
inner: ZenohSubscriber<RingChannelHandler<Sample>>,

crates/simple-viewer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ homepage.workspace = true
99
booster = { workspace = true }
1010
color-eyre = { workspace = true }
1111
hulkz = { workspace = true }
12+
image = { workspace = true }
1213
rerun = { workspace = true }
1314
ros2 = { workspace = true }
1415
tokio = { workspace = true }
1516
tracing-subscriber = { workspace = true }
17+
yuv = { workspace = true }

crates/simple-viewer/src/main.rs

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use booster::LowState;
2-
use color_eyre::Result;
1+
use color_eyre::{eyre::bail, Result};
32
use hulkz::Session;
3+
use image::{error::DecodingError, ImageError, RgbImage};
4+
use ros2::sensor_msgs::image::Image;
5+
use yuv::{yuv_nv12_to_rgb, YuvBiPlanarImage, YuvConversionMode, YuvRange, YuvStandardMatrix};
46

57
#[tokio::main]
68
async fn main() -> Result<()> {
@@ -15,24 +17,65 @@ async fn main() -> Result<()> {
1517

1618
let session = Session::new().await?;
1719

18-
let mut low_state = session
19-
.stream::<LowState>("HULK10/booster/low_state")
20-
.await?;
20+
let mut stream = session.stream::<Image>("booster/rectified_image").await?;
21+
22+
while let Ok(image) = stream.recv_async().await {
23+
let y_plane_size = (image.step * image.height) as usize;
24+
// UV plane is half height, but same stride as Y in NV12 (usually)
25+
let uv_plane_size = (image.step * image.height / 2) as usize;
26+
27+
if image.data.len() < y_plane_size + uv_plane_size {
28+
bail!("NV12: Source buffer is too small for the given dimensions");
29+
}
30+
31+
// 2. Prepare Output Buffer
32+
// RgbImage is a flattened Vec<u8> (R, G, B, R, G, B...)
33+
let mut rgb_image = RgbImage::new(image.width, image.height);
34+
35+
// 3. Define Strides
36+
// ROS 'step' is the stride for the Y plane.
37+
let y_stride = image.step;
38+
// NV12 UV plane usually has the same stride as Y
39+
let uv_stride = image.step;
40+
// RGB output stride (3 bytes per pixel * width)
41+
let rgb_stride = image.width * 3;
42+
43+
// 4. Split Input Data into Planes
44+
let (y_plane, remaining) = image.data.split_at(y_plane_size);
45+
let uv_plane = &remaining[..uv_plane_size];
46+
47+
let yuv_bi_planar_image = YuvBiPlanarImage {
48+
y_plane,
49+
y_stride,
50+
uv_plane,
51+
uv_stride,
52+
width: image.width,
53+
height: image.height,
54+
};
55+
56+
yuv_nv12_to_rgb(
57+
&yuv_bi_planar_image,
58+
rgb_image.as_flat_samples_mut().as_mut_slice(),
59+
rgb_stride,
60+
YuvRange::Limited, // Standard for video (16-235). Use 'Full' for JPEGs.
61+
YuvStandardMatrix::Bt709, // Standard for HD Video. Use Bt601 for SD/Webcams.
62+
YuvConversionMode::Balanced,
63+
)
64+
.map_err(|e| {
65+
ImageError::Decoding(DecodingError::from_format_hint(
66+
image::error::ImageFormatHint::Name(format!("NV12: {e}")),
67+
))
68+
})?;
2169

22-
while let Ok(low_state) = low_state.recv_async().await {
23-
let accelerometer = low_state.imu_state.linear_acceleration;
24-
rec.log(
25-
"booster/imu/accelerometer",
26-
&rerun::archetypes::Scalars::new([
27-
accelerometer.x(),
28-
accelerometer.y(),
29-
accelerometer.z(),
30-
]),
31-
)?;
32-
let rpy = low_state.imu_state.roll_pitch_yaw;
3370
rec.log(
34-
"booster/imu/roll_pitch_yaw",
35-
&rerun::archetypes::Scalars::new([rpy.x(), rpy.y(), rpy.z()]),
71+
"booster/rectified_image",
72+
&rerun::archetypes::Image::from_image(rgb_image)?,
73+
// &rerun::archetypes::Image::from_color_model_and_bytes(
74+
// rgb_image.as_raw(),
75+
// [image.width, image.height],
76+
// rerun::ColorModel::RGB,
77+
// rerun::ChannelDatatype::U8,
78+
// ),
3679
)?;
3780
}
3881

0 commit comments

Comments
 (0)