-
Notifications
You must be signed in to change notification settings - Fork 602
feat: Add experimental concurrent processor for logs #2780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
cijothomas
merged 6 commits into
open-telemetry:main
from
cijothomas:cijothomas/experimental-concurrentprocessor
Mar 11, 2025
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
0369dbe
feat: Add experimental concurrent processor for logs
cijothomas 5871538
Share stress test result for simple
cijothomas a5ef65d
use name SimpleConcurrentProcessor
cijothomas 3fb7d49
rename and handle error
cijothomas 967ea29
Merge branch 'main' into cijothomas/experimental-concurrentprocessor
cijothomas fe5f455
Merge branch 'main' into cijothomas/experimental-concurrentprocessor
cijothomas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| /* | ||
| The benchmark results: | ||
| criterion = "0.5.1" | ||
| Hardware: Apple M4 Pro | ||
| Total Number of Cores: 14 (10 performance and 4 efficiency) | ||
| | Test | Average time| | ||
| |---------------------------------------------|-------------| | ||
| | exporter_disabled_concurrent_processor | 1.9 ns | | ||
| | exporter_disabled_simple_processor | 5.0 ns | | ||
| */ | ||
|
|
||
| use criterion::{criterion_group, criterion_main, Criterion}; | ||
| use opentelemetry::logs::{Logger, LoggerProvider}; | ||
| use opentelemetry_sdk::error::OTelSdkResult; | ||
| use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor; | ||
| use opentelemetry_sdk::logs::{ | ||
| LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, | ||
| }; | ||
| use opentelemetry_sdk::Resource; | ||
| #[cfg(not(target_os = "windows"))] | ||
| use pprof::criterion::{Output, PProfProfiler}; | ||
|
|
||
| #[derive(Debug)] | ||
| struct NoopExporter; | ||
| impl LogExporter for NoopExporter { | ||
| async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn shutdown(&self) -> OTelSdkResult { | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn event_enabled( | ||
| &self, | ||
| _level: opentelemetry::logs::Severity, | ||
| _target: &str, | ||
| _name: Option<&str>, | ||
| ) -> bool { | ||
| false | ||
| } | ||
|
|
||
| fn set_resource(&mut self, _: &Resource) {} | ||
| } | ||
|
|
||
| fn benchmark_exporter_enabled_false<T>(c: &mut Criterion, name: &str, processor: T) | ||
| where | ||
| T: LogProcessor + Send + Sync + 'static, | ||
| { | ||
| let provider = SdkLoggerProvider::builder() | ||
| .with_log_processor(processor) | ||
| .build(); | ||
| let logger = provider.logger("test_logger"); | ||
|
|
||
| c.bench_function(name, |b| { | ||
| b.iter(|| { | ||
| logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name")); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| fn criterion_benchmark(c: &mut Criterion) { | ||
| let processor = SimpleConcurrentLogProcessor::new(NoopExporter); | ||
| benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); | ||
| let simple = SimpleLogProcessor::new(NoopExporter); | ||
| benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); | ||
| } | ||
|
|
||
| #[cfg(not(target_os = "windows"))] | ||
| criterion_group! { | ||
| name = benches; | ||
| config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); | ||
| targets = criterion_benchmark | ||
| } | ||
| #[cfg(target_os = "windows")] | ||
| criterion_group! { | ||
| name = benches; | ||
| config = Criterion::default(); | ||
| targets = criterion_benchmark | ||
| } | ||
| criterion_main!(benches); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| use opentelemetry::{otel_info, InstrumentationScope}; | ||
|
|
||
| use crate::error::OTelSdkResult; | ||
|
|
||
| use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; | ||
|
|
||
| /// A concurrent log processor calls exporter's export method on each emit. This | ||
| /// processor does not buffer logs. Note: This invokes exporter's export method | ||
| /// on the current thread without synchronization. i.e multiple export() calls | ||
| /// can happen simultaneously from different threads. This is not a problem if | ||
| /// the exporter is designed to handle that. As of now, exporters in the | ||
| /// opentelemetry-rust project (stdout/otlp) are not thread-safe. | ||
| /// This is intended to be used when exporting to operating system | ||
| /// tracing facilities like Windows ETW, Linux TracePoints etc. | ||
| #[derive(Debug)] | ||
| pub struct SimpleConcurrentLogProcessor<T: LogExporter> { | ||
| exporter: T, | ||
| } | ||
|
|
||
| impl<T: LogExporter> SimpleConcurrentLogProcessor<T> { | ||
| /// Creates a new `ConcurrentExportProcessor` with the given exporter. | ||
| pub fn new(exporter: T) -> Self { | ||
| Self { exporter } | ||
| } | ||
| } | ||
|
|
||
| impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> { | ||
| fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { | ||
| let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; | ||
| let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); | ||
| if let Err(err) = result { | ||
| otel_info!( | ||
| name: "SimpleConcurrentLogProcessor.Emit.ExportError", | ||
| error = format!("{}",err) | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| fn force_flush(&self) -> OTelSdkResult { | ||
| // TODO: invoke flush on exporter | ||
| // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 | ||
| // is resolved | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn shutdown(&self) -> OTelSdkResult { | ||
| self.exporter.shutdown() | ||
| } | ||
|
|
||
| #[cfg(feature = "spec_unstable_logs_enabled")] | ||
| fn event_enabled( | ||
| &self, | ||
| level: opentelemetry::logs::Severity, | ||
| target: &str, | ||
| name: Option<&str>, | ||
| ) -> bool { | ||
| self.exporter.event_enabled(level, target, name) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why
trueis returned to enable events if the exporter cannot be locked?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Not sure what is the right behavior here.
Since
trueis the default, I opted fortrue. It can be argued that if the lock got poisoned, no logs can be exported anyway, so returning false also is valid.Good to revisit this. I'll probably revisit the need of Mutex lock in SimpleLogProcessor, and use atomics to achieve the goal is ensuring one export() is active at a time. With that, we won't face the lock poison issue.