Skip to content

Commit ef2c68c

Browse files
committed
simple exporter tests
1 parent 16c0e10 commit ef2c68c

File tree

1 file changed

+171
-0
lines changed

1 file changed

+171
-0
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,4 +994,175 @@ mod tests {
994994
== AnyValue::String("Updated by FirstProcessor".into())
995995
);
996996
}
997+
998+
#[derive(Debug, Clone)]
999+
struct SyncLogExporter {
1000+
logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
1001+
}
1002+
1003+
#[async_trait::async_trait]
1004+
impl LogExporter for SyncLogExporter {
1005+
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1006+
let mut logs = self.logs.lock().unwrap();
1007+
for (log_record, instrumentation) in batch.iter() {
1008+
logs.push((log_record.clone(), instrumentation.clone()));
1009+
}
1010+
Ok(())
1011+
}
1012+
1013+
fn shutdown(&mut self) {}
1014+
1015+
fn set_resource(&mut self, _resource: &Resource) {}
1016+
}
1017+
1018+
#[test]
1019+
fn test_sync_exporter_without_runtime() {
1020+
let exporter = SyncLogExporter {
1021+
logs: Arc::new(Mutex::new(Vec::new())),
1022+
};
1023+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1024+
1025+
let mut record: LogRecord = Default::default();
1026+
let instrumentation: InstrumentationLibrary = Default::default();
1027+
1028+
processor.emit(&mut record, &instrumentation);
1029+
processor.force_flush().unwrap();
1030+
processor.shutdown().unwrap();
1031+
1032+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1033+
}
1034+
1035+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1036+
async fn test_sync_exporter_with_runtime() {
1037+
let exporter = SyncLogExporter {
1038+
logs: Arc::new(Mutex::new(Vec::new())),
1039+
};
1040+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1041+
1042+
let mut record: LogRecord = Default::default();
1043+
let instrumentation: InstrumentationLibrary = Default::default();
1044+
1045+
processor.emit(&mut record, &instrumentation);
1046+
processor.force_flush().unwrap();
1047+
processor.shutdown().unwrap();
1048+
1049+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1050+
}
1051+
1052+
#[tokio::test(flavor = "current_thread")]
1053+
async fn test_sync_exporter_with_current_thread_runtime() {
1054+
let exporter = SyncLogExporter {
1055+
logs: Arc::new(Mutex::new(Vec::new())),
1056+
};
1057+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1058+
1059+
let mut record: LogRecord = Default::default();
1060+
let instrumentation: InstrumentationLibrary = Default::default();
1061+
1062+
processor.emit(&mut record, &instrumentation);
1063+
processor.force_flush().unwrap();
1064+
processor.shutdown().unwrap();
1065+
1066+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1067+
}
1068+
1069+
#[derive(Debug, Clone)]
1070+
struct AsyncLogExporter {
1071+
logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
1072+
}
1073+
1074+
#[async_trait::async_trait]
1075+
impl LogExporter for AsyncLogExporter {
1076+
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1077+
println!("received..");
1078+
let logs = Arc::clone(&self.logs);
1079+
1080+
// Collect batch into an owned Vec to ensure the data can be moved into the tokio::spawn task
1081+
let batch_owned: Vec<(LogRecord, InstrumentationLibrary)> = batch
1082+
.iter()
1083+
.map(|(log_record, instrumentation)| (log_record.clone(), instrumentation.clone()))
1084+
.collect();
1085+
1086+
// Spawn a Tokio task to process the log batch asynchronously
1087+
tokio::spawn(async move {
1088+
let mut logs_lock = logs.lock().unwrap();
1089+
for (log_record, instrumentation) in batch_owned {
1090+
logs_lock.push((log_record, instrumentation));
1091+
}
1092+
})
1093+
.await
1094+
.expect("Task failed"); // Await the task completion.
1095+
1096+
Ok(())
1097+
}
1098+
1099+
fn shutdown(&mut self) {}
1100+
1101+
fn set_resource(&mut self, _resource: &Resource) {}
1102+
}
1103+
1104+
use std::panic;
1105+
1106+
#[test]
1107+
fn test_async_exporter_without_runtime() {
1108+
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
1109+
let result = panic::catch_unwind(|| {
1110+
let exporter = AsyncLogExporter {
1111+
logs: Arc::new(Mutex::new(Vec::new())),
1112+
};
1113+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1114+
1115+
let mut record: LogRecord = Default::default();
1116+
let instrumentation: InstrumentationLibrary = Default::default();
1117+
1118+
// This will panic because `tokio::spawn` is called without a runtime
1119+
processor.emit(&mut record, &instrumentation);
1120+
processor.force_flush().unwrap();
1121+
processor.shutdown().unwrap();
1122+
1123+
// Assert to make sure the failure occurs
1124+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1125+
});
1126+
1127+
// Verify that the panic occurred, indicating the absence of a Tokio runtime
1128+
assert!(
1129+
result.is_err(),
1130+
"The test should fail due to missing Tokio runtime, but it did not."
1131+
);
1132+
}
1133+
1134+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1135+
async fn test_async_exporter_with_runtime() {
1136+
let exporter = AsyncLogExporter {
1137+
logs: Arc::new(Mutex::new(Vec::new())),
1138+
};
1139+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1140+
1141+
let mut record: LogRecord = Default::default();
1142+
let instrumentation: InstrumentationLibrary = Default::default();
1143+
1144+
processor.emit(&mut record, &instrumentation);
1145+
processor.force_flush().unwrap();
1146+
processor.shutdown().unwrap();
1147+
1148+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1149+
}
1150+
1151+
#[tokio::test(flavor = "current_thread")]
1152+
#[ignore] // This test hangs as of now.
1153+
async fn test_async_exporter_with_current_thread_runtime() {
1154+
let exporter = AsyncLogExporter {
1155+
logs: Arc::new(Mutex::new(Vec::new())),
1156+
};
1157+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
1158+
1159+
let mut record: LogRecord = Default::default();
1160+
let instrumentation: InstrumentationLibrary = Default::default();
1161+
1162+
processor.emit(&mut record, &instrumentation);
1163+
processor.force_flush().unwrap();
1164+
processor.shutdown().unwrap();
1165+
1166+
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
1167+
}
9971168
}

0 commit comments

Comments
 (0)