Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 87 additions & 6 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
//! # OpenTelemetry Log Processor Interface
//!
//! The `LogProcessor` interface provides hooks for log record processing and
//! exporting. Log processors receive `LogRecord`s emitted by the SDK's
//! `Logger` and determine how these records are handled.
//!
//! Built-in log processors are responsible for converting logs to exportable
//! representations and passing them to configured exporters. They can be
//! registered directly with a `LoggerProvider`.
//!
//! ## Types of Log Processors
//!
//! - **SimpleLogProcessor**: Forwards log records to the exporter immediately
//! after they are emitted. This processor is **synchronous** and is designed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure of the wording "synchronous" as it may not be understood.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll want to mention something closely related to this -
Exporter's Export() method is invoked in the same thread as the log is emitted. If exporter is OTLP/tonic, then this can work only if the provider was constructed in a tokio context?

//! for debugging or testing purposes. It is **not suitable for production**
//! environments due to its lack of batching, performance optimizations, or support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last part(support for high throughput) is redundant and maybe omitted.

//! for high-throughput scenarios.
//!
//! - **BatchLogProcessor**: Buffers log records and sends them to the exporter
//! in batches. This processor is designed for **production use** in high-throughput
//! applications and reduces the overhead of frequent exports by using a background
//! thread for batch processing.
//!
//! ## Diagram
//!
//! ```ascii
//! +-----+---------------+ +-----------------------+ +-------------------+
//! | | | | | | |
//! | SDK | Logger.emit() +---> (Simple)LogProcessor +---> LogExporter |
//! | | | | (Batch)LogProcessor +---> (OTLPExporter) |
//! +-----+---------------+ +-----------------------+ +-------------------+
//! ```

use crate::{
export::logs::{ExportResult, LogBatch, LogExporter},
logs::{LogError, LogRecord, LogResult},
Expand Down Expand Up @@ -70,10 +103,24 @@ pub trait LogProcessor: Send + Sync + Debug {
fn set_resource(&self, _resource: &Resource) {}
}

/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
/// as they are emitted, without any batching. This is typically useful for
/// debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchLogProcessor].
/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
/// exports log records as they are emitted.
/// ## Example
///
/// ### Using a SimpleLogProcessor
///
/// ```rust
/// use opentelemetry_sdk::logs::{SimpleLogProcessor, LoggerProvider};
/// use opentelemetry::global;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

global is not required?

/// use opentelemetry_sdk::export::logs::LogExporter;
/// use opentelemetry_sdk::testing::logs::InMemoryLogExporter;
///
/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
/// let provider = LoggerProvider::builder()
/// .with_simple_exporter(exporter)
/// .build();
///
/// ```
#[derive(Debug)]
pub struct SimpleLogProcessor<T: LogExporter> {
exporter: Mutex<T>,
Expand Down Expand Up @@ -162,8 +209,42 @@ enum BatchMessage {
SetResource(Arc<Resource>),
}

/// A [`LogProcessor`] that buffers log records and reports
/// them at a pre-configured interval from a dedicated background thread.
/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
/// in batches to the configured `LogExporter`. This processor is ideal for
/// high-throughput environments, as it minimizes the overhead of exporting logs
/// individually. It uses a **dedicated background thread** to manage and export logs
/// asynchronously, ensuring that the application's main execution flow is not blocked.
///
/// - This processor supports the following configurations:
/// - **Queue size**: Maximum number of log records that can be buffered.
/// - **Batch size**: Maximum number of log records to include in a single export.
/// - **Export timeout**: Maximum duration allowed for an export operation.
/// - **Scheduled delay**: Frequency at which the batch is exported.
///
/// ### Using a BatchLogProcessor:
///
/// ```rust
/// use opentelemetry_sdk::logs::{BatchLogProcessor, BatchConfigBuilder, LoggerProvider};
/// use opentelemetry::global;
/// use std::time::Duration;
/// use opentelemetry_sdk::testing::logs::InMemoryLogExporter;
///
/// let exporter = InMemoryLogExporter::default(); // Replace with an actual exporter
/// let processor = BatchLogProcessor::builder(exporter)
/// .with_batch_config(
/// BatchConfigBuilder::default()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we really need BatchConfigBuilder ? Can we make them part of the BatchLogProcessorBuilder itself?
Not for this PR!

/// .with_max_queue_size(2048)
/// .with_max_export_batch_size(512)
/// .with_scheduled_delay(Duration::from_secs(5))
/// .with_max_export_timeout(Duration::from_secs(30))
/// .build(),
/// )
/// .build();
///
/// let provider = LoggerProvider::builder()
/// .with_log_processor(processor)
/// .build();
///
pub struct BatchLogProcessor {
message_sender: SyncSender<BatchMessage>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
Expand Down
Loading