Skip to content

Commit 06e69dc

Browse files
committed
review comments, move test to sdk itself
1 parent 3b8929f commit 06e69dc

File tree

5 files changed

+122
-62
lines changed

5 files changed

+122
-62
lines changed

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -327,56 +327,6 @@ mod tests {
327327
}
328328
}
329329

330-
#[test]
331-
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests and hence ignored in CI."]
332-
fn simple_processor_deadlock() {
333-
// TODO: This test maybe better suited in the opentelemetry-sdk crate tests
334-
let exporter: ReentrantLogExporter = ReentrantLogExporter;
335-
let logger_provider = SdkLoggerProvider::builder()
336-
.with_simple_exporter(exporter)
337-
.build();
338-
339-
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
340-
tracing_subscriber::registry().with(layer).init();
341-
342-
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
343-
}
344-
345-
#[test]
346-
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
347-
fn simple_processor_no_deadlock() {
348-
let exporter: ReentrantLogExporter = ReentrantLogExporter;
349-
let logger_provider = SdkLoggerProvider::builder()
350-
.with_simple_exporter(exporter.clone())
351-
.build();
352-
353-
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
354-
355-
// This filter will prevent the deadlock as the reentrant log will be
356-
// ignored.
357-
let filter = EnvFilter::new("debug").add_directive("reentrant=error".parse().unwrap());
358-
// Setting subscriber as global as that is the only way to test this scenario.
359-
tracing_subscriber::registry()
360-
.with(filter)
361-
.with(layer)
362-
.init();
363-
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
364-
}
365-
366-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
367-
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
368-
async fn batch_processor_no_deadlock() {
369-
let exporter: ReentrantLogExporter = ReentrantLogExporter;
370-
let logger_provider = SdkLoggerProvider::builder()
371-
.with_batch_exporter(exporter.clone())
372-
.build();
373-
374-
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
375-
376-
tracing_subscriber::registry().with(layer).init();
377-
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
378-
}
379-
380330
#[test]
381331
fn tracing_appender_standalone() {
382332
// Arrange

opentelemetry-sdk/CHANGELOG.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33
## vNext
44

5-
`SdkLogger` modified to respect telemetry suppression based on `Context`. In
6-
other words, if the current context has telemetry suppression enabled, then logs
7-
will be ignored. The flag is typically set by OTel components to prevent
8-
telemetry from itself being fed back into OTel. `BatchLogProcessor`,
9-
`BatchSpanProcessor`, and `PeriodicReader` modified to set the suppression flag
10-
in their dedicated thread, so that telemetry generated from those threads will
11-
not be fed back into OTel. Similarly, `SimpleSpanProcessor` and
12-
`SimpleLogProcessor` also modified to suppress telemetry before invoking
13-
exporters.
5+
`SdkLogger` modified to respect telemetry suppression based on `Context`. A
6+
similar change is done for Tracing signal too, by returning `Sampling::Drop`
7+
when telemetry is suppressed. In other words, if the current context has
8+
telemetry suppression enabled, then logs/spans will be ignored. The flag is
9+
typically set by OTel components to prevent telemetry from itself being fed back
10+
into OTel. `BatchLogProcessor`, `BatchSpanProcessor`, and `PeriodicReader`
11+
modified to set the suppression flag in their dedicated thread, so that
12+
telemetry generated from those threads will not be fed back into OTel.
13+
Similarly, `SimpleLogProcessor` also modified to suppress telemetry before
14+
invoking exporters.
1415

1516
## 0.29.0
1617

opentelemetry-sdk/src/logs/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub mod log_processor_with_async_runtime;
3636
#[cfg(all(test, feature = "testing"))]
3737
mod tests {
3838
use super::*;
39+
use crate::error::OTelSdkResult;
3940
use crate::Resource;
4041
use opentelemetry::baggage::BaggageExt;
4142
use opentelemetry::logs::LogRecord;
@@ -44,6 +45,7 @@ mod tests {
4445
use opentelemetry::{Context, InstrumentationScope};
4546
use std::borrow::Borrow;
4647
use std::collections::HashMap;
48+
use std::sync::{Arc, Mutex};
4749

4850
#[test]
4951
fn logging_sdk_test() {
@@ -239,4 +241,59 @@ mod tests {
239241
"There should be a no logs as log emission is done inside a suppressed context"
240242
);
241243
}
244+
245+
#[derive(Debug, Clone)]
246+
struct ReentrantLogProcessor {
247+
logger: Arc<Mutex<Option<SdkLogger>>>,
248+
}
249+
250+
impl ReentrantLogProcessor {
251+
fn new() -> Self {
252+
Self {
253+
logger: Arc::new(Mutex::new(None)),
254+
}
255+
}
256+
257+
fn set_logger(&self, logger: SdkLogger) {
258+
let mut guard = self.logger.lock().unwrap();
259+
*guard = Some(logger);
260+
}
261+
}
262+
263+
impl LogProcessor for ReentrantLogProcessor {
264+
fn emit(&self, _data: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
265+
let _suppress = Context::enter_telemetry_suppressed_scope();
266+
// Without the suppression above, the logger.emit(log_record) below will cause a deadlock,
267+
// as it emits another log, which will attempt to acquire the same lock that is
268+
// already held by itself!
269+
let logger = self.logger.lock().unwrap();
270+
if let Some(logger) = logger.as_ref() {
271+
let mut log_record = logger.create_log_record();
272+
log_record.set_severity_number(Severity::Error);
273+
logger.emit(log_record);
274+
}
275+
}
276+
277+
fn force_flush(&self) -> OTelSdkResult {
278+
Ok(())
279+
}
280+
281+
fn shutdown(&self) -> OTelSdkResult {
282+
Ok(())
283+
}
284+
}
285+
286+
#[test]
287+
fn log_does_not_deadlock_when_suppression_enabled() {
288+
let processor: ReentrantLogProcessor = ReentrantLogProcessor::new();
289+
let logger_provider = SdkLoggerProvider::builder()
290+
.with_log_processor(processor.clone())
291+
.build();
292+
processor.set_logger(logger_provider.logger("processor-logger"));
293+
294+
let logger = logger_provider.logger("test-logger");
295+
let mut log_record = logger.create_log_record();
296+
log_record.set_severity_number(Severity::Error);
297+
logger.emit(log_record);
298+
}
242299
}

opentelemetry-sdk/src/logs/simple_log_processor.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,13 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
153153
#[cfg(all(test, feature = "testing", feature = "logs"))]
154154
mod tests {
155155
use crate::logs::log_processor::tests::MockLogExporter;
156-
use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
156+
use crate::logs::{LogBatch, LogExporter, SdkLogRecord, SdkLogger};
157157
use crate::{
158158
error::OTelSdkResult,
159159
logs::{InMemoryLogExporterBuilder, LogProcessor, SdkLoggerProvider, SimpleLogProcessor},
160160
Resource,
161161
};
162+
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider};
162163
use opentelemetry::InstrumentationScope;
163164
use opentelemetry::KeyValue;
164165
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -439,4 +440,56 @@ mod tests {
439440

440441
assert_eq!(exporter.len(), 1);
441442
}
443+
444+
#[derive(Debug, Clone)]
445+
struct ReentrantLogExporter {
446+
logger: Arc<Mutex<Option<SdkLogger>>>,
447+
}
448+
449+
impl ReentrantLogExporter {
450+
fn new() -> Self {
451+
Self {
452+
logger: Arc::new(Mutex::new(None)),
453+
}
454+
}
455+
456+
fn set_logger(&self, logger: SdkLogger) {
457+
let mut guard = self.logger.lock().unwrap();
458+
*guard = Some(logger);
459+
}
460+
}
461+
462+
impl LogExporter for ReentrantLogExporter {
463+
fn shutdown(&self) -> OTelSdkResult {
464+
Ok(())
465+
}
466+
467+
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
468+
let logger = self.logger.lock().unwrap();
469+
if let Some(logger) = logger.as_ref() {
470+
let mut log_record = logger.create_log_record();
471+
log_record.set_severity_number(opentelemetry::logs::Severity::Error);
472+
logger.emit(log_record);
473+
}
474+
475+
Ok(())
476+
}
477+
}
478+
479+
#[test]
480+
fn does_not_deadlock_when_exporter_is_rentrant_simple_log_processor() {
481+
// This tests that even when exporter produces logs while
482+
// exporting, it does not deadlock, as SimpleLogProcessor
483+
// activates SuppressGuard before calling the exporter.
484+
let exporter: ReentrantLogExporter = ReentrantLogExporter::new();
485+
let logger_provider = SdkLoggerProvider::builder()
486+
.with_simple_exporter(exporter.clone())
487+
.build();
488+
exporter.set_logger(logger_provider.logger("processor-logger"));
489+
490+
let logger = logger_provider.logger("test-logger");
491+
let mut log_record = logger.create_log_record();
492+
log_record.set_severity_number(opentelemetry::logs::Severity::Error);
493+
logger.emit(log_record);
494+
}
442495
}

opentelemetry-sdk/src/trace/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,7 @@ mod tests {
547547
let tracer = tracer_provider.tracer("test");
548548
{
549549
let _suppressed_context = Context::enter_telemetry_suppressed_scope();
550-
let span = tracer.span_builder("span_name").start(&tracer);
551-
drop(span);
550+
let _span = tracer.span_builder("span_name").start(&tracer);
552551
}
553552

554553
// Assert

0 commit comments

Comments
 (0)