Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@
use opentelemetry::InstrumentationLibrary;
use opentelemetry::Key;
use opentelemetry::{logs::LogResult, KeyValue};
use std::panic;
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand Down Expand Up @@ -994,4 +995,148 @@
== AnyValue::String("Updated by FirstProcessor".into())
);
}

#[test]
fn test_simple_processor_sync_exporter_without_runtime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can write a helper and call them for all the cominations:

  1. Normal test
  2. Tokio test - current thread
  3. Tokio test - multi-thread
  4. Tokio test - multi-thread with thread=1

we can eventually prepare a test matrix.md file for quick visualization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good to have a test matrix. Will do it after adding tests for batch.

let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_sync_exporter_with_runtime() {
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}

#[tokio::test(flavor = "current_thread")]
async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}

#[derive(Debug, Clone)]
struct AsyncLogExporter {
logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
}

#[async_trait::async_trait]
impl LogExporter for AsyncLogExporter {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
let logs = Arc::clone(&self.logs);

// Collect batch into an owned Vec to ensure the data can be moved into the tokio::spawn task
let batch_owned: Vec<(LogRecord, InstrumentationLibrary)> = batch
.iter()
.map(|(log_record, instrumentation)| (log_record.clone(), instrumentation.clone()))
.collect();

// Spawn a Tokio task to process the log batch asynchronously
tokio::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this is really a scalable way of testing. Should we restrict to testing the features offered by otlp specificially?
otlp + tonic from tokio main, normal main
otlp + request ...
otlp + request blocking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it would be difficult to test that in unit-test with collector. I think we can extend functional test for that. The current test is more generic to cover any exporter which internally uses tokio runtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree its best to cover it via the integration test. I did some of them manually and it is not scaleable.

Copy link
Member Author

@lalitb lalitb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. And we still keep current generic test here with runtime - to cover the hang scenario, and panic due to missing runtime.

let mut logs_lock = logs.lock().unwrap();
for (log_record, instrumentation) in batch_owned {
logs_lock.push((log_record, instrumentation));
}
})
.await
.expect("Task failed"); // Await the task completion.

Ok(())
}
}

#[test]
fn test_simple_processor_async_exporter_without_runtime() {
// Use `catch_unwind` to catch the panic caused by missing Tokio runtime
let result = panic::catch_unwind(|| {
let exporter = AsyncLogExporter {
logs: Arc::new(Mutex::new(Vec::new())),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

// This will panic because `tokio::spawn` is called without a runtime
processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

// Assert to make sure the failure occurs
assert_eq!(exporter.logs.lock().unwrap().len(), 1);
});

// Verify that the panic occurred, indicating the absence of a Tokio runtime
assert!(
result.is_err(),
"The test should fail due to missing Tokio runtime, but it did not."

Check warning on line 1104 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L1104

Added line #L1104 was not covered by tests
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_async_exporter_with_runtime() {
let exporter = AsyncLogExporter {
logs: Arc::new(Mutex::new(Vec::new())),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

assert_eq!(exporter.logs.lock().unwrap().len(), 1);
}

#[tokio::test(flavor = "current_thread")]
#[ignore] // This test hangs as of now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it hangs with the same reason as batch processor, probably link them together?

Copy link
Member Author

@lalitb lalitb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have issue link for the batch processor ?

async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
let exporter = AsyncLogExporter {
logs: Arc::new(Mutex::new(Vec::new())),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();

assert_eq!(exporter.logs.lock().unwrap().len(), 1);
}

Check warning on line 1141 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L1127-L1141

Added lines #L1127 - L1141 were not covered by tests
}
Loading