Skip to content

Commit b999335

Browse files
committed
Virtual threads to isolate ThreadLocals in hooks.
There's something kind of funny about going to great lengths to use our Reactive Breadth First Search algorithm to achieve the hook ordering we want without using a background thread, and then right at the last moment, spawning a thread to run each hook anyway. I'm leaving it this way for the time being because it has certain desirable properties: - The hook queue maintains the desired order. In contrast, if we simply submitted the hooks to an executor, all bets are off. - Only one hook runs at a time. There's a happens-before relationship between all the hooks. In effect, since we're running completely synchronously with the virtual thread, using it as a kind of structured concurrency sans parallelism, we're actually just using the virtual thread mechansim as a utility to isolate ThreadLocals and nothing else. Anyway, this is a step closer to the semantics we want, and we can refine the implementation later if we can devise a simpler way to achieve all the desired properties.
1 parent 9379c22 commit b999335

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

bosk-core/src/main/java/works/bosk/Bosk.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import java.util.Queue;
1212
import java.util.concurrent.ConcurrentLinkedDeque;
1313
import java.util.concurrent.ConcurrentLinkedQueue;
14+
import java.util.concurrent.ExecutionException;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
1417
import java.util.concurrent.Semaphore;
1518
import java.util.concurrent.atomic.AtomicReference;
1619
import java.util.function.BiConsumer;
@@ -89,6 +92,7 @@ public class Bosk<R extends StateTreeNode> implements BoskInfo<R> {
8992
private final RootRef rootRef;
9093
private final ThreadLocal<R> rootSnapshot = new ThreadLocal<>();
9194
private final Queue<HookRegistration<?>> hooks = new ConcurrentLinkedQueue<>();
95+
private final ExecutorService hookExecutor = Executors.newVirtualThreadPerTaskExecutor();
9296
private final PathCompiler pathCompiler;
9397

9498
// Mutable state
@@ -542,8 +546,23 @@ private void drainQueueIfAllowed() {
542546
if (hookExecutionPermit.tryAcquire()) {
543547
try {
544548
for (Runnable ex = hookExecutionQueue.pollFirst(); ex != null; ex = hookExecutionQueue.pollFirst()) {
545-
ex.run();
549+
// Run the task in a separate virtual thread to prevent ThreadLocals from propagating.
550+
// This is slightly tragic, because usually ThreadLocal propagation works just the
551+
// way we'd want, but not always. Given the choices "always, sometimes, never", if
552+
// we can't achieve "always", then the bosk philosophy prefers "never" over "sometimes".
553+
hookExecutor.submit(ex).get();
546554
}
555+
} catch (ExecutionException e) {
556+
if (e.getCause() instanceof RuntimeException r) {
557+
throw r;
558+
} else if (e.getCause() instanceof Error error) {
559+
throw error;
560+
} else {
561+
throw new AssertionError("Hook runnable should catch and wrap checked exceptions", e);
562+
}
563+
} catch (InterruptedException e) {
564+
LOGGER.warn("Interrupted while running hooks", e);
565+
return;
547566
} finally {
548567
hookExecutionPermit.release();
549568
}

bosk-core/src/test/java/works/bosk/HooksTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import java.lang.reflect.InvocationTargetException;
55
import java.util.ArrayList;
66
import java.util.List;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.LinkedBlockingQueue;
79
import java.util.concurrent.atomic.AtomicBoolean;
810
import org.junit.jupiter.api.BeforeEach;
911
import org.junit.jupiter.api.Test;
@@ -78,6 +80,22 @@ void basic_hooksRunWhenRegistered() {
7880
"Hook should fire when it's registered");
7981
}
8082

83+
@Test
84+
void basic_hooksDoNotPropagateThreadLocals() throws InterruptedException {
85+
ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "initial");
86+
threadLocal.set("updated");
87+
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
88+
bosk.registerHook("foo", bosk.rootReference(), ref -> {
89+
try {
90+
queue.put(threadLocal.get());
91+
} catch (InterruptedException e) {
92+
throw new AssertionError("Huh?", e);
93+
}
94+
});
95+
String observed = queue.take();
96+
assertEquals("initial", observed, "Thread locals should not propagate into hooks");
97+
}
98+
8199
@ParameterizedTest
82100
@EnumSource(Variant.class)
83101
void basic_noIrrelevantHooks(Variant variant) {

0 commit comments

Comments
 (0)