Skip to content

Commit c9f32d2

Browse files
committed
Adopt the new interruptible leaveAndEnter()
1 parent f4ec62e commit c9f32d2

File tree

2 files changed

+44
-72
lines changed

2 files changed

+44
-72
lines changed

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.util.concurrent.CountDownLatch;
1313

14+
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
1415
import org.truffleruby.core.fiber.RubyFiber.FiberStatus;
1516

1617
import com.oracle.truffle.api.TruffleContext;
@@ -64,27 +65,38 @@ public void initialize(RubyFiber fiber, boolean blocking, RubyProc block, Node c
6465

6566
final TruffleContext truffleContext = context.getEnv().getContext();
6667

67-
context.getThreadManager().leaveAndEnter(truffleContext, currentNode, () -> {
68+
truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT, (unused) -> {
6869
ThreadManager threadManager = context.getThreadManager();
6970
Thread thread = threadManager.createFiberJavaThread(fiber, sourceSection,
7071
() -> beforeEnter(fiber, currentNode),
7172
() -> fiberMain(context, fiber, block, currentNode),
7273
() -> afterLeave(fiber), currentNode);
7374
thread.start();
74-
waitForInitialization(context, fiber, currentNode);
75+
waitForInitializationUnentered(context, fiber, currentNode);
7576
return BlockingAction.SUCCESS;
76-
});
77+
}, null);
7778
}
7879

7980
/** Wait for full initialization of the new fiber */
80-
public static void waitForInitialization(RubyContext context, RubyFiber fiber, Node currentNode) {
81+
public static void waitForInitializationEntered(RubyContext context, RubyFiber fiber, Node currentNode) {
82+
assert context.getEnv().getContext().isEntered();
8183
final CountDownLatch initializedLatch = fiber.initializedLatch;
8284

83-
if (context.getEnv().getContext().isEntered()) {
84-
context.getThreadManager().runUntilResultKeepStatus(currentNode, CountDownLatch::await, initializedLatch);
85-
} else {
86-
context.getThreadManager().retryWhileInterrupted(currentNode, CountDownLatch::await, initializedLatch);
85+
context.getThreadManager().runUntilResultKeepStatus(currentNode, CountDownLatch::await, initializedLatch);
86+
87+
final Throwable uncaughtException = fiber.uncaughtException;
88+
if (uncaughtException != null) {
89+
ExceptionOperations.rethrow(uncaughtException);
8790
}
91+
}
92+
93+
/** Wait for full initialization of the new fiber */
94+
public static void waitForInitializationUnentered(RubyContext context, RubyFiber fiber, Node currentNode)
95+
throws InterruptedException {
96+
assert !context.getEnv().getContext().isEntered();
97+
final CountDownLatch initializedLatch = fiber.initializedLatch;
98+
99+
initializedLatch.await();
88100

89101
final Throwable uncaughtException = fiber.uncaughtException;
90102
if (uncaughtException != null) {
@@ -104,7 +116,12 @@ private void beforeEnter(RubyFiber fiber, Node currentNode) {
104116
// fully initialized
105117
fiber.initializedLatch.countDown();
106118

107-
fiber.firstMessage = waitMessage(fiber, currentNode);
119+
try {
120+
fiber.firstMessage = waitMessage(fiber, currentNode);
121+
} catch (InterruptedException e) { // TODO ideally we would let this pop out of beforeEnter()
122+
Thread.currentThread().interrupt();
123+
return;
124+
}
108125

109126
// enter() polls so we need the current Fiber to be set before enter()
110127
fiber.rubyThread.setCurrentFiber(fiber);
@@ -113,6 +130,10 @@ private void beforeEnter(RubyFiber fiber, Node currentNode) {
113130
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
114131
final FiberMessage message = fiber.firstMessage;
115132
fiber.firstMessage = null;
133+
if (message == null) {
134+
TruffleSafepoint.poll(currentNode);
135+
throw CompilerDirectives.shouldNotReachHere("null Fiber message and no cancellation");
136+
}
116137

117138
FiberMessage lastMessage = null;
118139
try {
@@ -196,24 +217,9 @@ private void addToMessageQueue(RubyFiber fiber, FiberMessage message) {
196217

197218
/** Send the Java thread that represents this fiber to sleep until it receives a message. */
198219
@TruffleBoundary
199-
private FiberMessage waitMessage(RubyFiber fiber, Node currentNode) {
220+
private FiberMessage waitMessage(RubyFiber fiber, Node currentNode) throws InterruptedException {
200221
assertNotEntered("should have left context while waiting fiber message");
201-
202-
class State {
203-
final RubyFiber fiber;
204-
FiberMessage message;
205-
206-
State(RubyFiber fiber) {
207-
this.fiber = fiber;
208-
}
209-
}
210-
211-
final State state = new State(fiber);
212-
context.getThreadManager().retryWhileInterrupted(
213-
currentNode,
214-
s -> s.message = s.fiber.messageQueue.take(),
215-
state);
216-
return state.message;
222+
return fiber.messageQueue.take();
217223
}
218224

219225
private void assertNotEntered(String reason) {
@@ -300,25 +306,23 @@ public DescriptorAndArgs transferControlTo(RubyFiber fromFiber, RubyFiber toFibe
300306
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber toFiber, FiberOperation operation,
301307
ArgumentsDescriptor descriptor, Object[] args, Node currentNode) {
302308
final TruffleContext truffleContext = context.getEnv().getContext();
303-
final FiberMessage message = context
304-
.getThreadManager()
305-
.leaveAndEnter(truffleContext, currentNode, () -> {
309+
final FiberMessage message = truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT,
310+
(unused) -> {
306311
resume(fromFiber, toFiber, operation, descriptor, args);
307312
return waitMessage(fromFiber, currentNode);
308-
});
313+
}, null);
309314
fromFiber.rubyThread.setCurrentFiber(fromFiber);
310315
return message;
311316
}
312317

313318
@TruffleBoundary
314319
public void safepoint(RubyFiber fromFiber, RubyFiber fiber, SafepointAction action, Node currentNode) {
315320
final TruffleContext truffleContext = context.getEnv().getContext();
316-
final FiberResumeMessage returnMessage = (FiberResumeMessage) context
317-
.getThreadManager()
318-
.leaveAndEnter(truffleContext, currentNode, () -> {
321+
final FiberResumeMessage returnMessage = (FiberResumeMessage) truffleContext.leaveAndEnter(currentNode,
322+
Interrupter.THREAD_INTERRUPT, (unused) -> {
319323
addToMessageQueue(fiber, new FiberSafepointMessage(fromFiber, action));
320324
return waitMessage(fromFiber, currentNode);
321-
});
325+
}, null);
322326
fromFiber.rubyThread.setCurrentFiber(fromFiber);
323327

324328
if (returnMessage.getArgs() != SAFEPOINT_ARGS) {
@@ -366,26 +370,23 @@ public void killOtherFibers(RubyThread thread) {
366370
boolean allowSideEffects = safepoint.setAllowSideEffects(false);
367371
try {
368372
final TruffleContext truffleContext = context.getEnv().getContext();
369-
context.getThreadManager().leaveAndEnter(truffleContext, DummyNode.INSTANCE, () -> {
373+
truffleContext.leaveAndEnter(DummyNode.INSTANCE, Interrupter.THREAD_INTERRUPT, (unused) -> {
370374
doKillOtherFibers(thread);
371375
return BlockingAction.SUCCESS;
372-
});
376+
}, null);
373377
} finally {
374378
safepoint.setAllowSideEffects(allowSideEffects);
375379
}
376380
}
377381

378-
private void doKillOtherFibers(RubyThread thread) {
382+
private void doKillOtherFibers(RubyThread thread) throws InterruptedException {
379383
for (RubyFiber fiber : thread.runningFibers) {
380384
if (!fiber.isRootFiber()) {
381385
addToMessageQueue(fiber, new FiberShutdownMessage());
382386

383387
// Wait for the Fiber to finish so we only run one Fiber at a time
384388
final CountDownLatch finishedLatch = fiber.finishedLatch;
385-
context.getThreadManager().retryWhileInterrupted(
386-
DummyNode.INSTANCE,
387-
CountDownLatch::await,
388-
finishedLatch);
389+
finishedLatch.await();
389390

390391
final Throwable uncaughtException = fiber.uncaughtException;
391392
if (uncaughtException != null) {

src/main/java/org/truffleruby/core/thread/ThreadManager.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.oracle.truffle.api.CompilerAsserts;
2424
import com.oracle.truffle.api.CompilerDirectives;
2525
import com.oracle.truffle.api.CompilerDirectives.ValueType;
26-
import com.oracle.truffle.api.TruffleContext;
2726
import com.oracle.truffle.api.TruffleOptions;
2827
import com.oracle.truffle.api.TruffleSafepoint;
2928
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
@@ -321,7 +320,7 @@ public void initialize(RubyThread rubyThread, Node currentNode, String info, Str
321320
thread.start();
322321

323322
// Must not leave the context here, to perform safepoint actions, if e.g. the new thread Thread#raise this one
324-
FiberManager.waitForInitialization(context, rootFiber, currentNode);
323+
FiberManager.waitForInitializationEntered(context, rootFiber, currentNode);
325324
}
326325

327326
/** {@link RubyLanguage#initializeThread(RubyContext, Thread)} runs before this, and
@@ -504,36 +503,8 @@ public interface BlockingAction<T> {
504503
T block() throws InterruptedException;
505504
}
506505

507-
public <T> T leaveAndEnter(TruffleContext truffleContext, Node currentNode, Supplier<T> runWhileOutsideContext) {
508-
assert truffleContext.isEntered();
509-
return truffleContext.leaveAndEnter(currentNode, runWhileOutsideContext);
510-
}
511-
512-
/** Only use when the context is not entered. */
513-
@TruffleBoundary
514-
public <T> void retryWhileInterrupted(Node currentNode, TruffleSafepoint.Interruptible<T> interruptible, T object) {
515-
assert !context.getEnv().getContext().isEntered() : "Use runUntilResult*() when entered";
516-
boolean interrupted = false;
517-
try {
518-
while (true) {
519-
try {
520-
interruptible.apply(object);
521-
break;
522-
} catch (InterruptedException e) {
523-
interrupted = true;
524-
// retry
525-
}
526-
}
527-
} finally {
528-
if (interrupted) {
529-
Thread.currentThread().interrupt();
530-
}
531-
}
532-
}
533-
534506
@TruffleBoundary
535-
public <T> void runUntilResultKeepStatus(Node currentNode, TruffleSafepoint.Interruptible<T> action,
536-
T object) {
507+
public <T> void runUntilResultKeepStatus(Node currentNode, TruffleSafepoint.Interruptible<T> action, T object) {
537508
assert context.getEnv().getContext().isEntered() : "Use retryWhileInterrupted() when not entered";
538509
TruffleSafepoint.setBlockedThreadInterruptible(currentNode, action, object);
539510
}

0 commit comments

Comments
 (0)