Skip to content

Commit 7100ca5

Browse files
authored
Unique Remote Async Actions (elastic#139652)
1 parent 7e84a3c commit 7100ca5

File tree

7 files changed

+348
-161
lines changed

7 files changed

+348
-161
lines changed

server/src/main/java/org/elasticsearch/index/query/QueryRewriteAsyncAction.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.client.internal.Client;
1414

1515
import java.util.List;
16+
import java.util.Objects;
1617
import java.util.function.Consumer;
1718

1819
/**
@@ -24,7 +25,7 @@
2425
* Since we need to determine whether an action has already been registered, we require implementors to provide implementations for
2526
* {@link #hashCode()} and {@link #equals(Object)},
2627
*/
27-
public abstract class QueryRewriteAsyncAction<T> {
28+
public abstract class QueryRewriteAsyncAction<T, U extends QueryRewriteAsyncAction<T, U>> {
2829
/**
2930
* The execute method will:
3031
* - Execute the action using {@link #execute(Client, ActionListener)}
@@ -36,7 +37,7 @@ public abstract class QueryRewriteAsyncAction<T> {
3637
* @param consumers A list of consumer that expect the result of the action.
3738
*/
3839
@SuppressWarnings("unchecked")
39-
public void execute(Client client, ActionListener<?> listener, List<Consumer<?>> consumers) {
40+
public final void execute(Client client, ActionListener<?> listener, List<Consumer<?>> consumers) {
4041
execute(client, listener.delegateFailureAndWrap((l, result) -> {
4142
consumers.forEach(consumer -> ((Consumer<T>) consumer).accept(result));
4243
l.onResponse(null);
@@ -52,8 +53,25 @@ public void execute(Client client, ActionListener<?> listener, List<Consumer<?>>
5253
protected abstract void execute(Client client, ActionListener<T> listener);
5354

5455
@Override
55-
public abstract int hashCode();
56+
public int hashCode() {
57+
return Objects.hash(getClass(), doHashCode());
58+
};
59+
60+
public abstract int doHashCode();
5661

5762
@Override
58-
public abstract boolean equals(Object obj);
63+
public boolean equals(Object obj) {
64+
if (this == obj) {
65+
return true;
66+
}
67+
if (obj == null || getClass() != obj.getClass()) {
68+
return false;
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
U other = (U) obj;
73+
return doEquals(other);
74+
}
75+
76+
public abstract boolean doEquals(U other);
5977
}

server/src/main/java/org/elasticsearch/index/query/QueryRewriteContext.java

Lines changed: 12 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,15 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.ResolvedIndices;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.client.internal.RemoteClusterClient;
1615
import org.elasticsearch.cluster.metadata.DataStream;
1716
import org.elasticsearch.cluster.metadata.IndexMetadata;
1817
import org.elasticsearch.cluster.routing.allocation.DataTier;
1918
import org.elasticsearch.common.Strings;
20-
import org.elasticsearch.common.TriConsumer;
2119
import org.elasticsearch.common.collect.Iterators;
2220
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2321
import org.elasticsearch.common.regex.Regex;
2422
import org.elasticsearch.common.settings.Settings;
2523
import org.elasticsearch.common.util.concurrent.CountDown;
26-
import org.elasticsearch.common.util.concurrent.ThreadContext;
2724
import org.elasticsearch.core.Nullable;
2825
import org.elasticsearch.index.Index;
2926
import org.elasticsearch.index.IndexSettings;
@@ -39,7 +36,6 @@
3936
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
4037
import org.elasticsearch.search.builder.PointInTimeBuilder;
4138
import org.elasticsearch.transport.RemoteClusterAware;
42-
import org.elasticsearch.transport.RemoteClusterService;
4339
import org.elasticsearch.xcontent.XContentParser;
4440
import org.elasticsearch.xcontent.XContentParserConfiguration;
4541

@@ -59,7 +55,6 @@
5955
import java.util.stream.Collectors;
6056

6157
import static org.elasticsearch.search.SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS;
62-
import static org.elasticsearch.threadpool.ThreadPool.Names.SEARCH_COORDINATION;
6358

6459
/**
6560
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
@@ -81,9 +76,7 @@ public class QueryRewriteContext {
8176
protected final Client client;
8277
protected final LongSupplier nowInMillis;
8378
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
84-
private final Map<String, List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>>> remoteAsyncActions =
85-
new HashMap<>();
86-
private final Map<QueryRewriteAsyncAction<?>, List<Consumer<?>>> uniqueRewriteActions = new HashMap<>();
79+
private final Map<QueryRewriteAsyncAction<?, ?>, List<Consumer<?>>> uniqueAsyncActions = new HashMap<>();
8780
protected boolean allowUnmappedFields;
8881
protected boolean mapUnmappedFieldAsString;
8982
protected Predicate<String> allowedFields;
@@ -467,22 +460,11 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
467460
asyncActions.add(asyncAction);
468461
}
469462

470-
public void registerRemoteAsyncAction(
471-
String clusterAlias,
472-
TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>> asyncAction
473-
) {
474-
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> asyncActions = remoteAsyncActions.computeIfAbsent(
475-
clusterAlias,
476-
k -> new ArrayList<>()
477-
);
478-
asyncActions.add(asyncAction);
479-
}
480-
481463
/**
482464
* Returns <code>true</code> if there are any registered async actions.
483465
*/
484466
public boolean hasAsyncActions() {
485-
return asyncActions.isEmpty() == false || uniqueRewriteActions.isEmpty() == false || remoteAsyncActions.isEmpty() == false;
467+
return asyncActions.isEmpty() == false || uniqueAsyncActions.isEmpty() == false;
486468
}
487469

488470
/**
@@ -493,10 +475,7 @@ public void executeAsyncActions(ActionListener<Void> listener) {
493475
if (hasAsyncActions() == false) {
494476
listener.onResponse(null);
495477
} else {
496-
int actionCount = asyncActions.size() + uniqueRewriteActions.size();
497-
for (var actionList : remoteAsyncActions.values()) {
498-
actionCount += actionList.size();
499-
}
478+
final int actionCount = asyncActions.size() + uniqueAsyncActions.size();
500479

501480
CountDown countDown = new CountDown(actionCount);
502481
ActionListener<?> internalListener = new ActionListener<>() {
@@ -522,27 +501,10 @@ public void onFailure(Exception e) {
522501
action.accept(client, internalListener);
523502
}
524503

525-
var copyUniqueRewriteActions = new HashMap<>(uniqueRewriteActions);
526-
uniqueRewriteActions.clear();
527-
for (var entry : copyUniqueRewriteActions.keySet()) {
528-
entry.execute(client, internalListener, copyUniqueRewriteActions.get(entry));
529-
}
530-
531-
var remoteAsyncActionsCopy = new HashMap<>(remoteAsyncActions);
532-
remoteAsyncActions.clear();
533-
for (var entry : remoteAsyncActionsCopy.entrySet()) {
534-
String clusterAlias = entry.getKey();
535-
List<TriConsumer<RemoteClusterClient, ThreadContext, ActionListener<?>>> remoteTriConsumers = entry.getValue();
536-
537-
RemoteClusterClient remoteClient = client.getRemoteClusterClient(
538-
clusterAlias,
539-
client.threadPool().executor(SEARCH_COORDINATION),
540-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
541-
);
542-
ThreadContext threadContext = client.threadPool().getThreadContext();
543-
for (var action : remoteTriConsumers) {
544-
action.apply(remoteClient, threadContext, internalListener);
545-
}
504+
var copyUniqueAsyncActions = new HashMap<>(uniqueAsyncActions);
505+
uniqueAsyncActions.clear();
506+
for (var entry : copyUniqueAsyncActions.keySet()) {
507+
entry.execute(client, internalListener, copyUniqueAsyncActions.get(entry));
546508
}
547509
}
548510
}
@@ -733,7 +695,10 @@ public void setTrackTimeRangeFilterFrom(boolean trackTimeRangeFilterFrom) {
733695
* After the async action is executed, all consumers associated with it will be executed and receive as argument
734696
* the result of the async action.
735697
*/
736-
public <T> void registerUniqueAsyncAction(QueryRewriteAsyncAction<T> action, Consumer<T> consumer) {
737-
uniqueRewriteActions.computeIfAbsent(action, k -> new ArrayList<>()).add(consumer);
698+
public <T, U extends QueryRewriteAsyncAction<T, U>> void registerUniqueAsyncAction(
699+
QueryRewriteAsyncAction<T, U> action,
700+
Consumer<T> consumer
701+
) {
702+
uniqueAsyncActions.computeIfAbsent(action, k -> new ArrayList<>()).add(consumer);
738703
}
739704
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.query;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.client.internal.RemoteClusterClient;
15+
import org.elasticsearch.common.util.concurrent.ThreadContext;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
import org.elasticsearch.transport.RemoteClusterService;
18+
19+
import java.util.Objects;
20+
21+
import static org.elasticsearch.threadpool.ThreadPool.Names.SEARCH_COORDINATION;
22+
23+
public abstract class QueryRewriteRemoteAsyncAction<T, U extends QueryRewriteRemoteAsyncAction<T, U>> extends QueryRewriteAsyncAction<
24+
T,
25+
U> {
26+
27+
private final String clusterAlias;
28+
29+
public QueryRewriteRemoteAsyncAction(String clusterAlias) {
30+
this.clusterAlias = Objects.requireNonNull(clusterAlias);
31+
}
32+
33+
public String getClusterAlias() {
34+
return clusterAlias;
35+
}
36+
37+
@Override
38+
protected final void execute(Client client, ActionListener<T> listener) {
39+
ThreadPool threadPool = client.threadPool();
40+
RemoteClusterClient remoteClient = client.getRemoteClusterClient(
41+
clusterAlias,
42+
threadPool.executor(SEARCH_COORDINATION),
43+
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
44+
);
45+
ThreadContext threadContext = threadPool.getThreadContext();
46+
execute(remoteClient, threadContext, listener);
47+
}
48+
49+
protected abstract void execute(RemoteClusterClient client, ThreadContext threadContext, ActionListener<T> listener);
50+
51+
@Override
52+
public int hashCode() {
53+
return Objects.hash(super.hashCode(), clusterAlias);
54+
}
55+
56+
@Override
57+
public boolean equals(Object obj) {
58+
boolean equals = super.equals(obj);
59+
if (equals) {
60+
QueryRewriteRemoteAsyncAction<?, ?> other = (QueryRewriteRemoteAsyncAction<?, ?>) obj;
61+
equals = Objects.equals(clusterAlias, other.clusterAlias);
62+
}
63+
64+
return equals;
65+
}
66+
}

0 commit comments

Comments
 (0)