Skip to content

Commit 98035bd

Browse files
authored
Merge branch 'main' into manage_ad_state_indices
2 parents 8ba07e0 + a01ab1e commit 98035bd

File tree

10 files changed

+178
-20
lines changed

10 files changed

+178
-20
lines changed

docs/changelog/136889.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 136889
2+
summary: Remove early phase failure in batched
3+
area: Search
4+
type: bug
5+
issues:
6+
- 134151

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ static IndexMode resolveMode(
224224
indexMode = Enum.valueOf(IndexMode.class, rawMode.toUpperCase(Locale.ROOT));
225225
}
226226
}
227+
if (indexMode == null) {
228+
String rawMode = settings.get(IndexSettings.MODE.getKey());
229+
if (rawMode != null) {
230+
indexMode = Enum.valueOf(IndexMode.class, rawMode.toUpperCase(Locale.ROOT));
231+
}
232+
}
227233
return indexMode;
228234
}
229235

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,6 @@ tests:
258258
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
259259
method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item}
260260
issue: https://github.com/elastic/elasticsearch/issues/122414
261-
- class: org.elasticsearch.search.SearchWithRejectionsIT
262-
method: testOpenContextsAfterRejections
263-
issue: https://github.com/elastic/elasticsearch/issues/130821
264261
- class: org.elasticsearch.packaging.test.DockerTests
265262
method: test090SecurityCliPackaging
266263
issue: https://github.com/elastic/elasticsearch/issues/131107

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 143 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<S
8585
private static final Logger logger = LogManager.getLogger(SearchQueryThenFetchAsyncAction.class);
8686

8787
private static final TransportVersion BATCHED_QUERY_PHASE_VERSION = TransportVersion.fromName("batched_query_phase_version");
88+
private static final TransportVersion BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE = TransportVersion.fromName(
89+
"batched_response_might_include_reduction_failure"
90+
);
8891

8992
private final SearchProgressListener progressListener;
9093

@@ -226,20 +229,32 @@ public static final class NodeQueryResponse extends TransportResponse {
226229
private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted());
227230

228231
private final Object[] results;
232+
private final Exception reductionFailure;
229233
private final SearchPhaseController.TopDocsStats topDocsStats;
230234
private final QueryPhaseResultConsumer.MergeResult mergeResult;
231235

232236
public NodeQueryResponse(StreamInput in) throws IOException {
233237
this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new);
234-
this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
235-
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
238+
if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) {
239+
this.reductionFailure = in.readException();
240+
this.mergeResult = null;
241+
this.topDocsStats = null;
242+
} else {
243+
this.reductionFailure = null;
244+
this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in);
245+
this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in);
246+
}
236247
}
237248

238249
// public for tests
239250
public Object[] getResults() {
240251
return results;
241252
}
242253

254+
Exception getReductionFailure() {
255+
return reductionFailure;
256+
}
257+
243258
@Override
244259
public void writeTo(StreamOutput out) throws IOException {
245260
out.writeVInt(results.length);
@@ -250,7 +265,17 @@ public void writeTo(StreamOutput out) throws IOException {
250265
writePerShardResult(out, (QuerySearchResult) result);
251266
}
252267
}
253-
writeMergeResult(out, mergeResult, topDocsStats);
268+
if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) {
269+
boolean hasReductionFailure = reductionFailure != null;
270+
out.writeBoolean(hasReductionFailure);
271+
if (hasReductionFailure) {
272+
out.writeException(reductionFailure);
273+
} else {
274+
writeMergeResult(out, mergeResult, topDocsStats);
275+
}
276+
} else {
277+
writeMergeResult(out, mergeResult, topDocsStats);
278+
}
254279
}
255280

256281
@Override
@@ -515,7 +540,12 @@ public Executor executor() {
515540
@Override
516541
public void handleResponse(NodeQueryResponse response) {
517542
if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) {
518-
queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult);
543+
Exception reductionFailure = response.getReductionFailure();
544+
if (reductionFailure != null) {
545+
queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure);
546+
} else {
547+
queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult);
548+
}
519549
}
520550
for (int i = 0; i < response.results.length; i++) {
521551
var s = request.shards.get(i);
@@ -537,6 +567,21 @@ public void handleResponse(NodeQueryResponse response) {
537567

538568
@Override
539569
public void handleException(TransportException e) {
570+
if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) {
571+
bwcHandleException(e);
572+
return;
573+
}
574+
Exception cause = (Exception) ExceptionsHelper.unwrapCause(e);
575+
logger.debug("handling node search exception coming from [" + nodeId + "]", cause);
576+
onNodeQueryFailure(e, request, routing);
577+
}
578+
579+
/**
580+
* This code is strictly for _snapshot_ backwards compatibility. The feature flag
581+
* {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version
582+
* {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced.
583+
*/
584+
private void bwcHandleException(TransportException e) {
540585
Exception cause = (Exception) ExceptionsHelper.unwrapCause(e);
541586
logger.debug("handling node search exception coming from [" + nodeId + "]", cause);
542587
if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) {
@@ -817,13 +862,101 @@ void onShardDone() {
817862
if (countDown.countDown() == false) {
818863
return;
819864
}
865+
if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) {
866+
bwcRespond();
867+
return;
868+
}
869+
var channelListener = new ChannelActionListener<>(channel);
870+
RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream();
871+
out.setTransportVersion(channel.getVersion());
872+
try (queryPhaseResultConsumer) {
873+
Exception reductionFailure = queryPhaseResultConsumer.failure.get();
874+
if (reductionFailure == null) {
875+
writeSuccessfulResponse(out);
876+
} else {
877+
writeReductionFailureResponse(out, reductionFailure);
878+
}
879+
} catch (IOException e) {
880+
releaseAllResultsContexts();
881+
channelListener.onFailure(e);
882+
return;
883+
}
884+
ActionListener.respondAndRelease(
885+
channelListener,
886+
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
887+
);
888+
}
889+
890+
// Writes the "successful" response (see NodeQueryResponse for the corresponding read logic)
891+
private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException {
892+
final QueryPhaseResultConsumer.MergeResult mergeResult;
893+
try {
894+
mergeResult = Objects.requireNonNullElse(
895+
queryPhaseResultConsumer.consumePartialMergeResultDataNode(),
896+
EMPTY_PARTIAL_MERGE_RESULT
897+
);
898+
} catch (Exception e) {
899+
writeReductionFailureResponse(out, e);
900+
return;
901+
}
902+
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
903+
// also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other
904+
// indices without a roundtrip to the coordinating node
905+
final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size());
906+
if (mergeResult.reducedTopDocs() != null) {
907+
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
908+
final int localIndex = scoreDoc.shardIndex;
909+
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
910+
relevantShardIndices.set(localIndex);
911+
}
912+
}
913+
final int resultCount = queryPhaseResultConsumer.getNumShards();
914+
out.writeVInt(resultCount);
915+
for (int i = 0; i < resultCount; i++) {
916+
var result = queryPhaseResultConsumer.results.get(i);
917+
if (result == null) {
918+
NodeQueryResponse.writePerShardException(out, failures.remove(i));
919+
} else {
920+
// free context id and remove it from the result right away in case we don't need it anymore
921+
maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry);
922+
NodeQueryResponse.writePerShardResult(out, result);
923+
}
924+
}
925+
out.writeBoolean(false); // does not have a reduction failure
926+
NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats);
927+
}
928+
929+
// Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic)
930+
private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException {
931+
final int resultCount = queryPhaseResultConsumer.getNumShards();
932+
out.writeVInt(resultCount);
933+
for (int i = 0; i < resultCount; i++) {
934+
var result = queryPhaseResultConsumer.results.get(i);
935+
if (result == null) {
936+
NodeQueryResponse.writePerShardException(out, failures.remove(i));
937+
} else {
938+
NodeQueryResponse.writePerShardResult(out, result);
939+
}
940+
}
941+
out.writeBoolean(true); // does have a reduction failure
942+
out.writeException(reductionFailure);
943+
releaseAllResultsContexts();
944+
}
945+
946+
/**
947+
* This code is strictly for _snapshot_ backwards compatibility. The feature flag
948+
* {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version
949+
* {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced.
950+
*/
951+
void bwcRespond() {
820952
RecyclerBytesStreamOutput out = null;
821953
boolean success = false;
822954
var channelListener = new ChannelActionListener<>(channel);
823955
try (queryPhaseResultConsumer) {
824956
var failure = queryPhaseResultConsumer.failure.get();
825957
if (failure != null) {
826-
handleMergeFailure(failure, channelListener, namedWriteableRegistry);
958+
releaseAllResultsContexts();
959+
channelListener.onFailure(failure);
827960
return;
828961
}
829962
final QueryPhaseResultConsumer.MergeResult mergeResult;
@@ -833,7 +966,8 @@ void onShardDone() {
833966
EMPTY_PARTIAL_MERGE_RESULT
834967
);
835968
} catch (Exception e) {
836-
handleMergeFailure(e, channelListener, namedWriteableRegistry);
969+
releaseAllResultsContexts();
970+
channelListener.onFailure(e);
837971
return;
838972
}
839973
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -865,7 +999,8 @@ void onShardDone() {
865999
NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats);
8661000
success = true;
8671001
} catch (IOException e) {
868-
handleMergeFailure(e, channelListener, namedWriteableRegistry);
1002+
releaseAllResultsContexts();
1003+
channelListener.onFailure(e);
8691004
return;
8701005
}
8711006
} finally {
@@ -897,11 +1032,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi
8971032
}
8981033
}
8991034

900-
private void handleMergeFailure(
901-
Exception e,
902-
ChannelActionListener<TransportResponse> channelListener,
903-
NamedWriteableRegistry namedWriteableRegistry
904-
) {
1035+
private void releaseAllResultsContexts() {
9051036
queryPhaseResultConsumer.getSuccessfulResults()
9061037
.forEach(
9071038
searchPhaseResult -> releaseLocalContext(
@@ -911,7 +1042,6 @@ private void handleMergeFailure(
9111042
namedWriteableRegistry
9121043
)
9131044
);
914-
channelListener.onFailure(e);
9151045
}
9161046

9171047
void consumeResult(QuerySearchResult queryResult) {

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,6 @@ public void writeToNoId(StreamOutput out) throws IOException {
504504
out.writeBoolean(true);
505505
writeTopDocs(out, topDocsAndMaxScore);
506506
} else {
507-
assert isPartiallyReduced();
508507
out.writeBoolean(false);
509508
}
510509
} else {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9213000,9185007,9112012
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
initial_9.1.7,9112011
1+
batched_response_might_include_reduction_failure,9112012
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ilm_searchable_snapshot_opt_out_clone,9185006
1+
batched_response_might_include_reduction_failure,9185007
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
resolved_fields_caps,9212000
1+
batched_response_might_include_reduction_failure,9213000
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'default index mode':
2+
- requires:
3+
test_runner_features: [ capabilities ]
4+
capabilities:
5+
- method: PUT
6+
path: /{index}
7+
capabilities: [ logsdb_index_mode ]
8+
reason: "Support for 'logsdb' index mode capability required"
9+
10+
- do:
11+
indices.create_data_stream:
12+
name: logs-test-1
13+
14+
- do:
15+
indices.get_data_stream:
16+
name: logs-test-1
17+
18+
- match: { data_streams.0.indices.0.index_mode: logsdb}
19+
- match: { data_streams.0.index_mode: logsdb}

0 commit comments

Comments
 (0)