Skip to content

Commit 2d0fb63

Browse files
committed
Create the Fiber thread lazily on the first Fiber#{resume,transfer}
* This creates the thread later which is better for resources as there is no need for it earlier and there might be cases where a Fiber is created but never resumed. * This avoids the problem of blocking in beforeEnter() which would prevent cancellation of a Fiber thread which never received a message.
1 parent 1b287a8 commit 2d0fb63

File tree

2 files changed

+34
-13
lines changed

2 files changed

+34
-13
lines changed

src/main/java/org/truffleruby/core/fiber/FiberManager.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010
package org.truffleruby.core.fiber;
1111

12+
import java.util.Objects;
1213
import java.util.concurrent.CountDownLatch;
1314

1415
import com.oracle.truffle.api.TruffleSafepoint.Interrupter;
@@ -61,16 +62,27 @@ public FiberManager(RubyLanguage language, RubyContext context) {
6162
public void initialize(RubyFiber fiber, boolean blocking, RubyProc block, Node currentNode) {
6263
final SourceSection sourceSection = block.getSharedMethodInfo().getSourceSection();
6364
fiber.sourceLocation = context.fileLine(sourceSection);
65+
fiber.body = block;
66+
fiber.initializeNode = currentNode;
6467
fiber.blocking = blocking;
68+
}
69+
70+
private void createThreadToReceiveFirstMessage(RubyFiber fiber, Node currentNode) {
71+
assert fiber.thread == null;
72+
RubyProc block = Objects.requireNonNull(fiber.body);
73+
fiber.body = null;
74+
Node initializeNode = Objects.requireNonNull(fiber.initializeNode);
75+
fiber.initializeNode = null;
6576

77+
var sourceSection = block.getSharedMethodInfo().getSourceSection();
6678
final TruffleContext truffleContext = context.getEnv().getContext();
6779

6880
truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT, (unused) -> {
69-
ThreadManager threadManager = context.getThreadManager();
70-
Thread thread = threadManager.createFiberJavaThread(fiber, sourceSection,
71-
() -> beforeEnter(fiber, currentNode),
72-
() -> fiberMain(context, fiber, block, currentNode),
81+
Thread thread = context.getThreadManager().createFiberJavaThread(fiber, sourceSection,
82+
() -> beforeEnter(fiber, initializeNode),
83+
() -> fiberMain(context, fiber, block, initializeNode),
7384
() -> afterLeave(fiber), currentNode);
85+
fiber.thread = thread;
7486
thread.start();
7587
waitForInitializationUnentered(context, fiber, currentNode);
7688
return BlockingAction.SUCCESS;
@@ -121,22 +133,17 @@ private void beforeEnter(RubyFiber fiber, Node currentNode) {
121133

122134
try {
123135
fiber.firstMessage = waitMessage(fiber, currentNode);
124-
} catch (InterruptedException e) { // TODO ideally we would let this pop out of beforeEnter()
125-
Thread.currentThread().interrupt();
126-
return;
136+
} catch (InterruptedException e) {
137+
throw CompilerDirectives.shouldNotReachHere("unexpected interrupt in Fiber beforeEnter()");
127138
}
128139

129140
// enter() polls so we need the current Fiber to be set before enter()
130141
fiber.rubyThread.setCurrentFiber(fiber);
131142
}
132143

133144
private void fiberMain(RubyContext context, RubyFiber fiber, RubyProc block, Node currentNode) {
134-
final FiberMessage message = fiber.firstMessage;
145+
final FiberMessage message = Objects.requireNonNull(fiber.firstMessage);
135146
fiber.firstMessage = null;
136-
if (message == null) {
137-
TruffleSafepoint.poll(currentNode);
138-
throw CompilerDirectives.shouldNotReachHere("null Fiber message and no cancellation");
139-
}
140147

141148
FiberMessage lastMessage = null;
142149
try {
@@ -308,6 +315,11 @@ public DescriptorAndArgs transferControlTo(RubyFiber fromFiber, RubyFiber toFibe
308315
@TruffleBoundary
309316
private FiberMessage resumeAndWait(RubyFiber fromFiber, RubyFiber toFiber, FiberOperation operation,
310317
ArgumentsDescriptor descriptor, Object[] args, Node currentNode) {
318+
319+
if (toFiber.body != null) {
320+
context.fiberManager.createThreadToReceiveFirstMessage(toFiber, currentNode);
321+
}
322+
311323
final TruffleContext truffleContext = context.getEnv().getContext();
312324
final FiberMessage message = truffleContext.leaveAndEnter(currentNode, Interrupter.THREAD_INTERRUPT,
313325
(unused) -> {
@@ -334,7 +346,12 @@ public void safepoint(RubyFiber fromFiber, RubyFiber fiber, SafepointAction acti
334346
}
335347

336348
public void start(RubyFiber fiber, Thread javaThread) {
337-
fiber.thread = javaThread;
349+
if (fiber.isRootFiber()) {
350+
fiber.thread = javaThread;
351+
fiber.status = FiberStatus.RESUMED;
352+
} else {
353+
// fiber.thread set by createThreadToReceiveFirstMessage()
354+
}
338355

339356
final RubyThread rubyThread = fiber.rubyThread;
340357

src/main/java/org/truffleruby/core/fiber/RubyFiber.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.LinkedBlockingQueue;
1616

1717
import com.oracle.truffle.api.CompilerAsserts;
18+
import com.oracle.truffle.api.nodes.Node;
1819
import org.truffleruby.RubyContext;
1920
import org.truffleruby.RubyLanguage;
2021
import org.truffleruby.cext.ValueWrapperManager;
@@ -23,6 +24,7 @@
2324
import org.truffleruby.core.array.RubyArray;
2425
import org.truffleruby.core.basicobject.RubyBasicObject;
2526
import org.truffleruby.core.klass.RubyClass;
27+
import org.truffleruby.core.proc.RubyProc;
2628
import org.truffleruby.core.thread.RubyThread;
2729
import org.truffleruby.language.Nil;
2830
import org.truffleruby.language.RubyDynamicObject;
@@ -87,6 +89,8 @@ public enum FiberStatus {
8789
public Thread thread = null;
8890
public volatile Throwable uncaughtException = null;
8991
String sourceLocation;
92+
RubyProc body;
93+
Node initializeNode;
9094
public final MarkingService.ExtensionCallStack extensionCallStack;
9195
public final ValueWrapperManager.HandleBlockHolder handleData;
9296
boolean blocking = true;

0 commit comments

Comments
 (0)