diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index dc5bacc813..c2fda41b52 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -16,7 +16,7 @@ async fn main() { let exporter = LogExporter::default(); //Create a LoggerProvider and register the exporter let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .with_log_processor(BatchLogProcessor::builder(exporter).build()) .build(); // Setup Log Appender for the log crate. diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 0287516c34..ffe127a1b8 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -296,7 +296,7 @@ mod tests { async fn batch_processor_no_deadlock() { let exporter: ReentrantLogExporter = ReentrantLogExporter; let logger_provider = LoggerProvider::builder() - .with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio) + .with_batch_exporter(exporter.clone()) .build(); let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 6cecbbd3d5..9d171552f7 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -50,7 +50,7 @@ fn init_logs() -> Result { Ok(LoggerProvider::builder() .with_resource(RESOURCE.clone()) - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .build()) } diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index bb643cb095..9be62f643d 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -6,7 +6,9 @@ use async_trait::async_trait; use opentelemetry::otel_debug; use std::fmt::Debug; +use opentelemetry::logs::LogError; use opentelemetry_sdk::logs::LogResult; +use opentelemetry_sdk::Resource; use opentelemetry_sdk::export::logs::LogBatch; @@ -134,4 +136,4 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { self.client.set_resource(resource); } -} +} \ No newline at end of file diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 86448e5c51..62cab7ad74 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -6,7 +6,7 @@ use opentelemetry::KeyValue; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::{LogError, LoggerProvider}; -use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; +use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::error::Error; use std::fs::File; use std::os::unix::fs::MetadataExt; diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index ae66c49bfe..45b83a9503 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,5 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; +use crate::{export::logs::LogExporter, Resource}; use crate::{logs::LogError, logs::LogResult}; use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope}; @@ -194,12 +194,11 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter( + pub fn with_batch_exporter( self, exporter: T, - runtime: R, ) -> Self { - let batch = BatchLogProcessor::builder(exporter, runtime).build(); + let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6d0bca8040..f03b10194b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,13 +1,13 @@ use crate::{ export::logs::{ExportResult, LogBatch, LogExporter}, logs::{LogError, LogRecord, LogResult}, - runtime::{RuntimeChannel, TrySend}, Resource, }; +use std::sync::mpsc::{self, SyncSender, RecvTimeoutError}; use futures_channel::oneshot; use futures_util::{ - future::{self, Either}, - {pin_mut, stream, StreamExt as _}, + // future::{self, Either}, + {pin_mut, /*stream, StreamExt as _*/}, }; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; @@ -20,7 +20,9 @@ use std::{ str::FromStr, sync::Arc, time::Duration, + time::Instant, }; +use std::thread; /// Delay interval between two consecutive exports. const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; @@ -39,6 +41,14 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Default timeout for forceflush and shutdown. +const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1); +const OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); + +/// environment variable name for forceflush and shutdown timeout. +const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; +const OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; + /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -150,33 +160,49 @@ impl LogProcessor for SimpleLogProcessor { } } -/// A [`LogProcessor`] that asynchronously buffers log records and reports -/// them at a pre-configured interval. -pub struct BatchLogProcessor { - message_sender: R::Sender, - +/// A [`LogProcessor`] that buffers log records and reports them at a pre-configured interval. +pub struct BatchLogProcessor { + sender: SyncSender, + handle: Mutex>>, + forceflush_timeout: Duration, + shutdown_timeout: Duration, + is_shutdown: AtomicBool, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, - // Track the maximum queue size that was configured for this processor max_queue_size: usize, } -impl Debug for BatchLogProcessor { +impl Debug for BatchLogProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BatchLogProcessor") - .field("message_sender", &self.message_sender) + .field("sender", &self.sender) .finish() } } -impl LogProcessor for BatchLogProcessor { +impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - let result = self.message_sender.try_send(BatchMessage::ExportLog(( + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "batch_log_processor_emit_after_shutdown" + ); + return; + } + + let result = self.sender.send(BatchMessage::ExportLog(( record.clone(), instrumentation.clone(), ))); + if let Err(err) = result { + otel_error!( + name: "batch_log_processor_emit_error", + error = format!("{:?}", err) + ); + } + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { // Increment dropped logs count. The first time we have to drop a log, @@ -189,17 +215,32 @@ impl LogProcessor for BatchLogProcessor { } fn force_flush(&self) -> LogResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| LogError::Other(err.into()))?; + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "batch_log_processor_force_flush_after_shutdown" + ); + return LogResult::Err(LogError::Other("batch log processor is already shutdown".into())); + } + + let (sender, receiver) = mpsc::sync_channel(1); + self.sender.try_send(BatchMessage::ForceFlush(sender)) + .map_err(|err| LogError::Other(err.into()))?; - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + receiver.recv_timeout(self.forceflush_timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })? } fn shutdown(&self) -> LogResult<()> { + // test and set is_shutdown flag is it is not set. + if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) { + return Ok(()); + } + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -216,136 +257,143 @@ impl LogProcessor for BatchLogProcessor { .try_send(BatchMessage::Shutdown(res_sender)) .map_err(|err| LogError::Other(err.into()))?; - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + let (sender, receiver) = mpsc::sync_channel(1); + self.sender.try_send(BatchMessage::Shutdown(sender)) + .map_err(|err| LogError::Other(err.into()))?; + + receiver.recv_timeout(self.shutdown_timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.shutdown_timeout) + } else { + LogError::Other(err.into()) + } + })??; + + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().unwrap(); + } + LogResult::Ok(()) } - fn set_resource(&self, resource: &Resource) { - let resource = Arc::new(resource.clone()); - let _ = self - .message_sender - .try_send(BatchMessage::SetResource(resource)); + fn set_resource(&self, _resource: &Resource) { + let _result = self.sender.send(BatchMessage::SetResource(Arc::new(_resource.clone()))); } } -impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - let inner_runtime = runtime.clone(); - let max_queue_size = config.max_queue_size; - - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); +impl BatchLogProcessor { + pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { + let (sender, receiver) = mpsc::sync_channel(config.max_queue_size); + let handle = thread::spawn(move || { + let mut last_export_time = Instant::now(); let mut logs = Vec::new(); - let mut messages = Box::pin(stream::select(message_receiver, ticker)); - - while let Some(message) = messages.next().await { - match message { - // Log has finished, add to buffer of pending logs. - BatchMessage::ExportLog(log) => { - logs.push(log); - if logs.len() == config.max_export_batch_size { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Export.Error", - error = format!("{}", err) - ); - } + logs.reserve(config.max_export_batch_size); + + loop { + let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + + match receiver.recv_timeout(remaining_time) { + Ok(BatchMessage::ExportLog(data)) => { + logs.push(data); + + if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay { + last_export_time = Instant::now(); + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + + // if let Err(err) = result { + // otel_error!( + // name: "BatchLogProcessor.Export.Error", + // error = format!("{}", err) + // ); + // } } } - // Log batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush(res_channel) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Some(channel) = res_channel { - if let Err(send_error) = channel.send(result) { - otel_debug!( - name: "BatchLogProcessor.Flush.SendResultError", - error = format!("{:?}", send_error), - ); - } + Ok(BatchMessage::ForceFlush(_sender)) => { + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + + match _sender.send(Ok(())) { + Ok(_) => {} + Err(err) => global::handle_error(LogError::Other(err.into())), } } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - + Ok(BatchMessage::Shutdown(_sender)) => { + export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); exporter.shutdown(); - - if let Err(send_error) = ch.send(result) { - otel_debug!( - name: "BatchLogProcessor.Shutdown.SendResultError", - error = format!("{:?}", send_error), - ); + match _sender.send(Ok(())) { + Ok(_) => {} + Err(err) => global::handle_error(LogError::Other(err.into())), } break; } - // propagate the resource - BatchMessage::SetResource(resource) => { + Ok(BatchMessage::SetResource(resource)) => { exporter.set_resource(&resource); } + Err(RecvTimeoutError::Timeout) => { + export_with_timeout_sync(config.max_export_timeout, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + } + Err(err) => global::handle_error(LogError::Other(err.into())), } } - })); + }); + + let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT); + let shutdown_timeout = env::var(OTEL_LOGS_SHUTDOWN_TIMEOUT_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(OTEL_LOGS_DEFAULT_SHUTDOWN_TIMEOUT); // Return batch processor with link to worker - BatchLogProcessor { - message_sender, - dropped_logs_count: AtomicUsize::new(0), - max_queue_size, - } + BatchLogProcessor { sender, handle: Mutex::new(Some(handle)), forceflush_timeout, shutdown_timeout, is_shutdown: AtomicBool::new(false) } } /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorBuilder + pub fn builder(exporter: E) -> BatchLogProcessorBuilder where E: LogExporter, { BatchLogProcessorBuilder { exporter, config: Default::default(), - runtime, } } } -async fn export_with_timeout( - time_out: Duration, +fn export_with_timeout_sync( + timeout: Duration, + exporter: &mut E, + batch: Vec<(LogRecord, InstrumentationScope)>, + last_export_time: &mut Instant, +) +where + E: LogExporter + ?Sized, +{ + *last_export_time = Instant::now(); + + if batch.is_empty() { + return (); + } + + let export = export_with_timeout(timeout, exporter, batch); + let result = futures_executor::block_on(export); + // batch.clear(); + match result { + Ok(_) => {} + Err(err) => global::handle_error(err), + }; +} + +async fn export_with_timeout( + _time_out: Duration, exporter: &mut E, - runtime: &R, batch: Vec<(LogRecord, InstrumentationScope)>, ) -> ExportResult where - R: RuntimeChannel, E: LogExporter + ?Sized, { if batch.is_empty() { @@ -357,14 +405,15 @@ where .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); - let export = exporter.export(LogBatch::new(log_vec.as_slice())); - let timeout = runtime.delay(time_out); - pin_mut!(export); - pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), - } + let _export = exporter.export(LogBatch::new(log_vec.as_slice())); + // let timeout = runtime.delay(time_out); + pin_mut!(_export); + // pin_mut!(timeout); + // match future::select(export, export).await { + // Either::Left((export_res, _)) => export_res, + // Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), + // } + ExportResult::Ok(()) } /// Batch log processor configuration. @@ -510,16 +559,14 @@ impl BatchConfigBuilder { /// A builder for creating [`BatchLogProcessor`] instances. /// #[derive(Debug)] -pub struct BatchLogProcessorBuilder { +pub struct BatchLogProcessorBuilder { exporter: E, config: BatchConfig, - runtime: R, } -impl BatchLogProcessorBuilder +impl BatchLogProcessorBuilder where E: LogExporter + 'static, - R: RuntimeChannel, { /// Set the BatchConfig for [`BatchLogProcessorBuilder`] pub fn with_batch_config(self, config: BatchConfig) -> Self { @@ -527,8 +574,8 @@ where } /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) + pub fn build(self) -> BatchLogProcessor { + BatchLogProcessor::new(Box::new(self.exporter), self.config) } } @@ -538,11 +585,13 @@ where enum BatchMessage { /// Export logs, usually called when the log is emitted. ExportLog((LogRecord, InstrumentationScope)), + /// ForceFlush flush the current buffer to the backend + ForceFlush(mpsc::SyncSender), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), + Shutdown(mpsc::SyncSender), /// Set the resource for the exporter. SetResource(Arc), } @@ -565,7 +614,6 @@ mod tests { }, BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, }, - runtime, testing::logs::InMemoryLogExporter, Resource, }; @@ -713,7 +761,7 @@ mod tests { ]; temp_env::with_vars(env_vars.clone(), || { let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -734,7 +782,7 @@ mod tests { temp_env::with_vars(env_vars, || { let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); @@ -749,7 +797,7 @@ mod tests { .with_max_queue_size(4) .build(); - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio) + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()) .with_batch_config(expected); let actual = &builder.config; @@ -786,7 +834,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let provider = LoggerProvider::builder() .with_log_processor(processor) @@ -813,7 +860,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let mut record = LogRecord::default(); @@ -858,7 +904,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, + // runtime::Tokio, ); // @@ -873,7 +919,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, + // runtime::TokioCurrentThread, ); processor.shutdown().unwrap(); @@ -885,7 +931,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, + // runtime::Tokio, ); processor.shutdown().unwrap(); @@ -897,7 +943,7 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, + // runtime::TokioCurrentThread, ); processor.shutdown().unwrap();