Skip to content

Commit a6b9b65

Browse files
authored
[8.19] Don't fail search if bottom doc can't be formatted (#133188) (#133243)
* Don't fail search if bottom doc can't be formatted (#133188) * Don't fail search if bottom doc can't be formatted (cherry picked from commit 84443ee) # Conflicts: # server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java * Fix test
1 parent 0617f2a commit a6b9b65

File tree

3 files changed

+208
-1
lines changed

3 files changed

+208
-1
lines changed

docs/changelog/133188.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133188
2+
summary: Don't fail search if bottom doc can't be formatted
3+
area: Search
4+
type: bug
5+
issues:
6+
- 125321

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,17 @@ && getRequest().scroll() == null
174174
}
175175
}
176176
}
177-
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
177+
try {
178+
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
179+
} catch (Exception e) {
180+
// In case the collecting fails, e.g. because of a formatting error, we log the error and continue
181+
logger.debug(
182+
"failed to consume top docs for shard [{}] with sort fields [{}]: {}",
183+
result.getShardIndex(),
184+
Arrays.toString(topDocs.fields),
185+
e
186+
);
187+
}
178188
}
179189
super.onShardResult(result);
180190
}

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.search.SortField;
1414
import org.apache.lucene.search.TopFieldDocs;
1515
import org.apache.lucene.search.TotalHits;
16+
import org.apache.lucene.util.BytesRef;
1617
import org.elasticsearch.Version;
1718
import org.elasticsearch.action.ActionListener;
1819
import org.elasticsearch.action.OriginalIndices;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.cluster.routing.UnassignedInfo;
2728
import org.elasticsearch.common.breaker.CircuitBreaker;
2829
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
30+
import org.elasticsearch.common.io.stream.StreamOutput;
2931
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
3032
import org.elasticsearch.common.util.concurrent.EsExecutors;
3133
import org.elasticsearch.core.TimeValue;
@@ -51,6 +53,7 @@
5153
import org.elasticsearch.transport.Transport;
5254
import org.elasticsearch.transport.TransportService;
5355

56+
import java.io.IOException;
5457
import java.util.ArrayList;
5558
import java.util.Collections;
5659
import java.util.List;
@@ -59,6 +62,7 @@
5962
import java.util.concurrent.CountDownLatch;
6063
import java.util.concurrent.atomic.AtomicBoolean;
6164
import java.util.concurrent.atomic.AtomicInteger;
65+
import java.util.function.LongSupplier;
6266

6367
import static java.util.Collections.singletonList;
6468
import static org.elasticsearch.test.VersionUtils.allVersions;
@@ -745,4 +749,191 @@ public void run() {
745749
assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
746750
}
747751
}
752+
753+
static class BadRawDocValueFormat implements DocValueFormat {
754+
@Override
755+
public String getWriteableName() {
756+
return "bad";
757+
}
758+
759+
@Override
760+
public void writeTo(StreamOutput out) throws IOException {}
761+
762+
@Override
763+
public Object format(long value) {
764+
if (value == Long.MAX_VALUE) {
765+
// Simulate a bad value that cannot be formatted correctly
766+
throw new IllegalArgumentException("Cannot format Long.MAX_VALUE");
767+
}
768+
return RawDocValueFormat.INSTANCE.format(value);
769+
}
770+
771+
@Override
772+
public Object format(double value) {
773+
return RawDocValueFormat.INSTANCE.format(value);
774+
}
775+
776+
@Override
777+
public Object format(BytesRef value) {
778+
return RawDocValueFormat.INSTANCE.format(value);
779+
}
780+
781+
@Override
782+
public long parseLong(String value, boolean roundUp, LongSupplier now) {
783+
return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now);
784+
}
785+
786+
@Override
787+
public double parseDouble(String value, boolean roundUp, LongSupplier now) {
788+
return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now);
789+
}
790+
791+
@Override
792+
public BytesRef parseBytesRef(Object value) {
793+
return RawDocValueFormat.INSTANCE.parseBytesRef(value);
794+
}
795+
796+
@Override
797+
public Object formatSortValue(Object value) {
798+
return RawDocValueFormat.INSTANCE.formatSortValue(value);
799+
}
800+
}
801+
802+
// Test what happens if doc formatter fails to format the bottom sort values
803+
public void testBadFormatting() throws Exception {
804+
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
805+
0,
806+
System.nanoTime(),
807+
System::nanoTime
808+
);
809+
810+
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
811+
DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1");
812+
DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2");
813+
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
814+
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
815+
816+
int numShards = randomIntBetween(10, 20);
817+
int numConcurrent = randomIntBetween(1, 4);
818+
AtomicInteger numWithTopDocs = new AtomicInteger();
819+
AtomicInteger successfulOps = new AtomicInteger();
820+
AtomicBoolean canReturnNullResponse = new AtomicBoolean(false);
821+
var transportService = mock(TransportService.class);
822+
when(transportService.getLocalNode()).thenReturn(primaryNode);
823+
SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) {
824+
@Override
825+
public void sendExecuteQuery(
826+
Transport.Connection connection,
827+
ShardSearchRequest request,
828+
SearchTask task,
829+
ActionListener<SearchPhaseResult> listener
830+
) {
831+
int shardId = request.shardId().id();
832+
if (request.canReturnNullResponseIfMatchNoDocs()) {
833+
canReturnNullResponse.set(true);
834+
}
835+
if (request.getBottomSortValues() != null) {
836+
numWithTopDocs.incrementAndGet();
837+
}
838+
QuerySearchResult queryResult = new QuerySearchResult(
839+
new ShardSearchContextId("N/A", 123),
840+
new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null),
841+
null
842+
);
843+
try {
844+
SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG);
845+
queryResult.topDocs(
846+
new TopDocsAndMaxScore(
847+
new TopFieldDocs(
848+
new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
849+
new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) },
850+
new SortField[] { sortField }
851+
),
852+
Float.NaN
853+
),
854+
new DocValueFormat[] { new BadRawDocValueFormat() }
855+
);
856+
queryResult.from(0);
857+
queryResult.size(1);
858+
successfulOps.incrementAndGet();
859+
queryResult.incRef();
860+
new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start();
861+
} finally {
862+
queryResult.decRef();
863+
}
864+
}
865+
};
866+
CountDownLatch latch = new CountDownLatch(1);
867+
List<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter(
868+
"idx",
869+
new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS),
870+
numShards,
871+
randomBoolean(),
872+
primaryNode,
873+
replicaNode
874+
);
875+
final SearchRequest searchRequest = new SearchRequest();
876+
searchRequest.setMaxConcurrentShardRequests(numConcurrent);
877+
searchRequest.setBatchedReduceSize(2);
878+
searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp")));
879+
searchRequest.source().trackTotalHitsUpTo(2);
880+
searchRequest.allowPartialSearchResults(false);
881+
SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
882+
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
883+
try (
884+
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
885+
searchRequest,
886+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
887+
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
888+
controller,
889+
task::isCancelled,
890+
task.getProgressListener(),
891+
shardsIter.size(),
892+
exc -> {}
893+
)
894+
) {
895+
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
896+
logger,
897+
null,
898+
searchTransportService,
899+
(clusterAlias, node) -> lookup.get(node),
900+
Collections.singletonMap("_na_", AliasFilter.EMPTY),
901+
Collections.emptyMap(),
902+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
903+
resultConsumer,
904+
searchRequest,
905+
null,
906+
shardsIter,
907+
timeProvider,
908+
new ClusterState.Builder(new ClusterName("test")).build(),
909+
task,
910+
SearchResponse.Clusters.EMPTY,
911+
null,
912+
false
913+
) {
914+
@Override
915+
protected SearchPhase getNextPhase() {
916+
return new SearchPhase("test") {
917+
@Override
918+
protected void run() {
919+
latch.countDown();
920+
}
921+
};
922+
}
923+
924+
@Override
925+
void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) {
926+
latch.countDown();
927+
fail(e, "Unexpected shard failure");
928+
}
929+
};
930+
action.start();
931+
latch.await();
932+
assertThat(successfulOps.get(), equalTo(numShards));
933+
SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
934+
assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
935+
assertThat(phase.totalHits().value, equalTo(2L));
936+
}
937+
}
938+
748939
}

0 commit comments

Comments
 (0)