Skip to content

Commit 23dad9e

Browse files
committed
Add more utility functions to MessageIterator
1 parent 95b6ffc commit 23dad9e

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

src/reader.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,5 +648,97 @@ impl MCAPMessageIterator {
648648
None => Variant::nil(),
649649
}
650650
}
651+
652+
/// Reset iterator to the start, clearing any peeked value and state.
653+
#[func]
654+
pub fn rewind(&mut self) {
655+
self.reset_iteration_state();
656+
}
657+
658+
/// Remove any channel filter and reset iteration.
659+
#[func]
660+
pub fn clear_filter(&mut self) {
661+
self.filter_channel = None;
662+
self.reset_iteration_state();
663+
}
664+
665+
/// Return the number of messages yielded so far.
666+
#[func]
667+
pub fn current_index(&self) -> i64 {
668+
self.index
669+
}
670+
671+
/// Seek iterator to the first message with log_time >= given timestamp (microseconds).
672+
/// Returns true if positioned on or before a valid next message.
673+
#[func]
674+
pub fn seek_to_time(&mut self, log_time_usec: i64) -> bool {
675+
if !self.ensure_summary() { return false; }
676+
self.reset_iteration_state();
677+
let t: u64 = if log_time_usec < 0 { 0 } else { log_time_usec as u64 };
678+
let summary = match &self.summary { Some(s) => s, None => return false };
679+
while self.chunk_i < summary.chunk_indexes.len() {
680+
let chunk_idx = &summary.chunk_indexes[self.chunk_i];
681+
if chunk_idx.message_end_time < t {
682+
self.chunk_i += 1;
683+
continue;
684+
}
685+
self.per_channel.clear();
686+
self.heap.clear();
687+
match summary.read_message_indexes(self.buf.as_slice(), chunk_idx) {
688+
Ok(map) => {
689+
for (ch, entries) in map.into_iter() {
690+
let ch_id = ch.id;
691+
if let Some(filter) = self.filter_channel { if ch_id != filter { continue; } }
692+
if entries.is_empty() { continue; }
693+
let start_idx = match entries.binary_search_by(|e| e.log_time.cmp(&t)) { Ok(i) => i, Err(i) => i };
694+
if start_idx < entries.len() {
695+
self.per_channel.insert(ch_id, entries);
696+
let first = &self.per_channel.get(&ch_id).unwrap()[start_idx];
697+
self.heap.push(std::cmp::Reverse((first.log_time, ch_id, start_idx)));
698+
}
699+
}
700+
if !self.heap.is_empty() { return true; }
701+
}
702+
Err(e) => {
703+
godot_error!("MCAPMessageIterator: read_message_indexes(seek_to_time) failed: {}", e);
704+
}
705+
}
706+
self.chunk_i += 1;
707+
}
708+
false
709+
}
710+
711+
/// Check if another message is available without consuming it.
712+
#[func]
713+
pub fn has_next_message(&mut self) -> bool {
714+
if self.peek.is_none() {
715+
self.peek = self.next_message_internal();
716+
}
717+
self.peek.is_some()
718+
}
719+
720+
/// Fetch and advance to the next message; returns null if none.
721+
#[func]
722+
pub fn get_next_message(&mut self) -> Option<Gd<MCAPMessage>> {
723+
if self.peek.is_none() {
724+
self.peek = self.next_message_internal();
725+
}
726+
match self.peek.take() {
727+
Some(gd) => {
728+
self.index += 1;
729+
Some(gd)
730+
}
731+
None => None,
732+
}
733+
}
734+
735+
/// Return, without consuming, the next message if available.
736+
#[func]
737+
pub fn peek_message(&mut self) -> Option<Gd<MCAPMessage>> {
738+
if self.peek.is_none() {
739+
self.peek = self.next_message_internal();
740+
}
741+
self.peek.clone()
742+
}
651743
}
652744

0 commit comments

Comments
 (0)