From b8c67cbb064b234b0cde0379a97b08a1105573f9 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 9 Sep 2024 16:01:40 -0700 Subject: [PATCH 1/9] [Logs] Initial draft to handle batch export into background thread --- examples/self-diagnostics/src/main.rs | 2 +- .../examples/logs-basic.rs | 2 +- .../examples/basic-otlp-http/src/main.rs | 2 +- .../examples/basic-otlp/src/main.rs | 2 +- opentelemetry-otlp/src/logs.rs | 11 +- opentelemetry-sdk/src/logs/log_emitter.rs | 3 +- opentelemetry-sdk/src/logs/log_processor.rs | 238 +++++------------- 7 files changed, 73 insertions(+), 187 deletions(-) diff --git a/examples/self-diagnostics/src/main.rs b/examples/self-diagnostics/src/main.rs index 04cd1fcd45..89588d7491 100644 --- a/examples/self-diagnostics/src/main.rs +++ b/examples/self-diagnostics/src/main.rs @@ -58,7 +58,7 @@ fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider { .http() .with_endpoint("http://localhost:4318/v1/logs"), ) - .install_batch(opentelemetry_sdk::runtime::Tokio) + .install_batch() .unwrap(); // Add a tracing filter to filter events from crates used by opentelemetry-otlp. diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index dc5bacc813..c2fda41b52 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -16,7 +16,7 @@ async fn main() { let exporter = LogExporter::default(); //Create a LoggerProvider and register the exporter let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .with_log_processor(BatchLogProcessor::builder(exporter).build()) .build(); // Setup Log Appender for the log crate. diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 610268294e..c9466a3447 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -44,7 +44,7 @@ fn init_logs() -> Result .with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format .with_endpoint("http://localhost:4318/v1/logs"), ) - .install_batch(opentelemetry_sdk::runtime::Tokio) + .install_batch() } fn init_tracer_provider() -> Result { diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index f18de642f9..7f3bea4a83 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -60,7 +60,7 @@ fn init_logs() -> Result { .tonic() .with_endpoint("http://localhost:4317"), ) - .install_batch(runtime::Tokio) + .install_batch() } #[tokio::main] diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..da9aa08e71 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; use opentelemetry::logs::LogError; -use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource}; +use opentelemetry_sdk::{export::logs::LogData, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -167,14 +167,12 @@ impl OtlpLogPipeline { /// Returns a [`LoggerProvider`]. /// /// [`LoggerProvider`]: opentelemetry_sdk::logs::LoggerProvider - pub fn install_batch( + pub fn install_batch( self, - runtime: R, ) -> Result { Ok(build_batch_with_exporter( self.exporter_builder.build_log_exporter()?, self.resource, - runtime, self.batch_config, )) } @@ -194,14 +192,13 @@ fn build_simple_with_exporter( provider_builder.build() } -fn build_batch_with_exporter( +fn build_batch_with_exporter( exporter: LogExporter, resource: Option, - runtime: R, batch_config: Option, ) -> opentelemetry_sdk::logs::LoggerProvider { let mut provider_builder = opentelemetry_sdk::logs::LoggerProvider::builder(); - let batch_processor = opentelemetry_sdk::logs::BatchLogProcessor::builder(exporter, runtime) + let batch_processor = opentelemetry_sdk::logs::BatchLogProcessor::builder(exporter) .with_batch_config(batch_config.unwrap_or_default()) .build(); provider_builder = provider_builder.with_log_processor(batch_processor); diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c9d3e5a828..c5a016f763 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -173,9 +173,8 @@ impl Builder { pub fn with_batch_exporter( self, exporter: T, - runtime: R, ) -> Self { - let batch = BatchLogProcessor::builder(exporter, runtime).build(); + let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7366f19791..5cb5c7e19b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,13 +1,13 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, - runtime::{RuntimeChannel, TrySend}, Resource, }; +use std::sync::mpsc::{self, SyncSender}; use futures_channel::oneshot; -use futures_util::{ - future::{self, Either}, - {pin_mut, stream, StreamExt as _}, -}; +// use futures_util::{ +// future::{self, Either}, +// {pin_mut, stream, StreamExt as _}, +// }; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; use opentelemetry::{ @@ -23,6 +23,7 @@ use std::{ sync::Arc, time::Duration, }; +use std::thread; /// Delay interval between two consecutive exports. const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; @@ -137,189 +138,83 @@ impl LogProcessor for SimpleLogProcessor { /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a pre-configured interval. -pub struct BatchLogProcessor { - message_sender: R::Sender, +pub struct BatchLogProcessor { + sender: SyncSender>, + handle: thread::JoinHandle<()>, } -impl Debug for BatchLogProcessor { +impl Debug for BatchLogProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BatchLogProcessor") - .field("message_sender", &self.message_sender) + .field("sender", &self.sender) .finish() } } -impl LogProcessor for BatchLogProcessor { +impl LogProcessor for BatchLogProcessor { fn emit(&self, data: &mut LogData) { - let result = self - .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); - + let data = Box::new(data.clone()); + let result = self.sender.send(data); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); } } fn force_flush(&self) -> LogResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| LogError::Other(err.into()))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + // TODO, implement force_flush + LogResult::Ok(()) } fn shutdown(&self) -> LogResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) - .map_err(|err| LogError::Other(err.into()))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + // TODO, implement shutdown + // self.handle.join().unwrap(); + // let result = self.handle.join(); + // if let Err(err) = result { + // global::handle_error(err: LogError::Other(err.into())); + // } + // // TODO, implement shutdown + LogResult::Ok(()) } - fn set_resource(&self, resource: &Resource) { - let resource = Arc::new(resource.clone()); - let _ = self - .message_sender - .try_send(BatchMessage::SetResource(resource)); + fn set_resource(&self, _resource: &Resource) { + // TODO, implement set_resource } } -impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - let inner_runtime = runtime.clone(); - - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); - let mut logs = Vec::new(); - let mut messages = Box::pin(stream::select(message_receiver, ticker)); - - while let Some(message) = messages.next().await { - match message { - // Log has finished, add to buffer of pending logs. - BatchMessage::ExportLog(log) => { - logs.push(Cow::Owned(log)); - - if logs.len() == config.max_export_batch_size { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Err(err) = result { - global::handle_error(err); - } - } - } - // Log batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush(res_channel) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Some(channel) = res_channel { - if let Err(result) = channel.send(result) { - global::handle_error(LogError::from(format!( - "failed to send flush result: {:?}", - result - ))); - } - } else if let Err(err) = result { - global::handle_error(err); - } - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - exporter.shutdown(); - - if let Err(result) = ch.send(result) { - global::handle_error(LogError::from(format!( - "failed to send batch processor shutdown result: {:?}", - result - ))); - } - - break; - } - - // propagate the resource - BatchMessage::SetResource(resource) => { - exporter.set_resource(&resource); - } - } +impl BatchLogProcessor { + pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); + let handle = thread::spawn(move || { + let mut batch: Vec> = Vec::new(); + match receiver.try_recv() { + Ok(data) => batch.push(data), + // TODO: handle error + Err(_) => {} } - })); + let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); + let result = futures_executor::block_on(export); + match result { + Ok(_) => {} + Err(err) => global::handle_error(err), + } + }); // Return batch processor with link to worker - BatchLogProcessor { message_sender } + BatchLogProcessor { sender, handle } } /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorBuilder + pub fn builder(exporter: E) -> BatchLogProcessorBuilder where E: LogExporter, { BatchLogProcessorBuilder { exporter, config: Default::default(), - runtime, } } } -async fn export_with_timeout<'a, R, E>( - time_out: Duration, - exporter: &mut E, - runtime: &R, - batch: Vec>, -) -> ExportResult -where - R: RuntimeChannel, - E: LogExporter + ?Sized, -{ - if batch.is_empty() { - return Ok(()); - } - - let export = exporter.export(batch); - let timeout = runtime.delay(time_out); - pin_mut!(export); - pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), - } -} - /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. #[derive(Debug)] @@ -463,16 +358,14 @@ impl BatchConfigBuilder { /// A builder for creating [`BatchLogProcessor`] instances. /// #[derive(Debug)] -pub struct BatchLogProcessorBuilder { +pub struct BatchLogProcessorBuilder { exporter: E, config: BatchConfig, - runtime: R, } -impl BatchLogProcessorBuilder +impl BatchLogProcessorBuilder where E: LogExporter + 'static, - R: RuntimeChannel, { /// Set the BatchConfig for [`BatchLogProcessorBuilder`] pub fn with_batch_config(self, config: BatchConfig) -> Self { @@ -480,25 +373,25 @@ where } /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) + pub fn build(self) -> BatchLogProcessor { + BatchLogProcessor::new(Box::new(self.exporter), self.config) } } /// Messages sent between application thread and batch log processor's work thread. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export logs, usually called when the log is emitted. - ExportLog(LogData), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} +// #[allow(clippy::large_enum_variant)] +// #[derive(Debug)] +// enum BatchMessage { +// /// Export logs, usually called when the log is emitted. +// ExportLog(LogData), +// /// Flush the current buffer to the backend, it can be triggered by +// /// pre configured interval or a call to `force_push` function. +// Flush(Option>), +// /// Shut down the worker thread, push all logs in buffer to the backend. +// Shutdown(oneshot::Sender), +// /// Set the resource for the exporter. +// SetResource(Arc), +// } #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { @@ -516,7 +409,6 @@ mod tests { }, BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, }, - runtime, testing::logs::InMemoryLogsExporter, Resource, }; @@ -663,7 +555,7 @@ mod tests { ]; temp_env::with_vars(env_vars.clone(), || { let builder = - BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogsExporter::default()); assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -684,7 +576,7 @@ mod tests { temp_env::with_vars(env_vars, || { let builder = - BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogsExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); @@ -699,7 +591,7 @@ mod tests { .with_max_queue_size(4) .build(); - let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio) + let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default()) .with_batch_config(expected); let actual = &builder.config; @@ -736,7 +628,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let provider = LoggerProvider::builder() .with_log_processor(processor) @@ -763,7 +654,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let mut log_data = LogData { record: Default::default(), From a45310fa366695b20616f7f32a9d9dd82022f6ff Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 1 Oct 2024 14:28:43 -0700 Subject: [PATCH 2/9] Commit last good log processor --- opentelemetry-sdk/src/logs/log_processor.rs | 89 +++++++++++++-------- 1 file changed, 55 insertions(+), 34 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 36b5ade953..477bfe8d6f 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -152,7 +152,7 @@ impl LogProcessor for SimpleLogProcessor { /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a pre-configured interval. pub struct BatchLogProcessor { - sender: SyncSender>, + sender: SyncSender, handle: thread::JoinHandle<()>, } @@ -166,14 +166,14 @@ impl Debug for BatchLogProcessor { impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { - // let result = self.sender.send(BatchMessage::ExportLog(( - // record.clone(), - // instrumentation.clone(), - // ))); + let result = self.sender.send(BatchMessage::ExportLog(( + record.clone(), + instrumentation.clone(), + ))); - // if let Err(err) = result { - // global::handle_error(LogError::Other(err.into())); - // } + if let Err(err) = result { + global::handle_error(LogError::Other(err.into())); + } } fn force_flush(&self) -> LogResult<()> { @@ -202,17 +202,51 @@ impl BatchLogProcessor { let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); let handle = thread::spawn(move || { let mut batch: Vec> = Vec::new(); - match receiver.try_recv() { - Ok(data) => batch.push(data), - // TODO: handle error + match receiver.recv() { + Ok(BatchMessage::ExportLog((data, instrumentation))) => { + batch.push(Box::new(LogData { record: data, instrumentation })); + } + Ok(BatchMessage::Flush(sender)) => { + // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); + // let result = futures_executor::block_on(export); + // match sender { + // Some(sender) => { + // let _ = sender.send(result); + // } + // None => { + // match result { + // Ok(_) => {} + // Err(err) => global::handle_error(err), + // } + // } + // } + } + Ok(BatchMessage::Shutdown(sender)) => { + // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); + // let result = futures_executor::block_on(export); + // match sender { + // Some(sender) => { + // let _ = sender.send(result); + // } + // None => { + // match result { + // Ok(_) => {} + // Err(err) => global::handle_error(err), + // } + // } + // } + } + Ok(BatchMessage::SetResource(resource)) => { + // exporter.set_resource(&resource); + } Err(_) => {} } - // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); - // let result = futures_executor::block_on(export); - // match result { - // Ok(_) => {} - // Err(err) => global::handle_error(err), - // } + let export = exporter.export(batch.into_iter().collect()); + let result = futures_executor::block_on(export); + match result { + Ok(_) => {} + Err(err) => global::handle_error(err), + } }); // Return batch processor with link to worker @@ -395,19 +429,6 @@ where } /// Messages sent between application thread and batch log processor's work thread. -// #[allow(clippy::large_enum_variant)] -// #[derive(Debug)] -// enum BatchMessage { -// /// Export logs, usually called when the log is emitted. -// ExportLog(LogData), -// /// Flush the current buffer to the backend, it can be triggered by -// /// pre configured interval or a call to `force_push` function. -// Flush(Option>), -// /// Shut down the worker thread, push all logs in buffer to the backend. -// Shutdown(oneshot::Sender), -// /// Set the resource for the exporter. -// SetResource(Arc), -// } #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { @@ -731,7 +752,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, + // runtime::Tokio, ); // @@ -748,7 +769,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, + // runtime::TokioCurrentThread, ); processor.shutdown().unwrap(); @@ -762,7 +783,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, + // runtime::Tokio, ); processor.shutdown().unwrap(); @@ -776,7 +797,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, + // runtime::TokioCurrentThread, ); processor.shutdown().unwrap(); From 4453aa767a25e32c71e253479852de8fdacea785 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 1 Oct 2024 15:47:47 -0700 Subject: [PATCH 3/9] Add shutdown implementation --- opentelemetry-sdk/src/logs/log_processor.rs | 63 ++++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 477bfe8d6f..6ad124a96d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,12 +1,15 @@ use crate::{ export::logs::{ExportResult, LogBatch, LogExporter}, - logs::LogData, logs::LogRecord, Resource, }; use std::sync::mpsc::{self, SyncSender}; use futures_channel::oneshot; -use std::borrow::Cow; +use futures_util::{ + // future::{self, Either}, + {pin_mut, /*stream, StreamExt as _*/}, +}; +// use std::borrow::Cow; // use futures_util::{ // future::{self, Either}, @@ -153,7 +156,7 @@ impl LogProcessor for SimpleLogProcessor { /// them at a pre-configured interval. pub struct BatchLogProcessor { sender: SyncSender, - handle: thread::JoinHandle<()>, + handle: Mutex>>, } impl Debug for BatchLogProcessor { @@ -177,23 +180,27 @@ impl LogProcessor for BatchLogProcessor { } fn force_flush(&self) -> LogResult<()> { - // TODO, implement force_flush + let _result = self.sender.send(BatchMessage::Flush(None)); LogResult::Ok(()) } fn shutdown(&self) -> LogResult<()> { + let (res_sender, _res_receiver) = oneshot::channel(); + let _result = self.sender.send(BatchMessage::Shutdown(res_sender)); // TODO, implement shutdown // self.handle.join().unwrap(); // let result = self.handle.join(); // if let Err(err) = result { // global::handle_error(err: LogError::Other(err.into())); // } - // // TODO, implement shutdown + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().unwrap(); + } LogResult::Ok(()) } fn set_resource(&self, _resource: &Resource) { - // TODO, implement set_resource + let _result = self.sender.send(BatchMessage::SetResource(Arc::new(_resource.clone()))); } } @@ -201,12 +208,12 @@ impl BatchLogProcessor { pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); let handle = thread::spawn(move || { - let mut batch: Vec> = Vec::new(); + let mut logs = Vec::new(); match receiver.recv() { - Ok(BatchMessage::ExportLog((data, instrumentation))) => { - batch.push(Box::new(LogData { record: data, instrumentation })); + Ok(BatchMessage::ExportLog(data)) => { + logs.push(data); } - Ok(BatchMessage::Flush(sender)) => { + Ok(BatchMessage::Flush(_sender)) => { // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); // let result = futures_executor::block_on(export); // match sender { @@ -221,7 +228,7 @@ impl BatchLogProcessor { // } // } } - Ok(BatchMessage::Shutdown(sender)) => { + Ok(BatchMessage::Shutdown(_sender)) => { // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); // let result = futures_executor::block_on(export); // match sender { @@ -236,12 +243,12 @@ impl BatchLogProcessor { // } // } } - Ok(BatchMessage::SetResource(resource)) => { + Ok(BatchMessage::SetResource(_resource)) => { // exporter.set_resource(&resource); } Err(_) => {} } - let export = exporter.export(batch.into_iter().collect()); + let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0)); let result = futures_executor::block_on(export); match result { Ok(_) => {} @@ -250,7 +257,7 @@ impl BatchLogProcessor { }); // Return batch processor with link to worker - BatchLogProcessor { sender, handle } + BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) } } /// Create a new batch processor builder @@ -265,6 +272,34 @@ impl BatchLogProcessor { } } +async fn export_with_timeout( + _time_out: Duration, + exporter: &mut E, + batch: Vec<(LogRecord, InstrumentationLibrary)>, +) -> ExportResult +where + E: LogExporter + ?Sized, +{ + if batch.is_empty() { + return Ok(()); + } + + // TBD - Can we avoid this conversion as it involves heap allocation with new vector? + let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch + .iter() + .map(|log_data| (&log_data.0, &log_data.1)) + .collect(); + let _export = exporter.export(LogBatch::new(log_vec.as_slice())); + // let timeout = runtime.delay(time_out); + pin_mut!(_export); + // pin_mut!(timeout); + // match future::select(export, export).await { + // Either::Left((export_res, _)) => export_res, + // Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), + // } + ExportResult::Ok(()) +} + /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. #[derive(Debug)] From 475ab15769c523eb5c70cd904eb46299461af223 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 1 Oct 2024 17:13:26 -0700 Subject: [PATCH 4/9] Loop on channel in background thread --- opentelemetry-sdk/src/logs/log_processor.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6ad124a96d..d1051bab5d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -209,6 +209,9 @@ impl BatchLogProcessor { let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); let handle = thread::spawn(move || { let mut logs = Vec::new(); + logs.reserve(config.max_export_batch_size); + loop { + logs.clear(); match receiver.recv() { Ok(BatchMessage::ExportLog(data)) => { logs.push(data); @@ -229,6 +232,8 @@ impl BatchLogProcessor { // } } Ok(BatchMessage::Shutdown(_sender)) => { + exporter.shutdown(); + break; // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); // let result = futures_executor::block_on(export); // match sender { @@ -248,13 +253,14 @@ impl BatchLogProcessor { } Err(_) => {} } + let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0)); let result = futures_executor::block_on(export); match result { Ok(_) => {} Err(err) => global::handle_error(err), } - }); + }}); // Return batch processor with link to worker BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) } From daa694d4529ec06b338ee2021be58dabc6520395 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Tue, 1 Oct 2024 17:21:15 -0700 Subject: [PATCH 5/9] Batch log data --- opentelemetry-sdk/src/logs/log_processor.rs | 74 ++++++++------------- 1 file changed, 26 insertions(+), 48 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index d1051bab5d..c9d5c6de82 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -211,56 +211,34 @@ impl BatchLogProcessor { let mut logs = Vec::new(); logs.reserve(config.max_export_batch_size); loop { - logs.clear(); - match receiver.recv() { - Ok(BatchMessage::ExportLog(data)) => { - logs.push(data); + match receiver.recv() { + Ok(BatchMessage::ExportLog(data)) => { + logs.push(data); + + if logs.len() == config.max_export_batch_size { + let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0)); + let result = futures_executor::block_on(export); + match result { + Ok(_) => {} + Err(err) => global::handle_error(err), + } + logs.clear(); + } + } + Ok(BatchMessage::Flush(_sender)) => { + // TODO: Implement flush + } + Ok(BatchMessage::Shutdown(_sender)) => { + exporter.shutdown(); + break; + } + Ok(BatchMessage::SetResource(resource)) => { + exporter.set_resource(&resource); + } + Err(_) => {} } - Ok(BatchMessage::Flush(_sender)) => { - // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); - // let result = futures_executor::block_on(export); - // match sender { - // Some(sender) => { - // let _ = sender.send(result); - // } - // None => { - // match result { - // Ok(_) => {} - // Err(err) => global::handle_error(err), - // } - // } - // } - } - Ok(BatchMessage::Shutdown(_sender)) => { - exporter.shutdown(); - break; - // let export = exporter.export(batch.into_iter().map(|data| Cow::Owned(*data)).collect()); - // let result = futures_executor::block_on(export); - // match sender { - // Some(sender) => { - // let _ = sender.send(result); - // } - // None => { - // match result { - // Ok(_) => {} - // Err(err) => global::handle_error(err), - // } - // } - // } - } - Ok(BatchMessage::SetResource(_resource)) => { - // exporter.set_resource(&resource); - } - Err(_) => {} } - - let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0)); - let result = futures_executor::block_on(export); - match result { - Ok(_) => {} - Err(err) => global::handle_error(err), - } - }}); + }); // Return batch processor with link to worker BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) } From 8e3bce3a82b79228b9e2538e7d9598168cadf717 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Thu, 10 Oct 2024 17:17:01 -0700 Subject: [PATCH 6/9] Implement forceflush and shutdown on batch processor with background thread --- opentelemetry-sdk/src/logs/log_emitter.rs | 4 +- opentelemetry-sdk/src/logs/log_processor.rs | 135 +++++++++++++++----- 2 files changed, 105 insertions(+), 34 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c548c3068a..3f571a9a74 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,5 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; +use crate::{export::logs::LogExporter, Resource}; use opentelemetry::{ global, logs::{LogError, LogResult}, @@ -162,7 +162,7 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter( + pub fn with_batch_exporter( self, exporter: T, ) -> Self { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c9d5c6de82..6e2676e686 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,16 +1,14 @@ use crate::{ - export::logs::{ExportResult, LogBatch, LogExporter}, + export::{logs::{ExportResult, LogBatch, LogExporter}}, logs::LogRecord, Resource, }; -use std::sync::mpsc::{self, SyncSender}; -use futures_channel::oneshot; +use std::sync::mpsc::{self, SyncSender, RecvTimeoutError}; +// use futures_channel::oneshot; use futures_util::{ // future::{self, Either}, {pin_mut, /*stream, StreamExt as _*/}, }; -// use std::borrow::Cow; - // use futures_util::{ // future::{self, Either}, // {pin_mut, stream, StreamExt as _}, @@ -29,6 +27,7 @@ use std::{ str::FromStr, sync::Arc, time::Duration, + time::Instant, }; use std::thread; @@ -49,6 +48,14 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Default timeout for forceflush and shutdown. +const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1); +const OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); + +/// environment variable name for forceflush and shutdown timeout. +const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; +const OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; + /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -157,6 +164,8 @@ impl LogProcessor for SimpleLogProcessor { pub struct BatchLogProcessor { sender: SyncSender, handle: Mutex>>, + forceflush_timeout: Duration, + shutdown_timeout: Duration, } impl Debug for BatchLogProcessor { @@ -180,19 +189,32 @@ impl LogProcessor for BatchLogProcessor { } fn force_flush(&self) -> LogResult<()> { - let _result = self.sender.send(BatchMessage::Flush(None)); - LogResult::Ok(()) + let (sender, receiver) = mpsc::sync_channel(1); + self.sender.try_send(BatchMessage::ForceFlush(sender)) + .map_err(|err| LogError::Other(err.into()))?; + + receiver.recv_timeout(self.forceflush_timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })? } fn shutdown(&self) -> LogResult<()> { - let (res_sender, _res_receiver) = oneshot::channel(); - let _result = self.sender.send(BatchMessage::Shutdown(res_sender)); - // TODO, implement shutdown - // self.handle.join().unwrap(); - // let result = self.handle.join(); - // if let Err(err) = result { - // global::handle_error(err: LogError::Other(err.into())); - // } + let (sender, receiver) = mpsc::sync_channel(1); + self.sender.try_send(BatchMessage::Shutdown(sender)) + .map_err(|err| LogError::Other(err.into()))?; + + receiver.recv_timeout(self.shutdown_timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.shutdown_timeout) + } else { + LogError::Other(err.into()) + } + })??; + if let Some(handle) = self.handle.lock().unwrap().take() { handle.join().unwrap(); } @@ -208,40 +230,66 @@ impl BatchLogProcessor { pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); let handle = thread::spawn(move || { + let mut last_export_time = Instant::now(); + let mut logs = Vec::new(); logs.reserve(config.max_export_batch_size); + loop { - match receiver.recv() { + let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + + match receiver.recv_timeout(remaining_time) { Ok(BatchMessage::ExportLog(data)) => { logs.push(data); - if logs.len() == config.max_export_batch_size { - let export = export_with_timeout(config.max_export_timeout, exporter.as_mut(), logs.split_off(0)); - let result = futures_executor::block_on(export); - match result { - Ok(_) => {} - Err(err) => global::handle_error(err), - } - logs.clear(); + if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay { + last_export_time = Instant::now(); + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); } } - Ok(BatchMessage::Flush(_sender)) => { - // TODO: Implement flush + Ok(BatchMessage::ForceFlush(_sender)) => { + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + + match _sender.send(Ok(())) { + Ok(_) => {} + Err(err) => global::handle_error(LogError::Other(err.into())), + } } Ok(BatchMessage::Shutdown(_sender)) => { + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); exporter.shutdown(); + match _sender.send(Ok(())) { + Ok(_) => {} + Err(err) => global::handle_error(LogError::Other(err.into())), + } break; } Ok(BatchMessage::SetResource(resource)) => { exporter.set_resource(&resource); } - Err(_) => {} + Err(RecvTimeoutError::Timeout) => { + export_with_timeout_sync(config.max_export_timeout, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + } + Err(err) => global::handle_error(LogError::Other(err.into())), } } }); + let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT); + let shutdown_timeout = env::var(OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT); + // Return batch processor with link to worker - BatchLogProcessor { sender, handle: Mutex::new(Some(handle)) } + BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout } } /// Create a new batch processor builder @@ -256,6 +304,30 @@ impl BatchLogProcessor { } } +fn export_with_timeout_sync( + timeout: Duration, + exporter: &mut E, + batch: Vec<(LogRecord, InstrumentationLibrary)>, + last_export_time: &mut Instant, +) +where + E: LogExporter + ?Sized, +{ + *last_export_time = Instant::now(); + + if batch.is_empty() { + return (); + } + + let export = export_with_timeout(timeout, exporter, batch); + let result = futures_executor::block_on(export); + // batch.clear(); + match result { + Ok(_) => {} + Err(err) => global::handle_error(err), + }; +} + async fn export_with_timeout( _time_out: Duration, exporter: &mut E, @@ -453,11 +525,10 @@ where enum BatchMessage { /// Export logs, usually called when the log is emitted. ExportLog((LogRecord, InstrumentationLibrary)), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), + /// ForceFlush flush the current buffer to the backend + ForceFlush(mpsc::SyncSender), /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), + Shutdown(mpsc::SyncSender), /// Set the resource for the exporter. SetResource(Arc), } From cdd2458d002ed871046f972eb3f4724580b3006e Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 14 Oct 2024 11:50:15 -0700 Subject: [PATCH 7/9] Remove runtime parameter on install_batch --- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 0c4fb773e9..36dbc0c74c 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -18,7 +18,7 @@ fn init_logs() -> Result { opentelemetry_semantic_conventions::resource::SERVICE_NAME, "logs-integration-test", )])) - .install_batch(runtime::Tokio) + .install_batch() } pub async fn logs() -> Result<(), Box> { From d5042e53b440fcb8ddda621dbc02d623f269f2a4 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 14 Oct 2024 12:50:45 -0700 Subject: [PATCH 8/9] Remove more runtime parameters --- opentelemetry-appender-tracing/src/layer.rs | 2 +- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index fbbf8e4d97..18ef395f76 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -296,7 +296,7 @@ mod tests { async fn batch_processor_no_deadlock() { let exporter: ReentrantLogExporter = ReentrantLogExporter; let logger_provider = LoggerProvider::builder() - .with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio) + .with_batch_exporter(exporter.clone()) .build(); let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 36dbc0c74c..243e585fa2 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -5,7 +5,7 @@ use log::{info, Level}; use opentelemetry::logs::LogError; use opentelemetry::KeyValue; use opentelemetry_appender_log::OpenTelemetryLogBridge; -use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; +use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::error::Error; use std::fs::File; use std::os::unix::fs::MetadataExt; From 082972dad7c7b43ee61ab334e24ca36d6ae42021 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 16 Oct 2024 13:37:08 -0700 Subject: [PATCH 9/9] Check shutdown flag --- opentelemetry-sdk/src/logs/log_processor.rs | 27 ++++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c0a52054f3..5c547a4eb5 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -163,13 +163,13 @@ impl LogProcessor for SimpleLogProcessor { } } -/// A [`LogProcessor`] that asynchronously buffers log records and reports -/// them at a pre-configured interval. +/// A [`LogProcessor`] that buffers log records and reports them at a pre-configured interval. pub struct BatchLogProcessor { sender: SyncSender, handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, + is_shutdown: AtomicBool, } impl Debug for BatchLogProcessor { @@ -182,6 +182,14 @@ impl Debug for BatchLogProcessor { impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "batch_log_processor_emit_after_shutdown" + ); + return; + } + let result = self.sender.send(BatchMessage::ExportLog(( record.clone(), instrumentation.clone(), @@ -192,11 +200,17 @@ impl LogProcessor for BatchLogProcessor { name: "batch_log_processor_emit_error", error = format!("{:?}", err) ); - global::handle_error(LogError::Other(err.into())); } } fn force_flush(&self) -> LogResult<()> { + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "batch_log_processor_force_flush_after_shutdown" + ); + return LogResult::Err(LogError::Other("batch log processor is already shutdown".into())); + } + let (sender, receiver) = mpsc::sync_channel(1); self.sender.try_send(BatchMessage::ForceFlush(sender)) .map_err(|err| LogError::Other(err.into()))?; @@ -211,6 +225,11 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { + // test and set is_shutdown flag is it is not set. + if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) { + return Ok(()); + } + let (sender, receiver) = mpsc::sync_channel(1); self.sender.try_send(BatchMessage::Shutdown(sender)) .map_err(|err| LogError::Other(err.into()))?; @@ -297,7 +316,7 @@ impl BatchLogProcessor { .unwrap_or(OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT); // Return batch processor with link to worker - BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout } + BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout, is_shutdown: AtomicBool::new(false) } } /// Create a new batch processor builder