@@ -5,6 +5,7 @@ use crossbeam::channel;
55use itertools:: Itertools as _;
66
77use re_chunk:: external:: crossbeam;
8+ use re_log_encoding:: RrdManifest ;
89
910// ---
1011
@@ -30,17 +31,19 @@ impl std::fmt::Display for InputSource {
3031/// * The first channel contains both the successfully decoded data, if any, as well as any
3132/// errors faced during processing.
3233/// * The second channel, which will fire only once, after all processing is done, indicates the
33- /// total number of bytes processed.
34+ /// total number of bytes processed, and returns all RRD manifests that were parsed from footers
35+ /// in the underlying stream.
3436///
3537/// This function is best-effort: it will try to make progress even in the face of errors.
3638/// It is up to the user to decide whether and when to stop.
3739///
3840/// This function is capable of decoding multiple independent recordings from a single stream.
41+ #[ expect( clippy:: type_complexity) ] // internal private API for the CLI impl
3942pub fn read_rrd_streams_from_file_or_stdin (
4043 paths : & [ String ] ,
4144) -> (
4245 channel:: Receiver < ( InputSource , anyhow:: Result < re_log_types:: LogMsg > ) > ,
43- channel:: Receiver < u64 > ,
46+ channel:: Receiver < ( u64 , anyhow :: Result < Vec < ( InputSource , RrdManifest ) > > ) > ,
4447) {
4548 read_any_rrd_streams_from_file_or_stdin :: < re_log_types:: LogMsg > ( paths)
4649}
@@ -55,7 +58,8 @@ pub fn read_rrd_streams_from_file_or_stdin(
5558/// * The first channel contains both the successfully decoded data, if any, as well as any
5659/// errors faced during processing.
5760/// * The second channel, which will fire only once, after all processing is done, indicates the
58- /// total number of bytes processed.
61+ /// total number of bytes processed, and returns all RRD manifests that were parsed from footers
62+ /// in the underlying stream.
5963///
6064/// This function is best-effort: it will try to make progress even in the face of errors.
6165/// It is up to the user to decide whether and when to stop.
@@ -68,25 +72,27 @@ pub fn read_rrd_streams_from_file_or_stdin(
6872// TODO(ab): For pre-0.25 legacy data with `StoreId` missing their application id, the migration
6973// in `Decoder` requires `SetStoreInfo` to arrive before the corresponding `ArrowMsg`. Ideally
7074// this tool would cache orphan `ArrowMsg` until a matching `SetStoreInfo` arrives.
75+ #[ expect( clippy:: type_complexity) ] // internal private API for the CLI impl
7176pub fn read_raw_rrd_streams_from_file_or_stdin (
7277 paths : & [ String ] ,
7378) -> (
7479 channel:: Receiver < (
7580 InputSource ,
7681 anyhow:: Result < re_protos:: log_msg:: v1alpha1:: log_msg:: Msg > ,
7782 ) > ,
78- channel:: Receiver < u64 > ,
83+ channel:: Receiver < ( u64 , anyhow :: Result < Vec < ( InputSource , RrdManifest ) > > ) > ,
7984) {
8085 read_any_rrd_streams_from_file_or_stdin :: < re_protos:: log_msg:: v1alpha1:: log_msg:: Msg > ( paths)
8186}
8287
88+ #[ expect( clippy:: type_complexity) ] // internal private API for the CLI impl
8389fn read_any_rrd_streams_from_file_or_stdin <
8490 T : re_log_encoding:: DecoderEntrypoint + Send + ' static ,
8591> (
8692 paths : & [ String ] ,
8793) -> (
8894 channel:: Receiver < ( InputSource , anyhow:: Result < T > ) > ,
89- channel:: Receiver < u64 > ,
95+ channel:: Receiver < ( u64 , anyhow :: Result < Vec < ( InputSource , RrdManifest ) > > ) > ,
9096) {
9197 let path_to_input_rrds = paths
9298 . iter ( )
@@ -95,26 +101,32 @@ fn read_any_rrd_streams_from_file_or_stdin<
95101 . collect_vec ( ) ;
96102
97103 // TODO(cmc): might want to make this configurable at some point.
98- let ( tx , rx ) = crossbeam:: channel:: bounded ( 100 ) ;
99- let ( tx_size_bytes , rx_size_bytes ) = crossbeam:: channel:: bounded ( 1 ) ;
104+ let ( tx_msgs , rx_msgs ) = crossbeam:: channel:: bounded ( 100 ) ;
105+ let ( tx_metadata , rx_metadata ) = crossbeam:: channel:: bounded ( 1 ) ;
100106
101107 _ = std:: thread:: Builder :: new ( )
102108 . name ( "rerun-rrd-in" . to_owned ( ) )
103109 . spawn ( move || {
110+ let mut rrd_manifests = Ok ( Vec :: new ( ) ) ;
104111 let mut size_bytes = 0 ;
105112
106113 if path_to_input_rrds. is_empty ( ) {
107114 // stdin
108115
116+ let source = InputSource :: Stdin ;
109117 let stdin = std:: io:: BufReader :: new ( std:: io:: stdin ( ) . lock ( ) ) ;
110118 let mut decoder = re_log_encoding:: Decoder :: decode_lazy ( stdin) ;
111119
112120 for res in & mut decoder {
113121 let res = res. context ( "couldn't decode message from stdin -- skipping" ) ;
114- tx . send ( ( InputSource :: Stdin , res) ) . ok ( ) ;
122+ tx_msgs . send ( ( source . clone ( ) , res) ) . ok ( ) ;
115123 }
116124
117125 size_bytes += decoder. num_bytes_processed ( ) ;
126+ rrd_manifests = decoder
127+ . rrd_manifests ( )
128+ . context ( "couldn't decode footers" )
129+ . map ( |manifests| manifests. into_iter ( ) . map ( |m| ( source. clone ( ) , m) ) . collect ( ) ) ;
118130 } else {
119131 // file(s)
120132
@@ -124,28 +136,35 @@ fn read_any_rrd_streams_from_file_or_stdin<
124136 {
125137 Ok ( file) => file,
126138 Err ( err) => {
127- tx. send ( ( InputSource :: File ( rrd_path. clone ( ) ) , Err ( err) ) )
139+ tx_msgs
140+ . send ( ( InputSource :: File ( rrd_path. clone ( ) ) , Err ( err) ) )
128141 . ok ( ) ;
129142 continue ;
130143 }
131144 } ;
132145
146+ let source = InputSource :: File ( rrd_path. clone ( ) ) ;
133147 let rrd_file = std:: io:: BufReader :: new ( rrd_file) ;
134- let mut messages = re_log_encoding:: Decoder :: decode_lazy ( rrd_file) ;
135-
136- for res in & mut messages {
148+ let mut decoder = re_log_encoding:: Decoder :: decode_lazy ( rrd_file) ;
149+ for res in & mut decoder {
137150 let res = res. context ( "decode rrd message" ) . with_context ( || {
138151 format ! ( "couldn't decode message {rrd_path:?} -- skipping" )
139152 } ) ;
140- tx . send ( ( InputSource :: File ( rrd_path . clone ( ) ) , res) ) . ok ( ) ;
153+ tx_msgs . send ( ( source . clone ( ) , res) ) . ok ( ) ;
141154 }
142155
143- size_bytes += messages. num_bytes_processed ( ) ;
156+ size_bytes += decoder. num_bytes_processed ( ) ;
157+ rrd_manifests = decoder
158+ . rrd_manifests ( )
159+ . context ( "couldn't decode footers" )
160+ . map ( |manifests| {
161+ manifests. into_iter ( ) . map ( |m| ( source. clone ( ) , m) ) . collect ( )
162+ } ) ;
144163 }
145164 }
146165
147- tx_size_bytes . send ( size_bytes) . ok ( ) ;
166+ tx_metadata . send ( ( size_bytes, rrd_manifests ) ) . ok ( ) ;
148167 } ) ;
149168
150- ( rx , rx_size_bytes )
169+ ( rx_msgs , rx_metadata )
151170}
0 commit comments