Skip to content
2 changes: 1 addition & 1 deletion src/java.base/linux/classes/sun/nio/ch/EPollPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public boolean isCheckpoint() {
this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
this.queue.offer(NEED_TO_POLL);

Core.Priority.EPOLLSELECTOR.getContext().register(resource);
Core.Priority.SELECTOR.getContext().register(resource);
}

EPollPort start() {
Expand Down
206 changes: 36 additions & 170 deletions src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,22 @@

package sun.nio.ch;

import jdk.internal.crac.mirror.Context;
import jdk.internal.crac.mirror.Resource;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.crac.Core;
import jdk.internal.crac.JDKResource;
import jdk.internal.crac.mirror.impl.CheckpointOpenResourceException;
import jdk.internal.crac.mirror.impl.CheckpointOpenSocketException;

import java.io.FileDescriptor;
import java.io.IOException;
import java.io.Serial;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -60,38 +58,14 @@
* @crac The file descriptor(s) used internally by this class are automatically
* closed before checkpointing the process and opened after the restore.
*/
class EPollSelectorImpl extends SelectorImpl implements JDKResource {
class EPollSelectorImpl extends SelectorCRaCSupport {

// maximum number of events to poll in one call to epoll_wait
private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);

private static final JavaIOFileDescriptorAccess fdAccess
= SharedSecrets.getJavaIOFileDescriptorAccess();

private enum CheckpointRestoreState {
NORMAL_OPERATION,
CHECKPOINT_TRANSITION,
CHECKPOINTED,
CHECKPOINT_ERROR,
RESTORE_TRANSITION,
}

private static class MoveToCheckpointThread extends Thread {
private Selector selector;

MoveToCheckpointThread(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
try {
selector.select(1);
} catch (IOException | ClosedSelectorException e) {
}
}
}

// epoll file descriptor
private int epfd;

Expand All @@ -108,14 +82,14 @@ public void run() {
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();

// interrupt triggering and clearing
private final Object interruptLock = new Object();
private boolean interruptTriggered;

private volatile CheckpointRestoreState checkpointState = CheckpointRestoreState.NORMAL_OPERATION;;
private Set<SelectableChannel> currentChannels;
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
initFileDescriptors(false);
}

private void initFDs() throws IOException {
@Override
protected void initFileDescriptors(boolean restore) throws IOException {
epfd = EPoll.create();

try {
Expand All @@ -134,64 +108,31 @@ private void initFDs() throws IOException {
EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN);
}

EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
initFDs();
// trigger FileDispatcherImpl initialization
new FileDispatcherImpl();
Core.Priority.EPOLLSELECTOR.getContext().register(this);
@Override
protected void closeFileDescriptors() throws IOException {
eventfd.close();
eventfd = null;
FileDispatcherImpl.closeIntFD(epfd);
}

private boolean processCheckpointRestore() throws IOException {
@Override
protected Set<SelectableChannel> getRegisteredChannels() {
assert Thread.holdsLock(this);
return fdToKey.values().stream().map(SelectionKey::channel).collect(Collectors.toSet());
}

if (checkpointState != CheckpointRestoreState.CHECKPOINT_TRANSITION) {
return false;
}
// If the channel using this selector was closed, some keys might be cancelled
// and we shall remove them.
processDeregisterQueue();

synchronized (interruptLock) {

CheckpointRestoreState thisState;
if (fdToKey.size() == 0) {
eventfd.close();
eventfd = null;
FileDispatcherImpl.closeIntFD(epfd);
thisState = CheckpointRestoreState.CHECKPOINTED;
} else {
thisState = CheckpointRestoreState.CHECKPOINT_ERROR;
currentChannels = fdToKey.values().stream().map(SelectionKey::channel).collect(Collectors.toSet());
}

checkpointState = thisState;
interruptLock.notifyAll();
while (checkpointState == thisState) {
try {
interruptLock.wait();
} catch (InterruptedException e) {
}
}

assert checkpointState == CheckpointRestoreState.RESTORE_TRANSITION;
if (thisState == CheckpointRestoreState.CHECKPOINTED) {
initFDs();
}
checkpointState = CheckpointRestoreState.NORMAL_OPERATION;
interruptLock.notifyAll();

if (interruptTriggered) {
try {
eventfd.set();
} catch (IOException ioe) {
throw new InternalError(ioe);
}
}
}
@Override
protected void wakeupInternal() throws IOException {
assert Thread.holdsLock(interruptLock);
eventfd.set();
}

return true;
// In case of C/R exceptions, claims file descriptors that would be otherwise automatically handled
@Override
protected Collection<FileDescriptor> claimFileDescriptors() {
return Arrays.asList(
claimFileDescriptor(this.epfd, "EPoll FD "),
claimFileDescriptor(this.eventfd.efd(), "EPoll Event FD "));
}

@Override
Expand Down Expand Up @@ -337,7 +278,7 @@ private int processEvents(int numEntries, Consumer<SelectionKey> action)
}
}

if (interrupted && !(Thread.currentThread() instanceof MoveToCheckpointThread)) {
if (interrupted && shouldClearInterrupt()) {
clearInterrupt();
}

Expand Down Expand Up @@ -403,79 +344,4 @@ private void clearInterrupt() throws IOException {
interruptTriggered = false;
}
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
if (!isOpen()) {
return;
}

synchronized (interruptLock) {
checkpointState = CheckpointRestoreState.CHECKPOINT_TRANSITION;
eventfd.set();
int tries = 5;
while (checkpointState == CheckpointRestoreState.CHECKPOINT_TRANSITION && 0 < tries--) {
try {
interruptLock.wait(5);
} catch (InterruptedException e) {
}
}
if (checkpointState == CheckpointRestoreState.CHECKPOINT_TRANSITION) {
Thread thr = new MoveToCheckpointThread(this);
thr.setDaemon(true);
thr.start();
}
while (checkpointState == CheckpointRestoreState.CHECKPOINT_TRANSITION) {
try {
interruptLock.wait();
} catch (InterruptedException e) {
}
}
if (checkpointState == CheckpointRestoreState.CHECKPOINT_ERROR) {
var ex = new BusySelectorException("Selector " + this + " has registered keys from channels: " + currentChannels, null);
ex.epollFds.add(claimFd(this.epfd, "EPoll FD "));
ex.epollFds.add(claimFd(this.eventfd.efd(), "EPoll Event FD "));
currentChannels = null;
throw ex;
}
}
}

private FileDescriptor claimFd(int fdval, String type) {
FileDescriptor fd = IOUtil.newFD(fdval);
Core.getClaimedFDs().claimFd(fd, this,
() -> new CheckpointOpenSocketException(type + fdval + " left open in " + this + " with registered keys.", null));
return fd;
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
if (!isOpen()) {
return;
}

synchronized (interruptLock) {
checkpointState = CheckpointRestoreState.RESTORE_TRANSITION;
interruptLock.notifyAll();
while (checkpointState == CheckpointRestoreState.RESTORE_TRANSITION) {
try {
interruptLock.wait();
} catch (InterruptedException e) {
}
}
}
}

private static class BusySelectorException extends CheckpointOpenResourceException {
@Serial
private static final long serialVersionUID = 5615481252774343456L;
// We need to keep the FileDescriptors around until the checkpoint completes
// as ClaimedFDs use WeakHashMap. Transient because exception is serializable
// and FileDescriptor is not.
transient List<FileDescriptor> epollFds = new ArrayList<>();

public BusySelectorException(String details, Throwable cause) {
super(details, cause);
}
}
}
Loading