Skip to content

Commit 6cf55ee

Browse files
committed
Consider min version from LOOKUP JOIN resolution
Important for ROW queries.
1 parent 22b7aa5 commit 6cf55ee

File tree

3 files changed

+30
-59
lines changed

3 files changed

+30
-59
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.xpack.esql.core.type.EsField;
4545
import org.elasticsearch.xpack.esql.core.util.StringUtils;
4646
import org.elasticsearch.xpack.esql.index.EsIndex;
47+
import org.elasticsearch.xpack.esql.index.IndexResolution;
4748
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
4849
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
4950
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -474,7 +475,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
474475
}
475476
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
476477
String indexName = EnrichPolicy.getBaseName(policyName);
477-
indexResolver.resolveAsMergedMappingForVersion(
478+
indexResolver.resolveAsMergedMapping(
478479
indexName,
479480
IndexResolver.ALL_FIELDS,
480481
null,
@@ -483,7 +484,8 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
483484
false,
484485
false,
485486
request.minimumVersion,
486-
refs.acquire(indexResult -> {
487+
refs.acquire(versionedIndexResult -> {
488+
IndexResolution indexResult = versionedIndexResult.inner();
487489
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
488490
EsIndex esIndex = indexResult.get();
489491
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ private void preAnalyzeLookupIndex(
608608
ThreadPool.Names.SEARCH_COORDINATION,
609609
ThreadPool.Names.SYSTEM_READ
610610
);
611-
indexResolver.resolveAsMergedMappingForVersion(
611+
indexResolver.resolveAsMergedMapping(
612612
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
613613
result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
614614
null,
@@ -642,8 +642,9 @@ private PreAnalysisResult receiveLookupIndexResolution(
642642
PreAnalysisResult result,
643643
String index,
644644
EsqlExecutionInfo executionInfo,
645-
IndexResolution lookupIndexResolution
645+
Versioned<IndexResolution> versionedLookupIndexResolution
646646
) {
647+
IndexResolution lookupIndexResolution = versionedLookupIndexResolution.inner();
647648
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, lookupIndexResolution.failures());
648649
if (lookupIndexResolution.isValid() == false) {
649650
// If the index resolution is invalid, don't bother with the rest of the analysis
@@ -673,7 +674,9 @@ private PreAnalysisResult receiveLookupIndexResolution(
673674
+ "] mode"
674675
);
675676
}
676-
return result.addLookupIndexResolution(index, lookupIndexResolution);
677+
678+
return result.addLookupIndexResolution(index, lookupIndexResolution)
679+
.withMinimumTransportVersion(versionedLookupIndexResolution.minimumVersion());
677680
}
678681

679682
if (lookupIndexResolution.get().indexNameWithModes().isEmpty() && lookupIndexResolution.resolvedIndices().isEmpty() == false) {
@@ -860,6 +863,7 @@ private void preAnalyzeMainIndices(
860863
indexMode == IndexMode.TIME_SERIES,
861864
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
862865
preAnalysis.useDenseVectorWhenNotSupported(),
866+
null,
863867
listener.delegateFailureAndWrap((l, indexResolution) -> {
864868
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
865869
l.onResponse(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -83,67 +83,26 @@ public IndexResolver(Client client) {
8383
this.client = client;
8484
}
8585

86-
/**
87-
* Like {@link #resolveAsMergedMapping(String, Set, QueryBuilder, boolean, boolean, boolean, ActionListener)},
88-
* but uses a pre-determined minimum transport version.
89-
*/
90-
public void resolveAsMergedMappingForVersion(
91-
String indexPattern,
92-
Set<String> fieldNames,
93-
QueryBuilder requestFilter,
94-
boolean includeAllDimensions,
95-
boolean useAggregateMetricDoubleWhenNotSupported,
96-
boolean useDenseVectorWhenNotSupported,
97-
TransportVersion minimumVersion,
98-
ActionListener<IndexResolution> listener
99-
) {
100-
ActionListener<Versioned<IndexResolution>> versionIgnoringListener = listener.delegateFailureAndWrap(
101-
(l, versionedResolution) -> l.onResponse(versionedResolution.inner())
102-
);
103-
104-
resolveAsMergedMapping(
105-
indexPattern,
106-
fieldNames,
107-
requestFilter,
108-
includeAllDimensions,
109-
useAggregateMetricDoubleWhenNotSupported,
110-
useDenseVectorWhenNotSupported,
111-
minimumVersion,
112-
versionIgnoringListener
113-
);
114-
}
115-
11686
/**
11787
* Perform a field caps request to resolve a pattern to one mapping (potentially compound, meaning it spans multiple indices).
11888
* The field caps response contains the minimum transport version of all clusters that apply to the pattern,
11989
* and it is used to deal with previously unsupported data types during resolution.
12090
* <p>
121-
* The retrieved minimum version is passed on to the listener.
122-
* <p>
12391
* If a field's type is not supported on the minimum version, it will be {@link DataType#UNSUPPORTED}.
92+
* <p>
93+
* If the nodes are too old to include their minimum transport version in the field caps response, we'll assume
94+
* {@link TransportVersion#minimumCompatible()}.
95+
* <p>
96+
* Optionally, a {@code minimumVersion} can be passed in that will be used instead if it is lower than the transport
97+
* version from the field caps response. It's meant to be the minimum version determined when resolving {@code FROM}
98+
* and is required to correctly resolve {@code ENRICH} queries in CCS (enrich policies are resolved locally and thus
99+
* might have a higher transport version in their field caps response than when resolving the main indices in {@code FROM}).
100+
* In case of {@code ROW}, it's also okay to pass in the version from the main index resolution; that will be the coordinator
101+
* version, which cannot be higher than the minimum version from the field caps response.
102+
* <p>
103+
* The overall minimum version which is used to determine data type support is passed on to the listener.
124104
*/
125105
public void resolveAsMergedMapping(
126-
String indexPattern,
127-
Set<String> fieldNames,
128-
QueryBuilder requestFilter,
129-
boolean includeAllDimensions,
130-
boolean useAggregateMetricDoubleWhenNotSupported,
131-
boolean useDenseVectorWhenNotSupported,
132-
ActionListener<Versioned<IndexResolution>> listener
133-
) {
134-
resolveAsMergedMapping(
135-
indexPattern,
136-
fieldNames,
137-
requestFilter,
138-
includeAllDimensions,
139-
useAggregateMetricDoubleWhenNotSupported,
140-
useDenseVectorWhenNotSupported,
141-
null,
142-
listener
143-
);
144-
}
145-
146-
private void resolveAsMergedMapping(
147106
String indexPattern,
148107
Set<String> fieldNames,
149108
QueryBuilder requestFilter,
@@ -157,9 +116,15 @@ private void resolveAsMergedMapping(
157116
EsqlResolveFieldsAction.TYPE,
158117
createFieldCapsRequest(indexPattern, fieldNames, requestFilter, includeAllDimensions),
159118
listener.delegateFailureAndWrap((l, response) -> {
119+
TransportVersion responseMinimumVersion = response.caps().minTransportVersion();
120+
// If we don't know the overall minimum version, it stays null to avoid faking knowledge we don't have.
121+
TransportVersion overallMinimumVersion = responseMinimumVersion == null ? null
122+
: minimumVersion == null ? responseMinimumVersion
123+
: TransportVersion.min(minimumVersion, responseMinimumVersion);
124+
160125
FieldsInfo info = new FieldsInfo(
161126
response.caps(),
162-
minimumVersion == null ? response.caps().minTransportVersion() : minimumVersion,
127+
overallMinimumVersion,
163128
Build.current().isSnapshot(),
164129
useAggregateMetricDoubleWhenNotSupported,
165130
useDenseVectorWhenNotSupported

0 commit comments

Comments
 (0)