diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java index c4c974ad6b668..4ed4e24110cd9 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java @@ -288,7 +288,7 @@ public Map runtimeFields() { return this.runtimeFields; } - Long nowInMillis() { + public Long nowInMillis() { return nowInMillis; } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index c56fd985c9e2b..3ad45526be217 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -54,7 +54,7 @@ /** * Dispatches child field-caps requests to old/new data nodes in the local cluster that have shards of the requesting indices. */ -final class RequestDispatcher { +public final class RequestDispatcher { static final Logger LOGGER = LogManager.getLogger(RequestDispatcher.class); private final TransportService transportService; @@ -74,7 +74,7 @@ final class RequestDispatcher { private final AtomicInteger executionRound = new AtomicInteger(); private final Map indexSelectors; - RequestDispatcher( + public RequestDispatcher( ClusterService clusterService, TransportService transportService, ProjectResolver projectResolver, @@ -127,7 +127,7 @@ final class RequestDispatcher { } } - void execute() { + public void execute() { executor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 4ae5e01569333..1228c2d616b09 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -129,40 +129,19 @@ public TransportFieldCapabilitiesAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - executeRequest( - task, - request, - (transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest( - conn, - REMOTE_TYPE.name(), - fieldCapabilitiesRequest, - TransportRequestOptions.EMPTY, - responseHandler - ), - listener - ); + executeRequest(task, request, listener); } - public void executeRequest( - Task task, - FieldCapabilitiesRequest request, - LinkedRequestExecutor linkedRequestExecutor, - ActionListener listener - ) { + public void executeRequest(Task task, FieldCapabilitiesRequest request, ActionListener listener) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l))); + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l))); } - private void doExecuteForked( - Task task, - FieldCapabilitiesRequest request, - LinkedRequestExecutor linkedRequestExecutor, - ActionListener listener - ) { + private void doExecuteForked(Task task, FieldCapabilitiesRequest request, ActionListener listener) { if (ccsCheckCompatibility) { checkCCSVersionCompatibility(request); } - final Executor singleThreadedExecutor = buildSingleThreadedExecutor(); + final Executor singleThreadedExecutor = buildSingleThreadedExecutor(searchCoordinationExecutor, LOGGER); assert task instanceof CancellableTask; final CancellableTask fieldCapTask = (CancellableTask) task; // retrieve the initial timestamp in case the action is a cross cluster search @@ -322,10 +301,11 @@ private void doExecuteForked( true, ActionListener.releaseAfter(remoteListener, refs.acquire()) ).delegateFailure( - (responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest( - transportService, + (responseListener, conn) -> transportService.sendRequest( conn, + REMOTE_TYPE.name(), remoteRequest, + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor) ) ) @@ -339,7 +319,7 @@ private void doExecuteForked( } } - private Executor buildSingleThreadedExecutor() { + public static Executor buildSingleThreadedExecutor(Executor searchCoordinationExecutor, Logger logger) { final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor); return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { @Override @@ -362,16 +342,7 @@ public void onFailure(Exception e) { }); } - public interface LinkedRequestExecutor { - void executeRemoteRequest( - TransportService transportService, - Transport.Connection conn, - FieldCapabilitiesRequest remoteRequest, - ActionListenerResponseHandler responseHandler - ); - } - - private static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) { + public static void checkIndexBlocks(ProjectState projectState, String[] concreteIndices) { var blocks = projectState.blocks(); var projectId = projectState.projectId(); if (blocks.global(projectId).isEmpty() && blocks.indices(projectId).isEmpty()) { @@ -421,7 +392,7 @@ private static void mergeIndexResponses( } } - private static FieldCapabilitiesRequest prepareRemoteRequest( + public static FieldCapabilitiesRequest prepareRemoteRequest( String clusterAlias, FieldCapabilitiesRequest request, OriginalIndices originalIndices, @@ -621,10 +592,10 @@ private static void innerMerge( * This collector can contain a failure for an index even if one of its shards was successful. When building the final * list, these failures will be skipped because they have no affect on the final response. */ - private static final class FailureCollector { + public static final class FailureCollector { private final Map failuresByIndex = new HashMap<>(); - List build(Set successfulIndices) { + public List build(Set successfulIndices) { Map, FieldCapabilitiesFailure> indexFailures = new HashMap<>(); for (Map.Entry failure : failuresByIndex.entrySet()) { String index = failure.getKey(); @@ -653,15 +624,15 @@ List build(Set successfulIndices) { return new ArrayList<>(indexFailures.values()); } - void collect(String index, Exception e) { + public void collect(String index, Exception e) { failuresByIndex.putIfAbsent(index, e); } - void clear() { + public void clear() { failuresByIndex.clear(); } - boolean isEmpty() { + public boolean isEmpty() { return failuresByIndex.isEmpty(); } } @@ -724,8 +695,8 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann } } - private static class ForkingOnFailureActionListener extends AbstractThreadedActionListener { - ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener delegate) { + public static class ForkingOnFailureActionListener extends AbstractThreadedActionListener { + public ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener delegate) { super(executor, forceExecution, delegate); } diff --git a/server/src/main/resources/transport/definitions/referable/esql_resolve_fields_response_created.csv b/server/src/main/resources/transport/definitions/referable/esql_resolve_fields_response_created.csv new file mode 100644 index 0000000000000..eb3f0c4f01615 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_resolve_fields_response_created.csv @@ -0,0 +1 @@ +9189000,9185001 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 2147eab66c207..341a3050b4a97 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -initial_9.2.0,9185000 +esql_resolve_fields_response_created,9185001 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..b947ec1f1d1ce --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +esql_resolve_fields_response_created,9189000 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java index d8596e6943bbc..1e77a62711990 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java @@ -130,9 +130,6 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException { protected boolean supportsNodeAssignment() throws IOException { if (supportsNodeAssignment == null) { - for (NodeInfo i : allNodeToInfo().values()) { - logger.error("NOCOMMIT {}", i); - } supportsNodeAssignment = allNodeToInfo().values() .stream() .allMatch(i -> (i.roles.contains("index") && i.roles.contains("search")) || (i.roles.contains("data"))); @@ -542,10 +539,8 @@ private boolean syntheticSourceByDefault() { } private Map expectedIndices() throws IOException { - logger.error("ADFADF NOCOMMIT"); Map result = new TreeMap<>(); if (supportsNodeAssignment()) { - logger.error("supports {}", allNodeToInfo()); for (Map.Entry e : allNodeToInfo().entrySet()) { String name = indexMode + "_" + e.getKey(); if (e.getValue().cluster != null) { @@ -554,7 +549,6 @@ private Map expectedIndices() throws IOException { result.put(name, e.getValue()); } } else { - logger.error("one per {}", allNodeToInfo()); for (Map.Entry e : allNodeToInfo().entrySet()) { String name = indexMode.toString(); if (e.getValue().cluster != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index ff137a024b5bf..47574a6530837 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -6,59 +6,356 @@ */ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.action.fieldcaps.IndexFieldCapabilities; +import org.elasticsearch.action.fieldcaps.RequestDispatcher; import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; + /** * A fork of the field-caps API for ES|QL. This fork allows us to gradually introduce features and optimizations to this internal * API without risking breaking the external field-caps API. For now, this API delegates to the field-caps API, but gradually, * we will decouple this API completely from the field-caps. */ -public class EsqlResolveFieldsAction extends HandledTransportAction { +public class EsqlResolveFieldsAction extends HandledTransportAction { public static final String NAME = "indices:data/read/esql/resolve_fields"; - public static final ActionType TYPE = new ActionType<>(NAME); + public static final ActionType TYPE = new ActionType<>(NAME); public static final RemoteClusterActionType RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>( NAME, FieldCapabilitiesResponse::new ); + private static final Logger LOGGER = LogManager.getLogger(EsqlResolveFieldsAction.class); - private final TransportFieldCapabilitiesAction fieldCapsAction; + private final Executor searchCoordinationExecutor; + private final TransportService transportService; + private final ClusterService clusterService; + private final ProjectResolver projectResolver; + private final IndexNameExpressionResolver indexNameExpressionResolver; + + private final IndicesService indicesService; + private final boolean ccsCheckCompatibility; + private final ThreadPool threadPool; + private final TimeValue forceConnectTimeoutSecs; @Inject public EsqlResolveFieldsAction( TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, - TransportFieldCapabilitiesAction fieldCapsAction + IndicesService indicesService, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver ) { // TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super(NAME, transportService, actionFilters, FieldCapabilitiesRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); - this.fieldCapsAction = fieldCapsAction; + this.searchCoordinationExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION); + this.transportService = transportService; + this.clusterService = clusterService; + this.projectResolver = projectResolver; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.indicesService = indicesService; + this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings()); + this.threadPool = threadPool; + this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null); } @Override - protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - fieldCapsAction.executeRequest(task, request, this::executeLinkedRequest, listener); + protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { + executeRequest(task, request, listener); } - void executeLinkedRequest( - TransportService transportService, - Transport.Connection conn, - FieldCapabilitiesRequest request, - ActionListenerResponseHandler responseHandler + public void executeRequest(Task task, FieldCapabilitiesRequest request, ActionListener listener) { + // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l))); + } + + private void doExecuteForked(Task task, FieldCapabilitiesRequest request, ActionListener listener) { + if (request.isMergeResults()) { + throw new IllegalArgumentException("merging results not supported"); + } + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(request); + } + final Executor singleThreadedExecutor = TransportFieldCapabilitiesAction.buildSingleThreadedExecutor( + searchCoordinationExecutor, + LOGGER + ); + assert task instanceof CancellableTask; + final CancellableTask fieldCapTask = (CancellableTask) task; + // retrieve the initial timestamp in case the action is a cross cluster search + long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis(); + ClusterState clusterState = clusterService.state(); + ProjectState projectState = projectResolver.getProjectState(clusterState); + AtomicReference minTransportVersion = new AtomicReference<>(clusterState.getMinTransportVersion()); + final Map remoteClusterIndices = transportService.getRemoteClusterService() + .groupIndices(request.indicesOptions(), request.indices(), request.returnLocalAll()); + final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + final String[] concreteIndices; + if (localIndices == null) { + // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices + concreteIndices = Strings.EMPTY_ARRAY; + } else { + concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), localIndices); + } + + if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { + // No indices at all! + listener.onResponse( + new EsqlResolveFieldsResponse( + new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()), + minTransportVersion.get() + ) + ); + return; + } + + TransportFieldCapabilitiesAction.checkIndexBlocks(projectState, concreteIndices); + final TransportFieldCapabilitiesAction.FailureCollector indexFailures = new TransportFieldCapabilitiesAction.FailureCollector(); + final Map indexResponses = new HashMap<>(); + // This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage. + final Map indexMappingHashToResponses = new HashMap<>(); + final Runnable releaseResourcesOnCancel = () -> { + LOGGER.trace("clear index responses on cancellation"); + indexFailures.clear(); + indexResponses.clear(); + indexMappingHashToResponses.clear(); + }; + final Consumer handleIndexResponse = resp -> { + if (fieldCapTask.isCancelled()) { + releaseResourcesOnCancel.run(); + return; + } + if (resp.canMatch() && resp.getIndexMappingHash() != null) { + FieldCapabilitiesIndexResponse curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp); + if (curr != null) { + resp = new FieldCapabilitiesIndexResponse( + resp.getIndexName(), + curr.getIndexMappingHash(), + curr.get(), + true, + curr.getIndexMode() + ); + } + } + if (request.includeEmptyFields()) { + indexResponses.putIfAbsent(resp.getIndexName(), resp); + } else { + indexResponses.merge(resp.getIndexName(), resp, (a, b) -> { + if (a.get().equals(b.get())) { + return a; + } + Map mergedCaps = new HashMap<>(a.get()); + mergedCaps.putAll(b.get()); + return new FieldCapabilitiesIndexResponse( + a.getIndexName(), + a.getIndexMappingHash(), + mergedCaps, + true, + a.getIndexMode() + ); + }); + } + if (fieldCapTask.isCancelled()) { + releaseResourcesOnCancel.run(); + } + }; + final BiConsumer handleIndexFailure = (index, error) -> { + if (fieldCapTask.isCancelled()) { + releaseResourcesOnCancel.run(); + return; + } + indexFailures.collect(index, error); + if (fieldCapTask.isCancelled()) { + releaseResourcesOnCancel.run(); + } + }; + final var finishedOrCancelled = new AtomicBoolean(); + fieldCapTask.addListener(() -> { + if (finishedOrCancelled.compareAndSet(false, true)) { + singleThreadedExecutor.execute(releaseResourcesOnCancel); + LOGGER.trace("clear index responses on cancellation submitted"); + } + }); + try (RefCountingRunnable refs = new RefCountingRunnable(() -> { + finishedOrCancelled.set(true); + if (fieldCapTask.notifyIfCancelled(listener)) { + releaseResourcesOnCancel.run(); + } else { + finishHim( + indexResponses, + indexFailures, + listener.map(caps -> new EsqlResolveFieldsResponse(caps, minTransportVersion.get())) + ); + } + })) { + // local cluster + final RequestDispatcher requestDispatcher = new RequestDispatcher( + clusterService, + transportService, + projectResolver, + indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis), + task, + request, + localIndices, + nowInMillis, + concreteIndices, + singleThreadedExecutor, + handleIndexResponse, + handleIndexFailure, + refs.acquire()::close + ); + requestDispatcher.execute(); + + // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead + // send us back all individual index results. + for (Map.Entry remoteIndices : remoteClusterIndices.entrySet()) { + String clusterAlias = remoteIndices.getKey(); + OriginalIndices originalIndices = remoteIndices.getValue(); + FieldCapabilitiesRequest remoteRequest = TransportFieldCapabilitiesAction.prepareRemoteRequest( + clusterAlias, + request, + originalIndices, + nowInMillis + ); + ActionListener remoteListener = ActionListener.wrap(response -> { + for (FieldCapabilitiesIndexResponse resp : response.caps().getIndexResponses()) { + String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); + handleIndexResponse.accept( + new FieldCapabilitiesIndexResponse( + indexName, + resp.getIndexMappingHash(), + resp.get(), + resp.canMatch(), + resp.getIndexMode() + ) + ); + } + for (FieldCapabilitiesFailure failure : response.caps().getFailures()) { + Exception ex = failure.getException(); + for (String index : failure.getIndices()) { + handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); + } + } + minTransportVersion.accumulateAndGet(response.minTransportVersion(), (lhs, rhs) -> { + if (lhs == null || rhs == null) { + return null; + } + return TransportVersion.min(lhs, rhs); + }); + }, ex -> { + for (String index : originalIndices.indices()) { + handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); + } + }); + + SubscribableListener connectionListener = new SubscribableListener<>(); + if (forceConnectTimeoutSecs != null) { + connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor); + } + + connectionListener.addListener( + // The underlying transport service may call onFailure with a thread pool other than search_coordinator. + // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. + // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439 + new TransportFieldCapabilitiesAction.ForkingOnFailureActionListener<>( + singleThreadedExecutor, + true, + ActionListener.releaseAfter(remoteListener, refs.acquire()) + ).delegateFailure( + (responseListener, conn) -> transportService.sendRequest( + conn, + RESOLVE_REMOTE_TYPE.name(), + remoteRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, EsqlResolveFieldsResponse::new, singleThreadedExecutor) + ) + ) + ); + + boolean ensureConnected = forceConnectTimeoutSecs != null + || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false; + transportService.getRemoteClusterService() + .maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener); + } + } + } + + private static void finishHim( + Map indexResponses, + TransportFieldCapabilitiesAction.FailureCollector indexFailures, + ActionListener listener ) { - transportService.sendRequest(conn, RESOLVE_REMOTE_TYPE.name(), request, TransportRequestOptions.EMPTY, responseHandler); + List failures = indexFailures.build(indexResponses.keySet()); + if (indexResponses.isEmpty() == false) { + listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), failures)); + } else { + // we have no responses at all, maybe because of errors + if (indexFailures.isEmpty() == false) { + /* + * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. + * Instead, they should always be passed through the response object, as part of "failures". + */ + if (failures.stream() + .anyMatch( + failure -> failure.getException() instanceof IllegalStateException ise + && ise.getCause() instanceof ElasticsearchTimeoutException + )) { + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); + } else { + // throw back the first exception + listener.onFailure(failures.get(0).getException()); + } + } else { + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList())); + } + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java new file mode 100644 index 0000000000000..365b2b976e2f1 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsResponse.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; + +import java.io.IOException; + +public class EsqlResolveFieldsResponse extends ActionResponse { + private static final TransportVersion RESOLVE_FIELDS_RESPONSE_CREATED_TV = TransportVersion.fromName( + "esql_resolve_fields_response_created" + ); + + private final FieldCapabilitiesResponse caps; + private final TransportVersion minTransportVersion; + + public EsqlResolveFieldsResponse(FieldCapabilitiesResponse caps, TransportVersion minTransportVersion) { + this.caps = caps; + this.minTransportVersion = minTransportVersion; + } + + public EsqlResolveFieldsResponse(StreamInput in) throws IOException { + caps = new FieldCapabilitiesResponse(in); + if (in.getTransportVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) && in.readBoolean()) { + minTransportVersion = TransportVersion.readVersion(in); + } else { + minTransportVersion = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + caps.writeTo(out); + if (out.getTransportVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV)) { + out.writeBoolean(minTransportVersion != null); + if (minTransportVersion != null) { + TransportVersion.writeVersion(minTransportVersion, out); + } + } + } + + public FieldCapabilitiesResponse caps() { + return caps; + } + + /** + * The minimum {@link TransportVersion} of all clusters against which we resolved + * indices. + *

+ * If this is {@code null} then one of the nodes is before {@link #RESOLVE_FIELDS_RESPONSE_CREATED_TV} but + * we have no idea how early it is. Could be back in {@code 8.19.0}. + *

+ */ + @Nullable + public TransportVersion minTransportVersion() { + return minTransportVersion; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index ff042c5e7d870..5cbc380d6d164 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -18,6 +18,8 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; @@ -51,6 +53,8 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.UNSUPPORTED; public class IndexResolver { + private static Logger LOGGER = LogManager.getLogger(IndexResolver.class); + public static final Set ALL_FIELDS = Set.of("*"); public static final Set INDEX_METADATA_FIELD = Set.of(MetadataAttribute.INDEX); public static final String UNMAPPED = "unmapped"; @@ -91,11 +95,12 @@ public void resolveAsMergedMapping( client.execute( EsqlResolveFieldsAction.TYPE, createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions), - listener.delegateFailureAndWrap( - (l, response) -> l.onResponse( - mergedMappings(indexWildcard, new FieldsInfo(response, supportsAggregateMetricDouble, supportsDenseVector)) - ) - ) + listener.delegateFailureAndWrap((l, response) -> { + LOGGER.debug("minimum transport version {}", response.minTransportVersion()); + l.onResponse( + mergedMappings(indexWildcard, new FieldsInfo(response.caps(), supportsAggregateMetricDouble, supportsDenseVector)) + ); + }) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index a0c5c99f82a60..9cb735b955d09 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichMetadata; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.session.IndexResolver; @@ -509,7 +510,10 @@ protected void } else { response = new FieldCapabilitiesResponse(List.of(), List.of()); } - threadPool().executor(ThreadPool.Names.SEARCH_COORDINATION).execute(ActionRunnable.supply(listener, () -> (Response) response)); + threadPool().executor(ThreadPool.Names.SEARCH_COORDINATION) + .execute( + ActionRunnable.supply(listener, () -> (Response) new EsqlResolveFieldsResponse(response, TransportVersion.current())) + ); } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index a9d91fb0a5a56..bf8434e3c11c5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.telemetry; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilities; @@ -33,6 +34,7 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; +import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver; import org.elasticsearch.xpack.esql.execution.PlanExecutor; @@ -134,9 +136,9 @@ public void testFailedMetric() { when(fieldCapabilitiesResponse.get()).thenReturn(fields(indices)); doAnswer((Answer) invocation -> { @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + ActionListener listener = (ActionListener) invocation.getArguments()[2]; // simulate a valid field_caps response so we can parse and correctly analyze de query - listener.onResponse(fieldCapabilitiesResponse); + listener.onResponse(new EsqlResolveFieldsResponse(fieldCapabilitiesResponse, TransportVersion.current())); return null; }).when(qlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any()); @@ -144,9 +146,14 @@ public void testFailedMetric() { IndexResolver indexResolver = new IndexResolver(esqlClient); doAnswer((Answer) invocation -> { @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + ActionListener listener = (ActionListener) invocation.getArguments()[2]; // simulate a valid field_caps response so we can parse and correctly analyze de query - listener.onResponse(new FieldCapabilitiesResponse(indexFieldCapabilities(indices), List.of())); + listener.onResponse( + new EsqlResolveFieldsResponse( + new FieldCapabilitiesResponse(indexFieldCapabilities(indices), List.of()), + TransportVersion.current() + ) + ); return null; }).when(esqlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any());