Skip to content

Commit 54978bf

Browse files
authored
Encoder: simplified APIs and improved discoverability (#11450)
The Encoder was offloading so much complexity to end-users of the API by forcing them to somehow discover, pick and figure out how to use a bunch of complicated freestanding helpers such as `encode_ref_as_bytes_local`. All these freestanding functions have now been replaced by a couple of simpler, more generic, more discoverable helper methods. --- This PR is part of an upcoming series of PRs to pay off organic growth debt in our encoding/decoding stack. * DNM: requires #11446
1 parent 8f806de commit 54978bf

File tree

22 files changed

+105
-153
lines changed

22 files changed

+105
-153
lines changed

crates/store/re_data_loader/src/loader_rrd.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ impl RetryableFileReader {
329329
#[cfg(test)]
330330
mod tests {
331331
use re_chunk::RowId;
332-
use re_log_encoding::encoder::Encoder;
332+
use re_log_encoding::Encoder;
333333
use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
334334

335335
use super::*;

crates/store/re_entity_db/examples/memory_usage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ fn log_messages() {
7070
fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
7171
let mut bytes = vec![];
7272
let encoding_options = re_log_encoding::EncodingOptions::PROTOBUF_COMPRESSED;
73-
re_log_encoding::encoder::encode_ref(
73+
re_log_encoding::Encoder::encode_into(
7474
re_build_info::CrateVersion::LOCAL,
7575
encoding_options,
7676
std::iter::once(log_msg).map(Ok),

crates/store/re_log_encoding/benches/msg_encode_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fn encode_log_msgs(
3838
encoding_options: re_log_encoding::EncodingOptions,
3939
) -> Vec<u8> {
4040
let mut bytes = vec![];
41-
re_log_encoding::encoder::encode_ref(
41+
re_log_encoding::Encoder::encode_into(
4242
re_build_info::CrateVersion::LOCAL,
4343
encoding_options,
4444
messages.iter().map(Ok),

crates/store/re_log_encoding/src/decoder/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,8 +484,8 @@ mod tests {
484484

485485
use super::*;
486486
use crate::Compression;
487+
use crate::Encoder;
487488
use crate::codec::arrow::encode_arrow;
488-
use crate::encoder::Encoder;
489489

490490
pub fn fake_log_messages() -> Vec<LogMsg> {
491491
let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
@@ -643,7 +643,7 @@ mod tests {
643643

644644
for options in options {
645645
let mut file = vec![];
646-
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
646+
crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut file)
647647
.unwrap();
648648

649649
let decoded_messages = Decoder::new(&mut file.as_slice())
@@ -787,7 +787,7 @@ mod tests {
787787

788788
for options in options {
789789
let mut file = vec![];
790-
crate::encoder::encode_ref(
790+
crate::Encoder::encode_into(
791791
rrd_version,
792792
options,
793793
out_of_order_messages.iter().map(Ok),
@@ -827,7 +827,7 @@ mod tests {
827827
{
828828
let writer = std::io::Cursor::new(&mut data);
829829
let mut encoder1 =
830-
crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
830+
crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
831831
for message in &messages {
832832
encoder1.append(message).unwrap();
833833
}
@@ -840,7 +840,7 @@ mod tests {
840840
let mut writer = std::io::Cursor::new(&mut data);
841841
writer.set_position(written);
842842
let mut encoder2 =
843-
crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
843+
crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
844844
for message in &messages {
845845
encoder2.append(message).unwrap();
846846
}

crates/store/re_log_encoding/src/decoder/stream.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ mod tests {
312312
use re_chunk::RowId;
313313
use re_log_types::{SetStoreInfo, StoreInfo};
314314

315+
use crate::Encoder;
315316
use crate::EncodingOptions;
316-
use crate::encoder::Encoder;
317317

318318
use super::*;
319319

@@ -330,14 +330,16 @@ mod tests {
330330
fn test_data(options: EncodingOptions, n: usize) -> (Vec<LogMsg>, Vec<u8>) {
331331
let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect();
332332

333-
let mut encoder = Encoder::new(CrateVersion::LOCAL, options, vec![]).unwrap();
334-
for message in &messages {
335-
encoder.append(message).unwrap();
336-
}
337-
338-
encoder.finish().unwrap();
333+
let mut data = Vec::new();
334+
Encoder::encode_into(
335+
CrateVersion::LOCAL,
336+
options,
337+
messages.clone().into_iter().map(Ok),
338+
&mut data,
339+
)
340+
.unwrap();
339341

340-
(messages, encoder.into_inner().unwrap())
342+
(messages, data)
341343
}
342344

343345
macro_rules! assert_message_ok {

crates/store/re_log_encoding/src/decoder/streaming.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ mod tests {
473473

474474
for options in options {
475475
let mut data = vec![];
476-
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut data)
476+
crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut data)
477477
.unwrap();
478478

479479
// We cut the input file by one byte to simulate a corrupted file and check that we don't end up in an infinite loop
@@ -517,7 +517,7 @@ mod tests {
517517

518518
for options in options {
519519
let mut data = vec![];
520-
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut data)
520+
crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut data)
521521
.unwrap();
522522

523523
let buf_reader = tokio::io::BufReader::new(std::io::Cursor::new(data));
@@ -557,7 +557,7 @@ mod tests {
557557

558558
for options in options {
559559
let mut data = vec![];
560-
crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut data)
560+
crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut data)
561561
.unwrap();
562562

563563
let buf_reader = tokio::io::BufReader::new(std::io::Cursor::new(data.clone()));

crates/store/re_log_encoding/src/encoder.rs

Lines changed: 49 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
22
3+
use std::borrow::Borrow;
4+
35
use re_build_info::CrateVersion;
46
use re_chunk::{ChunkError, ChunkResult};
57
use re_log_types::LogMsg;
@@ -74,6 +76,33 @@ pub struct Encoder<W: std::io::Write> {
7476
is_finished: bool,
7577
}
7678

79+
impl Encoder<Vec<u8>> {
80+
pub fn local() -> Result<Self, EncodeError> {
81+
Self::new(
82+
CrateVersion::LOCAL,
83+
EncodingOptions::PROTOBUF_COMPRESSED,
84+
Vec::new(),
85+
)
86+
}
87+
88+
/// All-in-one helper to encode a stream of [`LogMsg`]s into an actual RRD stream.
89+
///
90+
/// This always uses the local version and its default encoding options.
91+
///
92+
/// Returns the encoded data in a newly allocated vector.
93+
pub fn encode(
94+
messages: impl IntoIterator<Item = ChunkResult<impl Borrow<LogMsg>>>,
95+
) -> Result<Vec<u8>, EncodeError> {
96+
re_tracing::profile_function!();
97+
let mut encoder = Self::local()?;
98+
for message in messages {
99+
encoder.append(message?.borrow())?;
100+
}
101+
encoder.finish()?;
102+
encoder.into_inner()
103+
}
104+
}
105+
77106
impl<W: std::io::Write> Encoder<W> {
78107
pub fn new(
79108
version: CrateVersion,
@@ -191,6 +220,26 @@ impl<W: std::io::Write> Encoder<W> {
191220
}
192221
}
193222

223+
impl<W: std::io::Write> Encoder<W> {
224+
/// All-in-one helper to encode a stream of [`LogMsg`]s into an actual RRD stream.
225+
///
226+
/// Returns the size in bytes of the encoded data.
227+
pub fn encode_into(
228+
version: CrateVersion,
229+
options: EncodingOptions,
230+
messages: impl IntoIterator<Item = ChunkResult<impl Borrow<LogMsg>>>,
231+
write: &mut W,
232+
) -> Result<u64, EncodeError> {
233+
re_tracing::profile_function!();
234+
let mut encoder = Encoder::new(version, options, write)?;
235+
let mut size_bytes = 0;
236+
for message in messages {
237+
size_bytes += encoder.append(message?.borrow())?;
238+
}
239+
Ok(size_bytes)
240+
}
241+
}
242+
194243
// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush.
195244
// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now.
196245
impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
@@ -205,91 +254,3 @@ impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
205254
}
206255
}
207256
}
208-
209-
/// Returns the size in bytes of the encoded data.
210-
pub fn encode(
211-
version: CrateVersion,
212-
options: EncodingOptions,
213-
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
214-
write: &mut impl std::io::Write,
215-
) -> Result<u64, EncodeError> {
216-
re_tracing::profile_function!();
217-
let mut encoder = Encoder::new(version, options, write)?;
218-
let mut size_bytes = 0;
219-
for message in messages {
220-
size_bytes += encoder.append(&message?)?;
221-
}
222-
Ok(size_bytes)
223-
}
224-
225-
/// Returns the size in bytes of the encoded data.
226-
pub fn encode_ref<'a>(
227-
version: CrateVersion,
228-
options: EncodingOptions,
229-
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
230-
write: &mut impl std::io::Write,
231-
) -> Result<u64, EncodeError> {
232-
re_tracing::profile_function!();
233-
let mut encoder = Encoder::new(version, options, write)?;
234-
let mut size_bytes = 0;
235-
for message in messages {
236-
size_bytes += encoder.append(message?)?;
237-
}
238-
Ok(size_bytes)
239-
}
240-
241-
pub fn encode_as_bytes(
242-
version: CrateVersion,
243-
options: EncodingOptions,
244-
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
245-
) -> Result<Vec<u8>, EncodeError> {
246-
re_tracing::profile_function!();
247-
let mut encoder = Encoder::new(version, options, vec![])?;
248-
for message in messages {
249-
encoder.append(&message?)?;
250-
}
251-
encoder.finish()?;
252-
encoder.into_inner()
253-
}
254-
255-
#[inline]
256-
pub fn local_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
257-
Encoder::new(
258-
CrateVersion::LOCAL,
259-
EncodingOptions::PROTOBUF_COMPRESSED,
260-
Vec::new(),
261-
)
262-
}
263-
264-
#[inline]
265-
pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
266-
Encoder::new(
267-
CrateVersion::LOCAL,
268-
EncodingOptions::PROTOBUF_COMPRESSED,
269-
Vec::new(),
270-
)
271-
}
272-
273-
#[inline]
274-
pub fn encode_as_bytes_local(
275-
messages: impl Iterator<Item = ChunkResult<LogMsg>>,
276-
) -> Result<Vec<u8>, EncodeError> {
277-
let mut encoder = local_raw_encoder()?;
278-
for message in messages {
279-
encoder.append(&message?)?;
280-
}
281-
encoder.finish()?;
282-
encoder.into_inner()
283-
}
284-
285-
#[inline]
286-
pub fn encode_ref_as_bytes_local<'a>(
287-
messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
288-
) -> Result<Vec<u8>, EncodeError> {
289-
let mut encoder = local_raw_encoder()?;
290-
for message in messages {
291-
encoder.append(message?)?;
292-
}
293-
encoder.finish()?;
294-
encoder.into_inner()
295-
}

crates/store/re_log_encoding/src/file_sink.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ impl FileSink {
9595

9696
let file = std::fs::File::create(&path)
9797
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
98-
let encoder = crate::encoder::Encoder::new(
99-
re_build_info::CrateVersion::LOCAL,
100-
encoding_options,
101-
file,
102-
)?;
98+
let encoder =
99+
crate::Encoder::new(re_build_info::CrateVersion::LOCAL, encoding_options, file)?;
103100
let join_handle = spawn_and_stream(Some(&path), encoder, rx)?;
104101

105102
Ok(Self {
@@ -117,7 +114,7 @@ impl FileSink {
117114

118115
re_log::debug!("Writing to stdout…");
119116

120-
let encoder = crate::encoder::Encoder::new(
117+
let encoder = crate::Encoder::new(
121118
re_build_info::CrateVersion::LOCAL,
122119
encoding_options,
123120
std::io::stdout(),
@@ -158,7 +155,7 @@ impl FileSink {
158155
/// Set `filepath` to `None` to stream to standard output.
159156
fn spawn_and_stream<W: std::io::Write + Send + 'static>(
160157
filepath: Option<&std::path::Path>,
161-
mut encoder: crate::encoder::Encoder<W>,
158+
mut encoder: crate::Encoder<W>,
162159
rx: Receiver<Option<Command>>,
163160
) -> Result<std::thread::JoinHandle<()>, FileSinkError> {
164161
let (name, target) = if let Some(filepath) = filepath {

crates/store/re_log_encoding/src/lib.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
pub mod decoder;
55

66
#[cfg(feature = "encoder")]
7-
pub mod encoder;
7+
mod encoder;
88

99
mod app_id_injector;
1010
pub mod codec;
@@ -24,13 +24,16 @@ pub mod external {
2424

2525
// ---------------------------------------------------------------------
2626

27-
pub use app_id_injector::{
27+
pub use self::app_id_injector::{
2828
ApplicationIdInjector, CachingApplicationIdInjector, DummyApplicationIdInjector,
2929
};
3030

31+
#[cfg(feature = "encoder")]
32+
pub use self::encoder::{EncodeError, Encoder};
33+
3134
#[cfg(feature = "encoder")]
3235
#[cfg(not(target_arch = "wasm32"))]
33-
pub use file_sink::{FileFlushError, FileSink, FileSinkError};
36+
pub use self::file_sink::{FileFlushError, FileSink, FileSinkError};
3437

3538
// ----------------------------------------------------------------------------
3639

crates/store/re_log_encoding/tests/arrow_encode_roundtrip.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use similar_asserts::assert_eq;
22

3-
use re_build_info::CrateVersion;
43
use re_chunk::{Chunk, RowId, TimePoint, Timeline};
5-
use re_log_encoding::{EncodingOptions, decoder::decode_bytes, encoder::encode_as_bytes};
4+
use re_log_encoding::{Encoder, decoder::decode_bytes};
65
use re_log_types::{LogMsg, StoreId};
76
use re_types::archetypes::Points3D;
87

@@ -39,13 +38,7 @@ fn encode_roundtrip() {
3938
let store_id = StoreId::empty_recording();
4039
let messages = [LogMsg::ArrowMsg(store_id, arrow_msg)];
4140

42-
let option = EncodingOptions::PROTOBUF_COMPRESSED;
43-
let crate_version = CrateVersion::LOCAL;
44-
let encoded = encode_as_bytes(crate_version, option, messages.iter().cloned().map(Ok)).unwrap();
41+
let encoded = Encoder::encode(messages.iter().cloned().map(Ok)).unwrap();
4542
let decoded = decode_bytes(&encoded).unwrap();
46-
similar_asserts::assert_eq!(
47-
decoded,
48-
messages,
49-
"Failed to roundtrip chunk with option {option:?}"
50-
);
43+
similar_asserts::assert_eq!(decoded, messages, "Failed to roundtrip chunk");
5144
}

0 commit comments

Comments
 (0)