diff --git a/java/org/apache/catalina/tribes/ChannelReceiver.java b/java/org/apache/catalina/tribes/ChannelReceiver.java index e14797f3909f..b4b64159dda7 100644 --- a/java/org/apache/catalina/tribes/ChannelReceiver.java +++ b/java/org/apache/catalina/tribes/ChannelReceiver.java @@ -36,6 +36,16 @@ public interface ChannelReceiver extends Heartbeat { */ void start() throws IOException; + /** + * Waits until the receiver has completed the asynchronous portion of start-up. Receivers that complete start-up + * synchronously do not need to override this method. + * + * @throws IOException Start-up failed + */ + default void awaitStart() throws IOException { + // NO-OP + } + /** * Stop listening for messages */ diff --git a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java index 5ef858feaccf..938bc5ee937b 100644 --- a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java +++ b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java @@ -160,7 +160,7 @@ protected synchronized void internalStart(int svc) throws ChannelException { clusterReceiver.setMessageListener(this); clusterReceiver.setChannel(getChannel()); clusterReceiver.start(); - // synchronize, big time FIXME + clusterReceiver.awaitStart(); Member localMember = getChannel().getLocalMember(false); if (localMember instanceof StaticMember staticMember) { // static member diff --git a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java index 065b7d0ade9d..7560189ca57a 100644 --- a/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java +++ b/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java @@ -30,6 +30,8 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.catalina.tribes.io.ObjectReader; @@ -55,6 +57,8 @@ public class NioReceiver extends ReceiverBase implements Runnable, NioReceiverMB private volatile boolean running = false; + private volatile CompletableFuture startFuture = CompletableFuture.completedFuture(null); + private final AtomicReference selector = new AtomicReference<>(); private ServerSocketChannel serverChannel = null; private DatagramChannel datagramChannel = null; @@ -78,10 +82,13 @@ public void stop() { @Override public void start() throws IOException { + CompletableFuture startFuture = new CompletableFuture<>(); + this.startFuture = startFuture; super.start(); try { setPool(new RxTaskPool(getMaxThreads(), getMinThreads(), this)); } catch (Exception e) { + startFuture.completeExceptionally(e); log.fatal(sm.getString("nioReceiver.threadpool.fail"), e); if (e instanceof IOException) { throw (IOException) e; @@ -100,6 +107,7 @@ public void start() throws IOException { t.setDaemon(true); t.start(); } catch (Exception e) { + startFuture.completeExceptionally(e); log.fatal(sm.getString("nioReceiver.start.fail"), e); if (e instanceof IOException) { throw (IOException) e; @@ -109,6 +117,22 @@ public void start() throws IOException { } } + @Override + public void awaitStart() throws IOException { + try { + startFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException(cause); + } + } + @Override public AbstractRxTask createRxTask() { NioReplicationTask thread = new NioReplicationTask(this, this); @@ -298,19 +322,28 @@ protected void socketTimeouts() { * @throws IOException IO error */ protected void listen() throws Exception { - if (doListen()) { - log.warn(sm.getString("nioReceiver.alreadyStarted")); - return; - } + Selector selector; + try { + if (doListen()) { + log.warn(sm.getString("nioReceiver.alreadyStarted")); + startFuture.complete(null); + return; + } - setListen(true); + setListen(true); - // Avoid NPEs if selector is set to null on stop. - Selector selector = this.selector.get(); + // Avoid NPEs if selector is set to null on stop. + selector = this.selector.get(); - if (selector != null && datagramChannel != null) { - ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size for a datagram packet - registerChannel(selector, datagramChannel, SelectionKey.OP_READ, oreader); + if (selector != null && datagramChannel != null) { + ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); // max size for a datagram packet + registerChannel(selector, datagramChannel, SelectionKey.OP_READ, oreader); + } + + startFuture.complete(null); + } catch (Exception e) { + startFuture.completeExceptionally(e); + throw e; } while (doListen() && selector != null) { @@ -371,7 +404,6 @@ protected void listen() throws Exception { ExceptionUtils.handleThrowable(t); log.error(sm.getString("nioReceiver.requestError"), t); } - } serverChannel.close(); if (datagramChannel != null) { diff --git a/test/org/apache/catalina/tribes/group/TestGroupChannelStartStop.java b/test/org/apache/catalina/tribes/group/TestGroupChannelStartStop.java index 584b86e5e52d..21c9bbcb2f24 100644 --- a/test/org/apache/catalina/tribes/group/TestGroupChannelStartStop.java +++ b/test/org/apache/catalina/tribes/group/TestGroupChannelStartStop.java @@ -16,12 +16,26 @@ */ package org.apache.catalina.tribes.group; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipProvider; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.membership.MembershipServiceBase; import org.apache.catalina.tribes.transport.ReceiverBase; public class TestGroupChannelStartStop { @@ -152,4 +166,164 @@ public void testUdpReceiverStart() throws Exception { Thread.sleep(1000); channel.stop(Channel.DEFAULT); } + + @Test + public void testLocalMemberPropertiesAreSetAfterReceiverStartCompletes() throws Exception { + BlockingChannelReceiver receiver = new BlockingChannelReceiver(); + TestMembershipService membershipService = new TestMembershipService(receiver); + channel.setChannelReceiver(receiver); + channel.setMembershipService(membershipService); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future startFuture = executor.submit(() -> { + try { + channel.start(Channel.SND_RX_SEQ); + } catch (Exception e) { + throw new IllegalStateException(e); + } + }); + + Assert.assertTrue(receiver.awaitStartEntered.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(membershipService.localMemberRequested.await(250, TimeUnit.MILLISECONDS)); + + receiver.allowStart.countDown(); + startFuture.get(5, TimeUnit.SECONDS); + + Assert.assertTrue(membershipService.localMemberRequested.await(0, TimeUnit.MILLISECONDS)); + Assert.assertTrue(membershipService.localMemberPropertiesSet); + } finally { + receiver.allowStart.countDown(); + executor.shutdownNow(); + } + } + + private static class BlockingChannelReceiver implements ChannelReceiver { + + private final CountDownLatch awaitStartEntered = new CountDownLatch(1); + private final CountDownLatch allowStart = new CountDownLatch(1); + private final AtomicBoolean awaitStartCompleted = new AtomicBoolean(false); + private MessageListener listener; + private Channel channel; + + @Override + public void start() { + // NO-OP + } + + @Override + public void awaitStart() throws IOException { + awaitStartEntered.countDown(); + try { + if (!allowStart.await(5, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting to complete receiver start"); + } + awaitStartCompleted.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + + @Override + public void stop() { + // NO-OP + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public int getPort() { + return 4000; + } + + @Override + public int getSecurePort() { + return -1; + } + + @Override + public int getUdpPort() { + return -1; + } + + @Override + public void setMessageListener(MessageListener listener) { + this.listener = listener; + } + + @Override + public MessageListener getMessageListener() { + return listener; + } + + @Override + public Channel getChannel() { + return channel; + } + + @Override + public void setChannel(Channel channel) { + this.channel = channel; + } + + @Override + public void heartbeat() { + // NO-OP + } + } + + private static class TestMembershipService extends MembershipServiceBase { + + private final BlockingChannelReceiver receiver; + private final CountDownLatch localMemberRequested = new CountDownLatch(1); + private final Member localMember; + private volatile boolean localMemberPropertiesSet = false; + + TestMembershipService(BlockingChannelReceiver receiver) throws IOException { + this.receiver = receiver; + localMember = new MemberImpl("localhost", 4000, 0); + } + + @Override + public void start(int level) { + // NO-OP + } + + @Override + public void stop(int level) { + // NO-OP + } + + @Override + public Member getLocalMember(boolean incAliveTime) { + Assert.assertTrue(receiver.awaitStartCompleted.get()); + localMemberRequested.countDown(); + return localMember; + } + + @Override + public void setLocalMemberProperties(String listenHost, int listenPort, int securePort, int udpPort) { + Assert.assertTrue(receiver.awaitStartCompleted.get()); + localMemberPropertiesSet = true; + } + + @Override + public void setPayload(byte[] payload) { + // NO-OP + } + + @Override + public void setDomain(byte[] domain) { + // NO-OP + } + + @Override + public MembershipProvider getMembershipProvider() { + return null; + } + } }