Skip to content

Commit a345f56

Browse files
authored
Fix counting skipped shards with filters (elastic#131737)
* Fix counting skipped shards with filters
1 parent 989f72b commit a345f56

File tree

5 files changed

+47
-17
lines changed

5 files changed

+47
-17
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,6 @@ tests:
307307
- class: org.elasticsearch.xpack.search.CrossClusterAsyncSearchIT
308308
method: testCCSClusterDetailsWhereAllShardsSkippedInCanMatch
309309
issue: https://github.com/elastic/elasticsearch/issues/128418
310-
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithFiltersIT
311-
method: testTimestampFilterFromQuery
312-
issue: https://github.com/elastic/elasticsearch/issues/127332
313310
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
314311
method: testSearchWhileRelocating
315312
issue: https://github.com/elastic/elasticsearch/issues/128500

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ record Doc(int id, String color, long data) {
8181
final String lookupIndexLocal = "test-lookup-index-local";
8282
final String lookupIndexRemote = "test-lookup-index-remote";
8383
final String lookupAlias = "test-lookup-index";
84+
private Boolean shouldCheckShardCounts = null;
8485

8586
@Before
8687
public void setUpIndices() throws Exception {
@@ -211,6 +212,17 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques
211212
}
212213
}
213214

215+
private boolean checkShardCounts() {
216+
if (shouldCheckShardCounts == null) {
217+
try {
218+
shouldCheckShardCounts = capabilitiesSupportedNewAndOld(List.of("correct_skipped_shard_count"));
219+
} catch (IOException e) {
220+
shouldCheckShardCounts = false;
221+
}
222+
}
223+
return shouldCheckShardCounts;
224+
}
225+
214226
private <C, V> void assertResultMapWithCapabilities(
215227
boolean includeCCSMetadata,
216228
Map<String, Object> result,
@@ -341,11 +353,16 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
341353
assertThat(
342354
remoteClusterShards,
343355
matchesMap().entry("total", greaterThanOrEqualTo(0))
344-
.entry("successful", remoteClusterShards.get("total"))
356+
.entry("successful", greaterThanOrEqualTo(0))
345357
.entry("skipped", greaterThanOrEqualTo(0))
346358
.entry("failed", 0)
347359
);
348-
360+
if (checkShardCounts()) {
361+
assertThat(
362+
(int) remoteClusterShards.get("successful") + (int) remoteClusterShards.get("skipped"),
363+
equalTo(remoteClusterShards.get("total"))
364+
);
365+
}
349366
if (remoteOnly == false) {
350367
@SuppressWarnings("unchecked")
351368
Map<String, Object> localCluster = (Map<String, Object>) details.get("(local)");
@@ -359,10 +376,16 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
359376
assertThat(
360377
localClusterShards,
361378
matchesMap().entry("total", greaterThanOrEqualTo(0))
362-
.entry("successful", localClusterShards.get("total"))
379+
.entry("successful", greaterThanOrEqualTo(0))
363380
.entry("skipped", greaterThanOrEqualTo(0))
364381
.entry("failed", 0)
365382
);
383+
if (checkShardCounts()) {
384+
assertThat(
385+
(int) localClusterShards.get("successful") + (int) localClusterShards.get("skipped"),
386+
equalTo(localClusterShards.get("total"))
387+
);
388+
}
366389
}
367390
}
368391

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ protected void assertClusterMetadata(EsqlExecutionInfo.Cluster clusterMetatata,
6262
protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) {
6363
assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL);
6464
assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
65-
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
66-
assertThat(clusterMetatata.getSkippedShards(), equalTo(0));
65+
// We should have at least one successful shard for data
66+
assertThat(clusterMetatata.getSuccessfulShards(), greaterThanOrEqualTo(1));
67+
// Some shards may be skipped, but total sum of the shards should match up
68+
assertThat(clusterMetatata.getSkippedShards() + clusterMetatata.getSuccessfulShards(), equalTo(shards));
6769
}
6870

6971
protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) {
@@ -81,7 +83,7 @@ protected void assertClusterMetadataSkippedShards(
8183
) {
8284
assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL);
8385
assertThat(clusterMetatata.getTotalShards(), equalTo(shards));
84-
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards));
86+
assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0));
8587
assertThat(clusterMetatata.getSkippedShards(), equalTo(shards));
8688
}
8789

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,12 @@ public enum Cap {
12991299
/**
13001300
* Support for the options field of CATEGORIZE.
13011301
*/
1302-
CATEGORIZE_OPTIONS;
1302+
CATEGORIZE_OPTIONS,
1303+
1304+
/**
1305+
* Support correct counting of skipped shards.
1306+
*/
1307+
CORRECT_SKIPPED_SHARDS_COUNT;
13031308

13041309
private final boolean enabled;
13051310

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,20 @@ final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTa
134134
var computeListener = new ComputeListener(
135135
transportService.getThreadPool(),
136136
runOnTaskFailure,
137-
listener.map(
138-
completionInfo -> new ComputeResponse(
137+
listener.map(completionInfo -> {
138+
final int totalSkipShards = targetShards.skippedShards() + skippedShards.get();
139+
final int failedShards = shardFailures.size();
140+
final int successfulShards = targetShards.totalShards() - totalSkipShards - failedShards;
141+
return new ComputeResponse(
139142
completionInfo,
140143
timeValueNanos(System.nanoTime() - startTimeInNanos),
141144
targetShards.totalShards(),
142-
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
143-
targetShards.skippedShards() + skippedShards.get(),
144-
shardFailures.size(),
145+
successfulShards,
146+
totalSkipShards,
147+
failedShards,
145148
selectFailures()
146-
)
147-
)
149+
);
150+
})
148151
)
149152
) {
150153
pendingShardIds.addAll(order(targetShards));

0 commit comments

Comments
 (0)