Skip to content

Commit 6b4f869

Browse files
committed
use concurrent data structures and atomic operations in SharedMultiprocessingData
1 parent 9a944d3 commit 6b4f869

File tree

1 file changed

+104
-66
lines changed

1 file changed

+104
-66
lines changed

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java

Lines changed: 104 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,14 @@
4747
import java.util.List;
4848
import java.util.Map;
4949
import java.util.SortedMap;
50-
import java.util.TreeMap;
5150
import java.util.WeakHashMap;
5251
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.ConcurrentSkipListMap;
5353
import java.util.concurrent.CountDownLatch;
5454
import java.util.concurrent.LinkedBlockingQueue;
5555
import java.util.concurrent.Semaphore;
5656
import java.util.concurrent.atomic.AtomicBoolean;
57+
import java.util.concurrent.atomic.AtomicInteger;
5758
import java.util.concurrent.atomic.AtomicLong;
5859
import java.util.concurrent.locks.ReentrantLock;
5960
import java.util.logging.Level;
@@ -564,13 +565,20 @@ public boolean compareAndSetExiting(boolean expect, boolean update) {
564565

565566
public static final class SharedMultiprocessingData {
566567

567-
private int fdCounter = 0;
568+
/**
569+
* A sentinel object that remains in the {@link LinkedBlockingQueue} in the
570+
* {@link #pipeData}. It is pushed there in #close so that any blocking #take calls can wake
571+
* up and react to the end of the stream.
572+
*/
573+
private static final Object SENTINEL = new Object();
574+
575+
private final AtomicInteger fdCounter = new AtomicInteger(0);
568576

569577
/**
570578
* Maps the two fake file descriptors created in {@link #pipe()} to one
571579
* {@link LinkedBlockingQueue}
572580
*/
573-
private final SortedMap<Integer, LinkedBlockingQueue<Object>> pipeData = new TreeMap<>();
581+
private final SortedMap<Integer, LinkedBlockingQueue<Object>> pipeData = new ConcurrentSkipListMap<>();
574582

575583
/**
576584
* Holds ref count of file descriptors which were passed over to a spawned child context.
@@ -580,7 +588,7 @@ public static final class SharedMultiprocessingData {
580588
* <li>real file descriptors coming from the posix implementation</li>
581589
* </ul>
582590
*/
583-
private final Map<Integer, Integer> fdRefCount = new HashMap<>();
591+
private final Map<Integer, Integer> fdRefCount = new ConcurrentHashMap<>();
584592

585593
public SharedMultiprocessingData(ConcurrentHashMap<String, Semaphore> namedSemaphores) {
586594
this.namedSemaphores = namedSemaphores;
@@ -591,9 +599,7 @@ public SharedMultiprocessingData(ConcurrentHashMap<String, Semaphore> namedSemap
591599
*/
592600
@TruffleBoundary
593601
private void incrementFDRefCount(int fd) {
594-
synchronized (fdRefCount) {
595-
fdRefCount.compute(fd, (f, count) -> (count == null) ? 1 : count + 1);
596-
}
602+
fdRefCount.compute(fd, (f, count) -> (count == null) ? 1 : count + 1);
597603
}
598604

599605
/**
@@ -604,19 +610,15 @@ private void incrementFDRefCount(int fd) {
604610
*/
605611
@TruffleBoundary
606612
public boolean decrementFDRefCount(int fd) {
607-
synchronized (fdRefCount) {
608-
Integer c = fdRefCount.get(fd);
609-
if (c == null) {
610-
return false;
611-
}
612-
if (c == 0) {
613-
fdRefCount.remove(fd);
614-
return false;
613+
Integer cnt = fdRefCount.computeIfPresent(fd, (f, count) -> {
614+
if (count == 0 || count == Integer.MIN_VALUE) {
615+
return Integer.MIN_VALUE;
615616
} else {
616-
fdRefCount.put(fd, c - 1);
617-
return true;
617+
assert count > 0;
618+
return count - 1;
618619
}
619-
}
620+
});
621+
return cnt != null && !fdRefCount.remove(fd, Integer.MIN_VALUE);
620622
}
621623

622624
/**
@@ -625,86 +627,122 @@ public boolean decrementFDRefCount(int fd) {
625627
*/
626628
@TruffleBoundary
627629
public int[] pipe() {
628-
synchronized (pipeData) {
629-
LinkedBlockingQueue<Object> q = new LinkedBlockingQueue<>();
630-
int readFD = --fdCounter;
631-
pipeData.put(readFD, q);
632-
int writeFD = --fdCounter;
633-
pipeData.put(writeFD, q);
634-
return new int[]{readFD, writeFD};
635-
}
630+
LinkedBlockingQueue<Object> q = new LinkedBlockingQueue<>();
631+
int writeFD = fdCounter.addAndGet(-2);
632+
assert isWriteFD(writeFD);
633+
int readFD = getPairFd(writeFD);
634+
pipeData.put(readFD, q);
635+
pipeData.put(writeFD, q);
636+
return new int[]{readFD, writeFD};
636637
}
637638

639+
/**
640+
* Adding pipe data needs no special synchronization, since we guarantee there is only ever
641+
* one or no queue registered for a given fd.
642+
*/
638643
@TruffleBoundary
639644
public void addPipeData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) {
640-
LinkedBlockingQueue<Object> q = null;
641-
synchronized (pipeData) {
642-
q = pipeData.get(fd);
643-
if (q == null) {
644-
noFDHandler.run();
645-
throw CompilerDirectives.shouldNotReachHere();
646-
}
647-
int fd2 = getPairFd(fd);
648-
if (isClosed(fd2)) {
649-
brokenPipeHandler.run();
650-
throw CompilerDirectives.shouldNotReachHere();
651-
}
645+
assert isWriteFD(fd);
646+
LinkedBlockingQueue<Object> q = pipeData.get(fd);
647+
if (q == null) {
648+
// the write end is already closed
649+
noFDHandler.run();
650+
throw CompilerDirectives.shouldNotReachHere();
651+
}
652+
int fd2 = getPairFd(fd);
653+
if (isClosed(fd2)) {
654+
// the read end is already closed
655+
brokenPipeHandler.run();
656+
throw CompilerDirectives.shouldNotReachHere();
652657
}
653658
q.add(bytes);
654659
}
655660

661+
/**
662+
* Closing the read end of a pipe just removes the mapping from that fd to the queue.
663+
* Closing the write end adds the {@link #SENTINEL} value as the last value. There is a
664+
* potential race here for incorrect code that concurrently writes to the write end via
665+
* {@link #addPipeData}, in that the sentinel may prevent writes from being visible.
666+
*/
656667
@TruffleBoundary
657668
public void closePipe(int fd) {
658-
synchronized (pipeData) {
659-
pipeData.remove(fd);
669+
LinkedBlockingQueue<Object> q = pipeData.remove(fd);
670+
if (q != null && isWriteFD(fd)) {
671+
q.offer(SENTINEL);
660672
}
661673
}
662674

675+
/**
676+
* This needs no additional synchronization, since if the write-end of the pipe is already
677+
* closed, the {@link #take} call will return appropriately.
678+
*/
663679
@TruffleBoundary
664680
public Object takePipeData(Node node, int fd, Runnable noFDHandler) {
665-
LinkedBlockingQueue<Object> q;
666-
synchronized (pipeData) {
667-
q = pipeData.get(fd);
668-
if (q == null) {
669-
noFDHandler.run();
670-
throw CompilerDirectives.shouldNotReachHere();
671-
}
672-
int fd2 = getPairFd(fd);
673-
if (isClosed(fd2)) {
674-
if (q.isEmpty()) {
675-
return PythonUtils.EMPTY_BYTE_ARRAY;
676-
}
677-
}
681+
LinkedBlockingQueue<Object> q = pipeData.get(fd);
682+
if (q == null) {
683+
noFDHandler.run();
684+
throw CompilerDirectives.shouldNotReachHere();
678685
}
679686
Object[] o = new Object[]{PNone.NONE};
680687
TruffleSafepoint.setBlockedThreadInterruptible(node, (lbq) -> {
681-
o[0] = lbq.take();
688+
o[0] = take(lbq);
682689
}, q);
683690
return o[0];
684691
}
685692

693+
/**
694+
* This uses LinkedBlockingQueue#compute to determine the blocking state. The runnable may
695+
* be run multiple times, so we need to check and write all possible results to the result
696+
* array. This ensures that if there is concurrent modification of the {@link #pipeData}, we
697+
* will get a valid result.
698+
*/
686699
@TruffleBoundary
687700
public boolean isBlocking(int fd) {
688-
LinkedBlockingQueue<Object> q;
689-
synchronized (pipeData) {
690-
q = pipeData.get(fd);
701+
boolean[] result = new boolean[]{false};
702+
pipeData.compute(fd, (f, q) -> {
691703
if (q == null) {
692-
return false;
693-
}
694-
int fd2 = getPairFd(fd);
695-
if (isClosed(fd2)) {
696-
return false;
704+
result[0] = false;
705+
} else {
706+
int fd2 = getPairFd(fd);
707+
if (isClosed(fd2)) {
708+
result[0] = false;
709+
} else {
710+
// this uses q.isEmpty() instead of our isEmpty(q), because we are not
711+
// interested in the race between closing fd2 and this runnable. If the
712+
// SENTINEL is pushed in the meantime, we should return false, just as if
713+
// we had observed fd2 to be closed already.
714+
result[0] = q.isEmpty();
715+
}
697716
}
698-
}
699-
return q.isEmpty();
717+
return q;
718+
});
719+
return result[0];
700720
}
701721

702722
private static int getPairFd(int fd) {
703-
return fd % 2 == 0 ? fd + 1 : fd - 1;
723+
return isWriteFD(fd) ? fd + 1 : fd - 1;
724+
}
725+
726+
private static boolean isWriteFD(int fd) {
727+
return fd % 2 == 0;
728+
}
729+
730+
private static Object take(LinkedBlockingQueue<Object> q) throws InterruptedException {
731+
Object v = q.take();
732+
if (v == SENTINEL) {
733+
q.offer(SENTINEL);
734+
return PythonUtils.EMPTY_BYTE_ARRAY;
735+
} else {
736+
return v;
737+
}
704738
}
705739

706740
private boolean isClosed(int fd) {
707-
return pipeData.get(fd) == null && fd >= fdCounter;
741+
// since there is no way that any thread can be trying to read/write to this pipe FD
742+
// legally before it was added to pipeData in #pipe above, we don't need to
743+
// synchronize. If the FD is taken, and it's not in pipe data, this is a race in the
744+
// program, because some thread is just arbitrarily probing FDs.
745+
return fd >= fdCounter.get() && pipeData.get(fd) == null;
708746
}
709747

710748
/**

0 commit comments

Comments
 (0)