Skip to content

Commit 0369dbe

Browse files
committed
feat: Add experimental concurrent processor for logs
1 parent 68c9133 commit 0369dbe

File tree

7 files changed

+175
-32
lines changed

7 files changed

+175
-32
lines changed

opentelemetry-sdk/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ internal-logs = ["tracing"]
5656
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
5757
spec_unstable_metrics_views = ["metrics"]
5858
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
59+
experimental_logs_concurrent_log_processor = ["logs"]
5960
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]
6061
experimental_metrics_disable_name_validation = ["metrics"]
6162

@@ -88,6 +89,11 @@ harness = false
8889
name = "log_processor"
8990
harness = false
9091

92+
[[bench]]
93+
name = "log_enabled"
94+
harness = false
95+
required-features = ["spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"]
96+
9197
[[bench]]
9298
name = "tracer_creation"
9399
harness = false
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
The benchmark results:
3+
criterion = "0.5.1"
4+
Hardware: Apple M4 Pro
5+
Total Number of Cores:   14 (10 performance and 4 efficiency)
6+
| Test | Average time|
7+
|---------------------------------------------|-------------|
8+
| exporter_disabled_concurrent_processor | 1.9 ns |
9+
| exporter_disabled_simple_processor | 5.0 ns |
10+
*/
11+
12+
use criterion::{criterion_group, criterion_main, Criterion};
13+
use opentelemetry::logs::{Logger, LoggerProvider};
14+
use opentelemetry_sdk::error::OTelSdkResult;
15+
use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor;
16+
use opentelemetry_sdk::logs::{
17+
LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor,
18+
};
19+
use opentelemetry_sdk::Resource;
20+
#[cfg(not(target_os = "windows"))]
21+
use pprof::criterion::{Output, PProfProfiler};
22+
23+
#[derive(Debug)]
24+
struct NoopExporter;
25+
impl LogExporter for NoopExporter {
26+
async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult {
27+
Ok(())
28+
}
29+
30+
fn shutdown(&self) -> OTelSdkResult {
31+
Ok(())
32+
}
33+
34+
fn event_enabled(
35+
&self,
36+
_level: opentelemetry::logs::Severity,
37+
_target: &str,
38+
_name: Option<&str>,
39+
) -> bool {
40+
false
41+
}
42+
43+
fn set_resource(&mut self, _: &Resource) {}
44+
}
45+
46+
fn benchmark_exporter_enabled_false<T>(c: &mut Criterion, name: &str, processor: T)
47+
where
48+
T: LogProcessor + Send + Sync + 'static,
49+
{
50+
let provider = SdkLoggerProvider::builder()
51+
.with_log_processor(processor)
52+
.build();
53+
let logger = provider.logger("test_logger");
54+
55+
c.bench_function(name, |b| {
56+
b.iter(|| {
57+
logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name"));
58+
});
59+
});
60+
}
61+
62+
fn criterion_benchmark(c: &mut Criterion) {
63+
let processor = ConcurrentExportProcessor::new(NoopExporter);
64+
benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor);
65+
let simple = SimpleLogProcessor::new(NoopExporter);
66+
benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple);
67+
}
68+
69+
#[cfg(not(target_os = "windows"))]
70+
criterion_group! {
71+
name = benches;
72+
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
73+
targets = criterion_benchmark
74+
}
75+
#[cfg(target_os = "windows")]
76+
criterion_group! {
77+
name = benches;
78+
config = Criterion::default();
79+
targets = criterion_benchmark
80+
}
81+
criterion_main!(benches);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use opentelemetry::InstrumentationScope;
2+
3+
use crate::error::OTelSdkResult;
4+
5+
use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord};
6+
7+
/// A concurrent log processor calls exporter's export method on each emit. This
8+
/// processor does not buffer logs. Note: This invokes exporter's export method
9+
/// on the current thread without synchronization. i.e multiple export() calls
10+
/// can happen simultaneously from different threads. This is not a problem if
11+
/// the exporter is designed to handle that. As of now, exporters in the
12+
/// opentelemetry-rust project (stdout/otlp) are not thread-safe.
13+
/// This is intended to be used when exporting to operating system
14+
/// tracing facilities like Windows ETW, Linux TracePoints etc.
15+
#[derive(Debug)]
16+
pub struct ConcurrentExportProcessor<T: LogExporter> {
17+
exporter: T,
18+
}
19+
20+
impl<T: LogExporter> ConcurrentExportProcessor<T> {
21+
/// Creates a new `ConcurrentExportProcessor` with the given exporter.
22+
pub fn new(exporter: T) -> Self {
23+
Self { exporter }
24+
}
25+
}
26+
27+
impl<T: LogExporter> LogProcessor for ConcurrentExportProcessor<T> {
28+
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
29+
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
30+
let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
31+
}
32+
33+
fn force_flush(&self) -> OTelSdkResult {
34+
// TODO: invoke flush on exporter
35+
// once https://github.com/open-telemetry/opentelemetry-rust/issues/2261
36+
// is resolved
37+
Ok(())
38+
}
39+
40+
fn shutdown(&self) -> OTelSdkResult {
41+
self.exporter.shutdown()
42+
}
43+
44+
#[cfg(feature = "spec_unstable_logs_enabled")]
45+
fn event_enabled(
46+
&self,
47+
level: opentelemetry::logs::Severity,
48+
target: &str,
49+
name: Option<&str>,
50+
) -> bool {
51+
self.exporter.event_enabled(level, target, name)
52+
}
53+
}

opentelemetry-sdk/src/logs/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ pub use logger_provider::{LoggerProviderBuilder, SdkLoggerProvider};
2727
pub use record::{SdkLogRecord, TraceContext};
2828
pub use simple_log_processor::SimpleLogProcessor;
2929

30+
#[cfg(feature = "experimental_logs_concurrent_log_processor")]
31+
/// Module for ConcurrentLogProcessor.
32+
pub mod concurrent_log_processor;
33+
3034
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
3135
/// Module for BatchLogProcessor with async runtime.
3236
pub mod log_processor_with_async_runtime;

opentelemetry-sdk/src/logs/simple_log_processor.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ pub struct SimpleLogProcessor<T: LogExporter> {
6565
}
6666

6767
impl<T: LogExporter> SimpleLogProcessor<T> {
68-
pub(crate) fn new(exporter: T) -> Self {
68+
/// Creates a new instance of `SimpleLogProcessor`.
69+
pub fn new(exporter: T) -> Self {
6970
SimpleLogProcessor {
7071
exporter: Mutex::new(exporter),
7172
is_shutdown: AtomicBool::new(false),
@@ -131,6 +132,20 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
131132
exporter.set_resource(resource);
132133
}
133134
}
135+
136+
#[cfg(feature = "spec_unstable_logs_enabled")]
137+
fn event_enabled(
138+
&self,
139+
level: opentelemetry::logs::Severity,
140+
target: &str,
141+
name: Option<&str>,
142+
) -> bool {
143+
if let Ok(exporter) = self.exporter.lock() {
144+
exporter.event_enabled(level, target, name)
145+
} else {
146+
true
147+
}
148+
}
134149
}
135150

136151
#[cfg(all(test, feature = "testing", feature = "logs"))]

stress/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ ctrlc = { workspace = true }
4444
lazy_static = { workspace = true }
4545
num_cpus = { workspace = true }
4646
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
47-
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
47+
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] }
4848
opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] }
4949
rand = { workspace = true, features = ["small_rng", "os_rng"] }
5050
tracing = { workspace = true, features = ["std"]}

stress/src/logs.rs

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,44 +13,29 @@
1313
~50 M/sec
1414
~1.1 B/sec (when disabled)
1515
*/
16-
17-
use opentelemetry::InstrumentationScope;
1816
use opentelemetry_appender_tracing::layer;
1917
use opentelemetry_sdk::error::OTelSdkResult;
18+
use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor;
19+
use opentelemetry_sdk::logs::SdkLoggerProvider;
2020
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
21-
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider};
2221

22+
use opentelemetry_sdk::Resource;
2323
use tracing::error;
2424
use tracing_subscriber::prelude::*;
2525

2626
mod throughput;
2727

28-
#[derive(Debug, Clone)]
29-
struct MockLogExporter;
30-
31-
impl LogExporter for MockLogExporter {
32-
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
33-
Ok(())
34-
}
35-
}
36-
3728
#[derive(Debug)]
38-
pub struct MockLogProcessor {
39-
exporter: MockLogExporter,
29+
struct NoopExporter {
4030
enabled: bool,
4131
}
42-
43-
impl LogProcessor for MockLogProcessor {
44-
fn emit(
45-
&self,
46-
record: &mut opentelemetry_sdk::logs::SdkLogRecord,
47-
scope: &InstrumentationScope,
48-
) {
49-
let log_tuple = &[(record as &SdkLogRecord, scope)];
50-
let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
32+
impl NoopExporter {
33+
fn new(enabled: bool) -> Self {
34+
Self { enabled }
5135
}
52-
53-
fn force_flush(&self) -> OTelSdkResult {
36+
}
37+
impl LogExporter for NoopExporter {
38+
async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult {
5439
Ok(())
5540
}
5641

@@ -66,18 +51,17 @@ impl LogProcessor for MockLogProcessor {
6651
) -> bool {
6752
self.enabled
6853
}
54+
55+
fn set_resource(&mut self, _: &Resource) {}
6956
}
7057

7158
fn main() {
7259
// change this to false to test the throughput when enabled is false.
73-
let enabled = true;
60+
let enabled = false;
7461

7562
// LoggerProvider with a no-op processor.
7663
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
77-
.with_log_processor(MockLogProcessor {
78-
exporter: MockLogExporter {},
79-
enabled,
80-
})
64+
.with_log_processor(ConcurrentExportProcessor::new(NoopExporter::new(enabled)))
8165
.build();
8266

8367
// Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.

0 commit comments

Comments
 (0)