Skip to content

Commit 2008fde

Browse files
committed
Box LogRecord and InstrumentationScope into heap
1 parent 25002df commit 2008fde

File tree

1 file changed

+24
-40
lines changed

1 file changed

+24
-40
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
3737
/// Default maximum batch size.
3838
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
3939

40-
/// Default timeout for forceflush and shutdown.
41-
const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1);
42-
43-
/// environment variable name for forceflush and shutdown timeout.
44-
const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_LOG_EXPORT_INTERVAL";
45-
4640
/// The interface for plugging into a [`Logger`].
4741
///
4842
/// [`Logger`]: crate::logs::Logger
@@ -154,6 +148,23 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
154148
}
155149
}
156150

151+
/// Messages sent between application thread and batch log processor's work thread.
152+
#[allow(clippy::large_enum_variant)]
153+
#[derive(Debug)]
154+
enum BatchMessage {
155+
/// Export logs, usually called when the log is emitted.
156+
ExportLog(Box<(LogRecord, InstrumentationScope)>),
157+
/// Flush the current buffer to the backend, it can be triggered by
158+
/// pre configured interval or a call to `force_push` function.
159+
// Flush(Option<oneshot::Sender<ExportResult>>),
160+
/// ForceFlush flushes the current buffer to the backend.
161+
ForceFlush(mpsc::SyncSender<ExportResult>),
162+
/// Shut down the worker thread, push all logs in buffer to the backend.
163+
Shutdown(mpsc::SyncSender<ExportResult>),
164+
/// Set the resource for the exporter.
165+
SetResource(Arc<Resource>),
166+
}
167+
157168
/// A [`LogProcessor`] that asynchronously buffers log records and reports
158169
/// them at a pre-configured interval.
159170
pub struct BatchLogProcessor {
@@ -189,10 +200,8 @@ impl LogProcessor for BatchLogProcessor {
189200
return;
190201
}
191202

192-
let result = self.message_sender.try_send(BatchMessage::ExportLog((
193-
record.clone(),
194-
instrumentation.clone(),
195-
)));
203+
let result = self.message_sender.
204+
try_send(BatchMessage::ExportLog(Box::new((record.clone(), instrumentation.clone()))));
196205

197206
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
198207
if result.is_err() {
@@ -264,7 +273,7 @@ impl LogProcessor for BatchLogProcessor {
264273
RecvTimeoutError::Timeout => {
265274
otel_error!(
266275
name: "BatchLogProcessor.Shutdown.Timeout",
267-
message = "BatchLogProcessor shutdown timed out."
276+
message = "BatchLogProcessor shutdown timing out."
268277
);
269278
LogError::ExportTimedOut(self.shutdown_timeout)
270279
}
@@ -329,8 +338,7 @@ impl BatchLogProcessor {
329338
exporter.set_resource(&resource);
330339
}
331340
Err(RecvTimeoutError::Timeout) => {
332-
// FIXME handle result
333-
let _result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
341+
let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time);
334342
}
335343
Err(err) => {
336344
otel_error!(
@@ -342,19 +350,12 @@ impl BatchLogProcessor {
342350
}
343351
});
344352

345-
346-
let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME)
347-
.ok()
348-
.and_then(|s| u64::from_str(&s).ok())
349-
.map(Duration::from_secs)
350-
.unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT);
351-
352353
// Return batch processor with link to worker
353354
BatchLogProcessor {
354355
message_sender,
355356
handle: Mutex::new(Some(handle)),
356-
forceflush_timeout,
357-
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
357+
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
358+
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
358359
is_shutdown: AtomicBool::new(false),
359360
dropped_logs_count: AtomicUsize::new(0),
360361
max_queue_size,
@@ -376,7 +377,7 @@ impl BatchLogProcessor {
376377
fn export_with_timeout_sync<E>(
377378
_: Duration, // TODO, enforcing timeout in exporter.
378379
exporter: &mut E,
379-
batch: Vec<(LogRecord, InstrumentationScope)>,
380+
batch: Vec<Box<(LogRecord, InstrumentationScope)>>,
380381
last_export_time: &mut Instant,
381382
) -> ExportResult
382383
where
@@ -601,23 +602,6 @@ where
601602
}
602603
}
603604

604-
/// Messages sent between application thread and batch log processor's work thread.
605-
#[allow(clippy::large_enum_variant)]
606-
#[derive(Debug)]
607-
enum BatchMessage {
608-
/// Export logs, usually called when the log is emitted.
609-
ExportLog((LogRecord, InstrumentationScope)),
610-
/// Flush the current buffer to the backend, it can be triggered by
611-
/// pre configured interval or a call to `force_push` function.
612-
// Flush(Option<oneshot::Sender<ExportResult>>),
613-
/// ForceFlush flushes the current buffer to the backend.
614-
ForceFlush(mpsc::SyncSender<ExportResult>),
615-
/// Shut down the worker thread, push all logs in buffer to the backend.
616-
Shutdown(mpsc::SyncSender<ExportResult>),
617-
/// Set the resource for the exporter.
618-
SetResource(Arc<Resource>),
619-
}
620-
621605
#[cfg(all(test, feature = "testing", feature = "logs"))]
622606
mod tests {
623607
use super::{

0 commit comments

Comments
 (0)