|
13 | 13 | import org.apache.lucene.util.automaton.TooComplexToDeterminizeException; |
14 | 14 | import org.elasticsearch.ElasticsearchTimeoutException; |
15 | 15 | import org.elasticsearch.ExceptionsHelper; |
| 16 | +import org.elasticsearch.TransportVersion; |
16 | 17 | import org.elasticsearch.action.ActionListener; |
17 | 18 | import org.elasticsearch.action.ActionListenerResponseHandler; |
18 | 19 | import org.elasticsearch.action.ActionRunnable; |
|
69 | 70 | import java.util.Set; |
70 | 71 | import java.util.concurrent.Executor; |
71 | 72 | import java.util.concurrent.atomic.AtomicBoolean; |
| 73 | +import java.util.concurrent.atomic.AtomicReference; |
72 | 74 | import java.util.function.BiConsumer; |
73 | 75 | import java.util.function.Consumer; |
74 | 76 | import java.util.function.Function; |
@@ -144,22 +146,20 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action |
144 | 146 | final Executor singleThreadedExecutor = buildSingleThreadedExecutor(searchCoordinationExecutor, LOGGER); |
145 | 147 | assert task instanceof CancellableTask; |
146 | 148 | final CancellableTask fieldCapTask = (CancellableTask) task; |
147 | | - // retrieve the initial timestamp in case the action is a cross cluster search |
| 149 | + // retrieve the initial timestamp in case the action is a cross-cluster search |
148 | 150 | long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis(); |
149 | 151 | final ProjectState projectState = projectResolver.getProjectState(clusterService.state()); |
| 152 | + final var minTransportVersion = new AtomicReference<>(clusterService.state().getMinTransportVersion()); |
150 | 153 | final Map<String, OriginalIndices> remoteClusterIndices = transportService.getRemoteClusterService() |
151 | 154 | .groupIndices(request.indicesOptions(), request.indices(), request.returnLocalAll()); |
152 | 155 | final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); |
153 | | - final String[] concreteIndices; |
154 | | - if (localIndices == null) { |
155 | | - // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices |
156 | | - concreteIndices = Strings.EMPTY_ARRAY; |
157 | | - } else { |
158 | | - concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), localIndices); |
159 | | - } |
| 156 | + // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices |
| 157 | + final String[] concreteIndices = localIndices != null |
| 158 | + ? indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), localIndices) |
| 159 | + : Strings.EMPTY_ARRAY; |
160 | 160 |
|
161 | 161 | if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { |
162 | | - listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap())); |
| 162 | + listener.onResponse(FieldCapabilitiesResponse.builder().withMinTransportVersion(minTransportVersion.get()).build()); |
163 | 163 | return; |
164 | 164 | } |
165 | 165 |
|
@@ -235,7 +235,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action |
235 | 235 | if (fieldCapTask.notifyIfCancelled(listener)) { |
236 | 236 | releaseResourcesOnCancel.run(); |
237 | 237 | } else { |
238 | | - mergeIndexResponses(request, fieldCapTask, indexResponses, indexFailures, listener); |
| 238 | + mergeIndexResponses(request, fieldCapTask, indexResponses, indexFailures, minTransportVersion, listener); |
239 | 239 | } |
240 | 240 | })) { |
241 | 241 | // local cluster |
@@ -281,6 +281,12 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action |
281 | 281 | handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); |
282 | 282 | } |
283 | 283 | } |
| 284 | + minTransportVersion.accumulateAndGet(response.minTransportVersion(), (lhs, rhs) -> { |
| 285 | + if (lhs == null || rhs == null) { |
| 286 | + return null; |
| 287 | + } |
| 288 | + return TransportVersion.min(lhs, rhs); |
| 289 | + }); |
284 | 290 | }, ex -> { |
285 | 291 | for (String index : originalIndices.indices()) { |
286 | 292 | handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); |
@@ -360,35 +366,41 @@ private static void mergeIndexResponses( |
360 | 366 | CancellableTask task, |
361 | 367 | Map<String, FieldCapabilitiesIndexResponse> indexResponses, |
362 | 368 | FailureCollector indexFailures, |
| 369 | + AtomicReference<TransportVersion> minTransportVersion, |
363 | 370 | ActionListener<FieldCapabilitiesResponse> listener |
364 | 371 | ) { |
365 | 372 | List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet()); |
366 | | - if (indexResponses.size() > 0) { |
| 373 | + if (indexResponses.isEmpty() == false) { |
367 | 374 | if (request.isMergeResults()) { |
368 | | - ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures)); |
| 375 | + ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures, minTransportVersion)); |
369 | 376 | } else { |
370 | | - listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), failures)); |
| 377 | + listener.onResponse( |
| 378 | + FieldCapabilitiesResponse.builder() |
| 379 | + .withIndexResponses(new ArrayList<>(indexResponses.values())) |
| 380 | + .withFailures(failures) |
| 381 | + .withMinTransportVersion(minTransportVersion.get()) |
| 382 | + .build() |
| 383 | + ); |
371 | 384 | } |
372 | | - } else { |
373 | | - // we have no responses at all, maybe because of errors |
374 | | - if (indexFailures.isEmpty() == false) { |
375 | | - /* |
376 | | - * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. |
377 | | - * Instead, they should always be passed through the response object, as part of "failures". |
378 | | - */ |
379 | | - if (failures.stream() |
380 | | - .anyMatch( |
381 | | - failure -> failure.getException() instanceof IllegalStateException ise |
382 | | - && ise.getCause() instanceof ElasticsearchTimeoutException |
383 | | - )) { |
384 | | - listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); |
385 | | - } else { |
386 | | - // throw back the first exception |
387 | | - listener.onFailure(failures.get(0).getException()); |
388 | | - } |
| 385 | + } else if (indexFailures.isEmpty() == false) { |
| 386 | + /* |
| 387 | + * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. |
| 388 | + * Instead, they should always be passed through the response object, as part of "failures". |
| 389 | + */ |
| 390 | + if (failures.stream() |
| 391 | + .anyMatch( |
| 392 | + failure -> failure.getException() instanceof IllegalStateException ise |
| 393 | + && ise.getCause() instanceof ElasticsearchTimeoutException |
| 394 | + )) { |
| 395 | + listener.onResponse( |
| 396 | + FieldCapabilitiesResponse.builder().withFailures(failures).withMinTransportVersion(minTransportVersion.get()).build() |
| 397 | + ); |
389 | 398 | } else { |
390 | | - listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList())); |
| 399 | + // throw back the first exception |
| 400 | + listener.onFailure(failures.get(0).getException()); |
391 | 401 | } |
| 402 | + } else { |
| 403 | + listener.onResponse(FieldCapabilitiesResponse.builder().withMinTransportVersion(minTransportVersion.get()).build()); |
392 | 404 | } |
393 | 405 | } |
394 | 406 |
|
@@ -423,7 +435,8 @@ private static FieldCapabilitiesResponse merge( |
423 | 435 | Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap, |
424 | 436 | CancellableTask task, |
425 | 437 | FieldCapabilitiesRequest request, |
426 | | - List<FieldCapabilitiesFailure> failures |
| 438 | + List<FieldCapabilitiesFailure> failures, |
| 439 | + AtomicReference<TransportVersion> minTransportVersion |
427 | 440 | ) { |
428 | 441 | assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker |
429 | 442 | task.ensureNotCancelled(); |
@@ -464,7 +477,12 @@ private static FieldCapabilitiesResponse merge( |
464 | 477 | ); |
465 | 478 | } |
466 | 479 | } |
467 | | - return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(fields), failures); |
| 480 | + return FieldCapabilitiesResponse.builder() |
| 481 | + .withIndices(indices) |
| 482 | + .withFields(Collections.unmodifiableMap(fields)) |
| 483 | + .withFailures(failures) |
| 484 | + .withMinTransportVersion(minTransportVersion.get()) |
| 485 | + .build(); |
468 | 486 | } |
469 | 487 |
|
470 | 488 | private static boolean shouldLogException(Exception e) { |
|
0 commit comments