Skip to content

Commit 3b685cb

Browse files
authored
Fix test fail on unavailable shards resolver (#125096)
1 parent be553e3 commit 3b685cb

File tree

6 files changed

+97
-49
lines changed

6 files changed

+97
-49
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas)
166166
}
167167

168168
protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
169+
return run(esqlCommands, pragmas, filter, null);
170+
}
171+
172+
protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter, Boolean allowPartialResults) {
169173
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
170174
request.query(esqlCommands);
171175
if (pragmas != null) {
@@ -174,6 +178,9 @@ protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, Query
174178
if (filter != null) {
175179
request.filter(filter);
176180
}
181+
if (allowPartialResults != null) {
182+
request.allowPartialResults(allowPartialResults);
183+
}
177184
return run(request);
178185
}
179186

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.index.query.RangeQueryBuilder;
1919
import org.elasticsearch.index.shard.ShardId;
2020
import org.elasticsearch.plugins.Plugin;
21-
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
2221
import org.elasticsearch.test.transport.MockTransportService;
2322
import org.elasticsearch.transport.TransportService;
2423
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
@@ -30,6 +29,7 @@
3029
import java.util.Map;
3130
import java.util.Set;
3231

32+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3333
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
3434
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
3535
import static org.hamcrest.Matchers.containsString;
@@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4848
* Make sure that we don't send data-node requests to the target shards which won't match the query
4949
*/
5050
public void testCanMatch() {
51-
ElasticsearchAssertions.assertAcked(
51+
assertAcked(
5252
client().admin()
5353
.indices()
5454
.prepareCreate("events_2022")
@@ -60,9 +60,7 @@ public void testCanMatch() {
6060
.add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1"))
6161
.add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1"))
6262
.get();
63-
ElasticsearchAssertions.assertAcked(
64-
client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")
65-
);
63+
assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword"));
6664
client().prepareBulk("events_2023")
6765
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
6866
.add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2"))
@@ -72,15 +70,17 @@ public void testCanMatch() {
7270
.get();
7371
try {
7472
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
75-
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
76-
MockTransportService transportService = (MockTransportService) ts;
77-
transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
78-
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
79-
for (ShardId shardId : dataNodeRequest.shardIds()) {
80-
queriedIndices.add(shardId.getIndexName());
73+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
74+
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
75+
ComputeService.DATA_ACTION_NAME,
76+
(handler, request, channel, task) -> {
77+
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
78+
for (ShardId shardId : dataNodeRequest.shardIds()) {
79+
queriedIndices.add(shardId.getIndexName());
80+
}
81+
handler.messageReceived(request, channel, task);
8182
}
82-
handler.messageReceived(request, channel, task);
83-
});
83+
);
8484
}
8585
try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) {
8686
assertThat(getValuesList(resp), hasSize(4));
@@ -118,14 +118,14 @@ public void testCanMatch() {
118118
queriedIndices.clear();
119119
}
120120
} finally {
121-
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
122-
((MockTransportService) ts).clearAllRules();
121+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
122+
as(transportService, MockTransportService.class).clearAllRules();
123123
}
124124
}
125125
}
126126

127127
public void testAliasFilters() {
128-
ElasticsearchAssertions.assertAcked(
128+
assertAcked(
129129
client().admin()
130130
.indices()
131131
.prepareCreate("employees")
@@ -141,7 +141,7 @@ public void testAliasFilters() {
141141
.add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1))
142142
.get();
143143

144-
ElasticsearchAssertions.assertAcked(
144+
assertAcked(
145145
client().admin()
146146
.indices()
147147
.prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
@@ -209,11 +209,10 @@ public void testAliasFilters() {
209209
}
210210
}
211211

212-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749")
213212
public void testFailOnUnavailableShards() throws Exception {
214213
internalCluster().ensureAtLeastNumDataNodes(2);
215214
String logsOnlyNode = internalCluster().startDataOnlyNode();
216-
ElasticsearchAssertions.assertAcked(
215+
assertAcked(
217216
client().admin()
218217
.indices()
219218
.prepareCreate("events")
@@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
230229
.add(new IndexRequest().source("timestamp", 2, "message", "b"))
231230
.add(new IndexRequest().source("timestamp", 3, "message", "c"))
232231
.get();
233-
ElasticsearchAssertions.assertAcked(
232+
assertAcked(
234233
client().admin()
235234
.indices()
236235
.prepareCreate("logs")
@@ -246,12 +245,28 @@ public void testFailOnUnavailableShards() throws Exception {
246245
.add(new IndexRequest().source("timestamp", 10, "message", "aa"))
247246
.add(new IndexRequest().source("timestamp", 11, "message", "bb"))
248247
.get();
248+
249+
// when all shards available
249250
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) {
250251
assertThat(getValuesList(resp), hasSize(5));
251-
internalCluster().stopNode(logsOnlyNode);
252-
ensureClusterSizeConsistency();
253-
Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message"));
254-
assertThat(error.getMessage(), containsString("no shard copies found"));
252+
}
253+
254+
internalCluster().stopNode(logsOnlyNode);
255+
ensureClusterSizeConsistency();
256+
257+
// when one shard is unavailable
258+
expectThrows(
259+
Exception.class,
260+
containsString("index [logs] has no active shard copy"),
261+
() -> run("from events,logs | KEEP timestamp,message")
262+
);
263+
expectThrows(
264+
Exception.class,
265+
containsString("index [logs] has no active shard copy"),
266+
() -> run("from * | KEEP timestamp,message")
267+
);
268+
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message", null, null, true)) {
269+
assertThat(getValuesList(resp), hasSize(3));
255270
}
256271
}
257272

@@ -261,9 +276,7 @@ public void testSkipOnIndexName() {
261276
Map<String, Integer> indexToNumDocs = new HashMap<>();
262277
for (int i = 0; i < numIndices; i++) {
263278
String index = "events-" + i;
264-
ElasticsearchAssertions.assertAcked(
265-
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
266-
);
279+
assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword"));
267280
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
268281
int docs = between(1, 5);
269282
long timestamp = 1;
@@ -274,15 +287,17 @@ public void testSkipOnIndexName() {
274287
indexToNumDocs.put(index, docs);
275288
}
276289
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
277-
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
278-
MockTransportService mockTransportService = as(ts, MockTransportService.class);
279-
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
280-
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
281-
for (ShardId shardId : dataNodeRequest.shardIds()) {
282-
queriedIndices.add(shardId.getIndexName());
290+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
291+
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
292+
ComputeService.DATA_ACTION_NAME,
293+
(handler, request, channel, task) -> {
294+
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
295+
for (ShardId shardId : dataNodeRequest.shardIds()) {
296+
queriedIndices.add(shardId.getIndexName());
297+
}
298+
handler.messageReceived(request, channel, task);
283299
}
284-
handler.messageReceived(request, channel, task);
285-
});
300+
);
286301
}
287302
try {
288303
for (int i = 0; i < numIndices; i++) {
@@ -294,9 +309,8 @@ public void testSkipOnIndexName() {
294309
assertThat(queriedIndices, equalTo(Set.of(index)));
295310
}
296311
} finally {
297-
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
298-
MockTransportService mockTransportService = as(ts, MockTransportService.class);
299-
mockTransportService.clearAllRules();
312+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
313+
as(transportService, MockTransportService.class).clearAllRules();
300314
}
301315
}
302316
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
*/
77
package org.elasticsearch.xpack.esql.index;
88

9+
import org.elasticsearch.action.NoShardAvailableActionException;
910
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1011
import org.elasticsearch.core.Nullable;
1112

12-
import java.util.Collections;
1313
import java.util.Map;
1414
import java.util.Objects;
1515
import java.util.Set;
@@ -19,30 +19,33 @@ public final class IndexResolution {
1919
/**
2020
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
2121
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
22+
* @param unavailableShards Set of shards that were unavailable during index resolution
2223
* @param unavailableClusters Remote clusters that could not be contacted during planning
2324
* @return valid IndexResolution
2425
*/
2526
public static IndexResolution valid(
2627
EsIndex index,
2728
Set<String> resolvedIndices,
29+
Set<NoShardAvailableActionException> unavailableShards,
2830
Map<String, FieldCapabilitiesFailure> unavailableClusters
2931
) {
3032
Objects.requireNonNull(index, "index must not be null if it was found");
3133
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
34+
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
3235
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
33-
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
36+
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
3437
}
3538

3639
/**
3740
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
3841
*/
3942
public static IndexResolution valid(EsIndex index) {
40-
return valid(index, index.concreteIndices(), Collections.emptyMap());
43+
return valid(index, index.concreteIndices(), Set.of(), Map.of());
4144
}
4245

4346
public static IndexResolution invalid(String invalid) {
4447
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
45-
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
48+
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
4649
}
4750

4851
public static IndexResolution notFound(String name) {
@@ -56,18 +59,21 @@ public static IndexResolution notFound(String name) {
5659

5760
// all indices found by field-caps
5861
private final Set<String> resolvedIndices;
62+
private final Set<NoShardAvailableActionException> unavailableShards;
5963
// remote clusters included in the user's index expression that could not be connected to
6064
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;
6165

6266
private IndexResolution(
6367
EsIndex index,
6468
@Nullable String invalid,
6569
Set<String> resolvedIndices,
70+
Set<NoShardAvailableActionException> unavailableShards,
6671
Map<String, FieldCapabilitiesFailure> unavailableClusters
6772
) {
6873
this.index = index;
6974
this.invalid = invalid;
7075
this.resolvedIndices = resolvedIndices;
76+
this.unavailableShards = unavailableShards;
7177
this.unavailableClusters = unavailableClusters;
7278
}
7379

@@ -109,6 +115,13 @@ public Set<String> resolvedIndices() {
109115
return resolvedIndices;
110116
}
111117

118+
/**
119+
* @return set of unavailable shards during index resolution
120+
*/
121+
public Set<NoShardAvailableActionException> getUnavailableShards() {
122+
return unavailableShards;
123+
}
124+
112125
@Override
113126
public boolean equals(Object obj) {
114127
if (obj == null || obj.getClass() != getClass()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,13 @@ private void preAnalyzeIndices(
482482
indexExpressionToResolve,
483483
result.fieldNames,
484484
requestFilter,
485-
listener.map(indexResolution -> result.withIndexResolution(indexResolution))
485+
listener.delegateFailure((l, indexResolution) -> {
486+
if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) {
487+
l.onFailure(indexResolution.getUnavailableShards().iterator().next());
488+
} else {
489+
l.onResponse(result.withIndexResolution(indexResolution));
490+
}
491+
})
486492
);
487493
}
488494
} else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.esql.session;
88

99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.NoShardAvailableActionException;
1011
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1112
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
1213
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
@@ -152,6 +153,13 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
152153
fieldCapsResponse.getFailures()
153154
);
154155

156+
Set<NoShardAvailableActionException> unavailableShards = new HashSet<>();
157+
for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) {
158+
if (failure.getException() instanceof NoShardAvailableActionException e) {
159+
unavailableShards.add(e);
160+
}
161+
}
162+
155163
Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
156164
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
157165
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
@@ -163,7 +171,7 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
163171
}
164172
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
165173
var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields);
166-
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableRemotes);
174+
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes);
167175
}
168176

169177
private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {

0 commit comments

Comments
 (0)