diff --git a/docs/changelog/133188.yaml b/docs/changelog/133188.yaml new file mode 100644 index 0000000000000..42d474fcf97c1 --- /dev/null +++ b/docs/changelog/133188.yaml @@ -0,0 +1,6 @@ +pr: 133188 +summary: Don't fail search if bottom doc can't be formatted +area: Search +type: bug +issues: + - 125321 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 8827ed239829c..9c7722ec79528 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -174,7 +174,17 @@ && getRequest().scroll() == null } } } - bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats()); + try { + bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats()); + } catch (Exception e) { + // In case the collecting fails, e.g. because of a formatting error, we log the error and continue + logger.debug( + "failed to consume top docs for shard [{}] with sort fields [{}]: {}", + result.getShardIndex(), + Arrays.toString(topDocs.fields), + e + ); + } } super.onShardResult(result); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 33fba11ffa330..d78f2ffd7078f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; @@ -26,6 +27,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; @@ -51,6 +53,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -59,6 +62,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; import static java.util.Collections.singletonList; import static org.elasticsearch.test.VersionUtils.allVersions; @@ -745,4 +749,191 @@ public void run() { assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); } } + + static class BadRawDocValueFormat implements DocValueFormat { + @Override + public String getWriteableName() { + return "bad"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public Object format(long value) { + if (value == Long.MAX_VALUE) { + // Simulate a bad value that cannot be formatted correctly + throw new IllegalArgumentException("Cannot format Long.MAX_VALUE"); + } + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public Object format(double value) { + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public Object format(BytesRef value) { + return RawDocValueFormat.INSTANCE.format(value); + } + + @Override + public long parseLong(String value, boolean roundUp, LongSupplier now) { + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); + } + + @Override + public double parseDouble(String value, boolean roundUp, LongSupplier now) { + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); + } + + @Override + public BytesRef parseBytesRef(Object value) { + return RawDocValueFormat.INSTANCE.parseBytesRef(value); + } + + @Override + public Object formatSortValue(Object value) { + return RawDocValueFormat.INSTANCE.formatSortValue(value); + } + } + + // Test what happens if doc formatter fails to format the bottom sort values + public void testBadFormatting() throws Exception { + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + + Map lookup = new ConcurrentHashMap<>(); + DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1"); + DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2"); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + int numShards = randomIntBetween(10, 20); + int numConcurrent = randomIntBetween(1, 4); + AtomicInteger numWithTopDocs = new AtomicInteger(); + AtomicInteger successfulOps = new AtomicInteger(); + AtomicBoolean canReturnNullResponse = new AtomicBoolean(false); + var transportService = mock(TransportService.class); + when(transportService.getLocalNode()).thenReturn(primaryNode); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) { + @Override + public void sendExecuteQuery( + Transport.Connection connection, + ShardSearchRequest request, + SearchTask task, + ActionListener listener + ) { + int shardId = request.shardId().id(); + if (request.canReturnNullResponseIfMatchNoDocs()) { + canReturnNullResponse.set(true); + } + if (request.getBottomSortValues() != null) { + numWithTopDocs.incrementAndGet(); + } + QuerySearchResult queryResult = new QuerySearchResult( + new ShardSearchContextId("N/A", 123), + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), + null + ); + try { + SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG); + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopFieldDocs( + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), + new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) }, + new SortField[] { sortField } + ), + Float.NaN + ), + new DocValueFormat[] { new BadRawDocValueFormat() } + ); + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + queryResult.incRef(); + new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); + } finally { + queryResult.decRef(); + } + } + }; + CountDownLatch latch = new CountDownLatch(1); + List shardsIter = SearchAsyncActionTests.getShardsIter( + "idx", + new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS), + numShards, + randomBoolean(), + primaryNode, + replicaNode + ); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.setMaxConcurrentShardRequests(numConcurrent); + searchRequest.setBatchedReduceSize(2); + searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); + searchRequest.source().trackTotalHitsUpTo(2); + searchRequest.allowPartialSearchResults(false); + SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + try ( + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task::isCancelled, + task.getProgressListener(), + shardsIter.size(), + exc -> {} + ) + ) { + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( + logger, + null, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", AliasFilter.EMPTY), + Collections.emptyMap(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + resultConsumer, + searchRequest, + null, + shardsIter, + timeProvider, + new ClusterState.Builder(new ClusterName("test")).build(), + task, + SearchResponse.Clusters.EMPTY, + null, + false + ) { + @Override + protected SearchPhase getNextPhase() { + return new SearchPhase("test") { + @Override + protected void run() { + latch.countDown(); + } + }; + } + + @Override + void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) { + latch.countDown(); + fail(e, "Unexpected shard failure"); + } + }; + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(numShards)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); + assertThat(phase.totalHits().value, equalTo(2L)); + } + } + }