Skip to content

Commit 04ff798

Browse files
authored
Cleanup esql session (elastic#134998)
1 parent 71be643 commit 04ff798

File tree

6 files changed

+108
-92
lines changed

6 files changed

+108
-92
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,10 @@ public Stream<Cluster> getClusterStates(Cluster.Status status) {
319319
return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status);
320320
}
321321

322+
public Stream<String> getRunningClusterAliases() {
323+
return getClusterStates(Cluster.Status.RUNNING).map(Cluster::getClusterAlias);
324+
}
325+
322326
@Override
323327
public String toString() {
324328
return "EsqlExecutionInfo{"

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
*/
2323
public class PreAnalyzer {
2424

25-
public record PreAnalysis(IndexMode indexMode, IndexPattern index, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
25+
public record PreAnalysis(IndexMode indexMode, IndexPattern indexPattern, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
2626
public static final PreAnalysis EMPTY = new PreAnalysis(null, null, List.of(), List.of());
2727
}
2828

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import java.util.Map;
4141
import java.util.Objects;
4242
import java.util.Set;
43-
import java.util.stream.Collectors;
43+
44+
import static java.util.stream.Collectors.joining;
45+
import static java.util.stream.Collectors.toSet;
4446

4547
public class EsqlCCSUtils {
4648

@@ -178,6 +180,15 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
178180
}
179181
}
180182

183+
static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo, String localPattern) {
184+
if (executionInfo.getClusters().isEmpty()) {
185+
return localPattern;
186+
}
187+
return executionInfo.getRunningClusterAliases()
188+
.map(clusterAlias -> RemoteClusterAware.buildRemoteIndexName(clusterAlias, localPattern))
189+
.collect(joining(","));
190+
}
191+
181192
static void updateExecutionInfoWithUnavailableClusters(
182193
EsqlExecutionInfo execInfo,
183194
Map<String, List<FieldCapabilitiesFailure>> failures
@@ -205,9 +216,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
205216
) {
206217
// Get the clusters which are still running, and we will check whether they have any matching indices.
207218
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
208-
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
209-
.map(Cluster::getClusterAlias)
210-
.collect(Collectors.toSet());
219+
final Set<String> clustersWithNoMatchingIndices = executionInfo.getRunningClusterAliases().collect(toSet());
211220
for (String indexName : indexResolution.resolvedIndices()) {
212221
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
213222
}
@@ -323,10 +332,10 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
323332
public static void initCrossClusterState(
324333
IndicesExpressionGrouper indicesGrouper,
325334
XPackLicenseState licenseState,
326-
IndexPattern pattern,
335+
IndexPattern indexPattern,
327336
EsqlExecutionInfo executionInfo
328337
) throws ElasticsearchStatusException {
329-
if (pattern == null) {
338+
if (indexPattern == null) {
330339
return;
331340
}
332341
try {
@@ -335,7 +344,7 @@ public static void initCrossClusterState(
335344
// it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with *
336345
Set.copyOf(indicesGrouper.getConfiguredClusters()),
337346
IndicesOptions.DEFAULT,
338-
pattern.indexPattern()
347+
indexPattern.indexPattern()
339348
);
340349

341350
executionInfo.clusterInfoInitializing(true);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@
8484
import java.util.List;
8585
import java.util.Map;
8686
import java.util.Set;
87-
import java.util.stream.Collectors;
88-
import java.util.stream.Stream;
8987

9088
import static java.util.stream.Collectors.joining;
9189
import static java.util.stream.Collectors.toSet;
@@ -396,7 +394,7 @@ public void analyzedPlan(
396394
}
397395

398396
var preAnalysis = preAnalyzer.preAnalyze(parsed);
399-
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo);
397+
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);
400398

401399
SubscribableListener. //
402400
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
@@ -437,25 +435,9 @@ private void preAnalyzeLookupIndex(
437435
ThreadPool.Names.SEARCH_COORDINATION,
438436
ThreadPool.Names.SYSTEM_READ
439437
);
440-
Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;
441-
442-
String patternWithRemotes;
443-
444-
if (executionInfo.getClusters().isEmpty()) {
445-
patternWithRemotes = localPattern;
446-
} else {
447-
// convert index -> cluster1:index,cluster2:index, etc.for each running cluster
448-
patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
449-
.map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
450-
.collect(Collectors.joining(","));
451-
}
452-
if (patternWithRemotes.isEmpty()) {
453-
return;
454-
}
455-
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
456438
indexResolver.resolveAsMergedMapping(
457-
patternWithRemotes,
458-
fieldNames,
439+
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, localPattern),
440+
result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames,
459441
null,
460442
false,
461443
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
@@ -560,10 +542,8 @@ private PreAnalysisResult receiveLookupIndexResolution(
560542
});
561543

562544
// These are clusters that are still in the running, we need to have the index on all of them
563-
Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
564545
// Verify that all active clusters have the lookup index resolved
565-
clusters.forEach(cluster -> {
566-
String clusterAlias = cluster.getClusterAlias();
546+
executionInfo.getRunningClusterAliases().forEach(clusterAlias -> {
567547
if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) {
568548
// Missing cluster resolution
569549
skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias));
@@ -622,9 +602,7 @@ private IndexResolution checkSingleIndex(
622602
* concrete indices aliased to the same index name.
623603
*/
624604
private void validateRemoteVersions(EsqlExecutionInfo executionInfo) {
625-
Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
626-
clusters.forEach(cluster -> {
627-
String clusterAlias = cluster.getClusterAlias();
605+
executionInfo.getRunningClusterAliases().forEach(clusterAlias -> {
628606
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
629607
// No need to check local, obviously
630608
var connection = remoteClusterService.getConnection(clusterAlias);
@@ -655,31 +633,30 @@ private void preAnalyzeMainIndices(
655633
ThreadPool.Names.SEARCH_COORDINATION,
656634
ThreadPool.Names.SYSTEM_READ
657635
);
658-
if (preAnalysis.index() != null) {
636+
if (preAnalysis.indexPattern() != null) {
659637
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
660638
if (indexExpressionToResolve.isEmpty()) {
661639
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
662640
listener.onResponse(
663-
result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of())))
641+
result.withIndexResolution(
642+
IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))
643+
)
664644
);
665645
} else {
666-
boolean includeAllDimensions = false;
667-
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
668-
if (preAnalysis.indexMode() == IndexMode.TIME_SERIES) {
669-
includeAllDimensions = true;
670-
// TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message.
671-
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
672-
if (requestFilter != null) {
673-
requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter);
674-
} else {
675-
requestFilter = indexModeFilter;
676-
}
677-
}
678646
indexResolver.resolveAsMergedMapping(
679647
indexExpressionToResolve,
680648
result.fieldNames,
681-
requestFilter,
682-
includeAllDimensions,
649+
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
650+
switch (preAnalysis.indexMode()) {
651+
case IndexMode.TIME_SERIES -> {
652+
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
653+
yield requestFilter != null
654+
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
655+
: indexModeFilter;
656+
}
657+
default -> requestFilter;
658+
},
659+
preAnalysis.indexMode() == IndexMode.TIME_SERIES,
683660
listener.delegateFailure((l, indexResolution) -> {
684661
l.onResponse(result.withIndexResolution(indexResolution));
685662
})

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,12 +530,12 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiInd
530530

531531
private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) {
532532
var preAnalysis = new PreAnalyzer().preAnalyze(parsed);
533-
if (preAnalysis.index() == null) {
533+
if (preAnalysis.indexPattern() == null) {
534534
// If the data set doesn't matter we'll just grab one we know works. Employees is fine.
535535
return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees"));
536536
}
537537

538-
String indexName = preAnalysis.index().indexPattern();
538+
String indexName = preAnalysis.indexPattern().indexPattern();
539539
List<CsvTestsDataLoader.TestDataset> datasets = new ArrayList<>();
540540
if (indexName.endsWith("*")) {
541541
String indexPrefix = indexName.substring(0, indexName.length() - 1);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@
3232
import org.elasticsearch.xpack.esql.index.IndexResolution;
3333
import org.elasticsearch.xpack.esql.plan.IndexPattern;
3434
import org.elasticsearch.xpack.esql.type.EsFieldTests;
35+
import org.hamcrest.Matcher;
3536

3637
import java.util.ArrayList;
3738
import java.util.Arrays;
3839
import java.util.EnumSet;
3940
import java.util.HashMap;
40-
import java.util.HashSet;
4141
import java.util.List;
4242
import java.util.Locale;
4343
import java.util.Map;
@@ -60,20 +60,16 @@ public class EsqlCCSUtilsTests extends ESTestCase {
6060
private final String REMOTE2_ALIAS = "remote2";
6161

6262
public void testCreateIndexExpressionFromAvailableClusters() {
63-
63+
var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
6464
// no clusters marked as skipped
6565
{
6666
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
6767
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
6868
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
6969
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
70-
71-
String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
72-
List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
73-
assertThat(list.size(), equalTo(5));
74-
assertThat(
75-
new HashSet<>(list),
76-
equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*"))
70+
assertIndexPattern(
71+
EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
72+
containsInAnyOrder("logs*", "remote1:*", "remote2:mylogs1", "remote2:mylogs2", "remote2:logs*")
7773
);
7874
}
7975

@@ -84,38 +80,23 @@ public void testCreateIndexExpressionFromAvailableClusters() {
8480
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true));
8581
executionInfo.swapCluster(
8682
REMOTE2_ALIAS,
87-
(k, v) -> new EsqlExecutionInfo.Cluster(
88-
REMOTE2_ALIAS,
89-
"mylogs1,mylogs2,logs*",
90-
true,
91-
EsqlExecutionInfo.Cluster.Status.SKIPPED
92-
)
83+
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
84+
);
85+
assertIndexPattern(
86+
EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
87+
containsInAnyOrder("logs*", "remote1:*", "remote1:foo")
9388
);
94-
95-
String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
96-
List<String> list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList();
97-
assertThat(list.size(), equalTo(3));
98-
assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo")));
9989
}
10090

10191
// two clusters marked as skipped, so only local cluster present in revised index expression
10292
{
10393
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
10494
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
105-
executionInfo.swapCluster(
106-
REMOTE1_ALIAS,
107-
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
108-
);
95+
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, skipped));
10996
executionInfo.swapCluster(
11097
REMOTE2_ALIAS,
111-
(k, v) -> new EsqlExecutionInfo.Cluster(
112-
REMOTE2_ALIAS,
113-
"mylogs1,mylogs2,logs*",
114-
true,
115-
EsqlExecutionInfo.Cluster.Status.SKIPPED
116-
)
98+
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
11799
);
118-
119100
assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*"));
120101
}
121102

@@ -128,18 +109,64 @@ public void testCreateIndexExpressionFromAvailableClusters() {
128109
);
129110
executionInfo.swapCluster(
130111
REMOTE2_ALIAS,
131-
(k, v) -> new EsqlExecutionInfo.Cluster(
132-
REMOTE2_ALIAS,
133-
"mylogs1,mylogs2,logs*",
134-
true,
135-
EsqlExecutionInfo.Cluster.Status.SKIPPED
136-
)
112+
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
137113
);
138-
139114
assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo(""));
140115
}
141116
}
142117

118+
public void testCreateQualifiedLookupIndexExpressionFromAvailableClusters() {
119+
120+
var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
121+
// no clusters marked as skipped
122+
{
123+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
124+
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
125+
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true));
126+
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true));
127+
assertIndexPattern(
128+
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
129+
containsInAnyOrder("lookup", REMOTE1_ALIAS + ":lookup", REMOTE2_ALIAS + ":lookup")
130+
);
131+
}
132+
// one cluster marked as skipped
133+
{
134+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
135+
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
136+
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true));
137+
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
138+
assertIndexPattern(
139+
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
140+
containsInAnyOrder("lookup", REMOTE1_ALIAS + ":lookup")
141+
);
142+
}
143+
// all remotes marked as skipped
144+
{
145+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
146+
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "", false));
147+
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, skipped));
148+
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
149+
assertIndexPattern(
150+
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
151+
containsInAnyOrder("lookup")
152+
);
153+
}
154+
// all remotes are skipped and no local
155+
{
156+
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
157+
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "", true, skipped));
158+
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "", true, skipped));
159+
assertIndexPattern(
160+
EsqlCCSUtils.createQualifiedLookupIndexExpressionFromAvailableClusters(executionInfo, "lookup"),
161+
containsInAnyOrder()
162+
);
163+
}
164+
}
165+
166+
private static void assertIndexPattern(String indexPattern, Matcher<Iterable<? extends String>> matcher) {
167+
assertThat(Set.of(Strings.splitStringByCommaToArray(indexPattern)), matcher);
168+
}
169+
143170
public void testUpdateExecutionInfoWithUnavailableClusters() {
144171

145172
// skip_unavailable=true clusters are unavailable, both marked as SKIPPED
@@ -806,5 +833,4 @@ public Map<String, OriginalIndices> groupIndices(
806833
return originalIndicesMap;
807834
}
808835
}
809-
810836
}

0 commit comments

Comments
 (0)