From 7aae7eea2e79ba2e8f623463b63051f0d2b81b72 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 13 Aug 2025 10:53:07 +0200 Subject: [PATCH 1/5] Suppressing interrupted exceptions from managed okhttp threads --- .../sdk/internal/DaemonThreadFactory.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java index e8f75abe40f..7418c06ff9c 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java @@ -9,6 +9,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; /** * A {@link ThreadFactory} that delegates to {@code Executors.defaultThreadFactory()} and marks all @@ -45,6 +46,8 @@ public Thread newThread(Runnable runnable) { Thread t = delegate.newThread( propagateContextForTesting ? Context.current().wrap(runnable) : runnable); + t.setUncaughtExceptionHandler( + new ManagedUncaughtExceptionHandler(t.getUncaughtExceptionHandler())); try { t.setDaemon(true); t.setName(namePrefix + "-" + counter.incrementAndGet()); @@ -53,4 +56,24 @@ public Thread newThread(Runnable runnable) { } return t; } + + private static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + @Nullable private final Thread.UncaughtExceptionHandler delegate; + + private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandler delegate) { + this.delegate = delegate; + } + + @SuppressWarnings({"Interruption", "ThrowSpecificExceptions"}) + @Override + public void uncaughtException(Thread t, Throwable e) { + if (e instanceof InterruptedException) { + t.interrupt(); + } else if (delegate != null) { + delegate.uncaughtException(t, e); + } else { + throw new RuntimeException(e); + } + } + } } From 5510d7e40b5c274e8601e8b3fe73b5ae3acf6da0 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 13 Aug 2025 14:24:36 +0200 Subject: [PATCH 2/5] Verifying managed thread uncaught exception handler --- .../sender/okhttp/internal/OkHttpUtil.java | 12 +++- .../sdk/internal/DaemonThreadFactory.java | 22 +++----- .../sdk/internal/DaemonThreadFactoryTest.java | 55 +++++++++++++++++++ 3 files changed, 73 insertions(+), 16 deletions(-) create mode 100644 sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index b641d1bad05..1a6202070b6 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -5,7 +5,9 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.internal.DaemonThreadFactory; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,7 +37,15 @@ public static Dispatcher newDispatcher() { 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new DaemonThreadFactory("okhttp-dispatch", propagateContextForTestingInDispatcher))); + createThreadFactory("okhttp-dispatch"))); + } + + private static DaemonThreadFactory createThreadFactory(String namePrefix) { + if (propagateContextForTestingInDispatcher) { + return new DaemonThreadFactory( + namePrefix, r -> Executors.defaultThreadFactory().newThread(Context.current().wrap(r))); + } + return new DaemonThreadFactory(namePrefix); } private OkHttpUtil() {} diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java index 7418c06ff9c..61dbc03d74c 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java @@ -5,7 +5,6 @@ package io.opentelemetry.sdk.internal; -import io.opentelemetry.context.Context; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -21,31 +20,26 @@ public final class DaemonThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger counter = new AtomicInteger(); - private final ThreadFactory delegate = Executors.defaultThreadFactory(); - private final boolean propagateContextForTesting; + private final ThreadFactory delegate; public DaemonThreadFactory(String namePrefix) { - this(namePrefix, /* propagateContextForTesting= */ false); + this(namePrefix, Executors.defaultThreadFactory()); } /** * {@link DaemonThreadFactory}'s constructor. * * @param namePrefix Used when setting the new thread's name. - * @param propagateContextForTesting For tests only. When enabled, the current thread's {@link - * Context} will be passed over to the new threads, this is useful for validating scenarios - * where context propagation is available through bytecode instrumentation. + * @param delegate Delegate to create threads. */ - public DaemonThreadFactory(String namePrefix, boolean propagateContextForTesting) { + public DaemonThreadFactory(String namePrefix, ThreadFactory delegate) { this.namePrefix = namePrefix; - this.propagateContextForTesting = propagateContextForTesting; + this.delegate = delegate; } @Override public Thread newThread(Runnable runnable) { - Thread t = - delegate.newThread( - propagateContextForTesting ? Context.current().wrap(runnable) : runnable); + Thread t = delegate.newThread(runnable); t.setUncaughtExceptionHandler( new ManagedUncaughtExceptionHandler(t.getUncaughtExceptionHandler())); try { @@ -57,7 +51,7 @@ public Thread newThread(Runnable runnable) { return t; } - private static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Nullable private final Thread.UncaughtExceptionHandler delegate; private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandler delegate) { @@ -71,8 +65,6 @@ public void uncaughtException(Thread t, Throwable e) { t.interrupt(); } else if (delegate != null) { delegate.uncaughtException(t, e); - } else { - throw new RuntimeException(e); } } } diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java new file mode 100644 index 00000000000..b5ae5b9deda --- /dev/null +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java @@ -0,0 +1,55 @@ +package io.opentelemetry.sdk.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +class DaemonThreadFactoryTest { + + @Test + void verifyUncaughtExceptions() + throws ExecutionException, InterruptedException, TimeoutException { + Thread.UncaughtExceptionHandler defaultHandler = mock(); + ThreadFactory delegateFactory = + r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setUncaughtExceptionHandler(defaultHandler); + return thread; + }; + ExecutorService service = + Executors.newSingleThreadExecutor(new DaemonThreadFactory("test", delegateFactory)); + Runnable runnable = + () -> { + Thread.UncaughtExceptionHandler uncaughtExceptionHandler = + Thread.currentThread().getUncaughtExceptionHandler(); + + assertThat(uncaughtExceptionHandler) + .isInstanceOf(DaemonThreadFactory.ManagedUncaughtExceptionHandler.class); + + Thread threadMock = mock(); + + // Verify interrupted exception + uncaughtExceptionHandler.uncaughtException(threadMock, new InterruptedException()); + verify(threadMock).interrupt(); + verifyNoInteractions(defaultHandler); + + // Verify delegate exception + clearInvocations(threadMock, defaultHandler); + IllegalStateException e = new IllegalStateException(); + uncaughtExceptionHandler.uncaughtException(threadMock, e); + verify(defaultHandler).uncaughtException(threadMock, e); + }; + + service.submit(runnable).get(5, TimeUnit.SECONDS); + } +} From e3e6d64621aa533968c114e1ba548ae88a3c30d3 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 13 Aug 2025 14:26:13 +0200 Subject: [PATCH 3/5] Clean up --- .../java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java index 61dbc03d74c..a4e0fad06a8 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java @@ -58,7 +58,7 @@ private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandle this.delegate = delegate; } - @SuppressWarnings({"Interruption", "ThrowSpecificExceptions"}) + @SuppressWarnings("Interruption") @Override public void uncaughtException(Thread t, Throwable e) { if (e instanceof InterruptedException) { From 3619d4a8997323d7ad750b1fc89dd4c29b0d53b8 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 13 Aug 2025 14:29:54 +0200 Subject: [PATCH 4/5] Spotless --- .../opentelemetry/sdk/internal/DaemonThreadFactoryTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java index b5ae5b9deda..adf8acec8f1 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.sdk.internal; import static org.assertj.core.api.Assertions.assertThat; From a27bf65b75c2a5f7acb6017a4160de62a6ccc0d9 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Mon, 18 Aug 2025 09:47:09 +0200 Subject: [PATCH 5/5] Validating that submitted work was executed in daemon thread factory tests --- .../opentelemetry/sdk/internal/DaemonThreadFactoryTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java index adf8acec8f1..d73c8211d99 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -33,7 +34,7 @@ void verifyUncaughtExceptions() }; ExecutorService service = Executors.newSingleThreadExecutor(new DaemonThreadFactory("test", delegateFactory)); - Runnable runnable = + Callable callable = () -> { Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.currentThread().getUncaughtExceptionHandler(); @@ -53,8 +54,9 @@ void verifyUncaughtExceptions() IllegalStateException e = new IllegalStateException(); uncaughtExceptionHandler.uncaughtException(threadMock, e); verify(defaultHandler).uncaughtException(threadMock, e); + return true; }; - service.submit(runnable).get(5, TimeUnit.SECONDS); + assertThat(service.submit(callable).get(5, TimeUnit.SECONDS)).isTrue(); } }