Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,15 @@
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyList;

/**
* This class is part of the planner. Acts somewhat like a linker, to find the indices and enrich policies referenced by the query.
*/
public class PreAnalyzer {

public static class PreAnalysis {
public static final PreAnalysis EMPTY = new PreAnalysis(null, emptyList(), emptyList(), emptyList());

public final IndexMode indexMode;
public final List<IndexPattern> indices;
public final List<Enrich> enriches;
public final List<IndexPattern> lookupIndices;

public PreAnalysis(IndexMode indexMode, List<IndexPattern> indices, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
this.indexMode = indexMode;
this.indices = indices;
this.enriches = enriches;
this.lookupIndices = lookupIndices;
}
public record PreAnalysis(IndexMode indexMode, IndexPattern index, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
public static final PreAnalysis EMPTY = new PreAnalysis(null, null, List.of(), List.of());
}

public PreAnalysis preAnalyze(LogicalPlan plan) {
Expand All @@ -51,18 +35,19 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {
}

protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
Set<IndexPattern> indices = new HashSet<>();

Holder<IndexMode> indexMode = new Holder<>();
Holder<IndexPattern> index = new Holder<>();

List<Enrich> unresolvedEnriches = new ArrayList<>();
List<IndexPattern> lookupIndices = new ArrayList<>();

Holder<IndexMode> indexMode = new Holder<>();
plan.forEachUp(UnresolvedRelation.class, p -> {
if (p.indexMode() == IndexMode.LOOKUP) {
lookupIndices.add(p.indexPattern());
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover: In case that we somehow find 2 indices in the same mode, we're going to silently throw away the previous one.

There shouldn't be 2 indices. This is enforced in EsqlSession#preAnalyzeMainIndices.

indexMode.set(p.indexMode());
indices.add(p.indexPattern());
index.set(p.indexPattern());
} else {
throw new IllegalStateException("index mode is already set");
}
Expand All @@ -73,7 +58,6 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
// mark plan as preAnalyzed (if it were marked, there would be no analysis)
plan.forEachUp(LogicalPlan::setPreAnalyzed);

return new PreAnalysis(indexMode.get(), indices.stream().toList(), unresolvedEnriches, lookupIndices);
return new PreAnalysis(indexMode.get(), index.get(), unresolvedEnriches, lookupIndices);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like indices were copied to immutable list here. I wonder if it is worth doing the same for unresolvedEnriches and lookupIndices?

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -322,20 +322,19 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
public static void initCrossClusterState(
IndicesExpressionGrouper indicesGrouper,
XPackLicenseState licenseState,
List<IndexPattern> patterns,
IndexPattern pattern,
EsqlExecutionInfo executionInfo
) throws ElasticsearchStatusException {
if (patterns.isEmpty()) {
if (pattern == null) {
return;
}
assert patterns.size() == 1 : "Only single index pattern is supported";
try {
var groupedIndices = indicesGrouper.groupIndices(
// indicesGrouper.getConfiguredClusters() might return mutable set that changes as clusters connect or disconnect.
// it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with *
Set.copyOf(indicesGrouper.getConfiguredClusters()),
IndicesOptions.DEFAULT,
patterns.getFirst().indexPattern()
pattern.indexPattern()
);

executionInfo.clusterInfoInitializing(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.index.MappingException;
import org.elasticsearch.xpack.esql.inference.InferenceResolution;
import org.elasticsearch.xpack.esql.inference.InferenceService;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
Expand Down Expand Up @@ -374,14 +373,14 @@ public void analyzedPlan(
}

var preAnalysis = preAnalyzer.preAnalyze(parsed);
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo);

SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices.iterator(), r, executionInfo, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
.addListener(logicalPlanListener);
}
Expand Down Expand Up @@ -633,26 +632,17 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
// TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already support lookup joins and they require a separate data structure.
Once we support nested queries they would likely need their own structure (to distinguish top level vs nested query).
Today we are guarded by grammar and parser to never have more than a single item in the list so indices.size() > 1 is effectively a dead code.

List<IndexPattern> indices = preAnalysis.indices;
if (indices.size() > 1) {
// Note: JOINs are not supported but we detect them when
listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
} else if (indices.size() == 1) {
IndexPattern table = indices.getFirst();

// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
// based only on available clusters (which could now be an empty list)
if (preAnalysis.index() != null) {
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(
result.withIndexResolution(IndexResolution.valid(new EsIndex(table.indexPattern(), Map.of(), Map.of())))
result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of())))
);
} else {
boolean includeAllDimensions = false;
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
if (preAnalysis.indexMode == IndexMode.TIME_SERIES) {
if (preAnalysis.indexMode() == IndexMode.TIME_SERIES) {
includeAllDimensions = true;
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
Expand All @@ -673,12 +663,8 @@ private void preAnalyzeMainIndices(
);
}
} else {
try {
// occurs when dealing with local relations (row a = 1)
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
} catch (Exception ex) {
listener.onFailure(ex);
}
// occurs when dealing with local relations (row a = 1)
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing could be thrown here.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,18 +530,12 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiInd

private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) {
var preAnalysis = new PreAnalyzer().preAnalyze(parsed);
var indices = preAnalysis.indices;
if (indices.isEmpty()) {
/*
* If the data set doesn't matter we'll just grab one we know works.
* Employees is fine.
*/
if (preAnalysis.index() == null) {
// If the data set doesn't matter we'll just grab one we know works. Employees is fine.
return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees"));
} else if (preAnalysis.indices.size() > 1) {
throw new IllegalArgumentException("unexpected index resolution to multiple entries [" + preAnalysis.indices.size() + "]");
}

String indexName = indices.getFirst().indexPattern();
String indexName = preAnalysis.index().indexPattern();
List<CsvTestsDataLoader.TestDataset> datasets = new ArrayList<>();
if (indexName.endsWith("*")) {
String indexPrefix = indexName.substring(0, indexName.length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public void testInitCrossClusterState() {

// local only search works with any license state
{
var localOnly = List.of(new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*")));
var localOnly = new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*"));

assertLicenseCheckPasses(indicesGrouper, null, localOnly, "");
for (var mode : License.OperationMode.values()) {
Expand All @@ -710,7 +710,7 @@ public void testInitCrossClusterState() {

// cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license
{
var remote = List.of(new IndexPattern(EMPTY, randomFrom("idx,remote:idx", "idx1,remote:idx2*,remote:logs")));
var remote = new IndexPattern(EMPTY, randomFrom("idx,remote:idx", "idx1,remote:idx2*,remote:logs"));

var supportedLicenses = EnumSet.of(License.OperationMode.TRIAL, License.OperationMode.ENTERPRISE);
var unsupportedLicenses = EnumSet.complementOf(supportedLicenses);
Expand Down Expand Up @@ -738,26 +738,26 @@ private static XPackLicenseState createLicenseState(XPackLicenseStatus status) {
private void assertLicenseCheckPasses(
TestIndicesExpressionGrouper indicesGrouper,
XPackLicenseStatus status,
List<IndexPattern> patterns,
IndexPattern pattern,
String... expectedRemotes
) {
var executionInfo = new EsqlExecutionInfo(true);
initCrossClusterState(indicesGrouper, createLicenseState(status), patterns, executionInfo);
initCrossClusterState(indicesGrouper, createLicenseState(status), pattern, executionInfo);
assertThat(executionInfo.clusterAliases(), containsInAnyOrder(expectedRemotes));
}

private void assertLicenseCheckFails(
TestIndicesExpressionGrouper indicesGrouper,
XPackLicenseStatus licenseStatus,
List<IndexPattern> patterns,
IndexPattern pattern,
String expectedErrorMessageSuffix
) {
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
equalTo(
"A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: " + expectedErrorMessageSuffix
),
() -> initCrossClusterState(indicesGrouper, createLicenseState(licenseStatus), patterns, new EsqlExecutionInfo(true))
() -> initCrossClusterState(indicesGrouper, createLicenseState(licenseStatus), pattern, new EsqlExecutionInfo(true))
);
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}
Expand Down