Skip to content

Commit daa4529

Browse files
committed
test: BatchProcessor stress testing
1 parent 52cd0e9 commit daa4529

File tree

4 files changed

+137
-3
lines changed

4 files changed

+137
-3
lines changed

opentelemetry-sdk/src/logs/export.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ impl LogBatch<'_> {
7777
index: 0,
7878
}
7979
}
80+
81+
/// Returns the number of log records in the batch.
82+
pub fn len(&self) -> usize {
83+
match &self.data {
84+
LogBatchData::SliceOfOwnedData(data) => data.len(),
85+
LogBatchData::SliceOfBorrowedData(data) => data.len(),
86+
}
87+
}
8088
}
8189

8290
struct LogBatchDataIter<'a> {

opentelemetry-sdk/src/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ mod tests {
3737
use crate::Resource;
3838
use opentelemetry::baggage::BaggageExt;
3939
use opentelemetry::logs::LogRecord;
40-
use opentelemetry::logs::{Logger, LoggerProvider, Severity};
40+
use opentelemetry::logs::{Logger, LoggerProvider, Severity, };
4141
use opentelemetry::{logs::AnyValue, Key, KeyValue};
4242
use opentelemetry::{Context, InstrumentationScope};
4343
use std::borrow::Borrow;

stress/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ name = "logs"
2929
path = "src/logs.rs"
3030
doc = false
3131

32+
[[bin]] # Bin to run the logs stress tests with BatchLogProcessor. This is different from existing stress tests.
33+
name = "logs_batch"
34+
path = "src/logs_batch.rs"
35+
doc = false
36+
3237
[[bin]] # Bin to run the traces stress tests
3338
name = "traces"
3439
path = "src/traces.rs"
@@ -44,11 +49,11 @@ ctrlc = { workspace = true }
4449
lazy_static = { workspace = true }
4550
num_cpus = { workspace = true }
4651
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
47-
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] }
52+
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "testing"] }
4853
opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] }
4954
rand = { workspace = true, features = ["small_rng", "os_rng"] }
5055
tracing = { workspace = true, features = ["std"]}
51-
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
56+
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
5257
num-format = { workspace = true }
5358
sysinfo = { workspace = true, optional = true }
5459
futures-executor = { workspace = true }

stress/src/logs_batch.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
2+
use opentelemetry::Key;
3+
use opentelemetry_appender_tracing::layer;
4+
use opentelemetry_sdk::error::OTelSdkResult;
5+
use opentelemetry_sdk::logs::{InMemoryLogExporter, LogBatch, LogExporter, SdkLoggerProvider};
6+
use std::sync::{Arc, Mutex};
7+
use tracing_subscriber::{prelude::*, EnvFilter};
8+
9+
#[derive(Debug)]
10+
struct NoOpExporter {
11+
count: Arc<Mutex<usize>>,
12+
}
13+
14+
impl NoOpExporter {
15+
fn new() -> Self {
16+
NoOpExporter {
17+
count: Arc::new(Mutex::new(0)),
18+
}
19+
}
20+
21+
fn get_count(&self) -> Arc<Mutex<usize>> {
22+
Arc::clone(&self.count)
23+
}
24+
}
25+
26+
impl LogExporter for NoOpExporter {
27+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
28+
let mut count = self.count.lock().unwrap();
29+
*count += batch.len();
30+
// Simulate some processing time
31+
// Varying this will affect the number of logs dropped
32+
// and can be used to test the batch log processor
33+
// in a various scenarios.
34+
std::thread::sleep(std::time::Duration::from_millis(10));
35+
Ok(())
36+
}
37+
}
38+
39+
fn main() {
40+
let total_logs_to_emit = 400_000_000;
41+
let num_threads = 4;
42+
// Setup ability to collect internal logs
43+
let inmemory_exporter = InMemoryLogExporter::default();
44+
let logger_provider_for_internal_logs: SdkLoggerProvider = SdkLoggerProvider::builder()
45+
.with_simple_exporter(inmemory_exporter.clone())
46+
.build();
47+
let filter_otel = EnvFilter::new("warn");
48+
let otel_layer = layer::OpenTelemetryTracingBridge::new(&logger_provider_for_internal_logs)
49+
.with_filter(filter_otel);
50+
51+
tracing_subscriber::registry().with(otel_layer).init();
52+
53+
let exporter = NoOpExporter::new();
54+
let count = exporter.get_count();
55+
{
56+
let logger_provider: SdkLoggerProvider = SdkLoggerProvider::builder()
57+
.with_batch_exporter(exporter)
58+
.build();
59+
let mut handles = vec![];
60+
61+
let logs_per_thread = total_logs_to_emit / num_threads;
62+
63+
for _ in 0..num_threads {
64+
let logger = logger_provider.logger("test_logger");
65+
let handle = std::thread::spawn(move || {
66+
for _ in 0..logs_per_thread {
67+
let mut log_record = logger.create_log_record();
68+
log_record.set_body("test log".into());
69+
logger.emit(log_record);
70+
}
71+
});
72+
handles.push(handle);
73+
}
74+
75+
for handle in handles {
76+
handle.join().unwrap();
77+
}
78+
79+
logger_provider.shutdown().unwrap();
80+
}
81+
82+
let internal_logs = inmemory_exporter
83+
.get_emitted_logs()
84+
.expect("Logs are expected to be exported.");
85+
86+
let dropped_logs_record = internal_logs
87+
.iter()
88+
.find(|log| log.record.event_name() == Some("BatchLogProcessor.LogsDropped"))
89+
.expect("Logs are expected to be exported.");
90+
91+
let dropped_logs_count = dropped_logs_record
92+
.record
93+
.attributes_iter()
94+
.find(|attr| attr.0 == Key::new("dropped_logs_count"))
95+
.expect("Logs are expected to be exported.");
96+
let logs_dropped_by_sdk =
97+
if let opentelemetry::logs::AnyValue::String(value) = &dropped_logs_count.1 {
98+
// parse the string to an integer
99+
value.as_str().parse::<u64>().unwrap()
100+
} else {
101+
0
102+
};
103+
104+
let count_logs_received_by_exporter = *count.lock().unwrap() as u64;
105+
106+
println!("Total logs emitted: {}", total_logs_to_emit);
107+
println!(
108+
"Logs received by exporter: {}",
109+
count_logs_received_by_exporter
110+
);
111+
println!("Dropped logs count: {}", logs_dropped_by_sdk);
112+
113+
if logs_dropped_by_sdk + count_logs_received_by_exporter == total_logs_to_emit {
114+
println!("Success! All logs are accounted for!");
115+
} else {
116+
println!(
117+
"Fail! {} logs are unaccounted for!",
118+
total_logs_to_emit - (logs_dropped_by_sdk + count_logs_received_by_exporter)
119+
);
120+
}
121+
}

0 commit comments

Comments
 (0)