Skip to content

Commit a4ee905

Browse files
committed
fix comments
1 parent 506bfb0 commit a4ee905

File tree

1 file changed

+111
-44
lines changed

1 file changed

+111
-44
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 111 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ mod tests {
553553
use opentelemetry::InstrumentationLibrary;
554554
use opentelemetry::Key;
555555
use opentelemetry::{logs::LogResult, KeyValue};
556-
use std::panic;
556+
// use std::panic;
557557
use std::sync::{Arc, Mutex};
558558
use std::time::Duration;
559559

@@ -1007,8 +1007,6 @@ mod tests {
10071007
let instrumentation: InstrumentationLibrary = Default::default();
10081008

10091009
processor.emit(&mut record, &instrumentation);
1010-
processor.force_flush().unwrap();
1011-
processor.shutdown().unwrap();
10121010

10131011
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
10141012
}
@@ -1024,12 +1022,40 @@ mod tests {
10241022
let instrumentation: InstrumentationLibrary = Default::default();
10251023

10261024
processor.emit(&mut record, &instrumentation);
1027-
processor.force_flush().unwrap();
1028-
processor.shutdown().unwrap();
10291025

10301026
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
10311027
}
10321028

1029+
#[tokio::test(flavor = "multi_thread")]
1030+
async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
1031+
let exporter = InMemoryLogsExporterBuilder::default()
1032+
.keep_records_on_shutdown()
1033+
.build();
1034+
let processor = Arc::new(Mutex::new(SimpleLogProcessor::new(Box::new(
1035+
exporter.clone(),
1036+
))));
1037+
1038+
let mut handles = vec![];
1039+
for _ in 0..10 {
1040+
let processor_clone = Arc::clone(&processor);
1041+
let handle = tokio::spawn(async move {
1042+
let mut record: LogRecord = Default::default();
1043+
let instrumentation: InstrumentationLibrary = Default::default();
1044+
processor_clone
1045+
.lock()
1046+
.unwrap()
1047+
.emit(&mut record, &instrumentation);
1048+
});
1049+
handles.push(handle);
1050+
}
1051+
1052+
for handle in handles {
1053+
handle.await.unwrap();
1054+
}
1055+
1056+
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
1057+
}
1058+
10331059
#[tokio::test(flavor = "current_thread")]
10341060
async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
10351061
let exporter = InMemoryLogsExporterBuilder::default()
@@ -1052,91 +1078,132 @@ mod tests {
10521078
logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
10531079
}
10541080

1081+
impl AsyncLogExporter {
1082+
/// Creates a new instance of `AsyncLogExporter`.
1083+
pub(crate) fn new() -> Self {
1084+
AsyncLogExporter {
1085+
logs: Arc::new(Mutex::new(Vec::new())),
1086+
}
1087+
}
1088+
1089+
/// Returns the number of logs stored in the exporter.
1090+
pub(crate) async fn len(&self) -> usize {
1091+
let logs = self.logs.lock().unwrap();
1092+
logs.len()
1093+
}
1094+
}
1095+
1096+
use tokio::time::sleep;
1097+
10551098
#[async_trait::async_trait]
10561099
impl LogExporter for AsyncLogExporter {
10571100
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1058-
let logs = Arc::clone(&self.logs);
1101+
println!("Exporting...");
10591102

1060-
// Collect batch into an owned Vec to ensure the data can be moved into the tokio::spawn task
1061-
let batch_owned: Vec<(LogRecord, InstrumentationLibrary)> = batch
1062-
.iter()
1063-
.map(|(log_record, instrumentation)| (log_record.clone(), instrumentation.clone()))
1064-
.collect();
1065-
1066-
// Spawn a Tokio task to process the log batch asynchronously
1067-
tokio::spawn(async move {
1068-
let mut logs_lock = logs.lock().unwrap();
1069-
for (log_record, instrumentation) in batch_owned {
1070-
logs_lock.push((log_record, instrumentation));
1071-
}
1072-
})
1073-
.await
1074-
.expect("Task failed"); // Await the task completion.
1103+
// Simulate minimal dependency on tokio by sleeping for a short duration
1104+
sleep(Duration::from_millis(50)).await;
10751105

1106+
let logs = Arc::clone(&self.logs);
1107+
let mut logs_lock = logs.lock().unwrap();
1108+
for (log_record, instrumentation) in batch.iter() {
1109+
logs_lock.push((log_record.clone(), instrumentation.clone()));
1110+
}
1111+
1112+
println!("Export success...");
10761113
Ok(())
10771114
}
10781115
}
10791116

10801117
#[test]
10811118
fn test_simple_processor_async_exporter_without_runtime() {
10821119
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
1083-
let result = panic::catch_unwind(|| {
1084-
let exporter = AsyncLogExporter {
1085-
logs: Arc::new(Mutex::new(Vec::new())),
1086-
};
1120+
let result = std::panic::catch_unwind(|| {
1121+
let exporter = AsyncLogExporter::new();
10871122
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
10881123

10891124
let mut record: LogRecord = Default::default();
10901125
let instrumentation: InstrumentationLibrary = Default::default();
10911126

1092-
// This will panic because `tokio::spawn` is called without a runtime
1127+
// This will panic because `tokio::spawn` or an async operation is called without a runtime.
10931128
processor.emit(&mut record, &instrumentation);
1094-
processor.force_flush().unwrap();
1095-
processor.shutdown().unwrap();
1096-
1097-
// Assert to make sure the failure occurs
1098-
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
10991129
});
11001130

1101-
// Verify that the panic occurred, indicating the absence of a Tokio runtime
1131+
// Verify that the panic occurred and check the panic message for the absence of a Tokio runtime
11021132
assert!(
11031133
result.is_err(),
11041134
"The test should fail due to missing Tokio runtime, but it did not."
11051135
);
1136+
1137+
if let Err(panic) = result {
1138+
let panic_message = panic
1139+
.downcast_ref::<String>()
1140+
.map(|s| s.as_str())
1141+
.or_else(|| panic.downcast_ref::<&str>().map(|&s| s))
1142+
.unwrap_or("No panic message");
1143+
1144+
assert!(
1145+
panic_message.contains("no reactor running")
1146+
|| panic_message
1147+
.contains("must be called from the context of a Tokio 1.x runtime"),
1148+
"Expected panic message about missing Tokio runtime, but got: {}",
1149+
panic_message
1150+
);
1151+
}
11061152
}
11071153

11081154
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
11091155
async fn test_simple_processor_async_exporter_with_runtime() {
1110-
let exporter = AsyncLogExporter {
1111-
logs: Arc::new(Mutex::new(Vec::new())),
1112-
};
1156+
let exporter = AsyncLogExporter::new();
11131157
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
11141158

11151159
let mut record: LogRecord = Default::default();
11161160
let instrumentation: InstrumentationLibrary = Default::default();
11171161

11181162
processor.emit(&mut record, &instrumentation);
1119-
processor.force_flush().unwrap();
1120-
processor.shutdown().unwrap();
11211163

1122-
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1164+
assert_eq!(exporter.len().await, 1);
1165+
}
1166+
1167+
#[tokio::test(flavor = "multi_thread")]
1168+
async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
1169+
let exporter = AsyncLogExporter::new();
1170+
let processor = Arc::new(Mutex::new(SimpleLogProcessor::new(Box::new(
1171+
exporter.clone(),
1172+
))));
1173+
1174+
let mut handles = vec![];
1175+
// send 4 events concurrently
1176+
for _ in 0..4 {
1177+
let processor_clone = Arc::clone(&processor);
1178+
let handle = tokio::spawn(async move {
1179+
let mut record: LogRecord = Default::default();
1180+
let instrumentation: InstrumentationLibrary = Default::default();
1181+
processor_clone
1182+
.lock()
1183+
.unwrap()
1184+
.emit(&mut record, &instrumentation);
1185+
});
1186+
handles.push(handle);
1187+
}
1188+
1189+
for handle in handles {
1190+
handle.await.unwrap();
1191+
}
1192+
assert_eq!(exporter.len().await, 4);
11231193
}
11241194

11251195
#[tokio::test(flavor = "current_thread")]
11261196
#[ignore] // This test hangs as of now.
11271197
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
1128-
let exporter = AsyncLogExporter {
1129-
logs: Arc::new(Mutex::new(Vec::new())),
1130-
};
1198+
let exporter = AsyncLogExporter::new();
1199+
11311200
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
11321201

11331202
let mut record: LogRecord = Default::default();
11341203
let instrumentation: InstrumentationLibrary = Default::default();
11351204

11361205
processor.emit(&mut record, &instrumentation);
1137-
processor.force_flush().unwrap();
1138-
processor.shutdown().unwrap();
11391206

1140-
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1207+
assert_eq!(exporter.len().await, 1);
11411208
}
11421209
}

0 commit comments

Comments
 (0)