Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,26 @@ public static UnresolvedPolicy from(Enrich e) {
/**
* Resolves a set of enrich policies
*
* @param unresolvedPolicies the unresolved policies
* @param enriches the unresolved policies
* @param executionInfo the execution info
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
if (enriches.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}

doResolvePolicies(
new HashSet<>(executionInfo.getClusters().keySet()),
Copy link
Contributor Author

@idegtiarenko idegtiarenko Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This separates enrich resolution from resolving list of remotes.
Today this is simply taken from the parsed list of expressions.
In future this is going to be replaced with async index resolution step, remotes taken from its result and supplied to doResolvePolicies via listener

enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
executionInfo,
listener
);
}

protected void doResolvePolicies(
Set<String> remoteClusters,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
ActionListener<EnrichResolution> listener
Expand All @@ -124,13 +139,10 @@ public void resolvePolicies(
return;
}

final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();

Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();

final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
String clusterAlias = entry.getKey();
if (entry.getValue().connectionError != null) {
Expand Down Expand Up @@ -424,17 +436,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
new ChannelActionListener<>(channel),
threadContext
);
try (
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures)))
) {
try (var refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures)))) {
for (String policyName : request.policyNames) {
EnrichPolicy p = availablePolicies.get(policyName);
if (p == null) {
continue;
}
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
String indexName = EnrichPolicy.getBaseName(policyName);
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, refs.acquire(indexResult -> {
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
EsIndex esIndex = indexResult.get();
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0));
Expand All @@ -449,17 +459,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
} else {
failures.put(policyName, indexResult.toString());
}
}), false);
}));
}
}
}
}
}

protected Map<String, EnrichPolicy> availablePolicies() {
final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state())
.custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
return metadata.getPolicies();
return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies();
}

protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,10 @@ public void analyzedPlan(
}

PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to eagerly transform Enrich -> UnresolvedPolicy here.
This is going to be transformed later in enrichPolicyResolver but only if there are actual policies to resolve.

EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);

var listener = SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
// first resolve the lookup indices, then the main indices
Expand Down Expand Up @@ -424,8 +422,8 @@ private void preAnalyzeLookupIndex(
patternWithRemotes,
fieldNames,
null,
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)),
false
false,
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
);
}

Expand Down Expand Up @@ -655,10 +653,10 @@ private void preAnalyzeMainIndices(
indexExpressionToResolve,
result.fieldNames,
requestFilter,
includeAllDimensions,
listener.delegateFailure((l, indexResolution) -> {
l.onResponse(result.withIndexResolution(indexResolution));
}),
includeAllDimensions
})
);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void resolveAsMergedMapping(
String indexWildcard,
Set<String> fieldNames,
QueryBuilder requestFilter,
ActionListener<IndexResolution> listener,
boolean includeAllDimensions
boolean includeAllDimensions,
ActionListener<IndexResolution> listener
) {
client.execute(
EsqlResolveFieldsAction.TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -434,7 +435,6 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
}

EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
for (String cluster : clusters) {
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
Expand All @@ -452,7 +452,8 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
}
}
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future);
return future.actionGet(30, TimeUnit.SECONDS);
}

Expand Down