Skip to content

Commit f48051c

Browse files
committed
ReadinessPollingUtil
1 parent e196040 commit f48051c

File tree

2 files changed

+255
-0
lines changed

2 files changed

+255
-0
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.readiness;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.common.util.concurrent.EsExecutors;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.logging.LogManager;
19+
import org.elasticsearch.logging.Logger;
20+
import org.elasticsearch.threadpool.ThreadPool;
21+
import org.elasticsearch.transport.TransportException;
22+
import org.elasticsearch.transport.TransportResponseHandler;
23+
import org.elasticsearch.transport.TransportService;
24+
25+
import java.io.IOException;
26+
import java.util.Set;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.function.Predicate;
30+
import java.util.stream.Collectors;
31+
32+
class ReadinessPollingUtil {
33+
34+
private final ClusterService clusterService;
35+
private final TransportService transportService;
36+
private final ThreadPool threadPool;
37+
38+
public ReadinessPollingUtil(ClusterService clusterService, TransportService transportService, ThreadPool threadPool) {
39+
this.clusterService = clusterService;
40+
this.transportService = transportService;
41+
this.threadPool = threadPool;
42+
}
43+
44+
/**
45+
* Polls the indicated nodes until at least one reports readiness or the timeout expires.
46+
* Uses exponential backoff.
47+
* Failures are considered equivalent to "not ready", since this is used when a cluster is in flux.
48+
*
49+
* @param timeoutMillis total timeout for polling
50+
* @param initialDelayMillis initial retry delay; exponential backoff will double this on each attempt
51+
* @param nodeFilter predicate to select which nodes to poll
52+
* @param listener callback invoked with true if any node reports ready, false if timeout
53+
*/
54+
public void waitForAnyNodeReady(
55+
long timeoutMillis,
56+
long initialDelayMillis,
57+
Predicate<DiscoveryNode> nodeFilter,
58+
ActionListener<Boolean> listener
59+
) {
60+
61+
final long deadline = System.currentTimeMillis() + timeoutMillis;
62+
63+
Runnable attempt = new Runnable() {
64+
int attemptCount = 0;
65+
66+
@Override
67+
public void run() {
68+
Set<DiscoveryNode> nodes = clusterService.state().nodes().stream().filter(nodeFilter).collect(Collectors.toSet());
69+
70+
if (nodes.isEmpty()) {
71+
listener.onResponse(false);
72+
return;
73+
}
74+
75+
// We want to respond at most once
76+
AtomicBoolean responded = new AtomicBoolean(false);
77+
78+
for (DiscoveryNode node : nodes) {
79+
transportService.sendRequest(
80+
node,
81+
TransportReadinessAction.TYPE.name(),
82+
new ReadinessRequest(),
83+
new TransportResponseHandler<ReadinessResponse>() {
84+
@Override
85+
public ReadinessResponse read(StreamInput in) throws IOException {
86+
return ReadinessResponse.readFrom(in);
87+
}
88+
89+
@Override
90+
public void handleResponse(ReadinessResponse response) {
91+
// If !response.isReady() then we ignore this one hoping for another node to be ready.
92+
logger.debug("node [{}] reports readiness [{}]", node, response.isReady());
93+
if (response.isReady() && responded.compareAndSet(false, true)) {
94+
logger.debug("readiness poll responding with true");
95+
listener.onResponse(true);
96+
}
97+
}
98+
99+
@Override
100+
public void handleException(TransportException exp) {
101+
// ignore failures; retry/backoff handles it
102+
logger.debug("failed to determine readiness of node [{}]", node, exp);
103+
}
104+
105+
@Override
106+
public Executor executor() {
107+
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
108+
}
109+
}
110+
);
111+
}
112+
113+
attemptCount++;
114+
long delay = initialDelayMillis * (1L << (attemptCount - 1));
115+
delay = Math.min(delay, deadline - System.currentTimeMillis());
116+
if (delay > 0) {
117+
threadPool.schedule(() -> {
118+
if (!responded.get()) {
119+
// Retry
120+
this.run();
121+
}
122+
}, TimeValue.timeValueMillis(delay), EsExecutors.DIRECT_EXECUTOR_SERVICE);
123+
} else if (responded.compareAndSet(false, true)) {
124+
logger.debug("readiness poll responding with false");
125+
listener.onResponse(false);
126+
}
127+
}
128+
};
129+
130+
threadPool.generic().execute(attempt);
131+
}
132+
133+
private static final Logger logger = LogManager.getLogger(ReadinessPollingUtil.class);
134+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.readiness;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
18+
import org.elasticsearch.cluster.node.DiscoveryNodes;
19+
import org.elasticsearch.cluster.node.VersionInformation;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.util.concurrent.EsExecutors;
23+
import org.elasticsearch.test.ClusterServiceUtils;
24+
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.test.transport.MockTransportService;
26+
import org.elasticsearch.threadpool.TestThreadPool;
27+
import org.elasticsearch.threadpool.ThreadPool;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
31+
import java.util.Set;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.function.BooleanSupplier;
35+
36+
public class ReadinessPollingUtilTests extends ESTestCase {
37+
/**
38+
* This is for tests that are not expecting a timeout to occur.
39+
* We use a large value to support single-step debugging.
40+
*/
41+
private static final int LONG_TIMEOUT_MILLIS = 20 * 60 * 1000;
42+
43+
/**
44+
* We're not actually waiting for nodes to boot, so use a short value to avoid wasting time.
45+
*/
46+
private static final int QUICK_RETRY_MILLIS = 1;
47+
48+
private ThreadPool threadPool;
49+
private MockTransportService transport;
50+
private ClusterService clusterService;
51+
52+
@Before
53+
public void setup() {
54+
threadPool = new TestThreadPool(getClass().getName());
55+
transport = MockTransportService.createNewService(
56+
Settings.EMPTY,
57+
VersionInformation.CURRENT,
58+
TransportVersion.current(),
59+
threadPool
60+
);
61+
transport.start();
62+
transport.acceptIncomingRequests();
63+
DiscoveryNode node = transport.getLocalNode();
64+
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(node);
65+
ClusterState clusterState = ClusterState.builder(new ClusterName("test-cluster")).nodes(nodesBuilder).build();
66+
clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
67+
}
68+
69+
@After
70+
public void teardown() {
71+
clusterService.close();
72+
transport.close();
73+
terminate(threadPool);
74+
}
75+
76+
public void testReadinessSucceedsImmediately() throws Exception {
77+
registerReadinessAction(() -> true);
78+
79+
ReadinessPollingUtil poll = new ReadinessPollingUtil(clusterService, transport, threadPool);
80+
AtomicBoolean readiness = new AtomicBoolean(false);
81+
82+
poll.waitForAnyNodeReady(
83+
LONG_TIMEOUT_MILLIS,
84+
QUICK_RETRY_MILLIS,
85+
n -> true,
86+
ActionListener.wrap(readiness::set, e -> fail(e.toString()))
87+
);
88+
89+
assertBusy(() -> assertTrue("Node was ready", readiness.get()));
90+
}
91+
92+
public void testReadinessRetriesThenSucceeds() throws Exception {
93+
AtomicInteger remainingFailures = new AtomicInteger(2); // fails twice before success
94+
registerReadinessAction(() -> remainingFailures.getAndDecrement() <= 0);
95+
96+
ReadinessPollingUtil poll = new ReadinessPollingUtil(clusterService, transport, threadPool);
97+
AtomicBoolean readiness = new AtomicBoolean(false);
98+
99+
poll.waitForAnyNodeReady(
100+
LONG_TIMEOUT_MILLIS,
101+
QUICK_RETRY_MILLIS,
102+
n -> true,
103+
ActionListener.wrap(readiness::set, e -> fail(e.toString()))
104+
);
105+
106+
assertBusy(() -> assertTrue("Node was ready", readiness.get()));
107+
}
108+
109+
private void registerReadinessAction(BooleanSupplier readinessSupplier) {
110+
TransportReadinessAction action = new TransportReadinessAction(new ActionFilters(Set.of()), null, Runnable::run, readinessSupplier);
111+
transport.registerRequestHandler(
112+
TransportReadinessAction.TYPE.name(),
113+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
114+
false,
115+
false,
116+
ReadinessRequest::new,
117+
(request, channel, task) -> action.execute(task, request, ActionListener.wrap(channel::sendResponse, channel::sendResponse))
118+
);
119+
}
120+
121+
}

0 commit comments

Comments
 (0)