-
Notifications
You must be signed in to change notification settings - Fork 602
feat: Add experimental concurrent processor for logs #2780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
0369dbe
5871538
a5ef65d
3fb7d49
967ea29
fe5f455
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| /* | ||
| The benchmark results: | ||
| criterion = "0.5.1" | ||
| Hardware: Apple M4 Pro | ||
| Total Number of Cores: 14 (10 performance and 4 efficiency) | ||
| | Test | Average time| | ||
| |---------------------------------------------|-------------| | ||
| | exporter_disabled_concurrent_processor | 1.9 ns | | ||
| | exporter_disabled_simple_processor | 5.0 ns | | ||
| */ | ||
|
|
||
| use criterion::{criterion_group, criterion_main, Criterion}; | ||
| use opentelemetry::logs::{Logger, LoggerProvider}; | ||
| use opentelemetry_sdk::error::OTelSdkResult; | ||
| use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentProcessor; | ||
| use opentelemetry_sdk::logs::{ | ||
| LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, | ||
| }; | ||
| use opentelemetry_sdk::Resource; | ||
| #[cfg(not(target_os = "windows"))] | ||
| use pprof::criterion::{Output, PProfProfiler}; | ||
|
|
||
| #[derive(Debug)] | ||
| struct NoopExporter; | ||
| impl LogExporter for NoopExporter { | ||
| async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn shutdown(&self) -> OTelSdkResult { | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn event_enabled( | ||
| &self, | ||
| _level: opentelemetry::logs::Severity, | ||
| _target: &str, | ||
| _name: Option<&str>, | ||
| ) -> bool { | ||
| false | ||
| } | ||
|
|
||
| fn set_resource(&mut self, _: &Resource) {} | ||
| } | ||
|
|
||
| fn benchmark_exporter_enabled_false<T>(c: &mut Criterion, name: &str, processor: T) | ||
| where | ||
| T: LogProcessor + Send + Sync + 'static, | ||
| { | ||
| let provider = SdkLoggerProvider::builder() | ||
| .with_log_processor(processor) | ||
| .build(); | ||
| let logger = provider.logger("test_logger"); | ||
|
|
||
| c.bench_function(name, |b| { | ||
| b.iter(|| { | ||
| logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name")); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| fn criterion_benchmark(c: &mut Criterion) { | ||
| let processor = SimpleConcurrentProcessor::new(NoopExporter); | ||
| benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); | ||
| let simple = SimpleLogProcessor::new(NoopExporter); | ||
| benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); | ||
| } | ||
|
|
||
| #[cfg(not(target_os = "windows"))] | ||
| criterion_group! { | ||
| name = benches; | ||
| config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); | ||
| targets = criterion_benchmark | ||
| } | ||
| #[cfg(target_os = "windows")] | ||
| criterion_group! { | ||
| name = benches; | ||
| config = Criterion::default(); | ||
| targets = criterion_benchmark | ||
| } | ||
| criterion_main!(benches); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| use opentelemetry::InstrumentationScope; | ||
|
|
||
| use crate::error::OTelSdkResult; | ||
|
|
||
| use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; | ||
|
|
||
| /// A concurrent log processor calls exporter's export method on each emit. This | ||
| /// processor does not buffer logs. Note: This invokes exporter's export method | ||
| /// on the current thread without synchronization. i.e multiple export() calls | ||
| /// can happen simultaneously from different threads. This is not a problem if | ||
| /// the exporter is designed to handle that. As of now, exporters in the | ||
| /// opentelemetry-rust project (stdout/otlp) are not thread-safe. | ||
| /// This is intended to be used when exporting to operating system | ||
| /// tracing facilities like Windows ETW, Linux TracePoints etc. | ||
| #[derive(Debug)] | ||
| pub struct SimpleConcurrentProcessor<T: LogExporter> { | ||
| exporter: T, | ||
| } | ||
|
|
||
| impl<T: LogExporter> SimpleConcurrentProcessor<T> { | ||
| /// Creates a new `ConcurrentExportProcessor` with the given exporter. | ||
| pub fn new(exporter: T) -> Self { | ||
| Self { exporter } | ||
| } | ||
| } | ||
|
|
||
| impl<T: LogExporter> LogProcessor for SimpleConcurrentProcessor<T> { | ||
| fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { | ||
| let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; | ||
| let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); | ||
cijothomas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| fn force_flush(&self) -> OTelSdkResult { | ||
| // TODO: invoke flush on exporter | ||
| // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 | ||
| // is resolved | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn shutdown(&self) -> OTelSdkResult { | ||
| self.exporter.shutdown() | ||
| } | ||
|
|
||
| #[cfg(feature = "spec_unstable_logs_enabled")] | ||
| fn event_enabled( | ||
| &self, | ||
| level: opentelemetry::logs::Severity, | ||
| target: &str, | ||
| name: Option<&str>, | ||
| ) -> bool { | ||
| self.exporter.event_enabled(level, target, name) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,7 +65,8 @@ | |
| } | ||
|
|
||
| impl<T: LogExporter> SimpleLogProcessor<T> { | ||
| pub(crate) fn new(exporter: T) -> Self { | ||
| /// Creates a new instance of `SimpleLogProcessor`. | ||
| pub fn new(exporter: T) -> Self { | ||
| SimpleLogProcessor { | ||
| exporter: Mutex::new(exporter), | ||
| is_shutdown: AtomicBool::new(false), | ||
|
|
@@ -131,6 +132,20 @@ | |
| exporter.set_resource(resource); | ||
| } | ||
| } | ||
|
|
||
| #[cfg(feature = "spec_unstable_logs_enabled")] | ||
| fn event_enabled( | ||
| &self, | ||
| level: opentelemetry::logs::Severity, | ||
| target: &str, | ||
| name: Option<&str>, | ||
| ) -> bool { | ||
| if let Ok(exporter) = self.exporter.lock() { | ||
| exporter.event_enabled(level, target, name) | ||
| } else { | ||
| true | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering why
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Not sure what is the right behavior here. Good to revisit this. I'll probably revisit the need of Mutex lock in SimpleLogProcessor, and use atomics to achieve the goal is ensuring one export() is active at a time. With that, we won't face the lock poison issue. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(all(test, feature = "testing", feature = "logs"))] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.