Skip to content

Commit c5c9edb

Browse files
committed
Refactor reader into multiple smaller files
1 parent 7210c66 commit c5c9edb

File tree

7 files changed

+1010
-983
lines changed

7 files changed

+1010
-983
lines changed

src/api.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ impl IResource for MCAPWriteOptions {
4343
impl MCAPWriteOptions {
4444
#[constant]
4545
/// No compression.
46-
const MCAP_COMPRESSION_NONE: i32 = MCAPCompression::None as i32;
46+
const MCAP_COMPRESSION_NONE: i64 = MCAPCompression::None as i64;
4747
#[cfg(feature = "zstd")]
4848
#[constant]
4949
/// Zstandard compression.
50-
const MCAP_COMPRESSION_ZSTD: i32 = MCAPCompression::Zstd as i32;
50+
const MCAP_COMPRESSION_ZSTD: i64 = MCAPCompression::Zstd as i64;
5151
#[cfg(feature = "lz4")]
5252
#[constant]
5353
/// LZ4 frame compression.
54-
const MCAP_COMPRESSION_LZ4: i32 = MCAPCompression::Lz4 as i32;
54+
const MCAP_COMPRESSION_LZ4: i64 = MCAPCompression::Lz4 as i64;
5555
}
5656

5757
#[godot_api]

src/reader/buf.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use godot::prelude::*;
2+
use std::sync::Arc;
3+
4+
// Prefer on-demand access via memory mapping to avoid copying the whole file.
5+
// Fall back to an in-memory PackedByteArray when constructed from bytes or if mmap fails.
6+
pub(super) enum BufBackend {
7+
Memory(PackedByteArray),
8+
Mmap(memmap2::Mmap),
9+
}
10+
11+
impl BufBackend {
12+
#[inline]
13+
pub fn as_slice(&self) -> &[u8] {
14+
match self {
15+
BufBackend::Memory(p) => p.as_slice(),
16+
BufBackend::Mmap(m) => &m[..],
17+
}
18+
}
19+
}
20+
21+
pub type SharedBuf = Arc<BufBackend>;

src/reader/filter.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::types::*;
2+
use godot::prelude::*;
3+
use mcap::read::Summary;
4+
use std::collections::HashSet;
5+
6+
// Reusable message filter for time range and channel sets
7+
pub(super) struct MsgFilter {
8+
pub time_start: Option<u64>,
9+
pub time_end: Option<u64>,
10+
pub channels: Option<HashSet<u16>>, // if None: accept all
11+
}
12+
13+
impl MsgFilter {
14+
#[inline]
15+
pub fn matches_time(&self, t: u64) -> bool {
16+
if let Some(s) = self.time_start {
17+
if t < s {
18+
return false;
19+
}
20+
}
21+
if let Some(e) = self.time_end {
22+
if t > e {
23+
return false;
24+
}
25+
}
26+
true
27+
}
28+
29+
#[inline]
30+
pub fn matches_ch(&self, id: u16) -> bool {
31+
match &self.channels {
32+
Some(set) => set.contains(&id),
33+
None => true,
34+
}
35+
}
36+
37+
#[inline]
38+
pub fn chunk_might_match(&self, idx: &mcap::records::ChunkIndex) -> bool {
39+
if let Some(s) = self.time_start {
40+
if idx.message_end_time < s {
41+
return false;
42+
}
43+
}
44+
if let Some(e) = self.time_end {
45+
if idx.message_start_time > e {
46+
return false;
47+
}
48+
}
49+
true
50+
}
51+
}
52+
53+
// Shared helper: stream a chunk, apply filter, build MCAPMessage, and call a closure with (log_time, message)
54+
pub(super) fn stream_chunk_apply<F>(
55+
bytes: &[u8],
56+
summary: &Summary,
57+
chunk_idx: &mcap::records::ChunkIndex,
58+
filter: &MsgFilter,
59+
mut f: F,
60+
) -> Result<(), String>
61+
where
62+
F: FnMut(u64, Gd<MCAPMessage>),
63+
{
64+
let iter = summary
65+
.stream_chunk(bytes, chunk_idx)
66+
.map_err(|e| format!("stream_chunk open failed: {}", e))?;
67+
for item in iter {
68+
match item {
69+
Ok(msg) => {
70+
if !filter.matches_time(msg.log_time) {
71+
continue;
72+
}
73+
if !filter.matches_ch(msg.channel.id) {
74+
continue;
75+
}
76+
let gd = MCAPMessage::from_mcap(&msg);
77+
f(msg.log_time, gd);
78+
}
79+
Err(e) => return Err(format!("stream_chunk failed: {}", e)),
80+
}
81+
}
82+
Ok(())
83+
}

0 commit comments

Comments
 (0)