Skip to content

Commit 7e06534

Browse files
authored
Allow QueryRewriteContext to perform async actions on remote clusters (#138124)
1 parent e33f903 commit 7e06534

File tree

6 files changed

+756
-29
lines changed

6 files changed

+756
-29
lines changed

server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ResolvedIndices;
1414
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.client.internal.RemoteClusterClient;
1516
import org.elasticsearch.cluster.metadata.DataStream;
1617
import org.elasticsearch.cluster.metadata.IndexMetadata;
1718
import org.elasticsearch.cluster.routing.allocation.DataTier;
1819
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.TriConsumer;
1921
import org.elasticsearch.common.collect.Iterators;
2022
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2123
import org.elasticsearch.common.regex.Regex;
2224
import org.elasticsearch.common.settings.Settings;
2325
import org.elasticsearch.common.util.concurrent.CountDown;
26+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2427
import org.elasticsearch.core.Nullable;
2528
import org.elasticsearch.index.Index;
2629
import org.elasticsearch.index.IndexSettings;
@@ -36,11 +39,13 @@
3639
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
3740
import org.elasticsearch.search.builder.PointInTimeBuilder;
3841
import org.elasticsearch.transport.RemoteClusterAware;
42+
import org.elasticsearch.transport.RemoteClusterService;
3943
import org.elasticsearch.xcontent.XContentParser;
4044
import org.elasticsearch.xcontent.XContentParserConfiguration;
4145

4246
import java.util.ArrayList;
4347
import java.util.Collections;
48+
import java.util.HashMap;
4449
import java.util.HashSet;
4550
import java.util.List;
4651
import java.util.Map;
@@ -52,6 +57,8 @@
5257
import java.util.function.Predicate;
5358
import java.util.stream.Collectors;
5459

60+
import static org.elasticsearch.threadpool.ThreadPool.Names.SEARCH_COORDINATION;
61+
5562
/**
5663
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
5764
*/
@@ -72,6 +79,8 @@ public class QueryRewriteContext {
7279
protected final Client client;
7380
protected final LongSupplier nowInMillis;
7481
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
82+
private final Map<String, List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>>> remoteAsyncActions =
83+
new HashMap<>();
7584
protected boolean allowUnmappedFields;
7685
protected boolean mapUnmappedFieldAsString;
7786
protected Predicate<String> allowedFields;
@@ -357,22 +366,38 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
357366
asyncActions.add(asyncAction);
358367
}
359368

369+
public void registerRemoteAsyncAction(
370+
String clusterAlias,
371+
TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>> asyncAction
372+
) {
373+
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> asyncActions = remoteAsyncActions.computeIfAbsent(
374+
clusterAlias,
375+
k -> new ArrayList<>()
376+
);
377+
asyncActions.add(asyncAction);
378+
}
379+
360380
/**
361381
* Returns <code>true</code> if there are any registered async actions.
362382
*/
363383
public boolean hasAsyncActions() {
364-
return asyncActions.isEmpty() == false;
384+
return asyncActions.isEmpty() == false || remoteAsyncActions.isEmpty() == false;
365385
}
366386

367387
/**
368388
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
369389
* <code>null</code>. The list of registered actions is cleared once this method returns.
370390
*/
371391
public void executeAsyncActions(ActionListener<Void> listener) {
372-
if (asyncActions.isEmpty()) {
392+
if (asyncActions.isEmpty() && remoteAsyncActions.isEmpty()) {
373393
listener.onResponse(null);
374394
} else {
375-
CountDown countDown = new CountDown(asyncActions.size());
395+
int actionCount = asyncActions.size();
396+
for (var actionList : remoteAsyncActions.values()) {
397+
actionCount += actionList.size();
398+
}
399+
400+
CountDown countDown = new CountDown(actionCount);
376401
ActionListener<?> internalListener = new ActionListener<>() {
377402
@Override
378403
public void onResponse(Object o) {
@@ -388,12 +413,30 @@ public void onFailure(Exception e) {
388413
}
389414
}
390415
};
416+
391417
// make a copy to prevent concurrent modification exception
392418
List<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
393419
asyncActions.clear();
394420
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
395421
action.accept(client, internalListener);
396422
}
423+
424+
var remoteAsyncActionsCopy = new HashMap<>(remoteAsyncActions);
425+
remoteAsyncActions.clear();
426+
for (var entry : remoteAsyncActionsCopy.entrySet()) {
427+
String clusterAlias = entry.getKey();
428+
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> remoteTriConsumers = entry.getValue();
429+
430+
RemoteClusterClient remoteClient = client.getRemoteClusterClient(
431+
clusterAlias,
432+
client.threadPool().executor(SEARCH_COORDINATION),
433+
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
434+
);
435+
ThreadContext threadContext = client.threadPool().getThreadContext();
436+
for (var action : remoteTriConsumers) {
437+
action.apply(remoteClient, threadContext, internalListener);
438+
}
439+
}
397440
}
398441
}
399442

test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
1616
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1717
import org.elasticsearch.client.internal.Client;
18+
import org.elasticsearch.client.internal.OriginSettingClient;
1819
import org.elasticsearch.common.network.NetworkModule;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.transport.TransportAddress;
@@ -94,6 +95,24 @@ protected boolean reuseClusters() {
9495
return true;
9596
}
9697

98+
protected NodeConfigurationSource nodeConfigurationSource() {
99+
return null;
100+
}
101+
102+
protected String internalClientOrigin() {
103+
return null;
104+
}
105+
106+
private Client internalClient() {
107+
return internalClient(LOCAL_CLUSTER);
108+
}
109+
110+
private Client internalClient(String clusterAlias) {
111+
String internalClientOrigin = internalClientOrigin();
112+
Client client = client(clusterAlias);
113+
return internalClientOrigin != null ? new OriginSettingClient(client, internalClientOrigin) : client;
114+
}
115+
97116
@Before
98117
public final void startClusters() throws Exception {
99118
if (clusterGroup != null && reuseClusters()) {
@@ -129,7 +148,7 @@ public final void startClusters() throws Exception {
129148
mockPlugins,
130149
Function.identity(),
131150
TEST_ENTITLEMENTS::addEntitledNodePaths
132-
);
151+
).internalClientOrigin(internalClientOrigin());
133152
try {
134153
cluster.beforeTest(random());
135154
} catch (Exception e) {
@@ -170,7 +189,11 @@ protected void removeRemoteCluster(String clusterAlias) throws Exception {
170189
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
171190
settings.putNull("cluster.remote." + clusterAlias + ".mode");
172191
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
173-
client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
192+
internalClient().admin()
193+
.cluster()
194+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
195+
.setPersistentSettings(settings)
196+
.get();
174197
assertBusy(() -> {
175198
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
176199
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
@@ -222,7 +245,7 @@ protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Coll
222245
}
223246
builder.build();
224247

225-
ClusterUpdateSettingsResponse resp = client().admin()
248+
ClusterUpdateSettingsResponse resp = internalClient().admin()
226249
.cluster()
227250
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
228251
.setPersistentSettings(settings)
@@ -233,7 +256,10 @@ protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Coll
233256
}
234257

235258
assertBusy(() -> {
236-
List<RemoteConnectionInfo> remoteConnectionInfos = client().execute(TransportRemoteInfoAction.TYPE, new RemoteInfoRequest())
259+
List<RemoteConnectionInfo> remoteConnectionInfos = internalClient().execute(
260+
TransportRemoteInfoAction.TYPE,
261+
new RemoteInfoRequest()
262+
)
237263
.actionGet()
238264
.getInfos()
239265
.stream()
@@ -265,27 +291,40 @@ public void close() throws IOException {
265291
}
266292
}
267293

268-
static NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection<Class<? extends Plugin>> nodePlugins) {
294+
private NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection<Class<? extends Plugin>> nodePlugins) {
269295
final Settings.Builder builder = Settings.builder();
270296
builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
271297
builder.putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file");
272298
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
273-
builder.put(nodeSettings);
274299

300+
NodeConfigurationSource nodeConfigurationSource = nodeConfigurationSource();
275301
return new NodeConfigurationSource() {
276302
@Override
277303
public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
304+
if (nodeConfigurationSource != null) {
305+
builder.put(nodeConfigurationSource.nodeSettings(nodeOrdinal, otherSettings));
306+
}
307+
builder.put(nodeSettings);
308+
278309
return builder.build();
279310
}
280311

281312
@Override
282313
public Path nodeConfigPath(int nodeOrdinal) {
283-
return null;
314+
return nodeConfigurationSource != null ? nodeConfigurationSource.nodeConfigPath(nodeOrdinal) : null;
284315
}
285316

286317
@Override
287318
public Collection<Class<? extends Plugin>> nodePlugins() {
288-
return nodePlugins;
319+
Collection<Class<? extends Plugin>> plugins;
320+
if (nodeConfigurationSource != null) {
321+
plugins = new ArrayList<>(nodeConfigurationSource.nodePlugins());
322+
plugins.addAll(nodePlugins);
323+
} else {
324+
plugins = nodePlugins;
325+
}
326+
327+
return plugins;
289328
}
290329
};
291330
}

test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,11 @@ public Client client() {
161161
return client;
162162
}
163163

164+
@Override
165+
protected Client internalClient() {
166+
return client;
167+
}
168+
164169
@Override
165170
public int size() {
166171
return httpAddresses.length;
@@ -189,7 +194,13 @@ public void close() throws IOException {
189194
@Override
190195
public void ensureEstimatedStats() {
191196
if (size() > 0) {
192-
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().clear().setBreaker(true).setIndices(true).get();
197+
NodesStatsResponse nodeStats = internalClient().admin()
198+
.cluster()
199+
.prepareNodesStats()
200+
.clear()
201+
.setBreaker(true)
202+
.setIndices(true)
203+
.get();
193204
for (NodeStats stats : nodeStats.getNodes()) {
194205
assertThat(
195206
"Fielddata breaker not reset to 0 on node: " + stats.getNode(),

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.action.support.RefCountingRunnable;
3232
import org.elasticsearch.action.support.replication.TransportReplicationAction;
3333
import org.elasticsearch.client.internal.Client;
34+
import org.elasticsearch.client.internal.OriginSettingClient;
3435
import org.elasticsearch.cluster.ClusterName;
3536
import org.elasticsearch.cluster.ClusterState;
3637
import org.elasticsearch.cluster.NodeConnectionsService;
@@ -278,6 +279,8 @@ public String toString() {
278279

279280
private final int numDataPaths;
280281

282+
private String internalClientOrigin = null;
283+
281284
/**
282285
* All nodes started by the cluster will have their name set to nodePrefix followed by a positive number
283286
*/
@@ -754,6 +757,11 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
754757
}
755758
}
756759

760+
public InternalTestCluster internalClientOrigin(String origin) {
761+
this.internalClientOrigin = origin;
762+
return this;
763+
}
764+
757765
private Settings getNodeSettings(final int nodeId, final long seed, final Settings extraSettings) {
758766
final Settings settings = getSettings(nodeId, seed, extraSettings);
759767

@@ -874,6 +882,20 @@ public Client client() {
874882
return c.client();
875883
}
876884

885+
@Override
886+
protected Client internalClient() {
887+
return internalClient(null);
888+
}
889+
890+
private Client internalClient(@Nullable String nodeName) {
891+
Client client = nodeName != null ? client(nodeName) : client();
892+
return makeInternal(client);
893+
}
894+
895+
private Client makeInternal(Client client) {
896+
return internalClientOrigin != null ? new OriginSettingClient(client, internalClientOrigin) : client;
897+
}
898+
877899
/**
878900
* Returns a node client to a data node in the cluster.
879901
* Note: use this with care tests should not rely on a certain nodes client.
@@ -1282,7 +1304,7 @@ public synchronized void validateClusterFormed() {
12821304
try {
12831305
assertBusy(() -> {
12841306
try {
1285-
final boolean timeout = client().admin()
1307+
final boolean timeout = internalClient().admin()
12861308
.cluster()
12871309
.prepareHealth(TEST_REQUEST_TIMEOUT)
12881310
.setWaitForEvents(Priority.LANGUID)
@@ -1552,7 +1574,7 @@ public void assertSeqNos() throws Exception {
15521574
*/
15531575
public void assertSameDocIdsOnShards() throws Exception {
15541576
assertBusy(() -> {
1555-
ClusterState state = client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
1577+
ClusterState state = internalClient().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
15561578
for (var indexRoutingTable : state.routingTable().indicesRouting().values()) {
15571579
for (int i = 0; i < indexRoutingTable.size(); i++) {
15581580
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(i);
@@ -1999,7 +2021,7 @@ private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
19992021

20002022
logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeNames);
20012023
try {
2002-
client().execute(
2024+
internalClient().execute(
20032025
TransportAddVotingConfigExclusionsAction.TYPE,
20042026
new AddVotingConfigExclusionsRequest(TEST_REQUEST_TIMEOUT, excludedNodeNames.toArray(Strings.EMPTY_ARRAY))
20052027
).get();
@@ -2016,7 +2038,7 @@ private void removeExclusions(Set<String> excludedNodeIds) {
20162038
if (autoManageVotingExclusions && excludedNodeIds.isEmpty() == false) {
20172039
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
20182040
try {
2019-
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client();
2041+
Client client = makeInternal(getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client());
20202042
client.execute(
20212043
TransportClearVotingConfigExclusionsAction.TYPE,
20222044
new ClearVotingConfigExclusionsRequest(TEST_REQUEST_TIMEOUT)
@@ -2080,7 +2102,7 @@ public String getMasterName(@Nullable String viaNode) {
20802102
}
20812103
try {
20822104
ClusterServiceUtils.awaitClusterState(state -> state.nodes().getMasterNode() != null, clusterService(viaNode));
2083-
final ClusterState state = client(viaNode).admin()
2105+
final ClusterState state = internalClient(viaNode).admin()
20842106
.cluster()
20852107
.prepareState(TEST_REQUEST_TIMEOUT)
20862108
.clear()

0 commit comments

Comments
 (0)