Skip to content

Commit 9862b43

Browse files
committed
cleaned up and simplified the fd_to_keep logic in multiprocessing
1 parent 4bb778f commit 9862b43

File tree

3 files changed

+65
-94
lines changed

3 files changed

+65
-94
lines changed

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/MultiprocessingModuleBuiltins.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,6 @@ PTuple pipe(@Cached GilNode gil) {
277277
pipe = sharedData.pipe();
278278
ctx.getChildContextFDs().add(pipe[0]);
279279
ctx.getChildContextFDs().add(pipe[1]);
280-
sharedData.addFdToKeep(pipe[0]);
281-
sharedData.addFdToKeep(pipe[1]);
282280
} finally {
283281
gil.acquire();
284282
}
@@ -297,7 +295,7 @@ Object doWrite(int fd, PBytes data,
297295
gil.release(true);
298296
try {
299297
byte[] bytes = bufferLib.getCopiedByteArray(data);
300-
sharedData.addSharedContextData(fd, bytes,
298+
sharedData.addPipeData(fd, bytes,
301299
() -> {
302300
throw PRaiseNode.raiseUncached(this, OSError, ErrorMessages.BAD_FILE_DESCRIPTOR);
303301
},
@@ -327,7 +325,7 @@ Object doRead(int fd, @SuppressWarnings("unused") Object length,
327325
SharedContextData sharedData = getContext().getSharedContextData();
328326
gil.release(true);
329327
try {
330-
Object data = sharedData.takeSharedContextData(this, fd, () -> {
328+
Object data = sharedData.takePipeData(this, fd, () -> {
331329
throw PRaiseNode.raiseUncached(this, OSError, ErrorMessages.BAD_FILE_DESCRIPTOR);
332330
});
333331
if (data == PNone.NONE) {
@@ -353,12 +351,8 @@ public abstract static class CloseNode extends PythonUnaryBuiltinNode {
353351
PNone close(@SuppressWarnings("unused") int fd) {
354352
assert fd < 0;
355353
SharedContextData sharedData = getContext().getSharedContextData();
356-
if (sharedData.isFdToKeep(fd)) {
357-
if (sharedData.removeFdToKeep(fd)) {
358-
sharedData.closeFd(fd);
359-
}
360-
} else {
361-
getContext().closeLater(fd);
354+
if (!sharedData.decrementFDRefCount(fd)) {
355+
sharedData.closePipe(fd);
362356
}
363357
return PNone.NONE;
364358
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixModuleBuiltins.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,8 +526,7 @@ PNone close(VirtualFrame frame, int fd,
526526
@CachedLibrary("getPosixSupport()") PosixSupportLibrary posixLib) {
527527
try {
528528
PythonContext ctx = getContext();
529-
if (ctx.getSharedContextData().isFdToKeep(fd)) {
530-
ctx.closeLater(fd);
529+
if (ctx.getSharedContextData().decrementFDRefCount(fd)) {
531530
return PNone.NONE;
532531
}
533532
posixLib.close(getPosixSupport(), fd);

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java

Lines changed: 60 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,6 @@
127127
import com.oracle.truffle.api.source.Source;
128128
import com.oracle.truffle.api.utilities.CyclicAssumption;
129129
import com.oracle.truffle.llvm.api.Toolchain;
130-
import java.util.Iterator;
131-
import java.util.Set;
132130
import java.util.SortedMap;
133131
import java.util.TreeMap;
134132
import java.util.concurrent.CountDownLatch;
@@ -472,8 +470,6 @@ public Thread getOwner() {
472470
private final ChildContextData childContextData;
473471
private final SharedContextData sharedContextData;
474472

475-
private final Set<Integer> fdsToClose = new HashSet<>();
476-
477473
public static final class ChildContextData {
478474
private int exitCode = 0;
479475
private boolean signaled;
@@ -525,44 +521,53 @@ public boolean compareAndSetExiting(boolean expect, boolean update) {
525521

526522
public static final class SharedContextData {
527523

528-
private int counter = 0;
524+
private int fdCounter = 0;
529525

530-
private final SortedMap<Integer, LinkedBlockingQueue<Object>> fdData = new TreeMap<>();
531-
private final Map<Integer, Integer> fdsToKeep = new HashMap<>();
526+
/**
527+
* Maps the two fake file descriptors created in {@link #pipe()} to one
528+
* {@link LinkedBlockingQueue}
529+
*/
530+
private final SortedMap<Integer, LinkedBlockingQueue<Object>> pipeData = new TreeMap<>();
532531

533-
@TruffleBoundary
534-
public boolean isFdToKeep(int fd) {
535-
synchronized (fdsToKeep) {
536-
return fdsToKeep.containsKey(fd);
537-
}
538-
}
532+
/**
533+
* Holds ref count of file descriptors which were passed over to a spawned child context.
534+
* This can be either:<br>
535+
* <ul>
536+
* <li>fake file descriptors created via {@link #pipe()}</li>
537+
* <li>real file descriptors coming from the posix implementation</li>
538+
* </ul>
539+
*/
540+
private final Map<Integer, Integer> fdRefCount = new HashMap<>();
539541

542+
/**
543+
* Increases reference count for the given file descriptor.
544+
*/
540545
@TruffleBoundary
541-
public void addFdToKeep(int fd) {
542-
synchronized (fdsToKeep) {
543-
Integer c = fdsToKeep.get(fd);
544-
if (c == null) {
545-
c = 1;
546-
} else {
547-
c = c + 1;
548-
}
549-
fdsToKeep.put(fd, c);
546+
private void incrementFDRefCount(int fd) {
547+
synchronized (fdRefCount) {
548+
fdRefCount.compute(fd, (f, count) -> (count == null) ? 1 : count + 1);
550549
}
551550
}
552551

552+
/**
553+
* Decreases reference count for the given file descriptor.
554+
*
555+
* @return {@code true} if ref count was decreased, {@link false} if ref count isn't tracked
556+
* anymore.
557+
*/
553558
@TruffleBoundary
554-
public boolean removeFdToKeep(int fd) {
555-
synchronized (fdsToKeep) {
556-
Integer c = fdsToKeep.get(fd);
559+
public boolean decrementFDRefCount(int fd) {
560+
synchronized (fdRefCount) {
561+
Integer c = fdRefCount.get(fd);
557562
if (c == null) {
558563
return false;
559564
}
560-
if (c == 1) {
561-
fdsToKeep.remove(fd);
562-
return true;
563-
} else {
564-
fdsToKeep.put(fd, c - 1);
565+
if (c == 0) {
566+
fdRefCount.remove(fd);
565567
return false;
568+
} else {
569+
fdRefCount.put(fd, c - 1);
570+
return true;
566571
}
567572
}
568573
}
@@ -573,21 +578,21 @@ public boolean removeFdToKeep(int fd) {
573578
*/
574579
@TruffleBoundary
575580
public int[] pipe() {
576-
synchronized (fdData) {
581+
synchronized (pipeData) {
577582
LinkedBlockingQueue<Object> q = new LinkedBlockingQueue<>();
578-
int readFD = --counter;
579-
fdData.put(readFD, q);
580-
int writeFD = --counter;
581-
fdData.put(writeFD, q);
583+
int readFD = --fdCounter;
584+
pipeData.put(readFD, q);
585+
int writeFD = --fdCounter;
586+
pipeData.put(writeFD, q);
582587
return new int[]{readFD, writeFD};
583588
}
584589
}
585590

586591
@TruffleBoundary
587-
public boolean addSharedContextData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) {
592+
public boolean addPipeData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) {
588593
LinkedBlockingQueue<Object> q = null;
589-
synchronized (fdData) {
590-
q = fdData.get(fd);
594+
synchronized (pipeData) {
595+
q = pipeData.get(fd);
591596
if (q == null) {
592597
noFDHandler.run();
593598
throw CompilerDirectives.shouldNotReachHere();
@@ -603,17 +608,17 @@ public boolean addSharedContextData(int fd, byte[] bytes, Runnable noFDHandler,
603608
}
604609

605610
@TruffleBoundary
606-
public void closeFd(int fd) {
607-
synchronized (fdData) {
608-
fdData.remove(fd);
611+
public void closePipe(int fd) {
612+
synchronized (pipeData) {
613+
pipeData.remove(fd);
609614
}
610615
}
611616

612617
@TruffleBoundary
613-
public Object takeSharedContextData(Node node, int fd, Runnable noFDHandler) {
618+
public Object takePipeData(Node node, int fd, Runnable noFDHandler) {
614619
LinkedBlockingQueue<Object> q;
615-
synchronized (fdData) {
616-
q = fdData.get(fd);
620+
synchronized (pipeData) {
621+
q = pipeData.get(fd);
617622
if (q == null) {
618623
noFDHandler.run();
619624
throw CompilerDirectives.shouldNotReachHere();
@@ -635,8 +640,8 @@ public Object takeSharedContextData(Node node, int fd, Runnable noFDHandler) {
635640
@TruffleBoundary
636641
public boolean isBlocking(int fd) {
637642
LinkedBlockingQueue<Object> q;
638-
synchronized (fdData) {
639-
q = fdData.get(fd);
643+
synchronized (pipeData) {
644+
q = pipeData.get(fd);
640645
if (q == null) {
641646
return false;
642647
}
@@ -649,7 +654,7 @@ public boolean isBlocking(int fd) {
649654
}
650655

651656
@TruffleBoundary
652-
public void closeFDs(List<Integer> fds) {
657+
public static void closeFDs(List<Integer> fds) {
653658
synchronized (fds) {
654659
for (Integer fd : fds) {
655660
fds.remove(fd);
@@ -662,7 +667,7 @@ private static int getPairFd(int fd) {
662667
}
663668

664669
private boolean isClosed(int fd) {
665-
return fdData.get(fd) == null && fd >= counter;
670+
return pipeData.get(fd) == null && fd >= fdCounter;
666671
}
667672
}
668673

@@ -671,7 +676,7 @@ public PythonContext(PythonLanguage language, TruffleLanguage.Env env, Python3Co
671676
this.core = core;
672677
this.env = env;
673678
this.childContextData = (ChildContextData) env.getConfig().get(CHILD_CONTEXT_DATA);
674-
this.sharedContextData = this.childContextData == null ? new SharedContextData() : null;
679+
this.sharedContextData = this.childContextData == null ? new SharedContextData() : childContextData.parentCtx.sharedContextData;
675680
this.handler = new AsyncHandler(this);
676681
this.sharedFinalizer = new AsyncHandler.SharedFinalizer(this);
677682
this.optionValues = PythonOptions.createOptionValuesStorage(env);
@@ -701,8 +706,8 @@ public ChildContextData getChildContextData() {
701706
return childContextData;
702707
}
703708

704-
public synchronized SharedContextData getSharedContextData() {
705-
return isChildContext() ? childContextData.parentCtx.sharedContextData : this.sharedContextData;
709+
public SharedContextData getSharedContextData() {
710+
return sharedContextData;
706711
}
707712

708713
public long spawnTruffleContext(int fd, int sentinel, int[] fdsToKeep) {
@@ -723,19 +728,12 @@ public long spawnTruffleContext(int fd, int sentinel, int[] fdsToKeep) {
723728
for (int fdToKeep : fdsToKeep) {
724729
// prevent file descriptors from being closed when passed to another "process",
725730
// equivalent to fds_to_keep arg in posix fork_exec
726-
getSharedContextData().addFdToKeep(fdToKeep);
731+
getSharedContextData().incrementFDRefCount(fdToKeep);
727732
}
728733
start(thread);
729734
return tid;
730735
}
731736

732-
@TruffleBoundary
733-
public void closeLater(int fd) {
734-
synchronized (fdsToClose) {
735-
fdsToClose.add(fd);
736-
}
737-
}
738-
739737
@TruffleBoundary
740738
private static void start(Thread thread) {
741739
thread.start();
@@ -788,7 +786,7 @@ public void run() {
788786
LOGGER.log(Level.FINE, t, () -> "exception while closing spawned child context");
789787
}
790788
}
791-
data.parentCtx.sharedContextData.closeFd(sentinel);
789+
data.parentCtx.sharedContextData.closePipe(sentinel);
792790
}
793791
} catch (ThreadDeath td) {
794792
// as a result of of TruffleContext.closeCancelled()
@@ -1404,29 +1402,9 @@ public void finalizeContext() {
14041402
disposeThreadStates();
14051403
}
14061404
cleanupHPyResources();
1407-
if (!cancelling) {
1408-
synchronized (fdsToClose) {
1409-
Iterator<Integer> it = fdsToClose.iterator();
1410-
while (it.hasNext()) {
1411-
int fd = it.next();
1412-
if (getSharedContextData().removeFdToKeep(fd)) {
1413-
it.remove();
1414-
if (fd > 0) {
1415-
try {
1416-
PosixSupportLibrary.getUncached().close(getPosixSupport(), fd);
1417-
} catch (PosixSupportLibrary.PosixException ex) {
1418-
LOGGER.log(Level.FINEST, ex, () -> "got PosixException while closing file discriptor " + fd);
1419-
}
1420-
} else {
1421-
getSharedContextData().closeFd(fd);
1422-
}
1423-
}
1424-
}
1425-
}
1426-
}
14271405
for (int fd : getChildContextFDs()) {
1428-
if (getSharedContextData().removeFdToKeep(fd)) {
1429-
getSharedContextData().closeFd(fd);
1406+
if (!getSharedContextData().decrementFDRefCount(fd)) {
1407+
getSharedContextData().closePipe(fd);
14301408
}
14311409
}
14321410
mainThread = null;

0 commit comments

Comments
 (0)