Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std", "env-fil
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
tokio = { workspace = true, features = ["full"]}

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
Expand Down
73 changes: 70 additions & 3 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,20 @@ const fn severity_of_level(level: &Level) -> Severity {
#[cfg(test)]
mod tests {
use crate::layer;
use opentelemetry::logs::Severity;
use async_trait::async_trait;
use opentelemetry::logs::{LogResult, Severity};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
use opentelemetry::{logs::AnyValue, Key};
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogRecord, LoggerProvider};
use opentelemetry_sdk::testing::logs::InMemoryLogsExporter;
use opentelemetry_sdk::trace;
use opentelemetry_sdk::trace::{Sampler, TracerProvider};
use tracing::error;
use tracing::{error, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::Layer;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};

pub fn attributes_contains(log_record: &LogRecord, key: &Key, value: &AnyValue) -> bool {
log_record
Expand All @@ -238,6 +241,70 @@ mod tests {
}

// cargo test --features=testing

#[derive(Clone, Debug, Default)]
struct ReentrantLogExporter;

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
Copy link
Member

@lalitb lalitb Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if we should bring the message queue back for simple log processor ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that might mitigate the issue temporarily, but we need to solve it via context based suppression I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In context-based suppression, there may still be cases where we want to loop back the internal events at least once. This could potentially result in a deadlock situation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow that. Maybe add a unit test to cover the scenario, so its easy to track.

Copy link
Member

@lalitb lalitb Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant - even with the suppression logic in place, we may want the internal errors to loop-back to the pipeline once, hoping that the error is transient and doesn't cause infinite loop. In case of non-transient errors, the subsequent error is suppressed. But this can be something discussed when we look into suppression logic.

Not related to this, but moving to message queue will also fix hang issue - #2071 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want the internal errors to loop-back to the pipeline once, hoping that the error is transient and doesn't cause infinite loop

Can you share a scenario where this is needed? I am not sure how often it is feasible for a component to determine if an error is transient or not.

Not related to this, but moving to message queue will also fix hang issue - #2071 (comment)

Yes, likely. But this needs a full redesign. I don't think adding a queue in the middle helps all cases.

warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
}

#[test]
#[ignore = "See issue: https://github.com/open-telemetry/opentelemetry-rust/issues/1745"]
fn simple_processor_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter::default();
let logger_provider = LoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

// Setting subscriber as global as that is the only way to test this scenario.
tracing_subscriber::registry().with(layer).init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[test]
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
fn simple_processor_no_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter::default();
let logger_provider = LoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

// This filter will prevent the deadlock as the reentrant log will be
// ignored.
let filter = EnvFilter::new("debug").add_directive("reentrant=error".parse().unwrap());
// Setting subscriber as global as that is the only way to test this scenario.
tracing_subscriber::registry()
.with(filter)
.with(layer)
.init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
async fn batch_processor_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter::default();
let logger_provider = LoggerProvider::builder()
.with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio)
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

tracing_subscriber::registry().with(layer).init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[test]
fn tracing_appender_standalone() {
// Arrange
Expand Down