Skip to content

Commit ea4b5e4

Browse files
authored
Add Unit Tests for Sync and Async Log Exporters - with and without runtime - for SimpleLogProcessor (#2218)
1 parent e911383 commit ea4b5e4

File tree

1 file changed

+233
-12
lines changed

1 file changed

+233
-12
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 233 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ mod tests {
553553
use opentelemetry::InstrumentationLibrary;
554554
use opentelemetry::Key;
555555
use opentelemetry::{logs::LogResult, KeyValue};
556+
use std::sync::atomic::{AtomicUsize, Ordering};
556557
use std::sync::{Arc, Mutex};
557558
use std::time::Duration;
558559

@@ -831,9 +832,7 @@ mod tests {
831832
#[tokio::test(flavor = "current_thread")]
832833
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
833834
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_multi_thread() {
834-
let exporter = InMemoryLogsExporterBuilder::default()
835-
.keep_records_on_shutdown()
836-
.build();
835+
let exporter = InMemoryLogsExporterBuilder::default().build();
837836
let processor = BatchLogProcessor::new(
838837
Box::new(exporter.clone()),
839838
BatchConfig::default(),
@@ -848,9 +847,7 @@ mod tests {
848847

849848
#[tokio::test(flavor = "current_thread")]
850849
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
851-
let exporter = InMemoryLogsExporterBuilder::default()
852-
.keep_records_on_shutdown()
853-
.build();
850+
let exporter = InMemoryLogsExporterBuilder::default().build();
854851
let processor = BatchLogProcessor::new(
855852
Box::new(exporter.clone()),
856853
BatchConfig::default(),
@@ -862,9 +859,7 @@ mod tests {
862859

863860
#[tokio::test(flavor = "multi_thread")]
864861
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
865-
let exporter = InMemoryLogsExporterBuilder::default()
866-
.keep_records_on_shutdown()
867-
.build();
862+
let exporter = InMemoryLogsExporterBuilder::default().build();
868863
let processor = BatchLogProcessor::new(
869864
Box::new(exporter.clone()),
870865
BatchConfig::default(),
@@ -876,9 +871,7 @@ mod tests {
876871

877872
#[tokio::test(flavor = "multi_thread")]
878873
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
879-
let exporter = InMemoryLogsExporterBuilder::default()
880-
.keep_records_on_shutdown()
881-
.build();
874+
let exporter = InMemoryLogsExporterBuilder::default().build();
882875
let processor = BatchLogProcessor::new(
883876
Box::new(exporter.clone()),
884877
BatchConfig::default(),
@@ -994,4 +987,232 @@ mod tests {
994987
== AnyValue::String("Updated by FirstProcessor".into())
995988
);
996989
}
990+
991+
#[test]
992+
fn test_simple_processor_sync_exporter_without_runtime() {
993+
let exporter = InMemoryLogsExporterBuilder::default().build();
994+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
995+
996+
let mut record: LogRecord = Default::default();
997+
let instrumentation: InstrumentationLibrary = Default::default();
998+
999+
processor.emit(&mut record, &instrumentation);
1000+
1001+
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1002+
}
1003+
1004+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1005+
async fn test_simple_processor_sync_exporter_with_runtime() {
1006+
let exporter = InMemoryLogsExporterBuilder::default().build();
1007+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1008+
1009+
let mut record: LogRecord = Default::default();
1010+
let instrumentation: InstrumentationLibrary = Default::default();
1011+
1012+
processor.emit(&mut record, &instrumentation);
1013+
1014+
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1015+
}
1016+
1017+
#[tokio::test(flavor = "multi_thread")]
1018+
async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
1019+
let exporter = InMemoryLogsExporterBuilder::default().build();
1020+
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
1021+
1022+
let mut handles = vec![];
1023+
for _ in 0..10 {
1024+
let processor_clone = Arc::clone(&processor);
1025+
let handle = tokio::spawn(async move {
1026+
let mut record: LogRecord = Default::default();
1027+
let instrumentation: InstrumentationLibrary = Default::default();
1028+
processor_clone.emit(&mut record, &instrumentation);
1029+
});
1030+
handles.push(handle);
1031+
}
1032+
1033+
for handle in handles {
1034+
handle.await.unwrap();
1035+
}
1036+
1037+
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
1038+
}
1039+
1040+
#[tokio::test(flavor = "current_thread")]
1041+
async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
1042+
let exporter = InMemoryLogsExporterBuilder::default().build();
1043+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1044+
1045+
let mut record: LogRecord = Default::default();
1046+
let instrumentation: InstrumentationLibrary = Default::default();
1047+
1048+
processor.emit(&mut record, &instrumentation);
1049+
1050+
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
1051+
}
1052+
1053+
#[derive(Debug, Clone)]
1054+
struct LogExporterThatRequiresTokio {
1055+
export_count: Arc<AtomicUsize>,
1056+
}
1057+
1058+
impl LogExporterThatRequiresTokio {
1059+
/// Creates a new instance of `LogExporterThatRequiresTokio`.
1060+
fn new() -> Self {
1061+
LogExporterThatRequiresTokio {
1062+
export_count: Arc::new(AtomicUsize::new(0)),
1063+
}
1064+
}
1065+
1066+
/// Returns the number of logs stored in the exporter.
1067+
fn len(&self) -> usize {
1068+
self.export_count.load(Ordering::Acquire)
1069+
}
1070+
}
1071+
1072+
#[async_trait::async_trait]
1073+
impl LogExporter for LogExporterThatRequiresTokio {
1074+
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1075+
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
1076+
tokio::time::sleep(Duration::from_millis(50)).await;
1077+
1078+
for _ in batch.iter() {
1079+
self.export_count.fetch_add(1, Ordering::Acquire);
1080+
}
1081+
Ok(())
1082+
}
1083+
}
1084+
1085+
#[test]
1086+
fn test_simple_processor_async_exporter_without_runtime() {
1087+
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
1088+
let result = std::panic::catch_unwind(|| {
1089+
let exporter = LogExporterThatRequiresTokio::new();
1090+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1091+
1092+
let mut record: LogRecord = Default::default();
1093+
let instrumentation: InstrumentationLibrary = Default::default();
1094+
1095+
// This will panic because an tokio async operation within exporter without a runtime.
1096+
processor.emit(&mut record, &instrumentation);
1097+
});
1098+
1099+
// Verify that the panic occurred and check the panic message for the absence of a Tokio runtime
1100+
assert!(
1101+
result.is_err(),
1102+
"The test should fail due to missing Tokio runtime, but it did not."
1103+
);
1104+
let panic_payload = result.unwrap_err();
1105+
let panic_message = panic_payload
1106+
.downcast_ref::<String>()
1107+
.map(|s| s.as_str())
1108+
.or_else(|| panic_payload.downcast_ref::<&str>().copied())
1109+
.unwrap_or("No panic message");
1110+
1111+
assert!(
1112+
panic_message.contains("no reactor running")
1113+
|| panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
1114+
"Expected panic message about missing Tokio runtime, but got: {}",
1115+
panic_message
1116+
);
1117+
}
1118+
1119+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1120+
#[ignore]
1121+
// This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
1122+
// It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
1123+
// Each task attempts to acquire a mutex on the exporter in `SimpleLogProcessor::emit`.
1124+
// Only one task obtains the lock, while the others are blocked, waiting for its release.
1125+
//
1126+
// The task holding the lock invokes the LogExporterThatRequiresTokio, which performs an
1127+
// asynchronous operation (e.g., network I/O simulated by `tokio::sleep`). This operation
1128+
// requires yielding control back to the Tokio runtime to make progress.
1129+
//
1130+
// However, all worker threads are occupied:
1131+
// - One thread is executing the async exporter operation
1132+
// - Three threads are blocked waiting for the mutex
1133+
//
1134+
// This leads to a deadlock as there are no available threads to drive the async operation
1135+
// to completion, preventing the mutex from being released. Consequently, neither the blocked
1136+
// tasks nor the exporter can proceed.
1137+
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
1138+
let exporter = LogExporterThatRequiresTokio::new();
1139+
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
1140+
1141+
let concurrent_emit = 4; // number of worker threads
1142+
1143+
let mut handles = vec![];
1144+
// try send `concurrent_emit` events concurrently
1145+
for _ in 0..concurrent_emit {
1146+
let processor_clone = Arc::clone(&processor);
1147+
let handle = tokio::spawn(async move {
1148+
let mut record: LogRecord = Default::default();
1149+
let instrumentation: InstrumentationLibrary = Default::default();
1150+
processor_clone.emit(&mut record, &instrumentation);
1151+
});
1152+
handles.push(handle);
1153+
}
1154+
1155+
// below code won't get executed
1156+
for handle in handles {
1157+
handle.await.unwrap();
1158+
}
1159+
assert_eq!(exporter.len(), concurrent_emit);
1160+
}
1161+
1162+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1163+
// This test uses a multi-threaded runtime setup with a single worker thread. Note that even
1164+
// though only one worker thread is created, it is distinct from the main thread. The processor
1165+
// emits a log event, and the exporter performs an async operation that requires the runtime.
1166+
// The single worker thread handles this operation without deadlocking, as long as no other
1167+
// tasks occupy the runtime.
1168+
async fn test_simple_processor_async_exporter_with_runtime() {
1169+
let exporter = LogExporterThatRequiresTokio::new();
1170+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1171+
1172+
let mut record: LogRecord = Default::default();
1173+
let instrumentation: InstrumentationLibrary = Default::default();
1174+
1175+
processor.emit(&mut record, &instrumentation);
1176+
1177+
assert_eq!(exporter.len(), 1);
1178+
}
1179+
1180+
#[tokio::test(flavor = "multi_thread")]
1181+
// This test uses a multi-threaded runtime setup with the default number of worker threads.
1182+
// The processor emits a log event, and the exporter, which requires the runtime for its async
1183+
// operations, can access one of the available worker threads to complete its task. As there
1184+
// are multiple threads, the exporter can proceed without blocking other tasks, ensuring the
1185+
// test completes successfully.
1186+
async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
1187+
let exporter = LogExporterThatRequiresTokio::new();
1188+
1189+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1190+
1191+
let mut record: LogRecord = Default::default();
1192+
let instrumentation: InstrumentationLibrary = Default::default();
1193+
1194+
processor.emit(&mut record, &instrumentation);
1195+
1196+
assert_eq!(exporter.len(), 1);
1197+
}
1198+
1199+
#[tokio::test(flavor = "current_thread")]
1200+
#[ignore]
1201+
// This test uses a current-thread runtime, where all operations run on the main thread.
1202+
// The processor emits a log event while the runtime is blocked using `futures::block_on`
1203+
// to complete the export operation. The exporter, which performs an async operation and
1204+
// requires the runtime, cannot progress because the main thread is already blocked.
1205+
// This results in a deadlock, as the runtime cannot move forward.
1206+
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
1207+
let exporter = LogExporterThatRequiresTokio::new();
1208+
1209+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1210+
1211+
let mut record: LogRecord = Default::default();
1212+
let instrumentation: InstrumentationLibrary = Default::default();
1213+
1214+
processor.emit(&mut record, &instrumentation);
1215+
1216+
assert_eq!(exporter.len(), 1);
1217+
}
9971218
}

0 commit comments

Comments
 (0)