Skip to content

Commit d34fc34

Browse files
authored
Suppressing interrupted exceptions from managed okhttp threads (#7565)
1 parent 35cc474 commit d34fc34

File tree

3 files changed

+100
-13
lines changed

3 files changed

+100
-13
lines changed

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package io.opentelemetry.exporter.sender.okhttp.internal;
77

8+
import io.opentelemetry.context.Context;
89
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
10+
import java.util.concurrent.Executors;
911
import java.util.concurrent.SynchronousQueue;
1012
import java.util.concurrent.ThreadPoolExecutor;
1113
import java.util.concurrent.TimeUnit;
@@ -35,7 +37,15 @@ public static Dispatcher newDispatcher() {
3537
60,
3638
TimeUnit.SECONDS,
3739
new SynchronousQueue<>(),
38-
new DaemonThreadFactory("okhttp-dispatch", propagateContextForTestingInDispatcher)));
40+
createThreadFactory("okhttp-dispatch")));
41+
}
42+
43+
private static DaemonThreadFactory createThreadFactory(String namePrefix) {
44+
if (propagateContextForTestingInDispatcher) {
45+
return new DaemonThreadFactory(
46+
namePrefix, r -> Executors.defaultThreadFactory().newThread(Context.current().wrap(r)));
47+
}
48+
return new DaemonThreadFactory(namePrefix);
3949
}
4050

4151
private OkHttpUtil() {}

sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
package io.opentelemetry.sdk.internal;
77

8-
import io.opentelemetry.context.Context;
98
import java.util.concurrent.Executors;
109
import java.util.concurrent.ThreadFactory;
1110
import java.util.concurrent.atomic.AtomicInteger;
11+
import javax.annotation.Nullable;
1212

1313
/**
1414
* A {@link ThreadFactory} that delegates to {@code Executors.defaultThreadFactory()} and marks all
@@ -20,31 +20,28 @@
2020
public final class DaemonThreadFactory implements ThreadFactory {
2121
private final String namePrefix;
2222
private final AtomicInteger counter = new AtomicInteger();
23-
private final ThreadFactory delegate = Executors.defaultThreadFactory();
24-
private final boolean propagateContextForTesting;
23+
private final ThreadFactory delegate;
2524

2625
public DaemonThreadFactory(String namePrefix) {
27-
this(namePrefix, /* propagateContextForTesting= */ false);
26+
this(namePrefix, Executors.defaultThreadFactory());
2827
}
2928

3029
/**
3130
* {@link DaemonThreadFactory}'s constructor.
3231
*
3332
* @param namePrefix Used when setting the new thread's name.
34-
* @param propagateContextForTesting For tests only. When enabled, the current thread's {@link
35-
* Context} will be passed over to the new threads, this is useful for validating scenarios
36-
* where context propagation is available through bytecode instrumentation.
33+
* @param delegate Delegate to create threads.
3734
*/
38-
public DaemonThreadFactory(String namePrefix, boolean propagateContextForTesting) {
35+
public DaemonThreadFactory(String namePrefix, ThreadFactory delegate) {
3936
this.namePrefix = namePrefix;
40-
this.propagateContextForTesting = propagateContextForTesting;
37+
this.delegate = delegate;
4138
}
4239

4340
@Override
4441
public Thread newThread(Runnable runnable) {
45-
Thread t =
46-
delegate.newThread(
47-
propagateContextForTesting ? Context.current().wrap(runnable) : runnable);
42+
Thread t = delegate.newThread(runnable);
43+
t.setUncaughtExceptionHandler(
44+
new ManagedUncaughtExceptionHandler(t.getUncaughtExceptionHandler()));
4845
try {
4946
t.setDaemon(true);
5047
t.setName(namePrefix + "-" + counter.incrementAndGet());
@@ -53,4 +50,22 @@ public Thread newThread(Runnable runnable) {
5350
}
5451
return t;
5552
}
53+
54+
static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
55+
@Nullable private final Thread.UncaughtExceptionHandler delegate;
56+
57+
private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandler delegate) {
58+
this.delegate = delegate;
59+
}
60+
61+
@SuppressWarnings("Interruption")
62+
@Override
63+
public void uncaughtException(Thread t, Throwable e) {
64+
if (e instanceof InterruptedException) {
65+
t.interrupt();
66+
} else if (delegate != null) {
67+
delegate.uncaughtException(t, e);
68+
}
69+
}
70+
}
5671
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.internal;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.mockito.Mockito.clearInvocations;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.verifyNoInteractions;
13+
14+
import java.util.concurrent.Callable;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ThreadFactory;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.TimeoutException;
21+
import org.junit.jupiter.api.Test;
22+
23+
class DaemonThreadFactoryTest {
24+
25+
@Test
26+
void verifyUncaughtExceptions()
27+
throws ExecutionException, InterruptedException, TimeoutException {
28+
Thread.UncaughtExceptionHandler defaultHandler = mock();
29+
ThreadFactory delegateFactory =
30+
r -> {
31+
Thread thread = Executors.defaultThreadFactory().newThread(r);
32+
thread.setUncaughtExceptionHandler(defaultHandler);
33+
return thread;
34+
};
35+
ExecutorService service =
36+
Executors.newSingleThreadExecutor(new DaemonThreadFactory("test", delegateFactory));
37+
Callable<Boolean> callable =
38+
() -> {
39+
Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
40+
Thread.currentThread().getUncaughtExceptionHandler();
41+
42+
assertThat(uncaughtExceptionHandler)
43+
.isInstanceOf(DaemonThreadFactory.ManagedUncaughtExceptionHandler.class);
44+
45+
Thread threadMock = mock();
46+
47+
// Verify interrupted exception
48+
uncaughtExceptionHandler.uncaughtException(threadMock, new InterruptedException());
49+
verify(threadMock).interrupt();
50+
verifyNoInteractions(defaultHandler);
51+
52+
// Verify delegate exception
53+
clearInvocations(threadMock, defaultHandler);
54+
IllegalStateException e = new IllegalStateException();
55+
uncaughtExceptionHandler.uncaughtException(threadMock, e);
56+
verify(defaultHandler).uncaughtException(threadMock, e);
57+
return true;
58+
};
59+
60+
assertThat(service.submit(callable).get(5, TimeUnit.SECONDS)).isTrue();
61+
}
62+
}

0 commit comments

Comments
 (0)