-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Enrich after main field caps #134290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enrich after main field caps #134290
Changes from 17 commits
0fe67dc
3c1ce1f
6c37a8f
5582140
db25c08
5cf06b6
1362fa9
4613736
d4446bd
43171a6
fcce03e
fe40176
f87d225
5489c5b
146e3ec
546bedf
4957da3
41d179d
b00fd59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -59,6 +59,7 @@ | |||||||||
| import java.util.Set; | ||||||||||
| import java.util.stream.Collectors; | ||||||||||
|
|
||||||||||
| import static java.util.stream.Collectors.toSet; | ||||||||||
| import static org.elasticsearch.xpack.esql.expression.Foldables.stringLiteralValueOf; | ||||||||||
| import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards; | ||||||||||
|
|
||||||||||
|
|
@@ -121,7 +122,7 @@ public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionIn | |||||||||
| } | ||||||||||
|
|
||||||||||
| doResolvePolicies( | ||||||||||
| new HashSet<>(executionInfo.getClusters().keySet()), | ||||||||||
| executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()), | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need this check? Wouldn't the stream take care of it anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Lines 317 to 320 in 04ff798
I assume that enforces us to perform a "in ccs" check. Please let me know if that could be done simpler |
||||||||||
| enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), | ||||||||||
| executionInfo, | ||||||||||
| listener | ||||||||||
|
|
@@ -310,7 +311,7 @@ private void lookupPolicies( | |||||||||
| Set<String> remotePolicies = unresolvedPolicies.stream() | ||||||||||
| .filter(u -> u.mode != Enrich.Mode.COORDINATOR) | ||||||||||
| .map(u -> u.name) | ||||||||||
| .collect(Collectors.toSet()); | ||||||||||
| .collect(toSet()); | ||||||||||
| // remote clusters | ||||||||||
| if (remotePolicies.isEmpty() == false) { | ||||||||||
| for (String cluster : remoteClusters) { | ||||||||||
|
|
@@ -342,7 +343,7 @@ public void onFailure(Exception e) { | |||||||||
| Set<String> localPolicies = unresolvedPolicies.stream() | ||||||||||
| .filter(u -> includeLocal || u.mode != Enrich.Mode.REMOTE) | ||||||||||
| .map(u -> u.name) | ||||||||||
| .collect(Collectors.toSet()); | ||||||||||
| .collect(toSet()); | ||||||||||
| if (localPolicies.isEmpty() == false) { | ||||||||||
| transportService.sendRequest( | ||||||||||
| transportService.getLocalNode(), | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -394,14 +394,27 @@ public void analyzedPlan( | |
| } | ||
|
|
||
| var preAnalysis = preAnalyzer.preAnalyze(parsed); | ||
| var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); | ||
|
|
||
| EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); | ||
|
|
||
| SubscribableListener. // | ||
| <EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l)) | ||
| .<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) | ||
| .<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l)) | ||
| .<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) | ||
| SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) | ||
| .andThenApply(r -> { | ||
| if (r.indices.isValid() | ||
| && executionInfo.isCrossClusterSearch() | ||
| && executionInfo.getRunningClusterAliases().findAny().isEmpty()) { | ||
| LOGGER.debug("No more clusters to search, ending analysis stage"); | ||
| throw new NoClustersToSearchException(); | ||
| } | ||
| return r; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is pulled to an earlier stage from analyzeWithRetry. |
||
| }) | ||
| .<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) | ||
| .<PreAnalysisResult>andThen((l, r) -> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: before this change, the resolution order was: Are there any reasons why we didn't keep the original order (apart from main of course), ie. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No particular reason actually. It could be any as long as main is first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd put inference last maybe, because enrich & lookup deal with remote clusters (and inference currently doesn't), where there's a high chance something may go wrong, and if it does, there's no need to even spending time on inference. |
||
| enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); | ||
| }) | ||
| .<PreAnalysisResult>andThen((l, r) -> { | ||
| inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(r::withInferenceResolution)); | ||
| }) | ||
| .<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I am a bit worried about this situation:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a valid concern. I believe in such case we need to retry the entire analysis including index resolution. |
||
| .addListener(logicalPlanListener); | ||
| } | ||
|
|
@@ -638,9 +651,7 @@ private void preAnalyzeMainIndices( | |
| if (indexExpressionToResolve.isEmpty()) { | ||
| // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution | ||
| listener.onResponse( | ||
| result.withIndexResolution( | ||
| IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())) | ||
| ) | ||
| result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) | ||
| ); | ||
| } else { | ||
| indexResolver.resolveAsMergedMapping( | ||
|
|
@@ -657,14 +668,15 @@ private void preAnalyzeMainIndices( | |
| default -> requestFilter; | ||
| }, | ||
| preAnalysis.indexMode() == IndexMode.TIME_SERIES, | ||
| listener.delegateFailure((l, indexResolution) -> { | ||
| l.onResponse(result.withIndexResolution(indexResolution)); | ||
| listener.delegateFailureAndWrap((l, indexResolution) -> { | ||
| EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is pulled to an earlier stage from |
||
| l.onResponse(result.withIndices(indexResolution)); | ||
| }) | ||
| ); | ||
| } | ||
| } else { | ||
| // occurs when dealing with local relations (row a = 1) | ||
| listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); | ||
| listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]"))); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -676,21 +688,8 @@ private void analyzeWithRetry( | |
| PreAnalysisResult result, | ||
| ActionListener<LogicalPlan> listener | ||
| ) { | ||
| if (result.indices.isValid()) { | ||
| EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); | ||
| if (executionInfo.isCrossClusterSearch() | ||
| && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { | ||
| // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception | ||
| // to let the LogicalPlanActionListener decide how to proceed | ||
| LOGGER.debug("No more clusters to search, ending analysis stage"); | ||
| listener.onFailure(new NoClustersToSearchException()); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter"; | ||
| LOGGER.debug("Analyzing the plan ({})", description); | ||
|
|
||
| try { | ||
| if (result.indices.isValid() || requestFilter != null) { | ||
| // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report | ||
|
|
@@ -715,7 +714,6 @@ private void analyzeWithRetry( | |
| try { | ||
| // the order here is tricky - if the cluster has been filtered and later became unavailable, | ||
| // do we want to declare it successful or skipped? For now, unavailability takes precedence. | ||
| EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures()); | ||
| EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false); | ||
| LogicalPlan plan = analyzedPlan(parsed, r, executionInfo); | ||
| LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan); | ||
|
|
@@ -730,10 +728,6 @@ private void analyzeWithRetry( | |
| } | ||
| } | ||
|
|
||
| private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) { | ||
| inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution)); | ||
| } | ||
|
|
||
| private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { | ||
| PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); | ||
| physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { | ||
|
|
@@ -793,43 +787,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { | |
| } | ||
|
|
||
| public record PreAnalysisResult( | ||
| Set<String> fieldNames, | ||
| Set<String> wildcardJoinIndices, | ||
| IndexResolution indices, | ||
| Map<String, IndexResolution> lookupIndices, | ||
| EnrichResolution enrichResolution, | ||
| Set<String> fieldNames, | ||
| Set<String> wildcardJoinIndices, | ||
| InferenceResolution inferenceResolution | ||
| ) { | ||
|
|
||
| public PreAnalysisResult(EnrichResolution enrichResolution, Set<String> fieldNames, Set<String> wildcardJoinIndices) { | ||
| this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); | ||
| public PreAnalysisResult(Set<String> fieldNames, Set<String> wildcardJoinIndices) { | ||
| this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY); | ||
| } | ||
|
|
||
| PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) { | ||
| return new PreAnalysisResult( | ||
| indices(), | ||
| lookupIndices(), | ||
| enrichResolution(), | ||
| fieldNames(), | ||
| wildcardJoinIndices(), | ||
| newInferenceResolution | ||
| ); | ||
| PreAnalysisResult withIndices(IndexResolution indices) { | ||
| return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); | ||
| } | ||
|
|
||
| PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { | ||
| return new PreAnalysisResult( | ||
| newIndexResolution, | ||
| lookupIndices(), | ||
| enrichResolution(), | ||
| fieldNames(), | ||
| wildcardJoinIndices(), | ||
| inferenceResolution() | ||
| ); | ||
| PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) { | ||
| lookupIndices.put(index, indexResolution); | ||
| return this; | ||
| } | ||
|
|
||
| PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { | ||
| lookupIndices.put(index, newIndexResolution); | ||
| return this; | ||
| PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) { | ||
| return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); | ||
| } | ||
|
|
||
| PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) { | ||
| return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and below addition of
required_capability: dense_vector_field_type.Previously enrich resolutions were happening before main field caps in order to resolve
matchFieldthat is later added to the list of fields in the main field caps call.In order to make main field caps call before enrich we have to request all fields in case there is any enrich in the query (as we do not know what might be their matchField yet). This list is kept and serialized within the plan. For this two affected queries we happen to query
from *that includes adense_vectorwith correspondingdense_vectorfield. It is not supported prior to 9.2. I am adding this capability in order to be able to deserialize a plan with this field on data nodes.