diff --git a/docs/changelog/133188.yaml b/docs/changelog/133188.yaml new file mode 100644 index 0000000000000..42d474fcf97c1 --- /dev/null +++ b/docs/changelog/133188.yaml @@ -0,0 +1,6 @@ +pr: 133188 +summary: Don't fail search if bottom doc can't be formatted +area: Search +type: bug +issues: + - 125321 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index b2d5693762dca..fffbd26adce50 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -177,7 +177,17 @@ && getRequest().scroll() == null } } } - bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats()); + try { + bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats()); + } catch (Exception e) { + // In case the collecting fails, e.g. because of a formatting error, we log the error and continue + logger.debug( + "failed to consume top docs for shard [{}] with sort fields [{}]: {}", + result.getShardIndex(), + Arrays.toString(topDocs.fields), + e + ); + } } super.onShardResult(result); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index d7348833c757a..2a9d12b27507d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.ClusterName; @@ -21,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; @@ -41,6 +43,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -48,6 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -243,4 +247,191 @@ protected void run() { assertThat(((FieldDoc) phase.sortedTopDocs().scoreDocs()[0]).fields[0], equalTo(0)); } } + + static class BadRawDocValueFormat implements DocValueFormat { + @Override + public String getWriteableName() { + return "bad"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public Object format(long value) { + if (value == Long.MAX_VALUE) { + // Simulate a bad value that cannot be formatted correctly + throw new IllegalArgumentException("Cannot format Long.MAX_VALUE"); + } + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public Object format(double value) { + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public Object format(BytesRef value) { + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public long parseLong(String value, boolean roundUp, LongSupplier now) { + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); + } + + @Override + public double parseDouble(String value, boolean roundUp, LongSupplier now) { + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); + } + + @Override + public BytesRef parseBytesRef(Object value) { + return RawDocValueFormat.INSTANCE.parseBytesRef(value); + } + + @Override + public Object formatSortValue(Object value) { + return RawDocValueFormat.INSTANCE.formatSortValue(value); + } + } + + // Test what happens if doc formatter fails to format the bottom sort values + public void testBadFormatting() throws Exception { + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1"); + DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2"); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + int numShards = randomIntBetween(10, 20); + int numConcurrent = randomIntBetween(1, 4); + AtomicInteger numWithTopDocs = new AtomicInteger(); + AtomicInteger successfulOps = new AtomicInteger(); + AtomicBoolean canReturnNullResponse = new AtomicBoolean(false); + var transportService = mock(TransportService.class); + when(transportService.getLocalNode()).thenReturn(primaryNode); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) { + @Override + public void sendExecuteQuery( + Transport.Connection connection, + ShardSearchRequest request, + SearchTask task, + ActionListener listener + ) { + int shardId = request.shardId().id(); + if (request.canReturnNullResponseIfMatchNoDocs()) { + canReturnNullResponse.set(true); + } + if (request.getBottomSortValues() != null) { + numWithTopDocs.incrementAndGet(); + } + QuerySearchResult queryResult = new QuerySearchResult( + new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), + null + ); + try { + SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG); + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) }, + new SortField[] { sortField } + ), + Float.NaN + ), + new DocValueFormat[] { new BadRawDocValueFormat() } + ); + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + queryResult.incRef(); + new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); + } finally { + queryResult.decRef(); + } + } + }; + CountDownLatch latch = new CountDownLatch(1); + List shardsIter = SearchAsyncActionTests.getShardsIter( + "idx", + new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS), + numShards, + randomBoolean(), + primaryNode, + replicaNode + ); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); + searchRequest.source().trackTotalHitsUpTo(2); + searchRequest.allowPartialSearchResults(false); + SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + try ( + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task::isCancelled, + task.getProgressListener(), + shardsIter.size(), + exc -> {} + ) + ) { + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( + logger, + null, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", AliasFilter.EMPTY), + Collections.emptyMap(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + resultConsumer, + searchRequest, + null, + shardsIter, + timeProvider, + new ClusterState.Builder(new ClusterName("test")).build(), + task, + SearchResponse.Clusters.EMPTY, + null, + false + ) { + @Override + protected SearchPhase getNextPhase() { + return new SearchPhase("test") { + @Override + protected void run() { + latch.countDown(); + } + }; + } + + @Override + void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) { + latch.countDown(); + fail(e, "Unexpected shard failure"); + } + }; + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(numShards)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); + assertThat(phase.totalHits().value(), equalTo(2L)); + } + } + }