Skip to content

Commit 0b78c8d

Browse files
authored
Improve robustness of rrd loader (#11302)
1 parent b1d72ba commit 0b78c8d

File tree

5 files changed

+86
-29
lines changed

5 files changed

+86
-29
lines changed

.vscode/launch.json

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
}
9494
},
9595
{
96-
"name": "Debug 'rerun' colmap.rrd from url",
96+
"name": "Debug 'rerun' bar_chart.rrd from url",
9797
"type": "lldb",
9898
"request": "launch",
9999
"cargo": {
@@ -110,9 +110,12 @@
110110
}
111111
},
112112
"args": [
113-
"https://demo.rerun.io/commit/0f89b62/examples/colmap/data.rrd"
113+
"https://app.rerun.io/version/0.25.0/examples/snippets/bar_chart.rrd"
114114
],
115-
"cwd": "${workspaceFolder}"
115+
"cwd": "${workspaceFolder}",
116+
"env": {
117+
"RUST_LOG": "trace"
118+
}
116119
},
117120
{
118121
"name": "Debug 'minimal' example",

crates/store/re_log_encoding/src/codec/file/decoder.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ pub(crate) fn decode_to_app(
1919
app_id_injector: &mut impl ApplicationIdInjector,
2020
data: &mut impl std::io::Read,
2121
) -> Result<(u64, Option<LogMsg>), DecodeError> {
22-
let mut read_bytes = 0u64;
2322
let header = MessageHeader::decode(data)?;
24-
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;
23+
let read_bytes = std::mem::size_of::<MessageHeader>() as u64 + header.len;
2524

2625
let mut buf = vec![0; header.len as usize];
2726
data.read_exact(&mut buf[..])?;
@@ -39,9 +38,8 @@ pub(crate) fn decode_to_app(
3938
pub(crate) fn decode_to_transport(
4039
data: &mut impl std::io::Read,
4140
) -> Result<(u64, Option<re_protos::log_msg::v1alpha1::log_msg::Msg>), DecodeError> {
42-
let mut read_bytes = 0u64;
4341
let header = MessageHeader::decode(data)?;
44-
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;
42+
let read_bytes = std::mem::size_of::<MessageHeader>() as u64 + header.len;
4543

4644
let mut buf = vec![0; header.len as usize];
4745
data.read_exact(&mut buf[..])?;

crates/store/re_log_encoding/src/decoder/mod.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,7 @@ impl<R: std::io::Read> Decoder<R> {
257257

258258
let (version, options) = options_from_bytes(&data)?;
259259

260-
Ok(Self {
261-
version,
262-
options,
263-
read: Reader::Raw(read),
264-
size_bytes: FileHeader::SIZE as _,
265-
app_id_cache: CachingApplicationIdInjector::default(),
266-
})
260+
Ok(Self::new_with_options(options, version, read))
267261
}
268262

269263
pub fn new_with_options(options: EncodingOptions, version: CrateVersion, read: R) -> Self {
@@ -394,20 +388,20 @@ impl<R: std::io::Read> Decoder<R> {
394388
},
395389
};
396390

397-
let Some(msg) = msg else {
391+
if let Some(msg) = msg {
392+
Some(Ok(msg))
393+
} else {
398394
// we might have a concatenated stream, so we peek beyond end of file marker to see
399395
if self.peek_file_header() {
400396
re_log::debug!(
401397
"Reached end of stream, but it seems we have a concatenated file, continuing"
402398
);
403-
return self.next_impl(decoder);
399+
self.next_impl(decoder)
400+
} else {
401+
re_log::trace!("Reached end of stream, iterator complete");
402+
None
404403
}
405-
406-
re_log::trace!("Reached end of stream, iterator complete");
407-
return None;
408-
};
409-
410-
Some(Ok(msg))
404+
}
411405
}
412406

413407
/// Peeks ahead in search of additional `FileHeader`s in the stream.
@@ -424,11 +418,14 @@ impl<R: std::io::Read> Decoder<R> {
424418
}
425419

426420
let mut read = std::io::Cursor::new(read.buffer());
427-
if FileHeader::decode(&mut read).is_err() {
428-
return false;
421+
match FileHeader::decode(&mut read) {
422+
Ok(_) => true,
423+
Err(DecodeError::Read { .. }) => false,
424+
Err(err) => {
425+
re_log::warn_once!("Error when expecting rrd header: {err}");
426+
false
427+
}
429428
}
430-
431-
true
432429
}
433430
}
434431
}

crates/store/re_log_encoding/src/decoder/stream.rs

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ enum State {
6464
/// Compression is only applied to individual `ArrowMsg`s, instead of
6565
/// the entire stream.
6666
Message(crate::codec::file::MessageHeader),
67+
68+
/// Stop reading
69+
Aborted,
6770
}
6871

6972
impl StreamDecoder {
@@ -109,9 +112,31 @@ impl StreamDecoder {
109112
fn try_read_impl(&mut self) -> Result<Option<LogMsg>, DecodeError> {
110113
match self.state {
111114
State::StreamHeader => {
115+
let is_first_header = self.chunks.num_read() == 0;
112116
if let Some(header) = self.chunks.try_read(FileHeader::SIZE) {
117+
re_log::trace!(?header, "Decoding StreamHeader");
118+
113119
// header contains version and compression options
114-
let (version, options) = options_from_bytes(header)?;
120+
let (version, options) = match options_from_bytes(header) {
121+
Ok(ok) => ok,
122+
Err(err) => {
123+
// We expected a header, but didn't find one!
124+
if is_first_header {
125+
return Err(err);
126+
} else {
127+
re_log::error!("Trailing bytes in rrd stream: {header:?} ({err})");
128+
self.state = State::Aborted;
129+
return Ok(None);
130+
}
131+
}
132+
};
133+
134+
re_log::trace!(
135+
version = version.to_string(),
136+
?options,
137+
"Found Stream Header"
138+
);
139+
115140
self.version = Some(version);
116141
self.options = options;
117142

@@ -131,6 +156,9 @@ impl StreamDecoder {
131156
.try_read(crate::codec::file::MessageHeader::SIZE_BYTES)
132157
{
133158
let header = crate::codec::file::MessageHeader::from_bytes(bytes)?;
159+
160+
re_log::trace!(?header, "MessageHeader");
161+
134162
self.state = State::Message(header);
135163
// we might have data left in the current chunk,
136164
// immediately try to read the message content
@@ -139,23 +167,42 @@ impl StreamDecoder {
139167
}
140168
State::Message(header) => {
141169
if let Some(bytes) = self.chunks.try_read(header.len as usize) {
170+
re_log::trace!(?header, "Read message");
171+
142172
let message = crate::codec::file::decoder::decode_bytes_to_app(
143173
&mut self.app_id_cache,
144174
header.kind,
145175
bytes,
146176
)?;
177+
147178
if let Some(mut message) = message {
179+
re_log::trace!(
180+
"LogMsg::{}",
181+
match message {
182+
LogMsg::SetStoreInfo { .. } => "SetStoreInfo",
183+
LogMsg::ArrowMsg { .. } => "ArrowMsg",
184+
LogMsg::BlueprintActivationCommand { .. } => {
185+
"BlueprintActivationCommand"
186+
}
187+
}
188+
);
189+
148190
propagate_version(&mut message, self.version);
149191
self.state = State::MessageHeader;
150192
return Ok(Some(message));
151193
} else {
194+
re_log::trace!("End of stream - expecting a new Streamheader");
195+
152196
// `None` means end of stream, but there might be concatenated streams,
153197
// so try to read another one.
154198
self.state = State::StreamHeader;
155199
return self.try_read();
156200
}
157201
}
158202
}
203+
State::Aborted => {
204+
return Ok(None);
205+
}
159206
}
160207

161208
Ok(None)
@@ -182,6 +229,9 @@ struct ChunkBuffer {
182229

183230
/// How many bytes of valid data are currently in `self.buffer`.
184231
buffer_fill: usize,
232+
233+
/// How many bytes have been read with [`Self::try_read`] so far?
234+
num_read: usize,
185235
}
186236

187237
impl ChunkBuffer {
@@ -190,6 +240,7 @@ impl ChunkBuffer {
190240
queue: VecDeque::with_capacity(16),
191241
buffer: Vec::with_capacity(1024),
192242
buffer_fill: 0,
243+
num_read: 0,
193244
}
194245
}
195246

@@ -200,6 +251,11 @@ impl ChunkBuffer {
200251
self.queue.push_back(Chunk::new(chunk));
201252
}
202253

254+
/// How many bytes have been read with [`Self::try_read`] so far?
255+
fn num_read(&self) -> usize {
256+
self.num_read
257+
}
258+
203259
/// Attempt to read exactly `n` bytes out of the queued chunks.
204260
///
205261
/// Returns `None` if there is not enough data to return a slice of `n` bytes.
@@ -239,7 +295,7 @@ impl ChunkBuffer {
239295
// followed by another call to `try_read(N)` with the same `N`
240296
// won't erroneously return the same bytes
241297
self.buffer_fill = 0;
242-
298+
self.num_read += n;
243299
Some(&self.buffer[..])
244300
} else {
245301
None

crates/store/re_log_encoding/src/stream_rrd_from_http.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,14 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
7575
move |part| match part {
7676
Ok(part) => match part {
7777
ehttp::streaming::Part::Response(ehttp::PartialResponse {
78+
url,
7879
ok,
7980
status,
8081
status_text,
81-
..
82+
headers,
8283
}) => {
84+
re_log::trace!("{url} status: {status} - {status_text}");
85+
re_log::trace!("{url} headers: {headers:#?}");
8386
if ok {
8487
re_log::debug!("Decoding .rrd file from {url:?}…");
8588
ControlFlow::Continue(())

0 commit comments

Comments
 (0)