Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9ee1813
Updated QueryRewriteContext to execute remote async actions
Mikep86 Nov 12, 2025
b6e329c
Resolve TODO
Mikep86 Nov 12, 2025
034d1f5
Added skeleton for QueryRewriteContextRemoteAsyncActionIT
Mikep86 Nov 12, 2025
a9534c1
Simplified test setup
Mikep86 Nov 12, 2025
59ef48a
Added test stubs
Mikep86 Nov 12, 2025
c6b1e64
Implemented test query rewrite logic
Mikep86 Nov 13, 2025
1851f49
Renamed variables
Mikep86 Nov 13, 2025
77210cb
Fix warnings
Mikep86 Nov 13, 2025
ffc99bb
Implemented testCallRemoteAsyncAction
Mikep86 Nov 13, 2025
84e2e51
Use search coordination thread pool
Mikep86 Nov 13, 2025
8d08360
Implemented testInvalidClusterAlias
Mikep86 Nov 13, 2025
9308455
Rename integration tests
Mikep86 Nov 13, 2025
41c00af
Merge branch 'main' into query-rewrite_remote-async-actions
Mikep86 Nov 14, 2025
7590440
Removed testRemoteClusterUnavailable
Mikep86 Nov 14, 2025
bb9c21d
Moved QueryRewriteContextMultiClustersIT to security plugin
Mikep86 Nov 18, 2025
167c019
Updated `InternalTestCluster` to use clients with origins set for int…
Mikep86 Nov 19, 2025
ef12db4
Updated AbstractMultiClustersTestCase to optionally take a custom nod…
Mikep86 Nov 19, 2025
375e609
Updated AbstractMultiClustersTestCase to use clients with origins set…
Mikep86 Nov 19, 2025
6c57f3b
Updated QueryRewriteContextMultiClustersIT to run with and without se…
Mikep86 Nov 19, 2025
5116eea
Updated test cluster classes to use internal clients for cluster mana…
Mikep86 Nov 19, 2025
8c88ffb
Fix license header
Mikep86 Nov 19, 2025
e6cc49d
Temporarily resolve jar hell
Mikep86 Nov 19, 2025
d859290
Merge branch 'main' into query-rewrite_remote-async-actions
elasticmachine Nov 19, 2025
ecda1fe
Make internal client origin configurable
Mikep86 Nov 20, 2025
ab45505
Cleanup security index
Mikep86 Nov 20, 2025
19abecc
Updated remote async actions to use thread context
Mikep86 Nov 20, 2025
a8c7661
Make a copy of remote async actions map
Mikep86 Nov 20, 2025
1b9c788
Use a custom security settings source
Mikep86 Nov 20, 2025
9596ecd
Configure the client to use the test user when security is enabled
Mikep86 Nov 20, 2025
6afaec7
Updated test query to set origin
Mikep86 Nov 20, 2025
a543197
Added tests where origin is and is not set
Mikep86 Nov 20, 2025
ef28a9d
Merge branch 'main' into query-rewrite_remote-async-actions
elasticmachine Nov 20, 2025
560475f
Merge branch 'main' into query-rewrite_remote-async-actions
Mikep86 Nov 24, 2025
dad30f5
Temporarily disable testInvalidClusterAlias when security is enabled
Mikep86 Nov 24, 2025
4fa6794
Resolve TODO
Mikep86 Nov 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -36,11 +39,13 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -52,6 +57,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.threadpool.ThreadPool.Names.SEARCH_COORDINATION;

/**
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
*/
Expand All @@ -72,6 +79,8 @@ public class QueryRewriteContext {
protected final Client client;
protected final LongSupplier nowInMillis;
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
private final Map<String, List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>>> remoteAsyncActions =
new HashMap<>();
protected boolean allowUnmappedFields;
protected boolean mapUnmappedFieldAsString;
protected Predicate<String> allowedFields;
Expand Down Expand Up @@ -357,22 +366,38 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
asyncActions.add(asyncAction);
}

public void registerRemoteAsyncAction(
String clusterAlias,
TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>> asyncAction
) {
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> asyncActions = remoteAsyncActions.computeIfAbsent(
clusterAlias,
k -> new ArrayList<>()
);
asyncActions.add(asyncAction);
}

/**
* Returns <code>true</code> if there are any registered async actions.
*/
public boolean hasAsyncActions() {
return asyncActions.isEmpty() == false;
return asyncActions.isEmpty() == false || remoteAsyncActions.isEmpty() == false;
}

/**
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
* <code>null</code>. The list of registered actions is cleared once this method returns.
*/
public void executeAsyncActions(ActionListener<Void> listener) {
if (asyncActions.isEmpty()) {
if (asyncActions.isEmpty() && remoteAsyncActions.isEmpty()) {
listener.onResponse(null);
} else {
CountDown countDown = new CountDown(asyncActions.size());
int actionCount = asyncActions.size();
for (var actionList : remoteAsyncActions.values()) {
actionCount += actionList.size();
}

CountDown countDown = new CountDown(actionCount);
ActionListener<?> internalListener = new ActionListener<>() {
@Override
public void onResponse(Object o) {
Expand All @@ -388,12 +413,30 @@ public void onFailure(Exception e) {
}
}
};

// make a copy to prevent concurrent modification exception
List<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
asyncActions.clear();
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
action.accept(client, internalListener);
}

var remoteAsyncActionsCopy = new HashMap<>(remoteAsyncActions);
remoteAsyncActions.clear();
for (var entry : remoteAsyncActionsCopy.entrySet()) {
String clusterAlias = entry.getKey();
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> remoteTriConsumers = entry.getValue();

RemoteClusterClient remoteClient = client.getRemoteClusterClient(
clusterAlias,
client.threadPool().executor(SEARCH_COORDINATION),
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
ThreadContext threadContext = client.threadPool().getThreadContext();
for (var action : remoteTriConsumers) {
action.apply(remoteClient, threadContext, internalListener);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -94,6 +95,24 @@ protected boolean reuseClusters() {
return true;
}

protected NodeConfigurationSource nodeConfigurationSource() {
return null;
}

protected String internalClientOrigin() {
return null;
}

private Client internalClient() {
return internalClient(LOCAL_CLUSTER);
}

private Client internalClient(String clusterAlias) {
String internalClientOrigin = internalClientOrigin();
Client client = client(clusterAlias);
return internalClientOrigin != null ? new OriginSettingClient(client, internalClientOrigin) : client;
}

@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
Expand Down Expand Up @@ -129,7 +148,7 @@ public final void startClusters() throws Exception {
mockPlugins,
Function.identity(),
TEST_ENTITLEMENTS::addEntitledNodePaths
);
).internalClientOrigin(internalClientOrigin());
try {
cluster.beforeTest(random());
} catch (Exception e) {
Expand Down Expand Up @@ -170,7 +189,11 @@ protected void removeRemoteCluster(String clusterAlias) throws Exception {
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
settings.putNull("cluster.remote." + clusterAlias + ".mode");
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
internalClient().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(settings)
.get();
assertBusy(() -> {
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
Expand Down Expand Up @@ -222,7 +245,7 @@ protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Coll
}
builder.build();

ClusterUpdateSettingsResponse resp = client().admin()
ClusterUpdateSettingsResponse resp = internalClient().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(settings)
Expand All @@ -233,7 +256,10 @@ protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Coll
}

assertBusy(() -> {
List<RemoteConnectionInfo> remoteConnectionInfos = client().execute(TransportRemoteInfoAction.TYPE, new RemoteInfoRequest())
List<RemoteConnectionInfo> remoteConnectionInfos = internalClient().execute(
TransportRemoteInfoAction.TYPE,
new RemoteInfoRequest()
)
.actionGet()
.getInfos()
.stream()
Expand Down Expand Up @@ -265,27 +291,40 @@ public void close() throws IOException {
}
}

static NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection<Class<? extends Plugin>> nodePlugins) {
private NodeConfigurationSource nodeConfigurationSource(Settings nodeSettings, Collection<Class<? extends Plugin>> nodePlugins) {
final Settings.Builder builder = Settings.builder();
builder.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
builder.putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file");
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
builder.put(nodeSettings);

NodeConfigurationSource nodeConfigurationSource = nodeConfigurationSource();
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
if (nodeConfigurationSource != null) {
builder.put(nodeConfigurationSource.nodeSettings(nodeOrdinal, otherSettings));
}
builder.put(nodeSettings);

return builder.build();
}

@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
return nodeConfigurationSource != null ? nodeConfigurationSource.nodeConfigPath(nodeOrdinal) : null;
}

@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return nodePlugins;
Collection<Class<? extends Plugin>> plugins;
if (nodeConfigurationSource != null) {
plugins = new ArrayList<>(nodeConfigurationSource.nodePlugins());
plugins.addAll(nodePlugins);
} else {
plugins = nodePlugins;
}

return plugins;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public Client client() {
return client;
}

@Override
protected Client internalClient() {
return client;
}

@Override
public int size() {
return httpAddresses.length;
Expand Down Expand Up @@ -189,7 +194,13 @@ public void close() throws IOException {
@Override
public void ensureEstimatedStats() {
if (size() > 0) {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().clear().setBreaker(true).setIndices(true).get();
NodesStatsResponse nodeStats = internalClient().admin()
.cluster()
.prepareNodesStats()
.clear()
.setBreaker(true)
.setIndices(true)
.get();
for (NodeStats stats : nodeStats.getNodes()) {
assertThat(
"Fielddata breaker not reset to 0 on node: " + stats.getNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeConnectionsService;
Expand Down Expand Up @@ -278,6 +279,8 @@ public String toString() {

private final int numDataPaths;

private String internalClientOrigin = null;

/**
* All nodes started by the cluster will have their name set to nodePrefix followed by a positive number
*/
Expand Down Expand Up @@ -754,6 +757,11 @@ public synchronized void ensureAtMostNumDataNodes(int n) throws IOException {
}
}

public InternalTestCluster internalClientOrigin(String origin) {
this.internalClientOrigin = origin;
return this;
}

private Settings getNodeSettings(final int nodeId, final long seed, final Settings extraSettings) {
final Settings settings = getSettings(nodeId, seed, extraSettings);

Expand Down Expand Up @@ -874,6 +882,20 @@ public Client client() {
return c.client();
}

@Override
protected Client internalClient() {
return internalClient(null);
}

private Client internalClient(@Nullable String nodeName) {
Client client = nodeName != null ? client(nodeName) : client();
return makeInternal(client);
}

private Client makeInternal(Client client) {
return internalClientOrigin != null ? new OriginSettingClient(client, internalClientOrigin) : client;
}

/**
* Returns a node client to a data node in the cluster.
* Note: use this with care tests should not rely on a certain nodes client.
Expand Down Expand Up @@ -1282,7 +1304,7 @@ public synchronized void validateClusterFormed() {
try {
assertBusy(() -> {
try {
final boolean timeout = client().admin()
final boolean timeout = internalClient().admin()
.cluster()
.prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
Expand Down Expand Up @@ -1552,7 +1574,7 @@ public void assertSeqNos() throws Exception {
*/
public void assertSameDocIdsOnShards() throws Exception {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
ClusterState state = internalClient().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
for (var indexRoutingTable : state.routingTable().indicesRouting().values()) {
for (int i = 0; i < indexRoutingTable.size(); i++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(i);
Expand Down Expand Up @@ -1999,7 +2021,7 @@ private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {

logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeNames);
try {
client().execute(
internalClient().execute(
TransportAddVotingConfigExclusionsAction.TYPE,
new AddVotingConfigExclusionsRequest(TEST_REQUEST_TIMEOUT, excludedNodeNames.toArray(Strings.EMPTY_ARRAY))
).get();
Expand All @@ -2016,7 +2038,7 @@ private void removeExclusions(Set<String> excludedNodeIds) {
if (autoManageVotingExclusions && excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
try {
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client();
Client client = makeInternal(getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client());
client.execute(
TransportClearVotingConfigExclusionsAction.TYPE,
new ClearVotingConfigExclusionsRequest(TEST_REQUEST_TIMEOUT)
Expand Down Expand Up @@ -2080,7 +2102,7 @@ public String getMasterName(@Nullable String viaNode) {
}
try {
ClusterServiceUtils.awaitClusterState(state -> state.nodes().getMasterNode() != null, clusterService(viaNode));
final ClusterState state = client(viaNode).admin()
final ClusterState state = internalClient(viaNode).admin()
.cluster()
.prepareState(TEST_REQUEST_TIMEOUT)
.clear()
Expand Down
Loading