Skip to content

Commit 6466fec

Browse files
committed
Add retry listener for more robust tests
1 parent ae0c83e commit 6466fec

File tree

2 files changed

+72
-28
lines changed

2 files changed

+72
-28
lines changed

server/src/main/java/org/elasticsearch/readiness/ReadinessPollingService.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,19 @@ class ReadinessPollingService {
7676
* Returns immediately once the polling loop has been initiated.
7777
*
7878
* @param nodeFilter predicate to select which nodes to poll
79-
* @param listener invoked with true if any node reports ready, false if timeout expires
79+
* @param readinessListener invoked with true if any node reports ready, false if timeout expires
8080
*/
81-
public void execute(Predicate<DiscoveryNode> nodeFilter, ActionListener<Boolean> listener) {
81+
public void execute(Predicate<DiscoveryNode> nodeFilter, ActionListener<Boolean> readinessListener) {
82+
execute(nodeFilter, readinessListener, ActionListener.noop());
83+
}
84+
85+
/**
86+
* A version of {@link #execute(Predicate, ActionListener)} with an additional listener
87+
* called before each retry.
88+
*
89+
* @param retryListener invoke on each retry
90+
*/
91+
void execute(Predicate<DiscoveryNode> nodeFilter, ActionListener<Boolean> readinessListener, ActionListener<Void> retryListener) {
8292
final long deadline = System.currentTimeMillis() + timeoutMillis;
8393

8494
Runnable attempt = new Runnable() {
@@ -89,7 +99,7 @@ public void run() {
8999
Set<DiscoveryNode> nodes = clusterService.state().nodes().stream().filter(nodeFilter).collect(Collectors.toSet());
90100

91101
if (nodes.isEmpty()) {
92-
listener.onResponse(false);
102+
readinessListener.onResponse(false);
93103
return;
94104
}
95105

@@ -113,7 +123,7 @@ public void handleResponse(ReadinessResponse response) {
113123
logger.debug("node [{}] reports readiness [{}]", node, response.isReady());
114124
if (response.isReady() && responded.compareAndSet(false, true)) {
115125
logger.debug("readiness poll responding with true");
116-
listener.onResponse(true);
126+
readinessListener.onResponse(true);
117127
}
118128
}
119129

@@ -138,12 +148,13 @@ public Executor executor() {
138148
threadPool.schedule(() -> {
139149
if (responded.get() == false) {
140150
// Retry
151+
retryListener.onResponse(null);
141152
this.run();
142153
}
143154
}, TimeValue.timeValueMillis(delay), EsExecutors.DIRECT_EXECUTOR_SERVICE);
144155
} else if (responded.compareAndSet(false, true)) {
145156
logger.debug("readiness poll responding with false");
146-
listener.onResponse(false);
157+
readinessListener.onResponse(false);
147158
}
148159
}
149160
};

server/src/test/java/org/elasticsearch/readiness/ReadinessPollingServiceTests.java

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import org.elasticsearch.cluster.service.ClusterService;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.concurrent.EsExecutors;
23+
import org.elasticsearch.core.CheckedRunnable;
2324
import org.elasticsearch.test.ClusterServiceUtils;
2425
import org.elasticsearch.test.ESTestCase;
25-
import org.elasticsearch.test.junit.annotations.TestLogging;
2626
import org.elasticsearch.test.transport.MockTransportService;
2727
import org.elasticsearch.threadpool.TestThreadPool;
2828
import org.elasticsearch.threadpool.ThreadPool;
@@ -36,12 +36,11 @@
3636
import java.util.concurrent.BlockingQueue;
3737
import java.util.concurrent.CountDownLatch;
3838
import java.util.concurrent.LinkedBlockingQueue;
39-
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.function.Predicate;
4141
import java.util.stream.IntStream;
4242

4343
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44-
import static java.util.concurrent.TimeUnit.SECONDS;
4544

4645
public class ReadinessPollingServiceTests extends ESTestCase {
4746
/**
@@ -53,14 +52,16 @@ public class ReadinessPollingServiceTests extends ESTestCase {
5352
/**
5453
* We're not actually waiting for nodes to boot, so use a short value to avoid wasting time.
5554
*/
56-
private static final int QUICK_RETRY_MILLIS = 1;
55+
private static final int QUICK_RETRY_MILLIS = 5;
5756

5857
/**
5958
* This is for tests that <em>are</em> expecting a timeout to occur.
6059
* Use a short value so we don't waste a lot of time.
6160
*/
6261
private static final int QUICK_TIMEOUT_MILLIS = 10 * QUICK_RETRY_MILLIS;
6362

63+
private static final int NUM_TARGET_NODES = 3;
64+
6465
private ThreadPool threadPool;
6566
private MockTransportService sourceTransport;
6667
private List<MockTransportService> targetTransports;
@@ -72,21 +73,9 @@ public class ReadinessPollingServiceTests extends ESTestCase {
7273
public void setup() throws InterruptedException {
7374
threadPool = new TestThreadPool(getClass().getName());
7475
sourceTransport = newMockTransportService();
75-
targetTransports = IntStream.rangeClosed(1, 2).mapToObj(n -> newMockTransportService()).toList();
76+
targetTransports = IntStream.rangeClosed(1, NUM_TARGET_NODES).mapToObj(n -> newMockTransportService()).toList();
7677
targetNodes = targetTransports.stream().map(TransportService::getLocalNode).toList();
7778

78-
// Connect all the target nodes to the source transport
79-
for (MockTransportService target : targetTransports) {
80-
CountDownLatch latch = new CountDownLatch(1);
81-
sourceTransport.connectionManager().connectToNode(target.getLocalNode(), null, (x,y,listener)->{listener.onResponse(null);}, ActionListener.wrap(
82-
connection -> latch.countDown(),
83-
e -> fail("Unexpected exception connecting to target node: " + e)
84-
));
85-
if (!latch.await(LONG_TIMEOUT_MILLIS, MILLISECONDS)) {
86-
fail("Timed out waiting for target node to connect");
87-
}
88-
}
89-
9079
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder()
9180
.add(sourceTransport.getLocalNode());
9281
targetNodes.forEach(nodesBuilder::add);
@@ -120,43 +109,63 @@ public void teardown() {
120109
public void testSuccessImmediately() throws Exception {
121110
service = newReadinessPollingService(LONG_TIMEOUT_MILLIS);
122111
registerReadinessActions(node -> true);
112+
connectNodes(targetTransports);
123113

124114
assertReadiness(true);
125115
}
126116

127-
public void testSuccessAfterRetries() throws Exception {
117+
public void testSuccessAfterRetry() throws Exception {
128118
service = newReadinessPollingService(LONG_TIMEOUT_MILLIS);
129-
AtomicInteger remainingFailures = new AtomicInteger(2 * targetNodes.size()); // A couple of failures per node before success
130-
registerReadinessActions(node -> remainingFailures.getAndDecrement() <= 0);
119+
AtomicBoolean isReady = new AtomicBoolean(false);
120+
CheckedRunnable<?> retryAction = () -> isReady.set(true);
121+
registerReadinessActions(node -> isReady.get());
122+
connectNodes(targetTransports);
131123

132-
assertReadiness(true);
124+
assertReadiness(true, retryAction);
133125
}
134126

135127
public void testSuccessOnOneNode() throws Exception {
136128
service = newReadinessPollingService(LONG_TIMEOUT_MILLIS);
137129
var goodNode = randomFrom(targetNodes);
138130
registerReadinessActions(goodNode::equals);
131+
connectNodes(targetTransports);
139132

140133
assertReadiness(true);
141134
}
142135

143-
public void testTimeout() throws Exception {
136+
public void testSuccessWithLateJoiningNodes() throws Exception {
137+
service = newReadinessPollingService(LONG_TIMEOUT_MILLIS);
138+
registerReadinessActions(node -> true);
139+
CheckedRunnable<InterruptedException> retryAction = () -> connectNodes(targetTransports);
140+
141+
assertReadiness(true, retryAction);
142+
}
143+
144+
public void testTimeoutWithNoNodes() throws Exception {
145+
service = newReadinessPollingService(QUICK_TIMEOUT_MILLIS);
146+
assertReadiness(false);
147+
}
148+
149+
public void testTimeoutWithUnreadyNodes() throws Exception {
144150
service = newReadinessPollingService(QUICK_TIMEOUT_MILLIS);
145151
registerReadinessActions(node -> false);
152+
connectNodes(targetTransports);
146153

147154
assertReadiness(false);
148155
}
149156

150157
public void testTransportExceptionSameAsTimeout() throws Exception {
151158
service = newReadinessPollingService(QUICK_TIMEOUT_MILLIS);
152159
registerReadinessActions(node -> { throw new TransportException("test"); });
160+
connectNodes(targetTransports);
153161

154162
assertReadiness(false);
155163
}
156164

157165
public void testIllegalStateExceptionSameAsTimeout() throws Exception {
158166
service = newReadinessPollingService(QUICK_TIMEOUT_MILLIS);
159167
registerReadinessActions(node -> { throw new IllegalStateException("test"); });
168+
connectNodes(targetTransports);
160169

161170
assertReadiness(false);
162171
}
@@ -187,9 +196,33 @@ private static void registerReadinessActionOnOneNode(MockTransportService transp
187196
);
188197
}
189198

199+
private void connectNodes(List<MockTransportService> transports) throws InterruptedException {
200+
for (MockTransportService target : transports) {
201+
CountDownLatch latch = new CountDownLatch(1);
202+
sourceTransport.connectionManager().connectToNode(target.getLocalNode(), null, (x,y,listener)->{listener.onResponse(null);}, ActionListener.wrap(
203+
connection -> latch.countDown(),
204+
e -> fail("Unexpected exception connecting to target node: " + e)
205+
));
206+
if (!latch.await(LONG_TIMEOUT_MILLIS, MILLISECONDS)) {
207+
fail("Timed out waiting for target node to connect");
208+
}
209+
}
210+
}
211+
190212
private void assertReadiness(Boolean expected) throws InterruptedException {
213+
assertReadiness(expected, ActionListener.noop());
214+
}
215+
216+
private void assertReadiness(Boolean expected, CheckedRunnable<?> retryAction) throws InterruptedException {
217+
assertReadiness(
218+
expected,
219+
ActionListener.wrap(r -> retryAction.run(), e->fail("Unexpected error waiting for readiness: " + e))
220+
);
221+
}
222+
223+
private void assertReadiness(Boolean expected, ActionListener<Void> retryListener) throws InterruptedException {
191224
BlockingQueue<Boolean> readiness = new LinkedBlockingQueue<>();
192-
service.execute(targetNodes::contains, ActionListener.wrap(readiness::add, e -> fail(e.toString())));
225+
service.execute(targetNodes::contains, ActionListener.wrap(readiness::add, e -> fail(e.toString())), retryListener);
193226
assertEquals(expected, readiness.poll(LONG_TIMEOUT_MILLIS, MILLISECONDS));
194227
}
195228

0 commit comments

Comments
 (0)