Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -35,7 +37,15 @@ public static Dispatcher newDispatcher() {
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DaemonThreadFactory("okhttp-dispatch", propagateContextForTestingInDispatcher)));
createThreadFactory("okhttp-dispatch")));
}

private static DaemonThreadFactory createThreadFactory(String namePrefix) {
if (propagateContextForTestingInDispatcher) {
return new DaemonThreadFactory(
namePrefix, r -> Executors.defaultThreadFactory().newThread(Context.current().wrap(r)));
}
return new DaemonThreadFactory(namePrefix);
}

private OkHttpUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package io.opentelemetry.sdk.internal;

import io.opentelemetry.context.Context;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
* A {@link ThreadFactory} that delegates to {@code Executors.defaultThreadFactory()} and marks all
Expand All @@ -20,31 +20,28 @@
public final class DaemonThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger counter = new AtomicInteger();
private final ThreadFactory delegate = Executors.defaultThreadFactory();
private final boolean propagateContextForTesting;
private final ThreadFactory delegate;

public DaemonThreadFactory(String namePrefix) {
this(namePrefix, /* propagateContextForTesting= */ false);
this(namePrefix, Executors.defaultThreadFactory());
}

/**
* {@link DaemonThreadFactory}'s constructor.
*
* @param namePrefix Used when setting the new thread's name.
* @param propagateContextForTesting For tests only. When enabled, the current thread's {@link
* Context} will be passed over to the new threads, this is useful for validating scenarios
* where context propagation is available through bytecode instrumentation.
* @param delegate Delegate to create threads.
*/
public DaemonThreadFactory(String namePrefix, boolean propagateContextForTesting) {
public DaemonThreadFactory(String namePrefix, ThreadFactory delegate) {
this.namePrefix = namePrefix;
this.propagateContextForTesting = propagateContextForTesting;
this.delegate = delegate;
}

@Override
public Thread newThread(Runnable runnable) {
Thread t =
delegate.newThread(
propagateContextForTesting ? Context.current().wrap(runnable) : runnable);
Thread t = delegate.newThread(runnable);
t.setUncaughtExceptionHandler(
new ManagedUncaughtExceptionHandler(t.getUncaughtExceptionHandler()));
try {
t.setDaemon(true);
t.setName(namePrefix + "-" + counter.incrementAndGet());
Expand All @@ -53,4 +50,22 @@ public Thread newThread(Runnable runnable) {
}
return t;
}

static class ManagedUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Nullable private final Thread.UncaughtExceptionHandler delegate;

private ManagedUncaughtExceptionHandler(@Nullable Thread.UncaughtExceptionHandler delegate) {
this.delegate = delegate;
}

@SuppressWarnings("Interruption")
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof InterruptedException) {
t.interrupt();
} else if (delegate != null) {
delegate.uncaughtException(t, e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;

class DaemonThreadFactoryTest {

@Test
void verifyUncaughtExceptions()
throws ExecutionException, InterruptedException, TimeoutException {
Thread.UncaughtExceptionHandler defaultHandler = mock();
ThreadFactory delegateFactory =
r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setUncaughtExceptionHandler(defaultHandler);
return thread;
};
ExecutorService service =
Executors.newSingleThreadExecutor(new DaemonThreadFactory("test", delegateFactory));
Runnable runnable =
() -> {
Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
Thread.currentThread().getUncaughtExceptionHandler();

assertThat(uncaughtExceptionHandler)
.isInstanceOf(DaemonThreadFactory.ManagedUncaughtExceptionHandler.class);

Thread threadMock = mock();

// Verify interrupted exception
uncaughtExceptionHandler.uncaughtException(threadMock, new InterruptedException());
verify(threadMock).interrupt();
verifyNoInteractions(defaultHandler);

// Verify delegate exception
clearInvocations(threadMock, defaultHandler);
IllegalStateException e = new IllegalStateException();
uncaughtExceptionHandler.uncaughtException(threadMock, e);
verify(defaultHandler).uncaughtException(threadMock, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that doing all assertions/verifications in an async block when the main test body doesn't also verify the execution is a code smell.

I think there could be something at the end of the Runnable (AtomicBoolean, CountdownLatch, etc) that you can then verify in the main thread. That way the reader can easily determine that the runnable completed which means the assertions all ran and passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I've added some changes to address this, cheers!

};

service.submit(runnable).get(5, TimeUnit.SECONDS);
}
}
Loading