Skip to content

Commit 657bc61

Browse files
committed
review comment
1 parent 95ff800 commit 657bc61

File tree

1 file changed

+47
-53
lines changed

1 file changed

+47
-53
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ impl SimpleLogProcessor {
9696

9797
impl LogProcessor for SimpleLogProcessor {
9898
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
99+
println!("SimpleLogProcessor::Emit");
99100
// noop after shutdown
100101
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
101102
// this is a warning, as the user is trying to log after the processor has been shutdown
@@ -553,7 +554,10 @@ mod tests {
553554
use opentelemetry::InstrumentationLibrary;
554555
use opentelemetry::Key;
555556
use opentelemetry::{logs::LogResult, KeyValue};
556-
use std::sync::{Arc, Mutex};
557+
use std::sync::{
558+
atomic::{AtomicU32, Ordering},
559+
Arc, Mutex,
560+
};
557561
use std::time::Duration;
558562

559563
#[derive(Debug, Clone)]
@@ -1066,44 +1070,37 @@ mod tests {
10661070
let instrumentation: InstrumentationLibrary = Default::default();
10671071

10681072
processor.emit(&mut record, &instrumentation);
1069-
processor.force_flush().unwrap();
1070-
processor.shutdown().unwrap();
10711073

10721074
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
10731075
}
10741076

10751077
#[derive(Debug, Clone)]
1076-
struct LogExporterThatRequiresTokioSpawn {
1077-
logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
1078+
struct LogExporterThatRequiresTokio {
1079+
event_count: Arc<AtomicU32>,
10781080
}
10791081

1080-
impl LogExporterThatRequiresTokioSpawn {
1081-
/// Creates a new instance of `LogExporterThatRequiresTokioSpawn`.
1082-
pub(crate) fn new() -> Self {
1083-
LogExporterThatRequiresTokioSpawn {
1084-
logs: Arc::new(Mutex::new(Vec::new())),
1082+
impl LogExporterThatRequiresTokio {
1083+
/// Creates a new instance of `LogExporterThatRequiresTokio`.
1084+
fn new() -> Self {
1085+
LogExporterThatRequiresTokio {
1086+
event_count: Arc::new(AtomicU32::new(0)),
10851087
}
10861088
}
10871089

10881090
/// Returns the number of logs stored in the exporter.
1089-
pub(crate) async fn len(&self) -> usize {
1090-
let logs = self.logs.lock().unwrap();
1091-
logs.len()
1091+
fn len(&self) -> usize {
1092+
self.event_count.load(Ordering::Acquire) as usize
10921093
}
10931094
}
10941095

1095-
use tokio::time::sleep;
1096-
10971096
#[async_trait::async_trait]
1098-
impl LogExporter for LogExporterThatRequiresTokioSpawn {
1097+
impl LogExporter for LogExporterThatRequiresTokio {
10991098
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1100-
// Simulate minimal dependency on tokio by sleeping for a short duration
1101-
sleep(Duration::from_millis(50)).await;
1099+
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
1100+
tokio::time::sleep(Duration::from_millis(50)).await;
11021101

1103-
let logs = Arc::clone(&self.logs);
1104-
let mut logs_lock = logs.lock().unwrap();
1105-
for (log_record, instrumentation) in batch.iter() {
1106-
logs_lock.push((log_record.clone(), instrumentation.clone()));
1102+
for _ in batch.iter() {
1103+
self.event_count.fetch_add(1, Ordering::Acquire);
11071104
}
11081105
Ok(())
11091106
}
@@ -1113,13 +1110,13 @@ mod tests {
11131110
fn test_simple_processor_async_exporter_without_runtime() {
11141111
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
11151112
let result = std::panic::catch_unwind(|| {
1116-
let exporter = LogExporterThatRequiresTokioSpawn::new();
1113+
let exporter = LogExporterThatRequiresTokio::new();
11171114
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
11181115

11191116
let mut record: LogRecord = Default::default();
11201117
let instrumentation: InstrumentationLibrary = Default::default();
11211118

1122-
// This will panic because `tokio::spawn` or an async operation is called without a runtime.
1119+
// This will panic because an async operation is called without a runtime.
11231120
processor.emit(&mut record, &instrumentation);
11241121
});
11251122

@@ -1128,51 +1125,48 @@ mod tests {
11281125
result.is_err(),
11291126
"The test should fail due to missing Tokio runtime, but it did not."
11301127
);
1128+
let panic_payload = result.unwrap_err();
1129+
let panic_message = panic_payload
1130+
.downcast_ref::<String>()
1131+
.map(|s| s.as_str())
1132+
.or_else(|| panic_payload.downcast_ref::<&str>().copied())
1133+
.unwrap_or("No panic message");
11311134

1132-
if let Err(panic) = result {
1133-
let panic_message = panic
1134-
.downcast_ref::<String>()
1135-
.map(|s| s.as_str())
1136-
.or_else(|| panic.downcast_ref::<&str>().copied())
1137-
.unwrap_or("No panic message");
1138-
1139-
assert!(
1140-
panic_message.contains("no reactor running")
1141-
|| panic_message
1142-
.contains("must be called from the context of a Tokio 1.x runtime"),
1143-
"Expected panic message about missing Tokio runtime, but got: {}",
1144-
panic_message
1145-
);
1146-
}
1135+
assert!(
1136+
panic_message.contains("no reactor running")
1137+
|| panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
1138+
"Expected panic message about missing Tokio runtime, but got: {}",
1139+
panic_message
1140+
);
11471141
}
11481142

11491143
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
11501144
async fn test_simple_processor_async_exporter_with_runtime() {
1151-
let exporter = LogExporterThatRequiresTokioSpawn::new();
1145+
let exporter = LogExporterThatRequiresTokio::new();
11521146
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
11531147

11541148
let mut record: LogRecord = Default::default();
11551149
let instrumentation: InstrumentationLibrary = Default::default();
11561150

11571151
processor.emit(&mut record, &instrumentation);
11581152

1159-
assert_eq!(exporter.len().await, 1);
1153+
assert_eq!(exporter.len(), 1);
11601154
}
11611155

11621156
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1163-
#[ignore]
1164-
// all threads are blocked running blocked::export(), and the exporter further needs tokio
1165-
// runtime to progress on this blocked thread, resulting in deadlock.
1166-
async fn test_simple_processor_async_exporter_with_multi_thread_runtime_all_cores_blocked() {
1167-
let exporter = LogExporterThatRequiresTokioSpawn::new();
1157+
//#[ignore]
1158+
// All worker threads except one are blocked, waiting for the export operation to complete.
1159+
// The exporter, which isn't blocked, requires the runtime to proceed, but no free worker threads are available, resulting in a deadlock.
1160+
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
1161+
let exporter = LogExporterThatRequiresTokio::new();
11681162
let processor = Arc::new(Mutex::new(SimpleLogProcessor::new(Box::new(
11691163
exporter.clone(),
11701164
))));
11711165

1172-
let concurrent_emit = 5; // number of worker sthreads + 1
1166+
let concurrent_emit = 5; // number of worker threads + 1
11731167

11741168
let mut handles = vec![];
1175-
// send 2 events concurrently
1169+
// try send `concurrent_emit` events concurrently
11761170
for _ in 0..concurrent_emit {
11771171
let processor_clone = Arc::clone(&processor);
11781172
let handle = tokio::spawn(async move {
@@ -1189,12 +1183,12 @@ mod tests {
11891183
for handle in handles {
11901184
handle.await.unwrap();
11911185
}
1192-
assert_eq!(exporter.len().await, 2);
1186+
assert_eq!(exporter.len(), 2);
11931187
}
11941188

11951189
#[tokio::test(flavor = "multi_thread")]
11961190
async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
1197-
let exporter = LogExporterThatRequiresTokioSpawn::new();
1191+
let exporter = LogExporterThatRequiresTokio::new();
11981192

11991193
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
12001194

@@ -1203,15 +1197,15 @@ mod tests {
12031197

12041198
processor.emit(&mut record, &instrumentation);
12051199

1206-
assert_eq!(exporter.len().await, 1);
1200+
assert_eq!(exporter.len(), 1);
12071201
}
12081202

12091203
#[tokio::test(flavor = "current_thread")]
12101204
#[ignore] // the current thread is blocked with futures::block_on to
12111205
// complete the export, and the exporter further needs tokio runtime to progress
12121206
// on this blocked thread, resulting in deadlock.
12131207
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
1214-
let exporter = LogExporterThatRequiresTokioSpawn::new();
1208+
let exporter = LogExporterThatRequiresTokio::new();
12151209

12161210
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
12171211

@@ -1220,6 +1214,6 @@ mod tests {
12201214

12211215
processor.emit(&mut record, &instrumentation);
12221216

1223-
assert_eq!(exporter.len().await, 1);
1217+
assert_eq!(exporter.len(), 1);
12241218
}
12251219
}

0 commit comments

Comments
 (0)