4242import org .elasticsearch .xpack .esql .analysis .EnrichResolution ;
4343import org .elasticsearch .xpack .esql .core .type .DataType ;
4444import org .elasticsearch .xpack .esql .core .type .EsField ;
45+ import org .elasticsearch .xpack .esql .core .util .Holder ;
4546import org .elasticsearch .xpack .esql .core .util .StringUtils ;
4647import org .elasticsearch .xpack .esql .index .EsIndex ;
4748import org .elasticsearch .xpack .esql .index .IndexResolution ;
4849import org .elasticsearch .xpack .esql .io .stream .PlanStreamInput ;
4950import org .elasticsearch .xpack .esql .io .stream .PlanStreamOutput ;
5051import org .elasticsearch .xpack .esql .plan .logical .Enrich ;
5152import org .elasticsearch .xpack .esql .session .IndexResolver ;
53+ import org .elasticsearch .xpack .esql .session .Versioned ;
5254
5355import java .io .IOException ;
5456import java .util .ArrayList ;
7779public class EnrichPolicyResolver {
7880 private static final String RESOLVE_ACTION_NAME = "cluster:monitor/xpack/enrich/esql/resolve_policy" ;
7981
80- // NOCOMMIT: rename to something that represents the overall change
8182 public static final TransportVersion ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION = TransportVersion .fromName (
8283 "esql_use_minimum_version_for_enrich_resolution"
8384 );
@@ -128,10 +129,10 @@ public void resolvePolicies(
128129 List <Enrich > enriches ,
129130 EsqlExecutionInfo executionInfo ,
130131 TransportVersion minimumVersion ,
131- ActionListener <EnrichResolution > listener
132+ ActionListener <Versioned < EnrichResolution > > listener
132133 ) {
133134 if (enriches .isEmpty ()) {
134- listener .onResponse (new EnrichResolution ());
135+ listener .onResponse (new Versioned <>( new EnrichResolution (), minimumVersion ));
135136 return ;
136137 }
137138
@@ -149,10 +150,10 @@ protected void doResolvePolicies(
149150 Collection <UnresolvedPolicy > unresolvedPolicies ,
150151 EsqlExecutionInfo executionInfo ,
151152 TransportVersion minimumVersion ,
152- ActionListener <EnrichResolution > listener
153+ ActionListener <Versioned < EnrichResolution > > listener
153154 ) {
154155 if (unresolvedPolicies .isEmpty ()) {
155- listener .onResponse (new EnrichResolution ());
156+ listener .onResponse (new Versioned <>( new EnrichResolution (), minimumVersion ));
156157 return ;
157158 }
158159
@@ -178,6 +179,14 @@ protected void doResolvePolicies(
178179 }
179180 }
180181
182+ // Propagate the minimum version observed during policy resolution back to the planning pipeline.
183+ // This is only really required for `ROW | ENRICH` queries, where the main index resolution doesn't
184+ // provide the minimum version of the local cluster.
185+ TransportVersion updatedMinimumVersion = minimumVersion ;
186+ for (LookupResponse response : lookupResponsesToProcess .values ()) {
187+ updatedMinimumVersion = TransportVersion .min (updatedMinimumVersion , response .minimumVersion );
188+ }
189+
181190 for (UnresolvedPolicy unresolved : unresolvedPolicies ) {
182191 Tuple <ResolvedEnrichPolicy , String > resolved = mergeLookupResults (
183192 unresolved ,
@@ -192,7 +201,7 @@ protected void doResolvePolicies(
192201 enrichResolution .addError (unresolved .name , unresolved .mode , resolved .v2 ());
193202 }
194203 }
195- return enrichResolution ;
204+ return new Versioned <>( enrichResolution , updatedMinimumVersion ) ;
196205 }));
197206 }
198207
@@ -404,6 +413,10 @@ private static class LookupRequest extends AbstractTransportRequest {
404413 if (in .getTransportVersion ().supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )) {
405414 this .minimumVersion = TransportVersion .readVersion (in );
406415 } else {
416+ // An older coordinator contacted us. Let's assume an old version, otherwise we might send back
417+ // types that it can't deserialize.
418+ // (The only version that knows some new types but doesn't send its transport version here is 9.2.0;
419+ // these types are dense_vector and aggregate_metric_double, and both don't work with ENRICH in 9.2.0, anyway.)
407420 this .minimumVersion = TransportVersion .minimumCompatible ();
408421 }
409422 }
@@ -421,12 +434,15 @@ public void writeTo(StreamOutput out) throws IOException {
421434 private static class LookupResponse extends TransportResponse {
422435 final Map <String , ResolvedEnrichPolicy > policies ;
423436 final Map <String , String > failures ;
437+ // The minimum transport version observed when running field caps requests to resolve the policies
438+ final TransportVersion minimumVersion ;
424439 // does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster
425440 final transient Exception connectionError ;
426441
427- LookupResponse (Map <String , ResolvedEnrichPolicy > policies , Map <String , String > failures ) {
442+ LookupResponse (Map <String , ResolvedEnrichPolicy > policies , Map <String , String > failures , TransportVersion minimumVersion ) {
428443 this .policies = policies ;
429444 this .failures = failures ;
445+ this .minimumVersion = minimumVersion ;
430446 this .connectionError = null ;
431447 }
432448
@@ -438,13 +454,27 @@ private static class LookupResponse extends TransportResponse {
438454 LookupResponse (Exception connectionError ) {
439455 this .policies = Collections .emptyMap ();
440456 this .failures = Collections .emptyMap ();
457+ this .minimumVersion = TransportVersion .current ();
441458 this .connectionError = connectionError ;
442459 }
443460
444461 LookupResponse (StreamInput in ) throws IOException {
445462 PlanStreamInput planIn = new PlanStreamInput (in , in .namedWriteableRegistry (), null );
446463 this .policies = planIn .readMap (StreamInput ::readString , ResolvedEnrichPolicy ::new );
447464 this .failures = planIn .readMap (StreamInput ::readString , StreamInput ::readString );
465+ if (in .getTransportVersion ().supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )) {
466+ this .minimumVersion = TransportVersion .readVersion (in );
467+ } else {
468+ // A pre-9.2.1 node resolved the enrich policy for us, but doesn't say which version its cluster is on.
469+ // We can safely assume this node's current version, even though that's technically wrong.
470+ // Assuming a version that's too old can disable aggregate_metric_double and dense_vector
471+ // data types in the query, that'd be very bad.
472+ // But assuming these types are supported is fine because in 9.2.0,
473+ // they're not supported in enrich policies, anyway, due to bugs.
474+ // https://github.com/elastic/elasticsearch/issues/127350
475+ // https://github.com/elastic/elasticsearch/issues/137699
476+ this .minimumVersion = TransportVersion .minimumCompatible ();
477+ }
448478 this .connectionError = null ;
449479 }
450480
@@ -453,6 +483,9 @@ public void writeTo(StreamOutput out) throws IOException {
453483 PlanStreamOutput pso = new PlanStreamOutput (out , null );
454484 pso .writeMap (policies , StreamOutput ::writeWriteable );
455485 pso .writeMap (failures , StreamOutput ::writeString );
486+ if (out .getTransportVersion ().supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )) {
487+ TransportVersion .writeVersion (minimumVersion , out );
488+ }
456489 }
457490 }
458491
@@ -462,12 +495,17 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
462495 final Map <String , EnrichPolicy > availablePolicies = availablePolicies ();
463496 final Map <String , String > failures = ConcurrentCollections .newConcurrentMap ();
464497 final Map <String , ResolvedEnrichPolicy > resolvedPolices = ConcurrentCollections .newConcurrentMap ();
498+ // We use the coordinator's minimum version as base line.
499+ final Holder <TransportVersion > minimumVersion = new Holder <>(request .minimumVersion );
465500 ThreadContext threadContext = threadPool .getThreadContext ();
466501 ActionListener <LookupResponse > listener = ContextPreservingActionListener .wrapPreservingContext (
467502 new ChannelActionListener <>(channel ),
468503 threadContext
469504 );
470- try (var refs = new RefCountingListener (listener .map (unused -> new LookupResponse (resolvedPolices , failures )))) {
505+ try (var refs = new RefCountingListener (listener .map (unused -> {
506+ TransportVersion finalMinimumVersion = minimumVersion .get ();
507+ return new LookupResponse (resolvedPolices , failures , finalMinimumVersion );
508+ }))) {
471509 for (String policyName : request .policyNames ) {
472510 EnrichPolicy p = availablePolicies .get (policyName );
473511 if (p == null ) {
@@ -497,6 +535,11 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
497535 esIndex .mapping ()
498536 );
499537 resolvedPolices .put (policyName , resolved );
538+ synchronized (minimumVersion ) {
539+ minimumVersion .set (
540+ TransportVersion .min (minimumVersion .get (), versionedIndexResult .minimumVersion ())
541+ );
542+ }
500543 } else {
501544 failures .put (policyName , indexResult .toString ());
502545 }
0 commit comments