Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions java/org/apache/catalina/tribes/ChannelReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ public interface ChannelReceiver extends Heartbeat {
*/
void start() throws IOException;

/**
* Waits until the receiver has completed the asynchronous portion of start-up. Receivers that complete start-up
* synchronously do not need to override this method.
*
* @throws IOException Start-up failed
*/
default void awaitStart() throws IOException {
// NO-OP
}

/**
* Stop listening for messages
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected synchronized void internalStart(int svc) throws ChannelException {
clusterReceiver.setMessageListener(this);
clusterReceiver.setChannel(getChannel());
clusterReceiver.start();
// synchronize, big time FIXME
clusterReceiver.awaitStart();
Member localMember = getChannel().getLocalMember(false);
if (localMember instanceof StaticMember staticMember) {
// static member
Expand Down
54 changes: 43 additions & 11 deletions java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.catalina.tribes.io.ObjectReader;
Expand All @@ -55,6 +57,8 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB

private volatile boolean running = false;

private volatile CompletableFuture<Void> startFuture = CompletableFuture.completedFuture(null);

private final AtomicReference<Selector> selector = new AtomicReference<>();
private ServerSocketChannel serverChannel = null;
private DatagramChannel datagramChannel = null;
Expand All @@ -78,10 +82,13 @@ public void stop() {

@Override
public void start() throws IOException {
CompletableFuture<Void> startFuture = new CompletableFuture<>();
this.startFuture = startFuture;
super.start();
try {
setPool(new RxTaskPool(getMaxThreads(), getMinThreads(), this));
} catch (Exception e) {
startFuture.completeExceptionally(e);
log.fatal(sm.getString("nioReceiver.threadpool.fail"), e);
if (e instanceof IOException) {
throw (IOException) e;
Expand All @@ -100,6 +107,7 @@ public void start() throws IOException {
t.setDaemon(true);
t.start();
} catch (Exception e) {
startFuture.completeExceptionally(e);
log.fatal(sm.getString("nioReceiver.start.fail"), e);
if (e instanceof IOException) {
throw (IOException) e;
Expand All @@ -109,6 +117,22 @@ public void start() throws IOException {
}
}

@Override
public void awaitStart() throws IOException {
try {
startFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException(cause);
}
}

@Override
public AbstractRxTask createRxTask() {
NioReplicationTask thread = new NioReplicationTask(this, this);
Expand Down Expand Up @@ -298,19 +322,28 @@ protected void socketTimeouts() {
* @throws IOException IO error
*/
protected void listen() throws Exception {
if (doListen()) {
log.warn(sm.getString("nioReceiver.alreadyStarted"));
return;
}
Selector selector;
try {
if (doListen()) {
log.warn(sm.getString("nioReceiver.alreadyStarted"));
startFuture.complete(null);
return;
}

setListen(true);
setListen(true);

// Avoid NPEs if selector is set to null on stop.
Selector selector = this.selector.get();
// Avoid NPEs if selector is set to null on stop.
selector = this.selector.get();

if (selector != null && datagramChannel != null) {
ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size for a datagram packet
registerChannel(selector, datagramChannel, SelectionKey.OP_READ, oreader);
if (selector != null && datagramChannel != null) {
ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size for a datagram packet
registerChannel(selector, datagramChannel, SelectionKey.OP_READ, oreader);
}

startFuture.complete(null);
} catch (Exception e) {
startFuture.completeExceptionally(e);
throw e;
}

while (doListen() && selector != null) {
Expand Down Expand Up @@ -371,7 +404,6 @@ protected void listen() throws Exception {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("nioReceiver.requestError"), t);
}

}
serverChannel.close();
if (datagramChannel != null) {
Expand Down
174 changes: 174 additions & 0 deletions test/org/apache/catalina/tribes/group/TestGroupChannelStartStop.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,26 @@
*/
package org.apache.catalina.tribes.group;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipProvider;
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.MembershipServiceBase;
import org.apache.catalina.tribes.transport.ReceiverBase;

public class TestGroupChannelStartStop {
Expand Down Expand Up @@ -152,4 +166,164 @@ public void testUdpReceiverStart() throws Exception {
Thread.sleep(1000);
channel.stop(Channel.DEFAULT);
}

@Test
public void testLocalMemberPropertiesAreSetAfterReceiverStartCompletes() throws Exception {
BlockingChannelReceiver receiver = new BlockingChannelReceiver();
TestMembershipService membershipService = new TestMembershipService(receiver);
channel.setChannelReceiver(receiver);
channel.setMembershipService(membershipService);

ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<?> startFuture = executor.submit(() -> {
try {
channel.start(Channel.SND_RX_SEQ);
} catch (Exception e) {
throw new IllegalStateException(e);
}
});

Assert.assertTrue(receiver.awaitStartEntered.await(5, TimeUnit.SECONDS));
Assert.assertFalse(membershipService.localMemberRequested.await(250, TimeUnit.MILLISECONDS));

receiver.allowStart.countDown();
startFuture.get(5, TimeUnit.SECONDS);

Assert.assertTrue(membershipService.localMemberRequested.await(0, TimeUnit.MILLISECONDS));
Assert.assertTrue(membershipService.localMemberPropertiesSet);
} finally {
receiver.allowStart.countDown();
executor.shutdownNow();
}
}

private static class BlockingChannelReceiver implements ChannelReceiver {

private final CountDownLatch awaitStartEntered = new CountDownLatch(1);
private final CountDownLatch allowStart = new CountDownLatch(1);
private final AtomicBoolean awaitStartCompleted = new AtomicBoolean(false);
private MessageListener listener;
private Channel channel;

@Override
public void start() {
// NO-OP
}

@Override
public void awaitStart() throws IOException {
awaitStartEntered.countDown();
try {
if (!allowStart.await(5, TimeUnit.SECONDS)) {
throw new IOException("Timed out waiting to complete receiver start");
}
awaitStartCompleted.set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}

@Override
public void stop() {
// NO-OP
}

@Override
public String getHost() {
return "localhost";
}

@Override
public int getPort() {
return 4000;
}

@Override
public int getSecurePort() {
return -1;
}

@Override
public int getUdpPort() {
return -1;
}

@Override
public void setMessageListener(MessageListener listener) {
this.listener = listener;
}

@Override
public MessageListener getMessageListener() {
return listener;
}

@Override
public Channel getChannel() {
return channel;
}

@Override
public void setChannel(Channel channel) {
this.channel = channel;
}

@Override
public void heartbeat() {
// NO-OP
}
}

private static class TestMembershipService extends MembershipServiceBase {

private final BlockingChannelReceiver receiver;
private final CountDownLatch localMemberRequested = new CountDownLatch(1);
private final Member localMember;
private volatile boolean localMemberPropertiesSet = false;

TestMembershipService(BlockingChannelReceiver receiver) throws IOException {
this.receiver = receiver;
localMember = new MemberImpl("localhost", 4000, 0);
}

@Override
public void start(int level) {
// NO-OP
}

@Override
public void stop(int level) {
// NO-OP
}

@Override
public Member getLocalMember(boolean incAliveTime) {
Assert.assertTrue(receiver.awaitStartCompleted.get());
localMemberRequested.countDown();
return localMember;
}

@Override
public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) {
Assert.assertTrue(receiver.awaitStartCompleted.get());
localMemberPropertiesSet = true;
}

@Override
public void setPayload(byte[] payload) {
// NO-OP
}

@Override
public void setDomain(byte[] domain) {
// NO-OP
}

@Override
public MembershipProvider getMembershipProvider() {
return null;
}
}
}