Skip to content

Commit 425b9f4

Browse files
committed
feat(trace-utils): add API to serialize trace to exsiting vec
1 parent 13970ff commit 425b9f4

File tree

1 file changed

+143
-6
lines changed
  • libdd-trace-utils/src/msgpack_encoder/v04

1 file changed

+143
-6
lines changed

libdd-trace-utils/src/msgpack_encoder/v04/mod.rs

Lines changed: 143 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::io;
5+
46
use crate::span::{Span, SpanText};
57
use rmp::encode::{write_array_len, ByteBuf, RmpWrite, ValueWriteError};
68

@@ -13,15 +15,24 @@ fn to_writer<W: RmpWrite, T: SpanText, S: AsRef<[Span<T>]>>(
1315
) -> Result<(), ValueWriteError<W::Error>> {
1416
write_array_len(writer, traces.len() as u32)?;
1517
for trace in traces {
16-
write_array_len(writer, trace.as_ref().len() as u32)?;
17-
for span in trace.as_ref() {
18-
span::encode_span(writer, span)?;
19-
}
18+
write_trace(writer, trace)?;
2019
}
2120

2221
Ok(())
2322
}
2423

24+
#[inline(always)]
25+
fn write_trace<W: RmpWrite, T: SpanText, S: AsRef<[Span<T>]>>(
26+
writer: &mut W,
27+
trace: &S,
28+
) -> Result<(), ValueWriteError<W::Error>> {
29+
write_array_len(writer, trace.as_ref().len() as u32)?;
30+
for span in trace.as_ref() {
31+
span::encode_span(writer, span)?;
32+
}
33+
Ok(())
34+
}
35+
2536
/// Encodes a collection of traces into a slice of bytes.
2637
///
2738
/// # Arguments
@@ -122,11 +133,137 @@ pub fn to_vec_with_capacity<T: SpanText, S: AsRef<[Span<T>]>>(
122133
capacity: u32,
123134
) -> Vec<u8> {
124135
let mut buf = ByteBuf::with_capacity(capacity as usize);
125-
#[allow(clippy::expect_used)]
126-
to_writer(&mut buf, traces).expect("infallible: the error is std::convert::Infallible");
136+
unwrap_infallible_write(to_writer(&mut buf, traces));
127137
buf.into_vec()
128138
}
129139

140+
const ARRAY_LEN_HEADER_WIDTH: usize = 5;
141+
142+
pub struct TraceBuffer {
143+
buf: Vec<u8>,
144+
trace_count: usize,
145+
max_trace_size: usize,
146+
max_size: usize,
147+
}
148+
149+
fn mp_write_array_len_fixed_width(buf: &mut [u8], len: usize) {
150+
if buf.len() < ARRAY_LEN_HEADER_WIDTH {
151+
return;
152+
}
153+
let Ok(len) = u32::try_from(len) else { return };
154+
buf[0] = 0xdd;
155+
let len_encoded: [u8; 4] = len.to_be_bytes();
156+
buf[1..5].copy_from_slice(&len_encoded);
157+
}
158+
159+
struct LimitedTruncatingWriter<'a> {
160+
w: &'a mut Vec<u8>,
161+
written: usize,
162+
limit: usize,
163+
}
164+
165+
impl LimitedTruncatingWriter<'_> {
166+
fn rollback(&mut self) {
167+
self.w.truncate(self.w.len() - self.written);
168+
self.written = 0;
169+
}
170+
}
171+
172+
impl io::Write for LimitedTruncatingWriter<'_> {
173+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
174+
if self.written + buf.len() > self.limit {
175+
return Err(io::Error::new(
176+
io::ErrorKind::OutOfMemory,
177+
"no space left in the buffer",
178+
));
179+
}
180+
let written = self.w.write(buf)?;
181+
self.written += written;
182+
Ok(written)
183+
}
184+
185+
fn flush(&mut self) -> io::Result<()> {
186+
self.w.flush()
187+
}
188+
}
189+
190+
impl TraceBuffer {
191+
pub fn new(max_trace_size: usize, max_size: usize) -> Self {
192+
Self {
193+
buf: vec![0; ARRAY_LEN_HEADER_WIDTH],
194+
trace_count: 0,
195+
max_trace_size,
196+
max_size,
197+
}
198+
}
199+
200+
fn writer(&mut self) -> LimitedTruncatingWriter<'_> {
201+
let leftover = self.max_size.saturating_sub(self.buf.len());
202+
LimitedTruncatingWriter {
203+
w: &mut self.buf,
204+
written: 0,
205+
limit: self.max_trace_size.min(leftover),
206+
}
207+
}
208+
209+
pub fn write_trace<T: SpanText, S: AsRef<[Span<T>]>>(&mut self, trace: S) -> io::Result<()> {
210+
let mut writer = self.writer();
211+
match write_trace(&mut writer, &trace) {
212+
Ok(()) => {}
213+
Err(ValueWriteError::InvalidDataWrite(e) | ValueWriteError::InvalidMarkerWrite(e)) => {
214+
writer.rollback();
215+
return Err(e);
216+
}
217+
};
218+
self.trace_count += 1;
219+
Ok(())
220+
}
221+
222+
fn reset(&mut self) -> Vec<u8> {
223+
let buf = std::mem::take(&mut self.buf);
224+
*self = Self {
225+
buf: {
226+
let mut v = Vec::with_capacity(buf.len());
227+
v.resize(ARRAY_LEN_HEADER_WIDTH, 0);
228+
v
229+
},
230+
trace_count: 0,
231+
max_trace_size: self.max_trace_size,
232+
max_size: self.max_size,
233+
};
234+
buf
235+
}
236+
237+
pub fn flush(&mut self) -> Vec<u8> {
238+
self.write_traces_len();
239+
let buf = self.reset();
240+
buf
241+
}
242+
243+
fn write_traces_len(&mut self) {
244+
mp_write_array_len_fixed_width(&mut self.buf, self.trace_count);
245+
}
246+
}
247+
248+
/// Serializes traces into a vector of bytes passed mutably
249+
pub fn to_vec_extend<T: SpanText, S: AsRef<[Span<T>]>>(traces: &[S], v: &mut Vec<u8>) {
250+
let mut buf = ByteBuf::from_vec(std::mem::take(v));
251+
#[allow(clippy::expect_used)]
252+
unwrap_infallible_write(to_writer(&mut buf, traces));
253+
*v = buf.into_vec();
254+
}
255+
256+
/// Unwrap an infallible result without panics
257+
fn unwrap_infallible_write<T>(res: Result<T, ValueWriteError<std::convert::Infallible>>) -> T {
258+
match res {
259+
Ok(ok) => ok,
260+
Err(e) => match match e {
261+
ValueWriteError::InvalidMarkerWrite(i) => i,
262+
ValueWriteError::InvalidDataWrite(i) => i,
263+
} {},
264+
}
265+
}
266+
130267
struct CountLength(u32);
131268

132269
impl std::io::Write for CountLength {

0 commit comments

Comments
 (0)