Skip to content

Commit 3c53cad

Browse files
Use optimized append_..._n methods in syslog receiver when constructing OTAP record (open-telemetry#1196)
Closes open-telemetry#879 Use the "append_..._n" methods in the syslog receiver when constructing the arrow records. These methods have some nice performance optimizations: - when appending a non-null value if the underlying builder is a dictionary array, the builder is able to append the value multiple times without doing multiple key lookups. For example: https://github.com/apache/arrow-rs/blob/0737c61e76057b127312dd8058887649ece702b8/arrow-array/src/builder/generic_bytes_dictionary_builder.rs#L304-L316 - when the value is null, we can use the optimized code path to append multiple nulls to the null buffer: https://github.com/apache/arrow-rs/blob/0737c61e76057b127312dd8058887649ece702b8/arrow-buffer/src/builder/boolean.rs#L193 Benchmark results: ``` arrow_batch_creation/rfc3164_arrow_batch_100_msgs time: [82.608 µs 82.774 µs 82.950 µs] change: [−3.9462% −3.4305% −2.9199%] (p = 0.00 < 0.05) Performance has improved. arrow_batch_creation/rfc5424_arrow_batch_100_msgs time: [46.139 µs 46.238 µs 46.338 µs] change: [−4.2893% −3.9924% −3.7179%] (p = 0.00 < 0.05) Performance has improved. arrow_batch_creation/cef_arrow_batch_100_msgs time: [40.973 µs 41.141 µs 41.308 µs] change: [−3.8351% −3.3433% −2.8615%] (p = 0.00 < 0.05) Performance has improved. ```
1 parent dfec562 commit 3c53cad

File tree

6 files changed

+156
-35
lines changed

6 files changed

+156
-35
lines changed

rust/otap-dataflow/crates/otap/src/encoder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ where
304304
.as_ref()
305305
.expect("LogRecord should not be None")
306306
.time_unix_nano()
307-
.map(|v| v as i64),
307+
.map(|v| v as i64)
308+
.unwrap_or(0),
308309
);
309310
}
310311
for log_record in log_records_slice {
@@ -313,7 +314,8 @@ where
313314
.as_ref()
314315
.expect("LogRecord should not be None")
315316
.observed_time_unix_nano()
316-
.map(|v| v as i64),
317+
.map(|v| v as i64)
318+
.unwrap_or(0),
317319
);
318320
}
319321
logs.append_schema_url_n(scope_schema_url, logs_count);

rust/otap-dataflow/crates/otap/src/syslog_cef_receiver/arrow_records_encoder.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl ArrowRecordsBuilder {
4747
/// Appends a parsed syslog message to the builder.
4848
pub fn append_syslog(&mut self, syslog_message: ParsedSyslogMessage<'_>) {
4949
self.logs
50-
.append_time_unix_nano(syslog_message.timestamp().map(|v| v as i64));
50+
.append_time_unix_nano(syslog_message.timestamp().map(|v| v as i64).unwrap_or(0));
5151

5252
let (severity_number, severity_text) =
5353
syslog_message.severity().unwrap_or((0, "UNSPECIFIED"));
@@ -89,32 +89,15 @@ impl ArrowRecordsBuilder {
8989
.append_dropped_attributes_count_n(0, log_record_count);
9090

9191
let observed_time = Utc::now().timestamp_nanos_opt().unwrap_or(0);
92-
for _ in 0..log_record_count {
93-
self.logs
94-
.append_observed_time_unix_nano(Some(observed_time));
95-
}
96-
92+
self.logs
93+
.append_observed_time_unix_nano_n(observed_time, log_record_count);
9794
self.logs.append_schema_url_n(None, log_record_count);
95+
self.logs
96+
.append_dropped_attributes_count_n(0, log_record_count);
9897

99-
for _ in 0..log_record_count {
100-
self.logs.append_dropped_attributes_count(0);
101-
}
102-
103-
for _ in 0..log_record_count {
104-
self.logs.append_flags(None);
105-
}
106-
107-
for _ in 0..log_record_count {
108-
_ = self.logs.append_trace_id(None);
109-
}
110-
111-
for _logs in 0..log_record_count {
112-
_ = self.logs.append_trace_id(None);
113-
}
114-
115-
for _logs in 0..log_record_count {
116-
_ = self.logs.append_span_id(None);
117-
}
98+
self.logs.append_flags_n(None, log_record_count);
99+
_ = self.logs.append_trace_id_n(None, log_record_count);
100+
_ = self.logs.append_span_id_n(None, log_record_count);
118101

119102
let mut otap_batch = OtapArrowRecords::Logs(Logs::default());
120103

rust/otel-arrow-rust/src/encode/record/array.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ pub trait CheckedArrayAppendSlice {
144144
/// append a slice of T to the builder. Note that this does not append an individual
145145
/// element for each value in the slice, it appends the slice as a single row
146146
fn append_slice(&mut self, val: &[Self::Native]) -> Result<(), ArrowError>;
147+
148+
/// append the slice to the builder `n` times
149+
fn append_slice_n(&mut self, val: &[Self::Native], n: usize) -> Result<(), ArrowError>;
147150
}
148151

149152
/// Used by the builder to identify the default value of the array that is being built. By default
@@ -614,6 +617,15 @@ where
614617
retry = { self.append_slice(value) }
615618
)
616619
}
620+
621+
fn append_slice_n(&mut self, value: &[Self::Native], n: usize) -> Result<(), ArrowError> {
622+
handle_append_checked!(
623+
self,
624+
append_slice_n(value, n),
625+
default_check = Self::is_default_value(&self.default_value, &value),
626+
retry = { self.append_slice_n(value, n) }
627+
)
628+
}
617629
}
618630

619631
impl<T, TArgs, TN, TD8, TD16> AdaptiveArrayBuilder<T, TArgs, TN, TD8, TD16>
@@ -1278,6 +1290,7 @@ pub mod test {
12781290
assert!(builder.append_slice(&valid_values[0]).is_ok());
12791291
assert!(builder.append_slice(&valid_values[1]).is_ok());
12801292
builder.append_nulls(2);
1293+
assert!(builder.append_slice_n(&valid_values[1], 2).is_ok());
12811294

12821295
let result = builder.finish().unwrap();
12831296
assert_eq!(
@@ -1287,7 +1300,7 @@ pub mod test {
12871300
Box::new(DataType::FixedSizeBinary(1))
12881301
)
12891302
);
1290-
assert_eq!(result.len(), 7);
1303+
assert_eq!(result.len(), 9);
12911304

12921305
let dict_array = result
12931306
.as_any()
@@ -1296,7 +1309,17 @@ pub mod test {
12961309
let dict_keys = dict_array.keys();
12971310
assert_eq!(
12981311
dict_keys,
1299-
&UInt8Array::from_iter(vec![Some(0), Some(1), None, Some(0), Some(1), None, None])
1312+
&UInt8Array::from_iter(vec![
1313+
Some(0),
1314+
Some(1),
1315+
None,
1316+
Some(0),
1317+
Some(1),
1318+
None,
1319+
None,
1320+
Some(1),
1321+
Some(1)
1322+
])
13001323
);
13011324
let dict_values = dict_array
13021325
.values()

rust/otel-arrow-rust/src/encode/record/array/dictionary.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub trait CheckedDictionaryAppendSlice {
9696
/// append a slice of T to the builder. Note that this does not append an individual
9797
/// element for each value in the slice, it appends the slice as a single row
9898
fn append_slice(&mut self, val: &[Self::Native]) -> checked::Result<usize>;
99+
100+
/// append the slice to the builder `n` times.
101+
fn append_slice_n(&mut self, val: &[Self::Native], n: usize) -> checked::Result<usize>;
99102
}
100103

101104
// This is the error type for the result that is returned by CheckedDictionaryArrayAppend trait.
@@ -593,6 +596,34 @@ where
593596
}
594597
}
595598
}
599+
600+
pub fn append_slice_n_checked(&mut self, value: &[T], n: usize) -> checked::Result<usize> {
601+
loop {
602+
let append_result = match &mut self.variant {
603+
DictIndexVariant::UInt8(dict_builder) => dict_builder.append_slice_n(value, n),
604+
DictIndexVariant::UInt16(dict_builder) => dict_builder.append_slice_n(value, n),
605+
};
606+
607+
match append_result {
608+
Ok(index) => {
609+
if index + 1 > self.max_cardinality as usize {
610+
self.overflow_index = Some(index);
611+
return Err(checked::DictionaryBuilderError::DictOverflow {});
612+
}
613+
return Ok(index);
614+
}
615+
Err(checked::DictionaryBuilderError::DictOverflow {}) => {
616+
self.upgrade_key().map_err(|err| match err {
617+
DictionaryBuilderError::DictOverflow {} => {
618+
checked::DictionaryBuilderError::DictOverflow {}
619+
}
620+
})?;
621+
// continue the loop and retry
622+
}
623+
other => return other,
624+
}
625+
}
626+
}
596627
}
597628

598629
impl<T8, T16> AdaptiveDictionaryBuilder<T8, T16>

rust/otel-arrow-rust/src/encode/record/array/fixed_size_binary.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ impl CheckedArrayAppendSlice for FixedSizeBinaryBuilder {
5050
fn append_slice(&mut self, val: &[Self::Native]) -> Result<(), ArrowError> {
5151
self.append_value(val)
5252
}
53+
54+
fn append_slice_n(&mut self, val: &[Self::Native], n: usize) -> Result<(), ArrowError> {
55+
for _ in 0..n {
56+
self.append_value(val)?;
57+
}
58+
59+
Ok(())
60+
}
5361
}
5462

5563
impl ArrayAppendNulls for FixedSizeBinaryBuilder {
@@ -114,6 +122,21 @@ where
114122
),
115123
}
116124
}
125+
126+
fn append_slice_n(
127+
&mut self,
128+
val: &[Self::Native],
129+
n: usize,
130+
) -> super::dictionary::checked::Result<usize> {
131+
// TODO use optimized method once we upgrade to latest arrow:
132+
// https://github.com/apache/arrow-rs/pull/8498
133+
let mut index = 0;
134+
for _ in 0..n {
135+
index = self.append_slice(val)?;
136+
}
137+
138+
Ok(index)
139+
}
117140
}
118141

119142
impl<K> ArrayAppendNulls for FixedSizeBinaryDictionaryBuilder<K>
@@ -172,11 +195,22 @@ mod test {
172195
CheckedArrayAppend::append_value(&mut fsb_builder, &b"1234".to_vec()).unwrap();
173196
CheckedArrayAppend::append_value(&mut fsb_builder, &b"5678".to_vec()).unwrap();
174197
CheckedArrayAppend::append_value(&mut fsb_builder, &b"9012".to_vec()).unwrap();
198+
CheckedArrayAppendSlice::append_slice(&mut fsb_builder, b"4180").unwrap();
199+
CheckedArrayAppendSlice::append_slice_n(&mut fsb_builder, b"5140", 2).unwrap();
200+
175201
let result = ArrayBuilder::finish(&mut fsb_builder);
176202
assert_eq!(result.data_type(), &DataType::FixedSizeBinary(4));
177203

178204
let expected = FixedSizeBinaryArray::try_from_iter(
179-
[b"1234".to_vec(), b"5678".to_vec(), b"9012".to_vec()].iter(),
205+
[
206+
b"1234".to_vec(),
207+
b"5678".to_vec(),
208+
b"9012".to_vec(),
209+
b"4180".to_vec(),
210+
b"5140".to_vec(),
211+
b"5140".to_vec(),
212+
]
213+
.iter(),
180214
)
181215
.unwrap();
182216

@@ -201,6 +235,11 @@ mod test {
201235
let index =
202236
CheckedDictionaryArrayAppend::append_value(&mut dict_builder, &b"b".to_vec()).unwrap();
203237
assert_eq!(index, 1);
238+
let index = CheckedDictionaryAppendSlice::append_slice(&mut dict_builder, b"c").unwrap();
239+
assert_eq!(index, 2);
240+
let index =
241+
CheckedDictionaryAppendSlice::append_slice_n(&mut dict_builder, b"d", 2).unwrap();
242+
assert_eq!(index, 3);
204243

205244
let result = DictionaryBuilder::finish(&mut dict_builder);
206245

@@ -215,7 +254,10 @@ mod test {
215254
let mut expected_dict_values = FixedSizeBinaryBuilder::new(1);
216255
assert!(expected_dict_values.append_value(b"a").is_ok());
217256
assert!(expected_dict_values.append_value(b"b").is_ok());
218-
let expected_dict_keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
257+
assert!(expected_dict_values.append_value(b"c").is_ok());
258+
assert!(expected_dict_values.append_value(b"d").is_ok());
259+
assert!(expected_dict_values.append_value(b"d").is_ok());
260+
let expected_dict_keys = UInt8Array::from_iter_values(vec![0, 0, 1, 2, 3, 3]);
219261
let expected =
220262
UInt8DictionaryArray::new(expected_dict_keys, Arc::new(expected_dict_values.finish()));
221263

rust/otel-arrow-rust/src/encode/record/logs.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,23 @@ impl LogsRecordBatchBuilder {
131131
}
132132

133133
/// append a value to the `time_unix_nano` array
134-
pub fn append_time_unix_nano(&mut self, val: Option<i64>) {
135-
let val = val.unwrap_or(0);
134+
#[inline]
135+
pub fn append_time_unix_nano(&mut self, val: i64) {
136136
self.time_unix_nano.append_value(&val);
137137
}
138138

139139
/// append a value to the `observed_time_unix_nano` array
140-
pub fn append_observed_time_unix_nano(&mut self, val: Option<i64>) {
141-
let val = val.unwrap_or(0);
140+
#[inline]
141+
pub fn append_observed_time_unix_nano(&mut self, val: i64) {
142142
self.observed_time_unix_nano.append_value(&val);
143143
}
144144

145+
/// append a value to the `observed_time_unix_nano` array `n` times
146+
#[inline]
147+
pub fn append_observed_time_unix_nano_n(&mut self, val: i64, n: usize) {
148+
self.observed_time_unix_nano.append_value_n(&val, n);
149+
}
150+
145151
/// append a value to the `severity_number` array
146152
pub fn append_severity_number(&mut self, val: Option<i32>) {
147153
if let Some(val) = val {
@@ -183,6 +189,11 @@ impl LogsRecordBatchBuilder {
183189
self.dropped_attributes_count.append_value(&val);
184190
}
185191

192+
/// append a value to the `dropped_attributes_count` array n times
193+
pub fn append_dropped_attributes_count_n(&mut self, val: u32, n: usize) {
194+
self.dropped_attributes_count.append_value_n(&val, n);
195+
}
196+
186197
/// append a value to the `flags` array
187198
pub fn append_flags(&mut self, val: Option<u32>) {
188199
if let Some(val) = val {
@@ -192,6 +203,15 @@ impl LogsRecordBatchBuilder {
192203
}
193204
}
194205

206+
/// append a value to the `flags` array n times
207+
pub fn append_flags_n(&mut self, val: Option<u32>, n: usize) {
208+
if let Some(val) = val {
209+
self.flags.append_value_n(&val, n);
210+
} else {
211+
self.flags.append_nulls(n);
212+
}
213+
}
214+
195215
/// append a value to the `trace_id` array
196216
pub fn append_trace_id(&mut self, val: Option<&TraceId>) -> Result<(), ArrowError> {
197217
if let Some(val) = val {
@@ -202,6 +222,16 @@ impl LogsRecordBatchBuilder {
202222
}
203223
}
204224

225+
/// append a value to the `trace_id` array `n` times
226+
pub fn append_trace_id_n(&mut self, val: Option<&TraceId>, n: usize) -> Result<(), ArrowError> {
227+
if let Some(val) = val {
228+
self.trace_id.append_slice_n(val, n)
229+
} else {
230+
self.trace_id.append_nulls(n);
231+
Ok(())
232+
}
233+
}
234+
205235
/// append a value to the `span_id` array
206236
pub fn append_span_id(&mut self, val: Option<&SpanId>) -> Result<(), ArrowError> {
207237
if let Some(val) = val {
@@ -212,6 +242,16 @@ impl LogsRecordBatchBuilder {
212242
}
213243
}
214244

245+
/// append a value to the `span_id` array `n` times
246+
pub fn append_span_id_n(&mut self, val: Option<&SpanId>, n: usize) -> Result<(), ArrowError> {
247+
if let Some(val) = val {
248+
self.span_id.append_slice_n(val, n)
249+
} else {
250+
self.span_id.append_nulls(n);
251+
Ok(())
252+
}
253+
}
254+
215255
/// append a value to the `event_name` array
216256
pub fn append_event_name(&mut self, val: Option<&[u8]>) {
217257
if let Some(val) = val {

0 commit comments

Comments
 (0)