Skip to content

Conversation

@lalitb
Copy link
Member

@lalitb lalitb commented Oct 17, 2024

Changes

There aren't many tests for SimpleLogExporter with various combinations. So added these:

SyncLogExporter: - A synchronous log exporter that stores log records in memory without requiring a runtime.

  • test_simple_processor_sync_exporter_without_runtime: Verifies log export without any runtime.
  • test_simple_processor_sync_exporter_with_runtime: Validates export processing in a Tokio multi-threaded runtime.
  • test_simple_processor_sync_exporter_with_current_thread_runtime: Tests processing within a single-threaded runtime.

AsyncLogExporter: - An asynchronous log exporter that processes log records in a separate Tokio task, requiring a Tokio runtime.

  • test_simple_processor_async_exporter_without_runtime: Ensures a panic occurs when attempting async export without a runtime.
  • test_simple_processor_async_exporter_with_runtime: Validates async log export with a multi-threaded (single worker thread) Tokio runtime.
  • test_simple_processor_async_exporter_with_current_thread_runtime: Ignored due to hanging behavior in the single-threaded runtime.
  • test_simple_processor_async_exporter_with_multi_thread_runtime_all_cores_blocked: Ignored due to hanging behaviour with all threads blocked.
  • test_simple_processor_async_exporter_with_multi_thread_runtime : Validates async log export with a multi-threaded (number of worker threads equal to the number of cores) Tokio runtime.

Will add similar tests for batch log exporter too.

Merge requirement checklist

  • CONTRIBUTING guidelines followed
  • Unit tests added/updated (if applicable)
  • Appropriate CHANGELOG.md files updated for non-trivial, user-facing changes
  • Changes in public API reviewed (if applicable)

@lalitb lalitb requested a review from a team as a code owner October 17, 2024 06:33
@lalitb lalitb changed the title Add Unit Tests for Sync and Async Log Exporters - with and without runtime - in SimpleLogProcessor Add Unit Tests for Sync and Async Log Exporters - with and without runtime - for SimpleLogProcessor Oct 17, 2024
@codecov
Copy link

codecov bot commented Oct 17, 2024

Codecov Report

Attention: Patch coverage is 75.15528% with 40 lines in your changes missing coverage. Please review.

Project coverage is 79.1%. Comparing base (e911383) to head (0ff3c03).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
opentelemetry-sdk/src/logs/log_processor.rs 75.1% 40 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##            main   #2218     +/-   ##
=======================================
- Coverage   79.1%   79.1%   -0.1%     
=======================================
  Files        121     121             
  Lines      20933   21082    +149     
=======================================
+ Hits       16564   16680    +116     
- Misses      4369    4402     +33     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

}

#[derive(Debug, Clone)]
struct SyncLogExporter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this same as InMemoryLogExporter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes using InMemoryLogsExporter now.

Ok(())
}

fn shutdown(&mut self) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these empty methods required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed them.

.collect();

// Spawn a Tokio task to process the log batch asynchronously
tokio::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if this is really a scalable way of testing. Should we restrict to testing the features offered by otlp specificially?
otlp + tonic from tokio main, normal main
otlp + request ...
otlp + request blocking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it would be difficult to test that in unit-test with collector. I think we can extend functional test for that. The current test is more generic to cover any exporter which internally uses tokio runtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree its best to cover it via the integration test. I did some of them manually and it is not scaleable.

Copy link
Member Author

@lalitb lalitb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. And we still keep current generic test here with runtime - to cover the hang scenario, and panic due to missing runtime.

}

#[tokio::test(flavor = "current_thread")]
#[ignore] // This test hangs as of now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it hangs with the same reason as batch processor, probably link them together?

Copy link
Member Author

@lalitb lalitb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have issue link for the batch processor ?

@lalitb lalitb added the integration tests Run integration tests label Oct 17, 2024
@lalitb
Copy link
Member Author

lalitb commented Oct 18, 2024

@cijothomas @ThomsonTan @utpilla thanks for the thorough review, and valuable comments. I have tried to address all of them. Please have a look once you get time.

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
// All worker threads except one are blocked, waiting for the export operation to complete.
// The exporter, which isn't blocked, requires the runtime to proceed, but no free worker threads are available, resulting in a deadlock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's a provide a more specific reasoning here? How about something like this?

We spawn enough number of tasks to have each worker thread execute export. While executing export, each of these worker threads park themselves when they hit tokio sleep (since sleep is called within futures_executor::block_on). We need a worker thread to react to the completion of sleep and unpark these threads but there is none available. As a result, the worker threads never get unparked and the spawned tokio tasks don't ever complete.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla actually only one worker thread is blocked on the tokio sleep, rest are blocked on the mutex for processor. I have added the comment to make it clear as below:

    // This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
    // It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
    // Each task attempts to acquire a mutex on the SimpleLogProcessor. Only one task obtains the lock,
    // while the others are blocked, waiting for its release.
    //
    // The task holding the lock invokes the LogExporterThatRequiresTokio, which performs an
    // asynchronous operation (e.g., network I/O simulated by `tokio::sleep`). This operation
    // requires yielding control back to the Tokio runtime to make progress.
    //
    // However, all worker threads are occupied:
    // - One thread is executing the async exporter operation
    // - Three threads are blocked waiting for the mutex
    //
    // This leads to a deadlock as there are no available threads to drive the async operation
    // to completion, preventing the mutex from being released. Consequently, neither the blocked
    // tasks nor the exporter can proceed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Just a general note, I feel that the Mutex used by SimpleLogProcessor is hiding away the main issue here which is futures_executor::block_on. For this particular test, yes since we use a SimpleLogProcessor other worker threads are waiting on the Mutex. That might lead us into thinking that Mutex is the real issue here but it's not.

Even if we were to remove the Mutex, and use something like a Reentrant/no-lock processor we would still hit this issue as long as we have futures_exectuor::block_on. Try this code which takes no mutex and spawns a task on each worker thread to wait for some time. This would still hang due to the reasoning stated in this comment.

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
        let concurrent_emit = 4; // number of worker threads

        let mut handles = vec![];
        for _ in 0..concurrent_emit {
            let handle = tokio::spawn(async move {
                futures_executor::block_on(async { tokio::time::sleep(Duration::from_millis(50)).await });
            });
            handles.push(handle);
        }

        for handle in handles {
            handle.await.unwrap();
        }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to above.
I left #2218 (comment) to mean same thing, but above is a very good explanation!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utpilla In case of SimpleLogProcessor, we will never have concurrent execution of block_on because of the mutex here -

So actually it is the combination of mutex and block_on in the processor code that is causing deadlock :)

However, I have removed the mutex from the unit-test, as there is already mutex as above, as also you indicated here - #2218 (comment). Hopefully this will make it more clear now.

@cijothomas - In this test now that we don't lock processor, we invoke SimpleLogProcessor::emit concurrently from multiple tokio worker threads, which would be the similar scenario when the user invokes multiple tracing macros concurrently.

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to add a note here that this works only because the export method executes synchronously here. If this was under a tokio spawn, the export would have hung again as there is only one worker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @utpilla. The export method is asynchronous here (ie with tokio sleep) just like the current_thread (test_simple_processor_async_exporter_with_current_thread_runtime) scenario. The difference is that we still have worker_thread to execute this async task. I have updated the comments for both as below:

`#[tokio::test(flavor = "multi_thread", worker_threads = 1)]`:

// This test uses a multi-threaded runtime setup with a single worker thread. Note that even
// though only one worker thread is created, it is distinct from the main thread. The processor
// emits a log event, and the exporter performs an async operation that requires the runtime.
// The single worker thread handles this operation without deadlocking, as long as no other
// tasks occupy the runtime.

`#[tokio::test(flavor = "multi_thread")]`

// This test uses a multi-threaded runtime setup with the default number of worker threads.
// The processor emits a log event, and the exporter, which requires the runtime for its async
// operations, can access one of the available worker threads to complete its task. As there
// are multiple threads, the exporter can proceed without blocking other tasks, ensuring the
// test completes successfully.

#[tokio::test(flavor = "current_thread")]
#[ignore]
// This test uses a current-thread runtime, where all operations run on the main thread.
// The processor emits a log event while the runtime is blocked using `futures::block_on`
// to complete the export operation. The exporter, which performs an async operation and
// requires the runtime, cannot progress because the main thread is already blocked.
// This results in a deadlock, as the runtime cannot move forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was this test works only because we execute processor.emit on the same original thread. If we were to do that as a spawned task this would have hung as we would have blocked the only worker thread we have by calling futures_executor::block_on.

This code with just a single worker thread will hang:

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_async_exporter_with_runtime() {
        let exporter = LogExporterThatRequiresTokio::new();
        let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

        let mut record: LogRecord = Default::default();
        let instrumentation: InstrumentationLibrary = Default::default();

        let handle =  tokio::spawn(async move {
            processor.emit(&mut record, &instrumentation);
        });

        handle.await.unwrap();

        // processor.emit(&mut record, &instrumentation);

        assert_eq!(exporter.len(), 1);
    }

Copy link
Member Author

@lalitb lalitb Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I think I got your point now. I believe the scenario you indicated above is similar to our "multi_thread" with multiple workers unit-test., where we do tokio::spawn. Hopefully above document is clear now.

}

#[test]
fn test_simple_processor_sync_exporter_without_runtime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can write a helper and call them for all the cominations:

  1. Normal test
  2. Tokio test - current thread
  3. Tokio test - multi-thread
  4. Tokio test - multi-thread with thread=1

we can eventually prepare a test matrix.md file for quick visualization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good to have a test matrix. Will do it after adding tests for batch.

#[ignore]
// This test demonstrates a potential deadlock scenario in a multi-threaded Tokio runtime.
// It spawns Tokio tasks equal to the number of runtime worker threads (4) to emit log events.
// Each task attempts to acquire a mutex on the SimpleLogProcessor. Only one task obtains the lock,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use tracing with tracing-appender, then we can more closely mimic user scenario. The test acquiring lock on SimpleProcessor, while simulates things, is not really what end users does.

maybe a follow up is to write this test using tracing macros to simulate user scenarios.

Copy link
Member

@cijothomas cijothomas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding coverage for all the combinations!
There are some comments left, which I think can be addressed via followups, as needed.


#[derive(Debug, Clone)]
struct LogExporterThatRequiresTokio {
export_count: Arc<AtomicU32>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arc needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, Atomic is not clonable, and need to keep the clone of exporter to check the count later.

@lalitb lalitb merged commit ea4b5e4 into open-telemetry:main Oct 21, 2024
24 of 25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

integration tests Run integration tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants