Skip to content

Commit dda89a5

Browse files
Track resolved index expressions and perform reconciliation
1 parent 3dd69d6 commit dda89a5

File tree

3 files changed

+89
-4
lines changed

3 files changed

+89
-4
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.IndicesRequest;
1616
import org.elasticsearch.action.LegacyActionRequest;
17+
import org.elasticsearch.action.ResolvedIndexExpressions;
1718
import org.elasticsearch.action.ValidateActions;
1819
import org.elasticsearch.action.support.IndicesOptions;
1920
import org.elasticsearch.common.Strings;
@@ -70,6 +71,7 @@ public final class FieldCapabilitiesRequest extends LegacyActionRequest implemen
7071
private QueryBuilder indexFilter;
7172
private Map<String, Object> runtimeFields = Collections.emptyMap();
7273
private Long nowInMillis;
74+
private ResolvedIndexExpressions resolvedIndexExpressions;
7375

7476
public FieldCapabilitiesRequest(StreamInput in) throws IOException {
7577
super(in);
@@ -248,6 +250,16 @@ public boolean allowsCrossProject() {
248250
return true;
249251
}
250252

253+
@Override
254+
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
255+
this.resolvedIndexExpressions = expressions;
256+
}
257+
258+
@Override
259+
public ResolvedIndexExpressions getResolvedIndexExpressions() {
260+
return resolvedIndexExpressions;
261+
}
262+
251263
@Override
252264
public boolean includeDataStreams() {
253265
return true;

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.action.ActionResponse;
14+
import org.elasticsearch.action.ResolvedIndexExpressions;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.common.collect.Iterators;
1617
import org.elasticsearch.common.io.stream.StreamInput;
@@ -45,35 +46,48 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
4546
private final List<FieldCapabilitiesFailure> failures;
4647
private final List<FieldCapabilitiesIndexResponse> indexResponses;
4748
private final TransportVersion minTransportVersion;
49+
private final ResolvedIndexExpressions resolvedIndexExpressions;
4850

4951
public FieldCapabilitiesResponse(
5052
String[] indices,
5153
Map<String, Map<String, FieldCapabilities>> fields,
5254
List<FieldCapabilitiesFailure> failures
5355
) {
54-
this(indices, fields, Collections.emptyList(), failures, null);
56+
this(indices, fields, Collections.emptyList(), failures, null, null);
5557
}
5658

5759
public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> fields) {
58-
this(indices, fields, Collections.emptyList(), Collections.emptyList(), null);
60+
this(indices, fields, Collections.emptyList(), Collections.emptyList(), null, null);
5961
}
6062

6163
public FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
62-
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures, null);
64+
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures, null, null);
6365
}
6466

6567
private FieldCapabilitiesResponse(
6668
String[] indices,
6769
Map<String, Map<String, FieldCapabilities>> fields,
6870
List<FieldCapabilitiesIndexResponse> indexResponses,
6971
List<FieldCapabilitiesFailure> failures,
70-
TransportVersion minTransportVersion
72+
TransportVersion minTransportVersion,
73+
@Nullable ResolvedIndexExpressions resolvedIndexExpressions
7174
) {
7275
this.fields = Objects.requireNonNull(fields);
7376
this.indexResponses = Objects.requireNonNull(indexResponses);
7477
this.indices = indices;
7578
this.failures = failures;
7679
this.minTransportVersion = minTransportVersion;
80+
this.resolvedIndexExpressions = resolvedIndexExpressions;
81+
}
82+
83+
private FieldCapabilitiesResponse(
84+
String[] indices,
85+
Map<String, Map<String, FieldCapabilities>> fields,
86+
List<FieldCapabilitiesIndexResponse> indexResponses,
87+
List<FieldCapabilitiesFailure> failures,
88+
TransportVersion minTransportVersion
89+
) {
90+
this(indices, fields, indexResponses, failures, minTransportVersion, null);
7791
}
7892

7993
public FieldCapabilitiesResponse(StreamInput in) throws IOException {
@@ -84,6 +98,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
8498
this.minTransportVersion = in.getTransportVersion().supports(MIN_TRANSPORT_VERSION)
8599
? in.readOptional(TransportVersion::readVersion)
86100
: null;
101+
this.resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new);
87102
}
88103

89104
/**
@@ -172,6 +187,7 @@ public void writeTo(StreamOutput out) throws IOException {
172187
if (out.getTransportVersion().supports(MIN_TRANSPORT_VERSION)) {
173188
out.writeOptional((Writer<TransportVersion>) (o, v) -> TransportVersion.writeVersion(v, o), minTransportVersion);
174189
}
190+
out.writeOptionalWriteable(this.resolvedIndexExpressions);
175191
}
176192

177193
private static void writeField(StreamOutput out, Map<String, FieldCapabilities> map) throws IOException {
@@ -268,4 +284,9 @@ public FieldCapabilitiesResponse build() {
268284
return new FieldCapabilitiesResponse(indices, fields, indexResponses, failures, minTransportVersion);
269285
}
270286
}
287+
288+
@Nullable
289+
public ResolvedIndexExpressions getResolvedIndexExpressions() {
290+
return resolvedIndexExpressions;
291+
}
271292
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.ActionType;
2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.action.RemoteClusterActionType;
24+
import org.elasticsearch.action.ResolvedIndexExpressions;
2425
import org.elasticsearch.action.support.AbstractThreadedActionListener;
2526
import org.elasticsearch.action.support.ActionFilters;
2627
import org.elasticsearch.action.support.ChannelActionListener;
@@ -38,6 +39,8 @@
3839
import org.elasticsearch.common.regex.Regex;
3940
import org.elasticsearch.common.util.Maps;
4041
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
42+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
43+
import org.elasticsearch.common.util.concurrent.CountDown;
4144
import org.elasticsearch.common.util.concurrent.EsExecutors;
4245
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
4346
import org.elasticsearch.core.Nullable;
@@ -49,7 +52,9 @@
4952
import org.elasticsearch.injection.guice.Inject;
5053
import org.elasticsearch.logging.LogManager;
5154
import org.elasticsearch.logging.Logger;
55+
import org.elasticsearch.node.internal.TerminationHandler;
5256
import org.elasticsearch.search.SearchService;
57+
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
5358
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
5459
import org.elasticsearch.tasks.CancellableTask;
5560
import org.elasticsearch.tasks.Task;
@@ -64,6 +69,7 @@
6469
import java.io.IOException;
6570
import java.util.ArrayList;
6671
import java.util.Arrays;
72+
import java.util.Collection;
6773
import java.util.Collections;
6874
import java.util.Comparator;
6975
import java.util.HashMap;
@@ -72,6 +78,7 @@
7278
import java.util.List;
7379
import java.util.Map;
7480
import java.util.Set;
81+
import java.util.concurrent.ConcurrentHashMap;
7582
import java.util.concurrent.Executor;
7683
import java.util.concurrent.atomic.AtomicBoolean;
7784
import java.util.concurrent.atomic.AtomicReference;
@@ -182,6 +189,7 @@ private <R extends ActionResponse> void doExecuteForked(
182189
LinkedRequestExecutor<R> linkedRequestExecutor,
183190
ActionListener<R> listener
184191
) {
192+
final boolean crossProjectEnabled = crossProjectModeDecider.resolvesCrossProject(request);
185193
if (ccsCheckCompatibility) {
186194
checkCCSVersionCompatibility(request);
187195
}
@@ -309,6 +317,44 @@ private <R extends ActionResponse> void doExecuteForked(
309317
);
310318
requestDispatcher.execute();
311319

320+
/*
321+
* We need to run the Cross Project Search reconciliation but only after we're heard back from all the linked projects.
322+
* It is also possible that some linked projects may respond back with an error instead of a valid response. To facilitate
323+
* this, we track each response, irrespective of whether it's valid or not, and then perform the reconciliation.
324+
*/
325+
CountDown countDownResponses = new CountDown(remoteClusterIndices.size());
326+
Map<String, ResolvedIndexExpressions> linkedProjectsResponses = ConcurrentCollections.newConcurrentMap();
327+
Runnable crossProjectReconciler = () -> {
328+
if (countDownResponses.countDown() && crossProjectEnabled) {
329+
/*
330+
* This happens when one or more linked projects respond with an error instead of a valid response -- say, networking
331+
* error.
332+
*/
333+
if (linkedProjectsResponses.size() != remoteClusterIndices.size()) {
334+
listener.onFailure(
335+
new IllegalArgumentException(
336+
"Invalid number of responses received: "
337+
+ linkedProjectsResponses.size()
338+
+ " vs expected "
339+
+ remoteClusterIndices.size()
340+
)
341+
);
342+
return;
343+
}
344+
345+
Exception validationEx = CrossProjectIndexResolutionValidator.validate(
346+
request.indicesOptions(),
347+
null,
348+
request.getResolvedIndexExpressions(),
349+
linkedProjectsResponses
350+
);
351+
352+
if (validationEx != null) {
353+
listener.onFailure(validationEx);
354+
}
355+
}
356+
};
357+
312358
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
313359
// send us back all individual index results.
314360
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
@@ -322,6 +368,10 @@ private <R extends ActionResponse> void doExecuteForked(
322368
crossProjectModeDecider
323369
);
324370
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
371+
assert response.getResolvedIndexExpressions() != null
372+
: "Resolved index expressions from [" + clusterAlias + "] are null";
373+
linkedProjectsResponses.put(clusterAlias, response.getResolvedIndexExpressions());
374+
325375
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
326376
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
327377
handleIndexResponse.accept(
@@ -346,10 +396,12 @@ private <R extends ActionResponse> void doExecuteForked(
346396
}
347397
return TransportVersion.min(lhs, rhs);
348398
});
399+
crossProjectReconciler.run();
349400
}, ex -> {
350401
for (String index : originalIndices.indices()) {
351402
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
352403
}
404+
crossProjectReconciler.run();
353405
});
354406

355407
SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();

0 commit comments

Comments
 (0)