Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
55556b7
Restrict EnrichPolicyResolver to supported types
alex-spies Oct 31, 2025
3b2aa0d
Update docs/changelog/137431.yaml
alex-spies Oct 31, 2025
3ca4ed9
Plumb minimum version into ENRICH resolution
alex-spies Oct 31, 2025
117bccf
[CI] Update transport version definitions
Oct 31, 2025
c9fc534
Merge remote-tracking branch 'upstream/main' into fix-enrich-lj-resol…
alex-spies Oct 31, 2025
6387ac6
[CI] Update transport version definitions
Oct 31, 2025
04329c5
Plumb minimumVersion into the profile
alex-spies Nov 4, 2025
6a30c94
Update tests
alex-spies Nov 4, 2025
0ab7c07
Assert minimum version in AllSupportedFields tests
alex-spies Nov 4, 2025
fff61c3
Fix tests some more
alex-spies Nov 4, 2025
c923ad3
Fix test
alex-spies Nov 4, 2025
a2c5123
Merge remote-tracking branch 'upstream/main' into fix-enrich-lj-resol…
alex-spies Nov 4, 2025
e7c628b
Fix test expectations
alex-spies Nov 4, 2025
ea39c52
[CI] Update transport version definitions
Nov 4, 2025
11de8ec
Add test for ROW
alex-spies Nov 5, 2025
27d56f8
Remove obsolete expectations
alex-spies Nov 5, 2025
22b7aa5
WIP: LOOKUP JOIN tests
alex-spies Nov 5, 2025
6cf55ee
Consider min version from LOOKUP JOIN resolution
alex-spies Nov 5, 2025
f7939c0
Fix serialization exception
alex-spies Nov 6, 2025
bb1d7a1
Finish test for cluster-local LOOKUP JOIN
alex-spies Nov 6, 2025
c78dbd9
Rename minimumVersion->minimumTransportVersion in profile
alex-spies Nov 6, 2025
73b05d9
Add test with ROW | ENRICH
alex-spies Nov 6, 2025
3afd809
Fix + test ROW | ENRICH
alex-spies Nov 6, 2025
d08f481
Delete docs/changelog/137431.yaml
alex-spies Nov 6, 2025
7402ff0
Update docs/changelog/137431.yaml
alex-spies Nov 6, 2025
482404e
Delete docs/changelog/137431.yaml
alex-spies Nov 6, 2025
855fd38
Update docs/changelog/137431.yaml
alex-spies Nov 6, 2025
2b84f68
Fix bug
alex-spies Nov 6, 2025
cd8cb65
Add comment
alex-spies Nov 6, 2025
022c9c2
Merge remote-tracking branch 'upstream/main' into fix-enrich-lj-resol…
alex-spies Nov 6, 2025
15b607b
[CI] Update transport version definitions
Nov 6, 2025
8db7bd9
Remove leftovers
alex-spies Nov 6, 2025
edf69b3
Clarify comment
alex-spies Nov 6, 2025
1a3ddc8
Improve log message
alex-spies Nov 7, 2025
1a8cbe4
Rename resolveAsMergedMapping->resolveIndexPattern
alex-spies Nov 7, 2025
ced04e1
Obtain local cluster version from ClusterState
alex-spies Nov 27, 2025
c68bc07
Update comment
alex-spies Nov 27, 2025
31f8a1a
Fix comment
alex-spies Nov 27, 2025
b3b1fe6
Fix some more comments
alex-spies Nov 27, 2025
5b41cce
Merge remote-tracking branch 'upstream/main' into fix-enrich-lj-resol…
alex-spies Nov 28, 2025
8a6fb3c
Minor simplification
alex-spies Nov 28, 2025
853d867
Fix tests
alex-spies Nov 28, 2025
8e4852b
Fix comment
alex-spies Nov 28, 2025
8d62430
[CI] Update transport version definitions
Nov 28, 2025
e48a1fa
Simplify leftovers
alex-spies Nov 28, 2025
3dd0261
Fix checkstyle violations
alex-spies Nov 28, 2025
ae90dd8
Handle null values in profile's minimum version
alex-spies Nov 28, 2025
aa2fb92
Fix another test
alex-spies Nov 28, 2025
7e2bf04
Add test for FROM *:* (only remote indices)
alex-spies Nov 28, 2025
aa0693c
Add test for FROM | ENRICH
alex-spies Nov 28, 2025
4dfbb65
Add test for FROM | LOOKUP JOIN
alex-spies Nov 28, 2025
59e0ef7
Force lookup joins onto remotes for good measure
alex-spies Nov 28, 2025
87f4fa0
Make assertion stricter in test
alex-spies Nov 28, 2025
2c5e937
Merge branch 'main' into fix-enrich-lj-resolution-min-transport-version
craigtaverner Dec 1, 2025
14b2e4d
Merge branch 'main' into fix-enrich-lj-resolution-min-transport-version
alex-spies Dec 2, 2025
08eae94
Merge remote-tracking branch 'upstream/main' into fix-enrich-lj-resol…
alex-spies Dec 2, 2025
f3559db
[CI] Update transport version definitions
Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137431.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137431
summary: Fix enrich and lookup join resolution based on min transport version
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9211000,9185007
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ilm_searchable_snapshot_opt_out_clone,9185006
esql_use_minimum_version_for_enrich_resolution,9185007
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
add_sample_method_downsample_ilm,9210000
esql_use_minimum_version_for_enrich_resolution,9211000
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,6 @@ private Matcher<String> expectedType(DataType type) throws IOException {
yield equalTo("aggregate_metric_double");
}
case DENSE_VECTOR -> {
logger.error("ADFDAFAF " + minVersion());
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_USED_TV) == false
|| minVersion().supports(ESQL_DENSE_VECTOR_CREATED_VERSION) == false) {
yield equalTo("unsupported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -75,6 +76,10 @@
public class EnrichPolicyResolver {
private static final String RESOLVE_ACTION_NAME = "cluster:monitor/xpack/enrich/esql/resolve_policy";

private static final TransportVersion ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION = TransportVersion.fromName(
"esql_use_minimum_version_for_enrich_resolution"
);

private final ClusterService clusterService;
private final IndexResolver indexResolver;
private final TransportService transportService;
Expand Down Expand Up @@ -113,9 +118,16 @@ public static UnresolvedPolicy from(Enrich e) {
*
* @param enriches the unresolved policies
* @param executionInfo the execution info
* @param minimumVersion the minimum transport version of all clusters involved in the query, used for making the resolved mapping
* compatible with all possible nodes
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
public void resolvePolicies(
List<Enrich> enriches,
EsqlExecutionInfo executionInfo,
TransportVersion minimumVersion,
ActionListener<EnrichResolution> listener
) {
if (enriches.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
Expand All @@ -125,6 +137,7 @@ public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionIn
executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()),
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
executionInfo,
minimumVersion,
listener
);
}
Expand All @@ -133,6 +146,7 @@ protected void doResolvePolicies(
Set<String> remoteClusters,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
TransportVersion minimumVersion,
ActionListener<EnrichResolution> listener
) {
if (unresolvedPolicies.isEmpty()) {
Expand All @@ -141,7 +155,7 @@ protected void doResolvePolicies(
}

final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> {
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, minimumVersion, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
Expand Down Expand Up @@ -305,6 +319,7 @@ private void lookupPolicies(
boolean includeLocal,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
TransportVersion minimumVersion,
ActionListener<Map<String, LookupResponse>> listener
) {
final Map<String, LookupResponse> lookupResponses = ConcurrentCollections.newConcurrentMap();
Expand All @@ -324,7 +339,7 @@ public void onResponse(Transport.Connection connection) {
transportService.sendRequest(
connection,
RESOLVE_ACTION_NAME,
new LookupRequest(cluster, remotePolicies),
new LookupRequest(cluster, remotePolicies, minimumVersion),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)),
Expand All @@ -350,7 +365,7 @@ public void onFailure(Exception e) {
transportService.sendRequest(
transportService.getLocalNode(),
RESOLVE_ACTION_NAME,
new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies),
new LookupRequest(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, localPolicies, minimumVersion),
new ActionListenerResponseHandler<>(
refs.acquire(resp -> lookupResponses.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, resp)),
LookupResponse::new,
Expand All @@ -372,21 +387,32 @@ private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, Acti
private static class LookupRequest extends AbstractTransportRequest {
private final String clusterAlias;
private final Collection<String> policyNames;
// The minimum version of all clusters involved in executing the ESQL query.
final TransportVersion minimumVersion;

LookupRequest(String clusterAlias, Collection<String> policyNames) {
LookupRequest(String clusterAlias, Collection<String> policyNames, TransportVersion minimumVersion) {
this.clusterAlias = clusterAlias;
this.policyNames = policyNames;
this.minimumVersion = minimumVersion;
}

LookupRequest(StreamInput in) throws IOException {
this.clusterAlias = in.readString();
this.policyNames = in.readStringCollectionAsList();
if (in.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) {
this.minimumVersion = TransportVersion.readVersion(in);
} else {
this.minimumVersion = TransportVersion.minimumCompatible();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterAlias);
out.writeStringCollection(policyNames);
if (out.getTransportVersion().supports(ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION)) {
TransportVersion.writeVersion(minimumVersion, out);
}
}
}

Expand Down Expand Up @@ -447,14 +473,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
}
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
String indexName = EnrichPolicy.getBaseName(policyName);
indexResolver.resolveAsMergedMapping(
indexResolver.resolveAsMergedMappingForVersion(
indexName,
IndexResolver.ALL_FIELDS,
null,
false,
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
false,
false,
request.minimumVersion,
refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
EsIndex esIndex = indexResult.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,12 @@ private void resolveIndicesAndAnalyze(
})
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
.<PreAnalysisResult>andThen((l, r) -> {
enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution));
enrichPolicyResolver.resolvePolicies(
preAnalysis.enriches(),
executionInfo,
r.minimumTransportVersion(),
l.map(r::withEnrichResolution)
);
})
.<PreAnalysisResult>andThen((l, r) -> {
inferenceService.inferenceResolver(functionRegistry).resolveInferenceIds(parsed, l.map(r::withInferenceResolution));
Expand Down Expand Up @@ -610,14 +615,15 @@ private void preAnalyzeLookupIndex(
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
indexResolver.resolveAsMergedMapping(
indexResolver.resolveAsMergedMappingForVersion(
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
null,
false,
// Disable aggregate_metric_double and dense_vector until we get version checks in planning
false,
false,
result.minimumTransportVersion(),
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
);
}
Expand Down Expand Up @@ -851,7 +857,7 @@ private void preAnalyzeMainIndices(
result.withIndices(indexPattern, IndexResolution.valid(new EsIndex(indexPattern.indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion(
indexResolver.resolveAsMergedMapping(
indexPattern.indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,58 +84,88 @@ public IndexResolver(Client client) {
}

/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
* Like {@link #resolveAsMergedMapping(String, Set, QueryBuilder, boolean, boolean, boolean, ActionListener)},
* but uses a pre-determined minimum transport version.
*/
public void resolveAsMergedMapping(
String indexWildcard,
public void resolveAsMergedMappingForVersion(
String indexPattern,
Set<String> fieldNames,
QueryBuilder requestFilter,
boolean includeAllDimensions,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
TransportVersion minimumVersion,
ActionListener<IndexResolution> listener
) {
ActionListener<Versioned<IndexResolution>> ignoreVersion = listener.delegateFailureAndWrap(
ActionListener<Versioned<IndexResolution>> versionIgnoringListener = listener.delegateFailureAndWrap(
(l, versionedResolution) -> l.onResponse(versionedResolution.inner())
);

resolveAsMergedMappingAndRetrieveMinimumVersion(
indexWildcard,
resolveAsMergedMapping(
indexPattern,
fieldNames,
requestFilter,
includeAllDimensions,
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported,
ignoreVersion
minimumVersion,
versionIgnoringListener
);
}

/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. Also retrieves the minimum transport
* version available in the cluster (and remotes).
* Perform a field caps request to resolve a pattern to one mapping (potentially compound, meaning it spans multiple indices).
* The field caps response contains the minimum transport version of all clusters that apply to the pattern,
* and it is used to deal with previously unsupported data types during resolution.
* <p>
* The retrieved minimum version is passed on to the listener.
* <p>
* If a field's type is not supported on the minimum version, it will be {@link DataType#UNSUPPORTED}.
*/
public void resolveAsMergedMappingAndRetrieveMinimumVersion(
String indexWildcard,
public void resolveAsMergedMapping(
String indexPattern,
Set<String> fieldNames,
QueryBuilder requestFilter,
boolean includeAllDimensions,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
ActionListener<Versioned<IndexResolution>> listener
) {
resolveAsMergedMapping(
indexPattern,
fieldNames,
requestFilter,
includeAllDimensions,
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported,
null,
listener
);
}

private void resolveAsMergedMapping(
String indexPattern,
Set<String> fieldNames,
QueryBuilder requestFilter,
boolean includeAllDimensions,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
@Nullable TransportVersion minimumVersion,
ActionListener<Versioned<IndexResolution>> listener
) {
client.execute(
EsqlResolveFieldsAction.TYPE,
createFieldCapsRequest(indexWildcard, fieldNames, requestFilter, includeAllDimensions),
createFieldCapsRequest(indexPattern, fieldNames, requestFilter, includeAllDimensions),
listener.delegateFailureAndWrap((l, response) -> {
FieldsInfo info = new FieldsInfo(
response.caps(),
response.caps().minTransportVersion(),
minimumVersion == null ? response.caps().minTransportVersion() : minimumVersion,
Build.current().isSnapshot(),
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported
);
LOGGER.debug("minimum transport version {} {}", response.caps().minTransportVersion(), info.effectiveMinTransportVersion());
l.onResponse(new Versioned<>(mergedMappings(indexWildcard, info), info.effectiveMinTransportVersion()));
l.onResponse(new Versioned<>(mergedMappings(indexPattern, info), info.effectiveMinTransportVersion()));
})
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
}
}
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future);
// NOCOMMIT: We should pass in a sensible transport version in general and also test this with older ones.
super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, TransportVersion.current(), future);
return future.actionGet(30, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.telemetry;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
Expand Down Expand Up @@ -84,7 +85,7 @@ EnrichPolicyResolver mockEnrichResolver() {
ActionListener<EnrichResolution> listener = (ActionListener<EnrichResolution>) arguments[arguments.length - 1];
listener.onResponse(new EnrichResolution());
return null;
}).when(enrichResolver).resolvePolicies(any(), any(), any());
}).when(enrichResolver).resolvePolicies(any(), any(), TransportVersion.current(), any());
return enrichResolver;
}

Expand Down