diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index b641d1bad05..1a6202070b6 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -5,7 +5,9 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.internal.DaemonThreadFactory; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,7 +37,15 @@ public static Dispatcher newDispatcher() { 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new DaemonThreadFactory("okhttp-dispatch", propagateContextForTestingInDispatcher))); + createThreadFactory("okhttp-dispatch"))); + } + + private static DaemonThreadFactory createThreadFactory(String namePrefix) { + if (propagateContextForTestingInDispatcher) { + return new DaemonThreadFactory( + namePrefix, r -> Executors.defaultThreadFactory().newThread(Context.current().wrap(r))); + } + return new DaemonThreadFactory(namePrefix); } private OkHttpUtil() {} diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java index e8f75abe40f..a4e0fad06a8 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java @@ -5,10 +5,10 @@ package io.opentelemetry.sdk.internal; -import io.opentelemetry.context.Context; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; /** * A {@link ThreadFactory} that delegates to {@code Executors.defaultThreadFactory()} and marks all @@ -20,31 +20,28 @@ public final class DaemonThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger counter = new AtomicInteger(); - private final ThreadFactory delegate = Executors.defaultThreadFactory(); - private final boolean propagateContextForTesting; + private final ThreadFactory delegate; public DaemonThreadFactory(String namePrefix) { - this(namePrefix, /* propagateContextForTesting= */ false); + this(namePrefix, Executors.defaultThreadFactory()); } /** * {@link DaemonThreadFactory}'s constructor. * * @param namePrefix Used when setting the new thread's name. - * @param propagateContextForTesting For tests only. When enabled, the current thread's {@link - * Context} will be passed over to the new threads, this is useful for validating scenarios - * where context propagation is available through bytecode instrumentation. + * @param delegate Delegate to create threads. */ - public DaemonThreadFactory(String namePrefix, boolean propagateContextForTesting) { + public DaemonThreadFactory(String namePrefix, ThreadFactory delegate) { this.namePrefix = namePrefix; - this.propagateContextForTesting = propagateContextForTesting; + this.delegate = delegate; } @Override public Thread newThread(Runnable runnable) { - Thread t = - delegate.newThread( - propagateContextForTesting ? Context.current().wrap(runnable) : runnable); + Thread t = delegate.newThread(runnable); + t.setUncaughtExceptionHandler( + new ManagedUncaughtExceptionHandler(t.getUncaughtExceptionHandler())); try { t.setDaemon(true); t.setName(namePrefix + "-" + counter.incrementAndGet()); @@ -53,4 +50,22 @@ public Thread newThread(Runnable runnable) { } return t; } + + static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + @Nullable private final Thread.UncaughtExceptionHandler delegate; + + private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandler delegate) { + this.delegate = delegate; + } + + @SuppressWarnings("Interruption") + @Override + public void uncaughtException(Thread t, Throwable e) { + if (e instanceof InterruptedException) { + t.interrupt(); + } else if (delegate != null) { + delegate.uncaughtException(t, e); + } + } + } } diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java new file mode 100644 index 00000000000..d73c8211d99 --- /dev/null +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/internal/DaemonThreadFactoryTest.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +class DaemonThreadFactoryTest { + + @Test + void verifyUncaughtExceptions() + throws ExecutionException, InterruptedException, TimeoutException { + Thread.UncaughtExceptionHandler defaultHandler = mock(); + ThreadFactory delegateFactory = + r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setUncaughtExceptionHandler(defaultHandler); + return thread; + }; + ExecutorService service = + Executors.newSingleThreadExecutor(new DaemonThreadFactory("test", delegateFactory)); + Callable callable = + () -> { + Thread.UncaughtExceptionHandler uncaughtExceptionHandler = + Thread.currentThread().getUncaughtExceptionHandler(); + + assertThat(uncaughtExceptionHandler) + .isInstanceOf(DaemonThreadFactory.ManagedUncaughtExceptionHandler.class); + + Thread threadMock = mock(); + + // Verify interrupted exception + uncaughtExceptionHandler.uncaughtException(threadMock, new InterruptedException()); + verify(threadMock).interrupt(); + verifyNoInteractions(defaultHandler); + + // Verify delegate exception + clearInvocations(threadMock, defaultHandler); + IllegalStateException e = new IllegalStateException(); + uncaughtExceptionHandler.uncaughtException(threadMock, e); + verify(defaultHandler).uncaughtException(threadMock, e); + return true; + }; + + assertThat(service.submit(callable).get(5, TimeUnit.SECONDS)).isTrue(); + } +}