Skip to content

Commit cbaa2d0

Browse files
committed
Don't fail search if bottom doc can't be formatted (elastic#133188)
* Don't fail search if bottom doc can't be formatted (cherry picked from commit 84443ee) # Conflicts: # server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java
1 parent 26e4333 commit cbaa2d0

File tree

3 files changed

+208
-1
lines changed

3 files changed

+208
-1
lines changed

docs/changelog/133188.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133188
2+
summary: Don't fail search if bottom doc can't be formatted
3+
area: Search
4+
type: bug
5+
issues:
6+
- 125321

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,17 @@ && getRequest().scroll() == null
121121
}
122122
}
123123
}
124-
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
124+
try {
125+
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
126+
} catch (Exception e) {
127+
// In case the collecting fails, e.g. because of a formatting error, we log the error and continue
128+
logger.debug(
129+
"failed to consume top docs for shard [{}] with sort fields [{}]: {}",
130+
result.getShardIndex(),
131+
Arrays.toString(topDocs.fields),
132+
e
133+
);
134+
}
125135
}
126136
super.onShardResult(result, shardIt);
127137
}

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.search.TopFieldDocs;
1515
import org.apache.lucene.search.TotalHits;
1616
import org.elasticsearch.Version;
17+
import org.apache.lucene.util.BytesRef;
1718
import org.elasticsearch.action.ActionListener;
1819
import org.elasticsearch.action.OriginalIndices;
1920
import org.elasticsearch.cluster.ClusterName;
@@ -27,6 +28,7 @@
2728
import org.elasticsearch.cluster.routing.UnassignedInfo;
2829
import org.elasticsearch.common.breaker.CircuitBreaker;
2930
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
31+
import org.elasticsearch.common.io.stream.StreamOutput;
3032
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
3133
import org.elasticsearch.common.util.concurrent.EsExecutors;
3234
import org.elasticsearch.core.TimeValue;
@@ -52,13 +54,15 @@
5254
import org.elasticsearch.transport.Transport;
5355

5456
import java.util.ArrayList;
57+
import java.io.IOException;
5558
import java.util.Collections;
5659
import java.util.List;
5760
import java.util.Map;
5861
import java.util.concurrent.ConcurrentHashMap;
5962
import java.util.concurrent.CountDownLatch;
6063
import java.util.concurrent.atomic.AtomicBoolean;
6164
import java.util.concurrent.atomic.AtomicInteger;
65+
import java.util.function.LongSupplier;
6266

6367
import static java.util.Collections.singletonList;
6468
import static org.elasticsearch.test.VersionUtils.allVersions;
@@ -740,4 +744,191 @@ public void run() {
740744
assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
741745
}
742746
}
747+
748+
static class BadRawDocValueFormat implements DocValueFormat {
749+
@Override
750+
public String getWriteableName() {
751+
return "bad";
752+
}
753+
754+
@Override
755+
public void writeTo(StreamOutput out) throws IOException {}
756+
757+
@Override
758+
public Object format(long value) {
759+
if (value == Long.MAX_VALUE) {
760+
// Simulate a bad value that cannot be formatted correctly
761+
throw new IllegalArgumentException("Cannot format Long.MAX_VALUE");
762+
}
763+
return RawDocValueFormat.INSTANCE.format(value);
764+
}
765+
766+
@Override
767+
public Object format(double value) {
768+
return RawDocValueFormat.INSTANCE.format(value);
769+
}
770+
771+
@Override
772+
public Object format(BytesRef value) {
773+
return RawDocValueFormat.INSTANCE.format(value);
774+
}
775+
776+
@Override
777+
public long parseLong(String value, boolean roundUp, LongSupplier now) {
778+
return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now);
779+
}
780+
781+
@Override
782+
public double parseDouble(String value, boolean roundUp, LongSupplier now) {
783+
return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now);
784+
}
785+
786+
@Override
787+
public BytesRef parseBytesRef(Object value) {
788+
return RawDocValueFormat.INSTANCE.parseBytesRef(value);
789+
}
790+
791+
@Override
792+
public Object formatSortValue(Object value) {
793+
return RawDocValueFormat.INSTANCE.formatSortValue(value);
794+
}
795+
}
796+
797+
// Test what happens if doc formatter fails to format the bottom sort values
798+
public void testBadFormatting() throws Exception {
799+
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
800+
0,
801+
System.nanoTime(),
802+
System::nanoTime
803+
);
804+
805+
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
806+
DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1");
807+
DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2");
808+
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
809+
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
810+
811+
int numShards = randomIntBetween(10, 20);
812+
int numConcurrent = randomIntBetween(1, 4);
813+
AtomicInteger numWithTopDocs = new AtomicInteger();
814+
AtomicInteger successfulOps = new AtomicInteger();
815+
AtomicBoolean canReturnNullResponse = new AtomicBoolean(false);
816+
var transportService = mock(TransportService.class);
817+
when(transportService.getLocalNode()).thenReturn(primaryNode);
818+
SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) {
819+
@Override
820+
public void sendExecuteQuery(
821+
Transport.Connection connection,
822+
ShardSearchRequest request,
823+
SearchTask task,
824+
ActionListener<SearchPhaseResult> listener
825+
) {
826+
int shardId = request.shardId().id();
827+
if (request.canReturnNullResponseIfMatchNoDocs()) {
828+
canReturnNullResponse.set(true);
829+
}
830+
if (request.getBottomSortValues() != null) {
831+
numWithTopDocs.incrementAndGet();
832+
}
833+
QuerySearchResult queryResult = new QuerySearchResult(
834+
new ShardSearchContextId("N/A", 123),
835+
new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null),
836+
null
837+
);
838+
try {
839+
SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG);
840+
queryResult.topDocs(
841+
new TopDocsAndMaxScore(
842+
new TopFieldDocs(
843+
new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
844+
new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) },
845+
new SortField[] { sortField }
846+
),
847+
Float.NaN
848+
),
849+
new DocValueFormat[] { new BadRawDocValueFormat() }
850+
);
851+
queryResult.from(0);
852+
queryResult.size(1);
853+
successfulOps.incrementAndGet();
854+
queryResult.incRef();
855+
new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start();
856+
} finally {
857+
queryResult.decRef();
858+
}
859+
}
860+
};
861+
CountDownLatch latch = new CountDownLatch(1);
862+
List<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter(
863+
"idx",
864+
new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS),
865+
numShards,
866+
randomBoolean(),
867+
primaryNode,
868+
replicaNode
869+
);
870+
final SearchRequest searchRequest = new SearchRequest();
871+
searchRequest.setMaxConcurrentShardRequests(numConcurrent);
872+
searchRequest.setBatchedReduceSize(2);
873+
searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp")));
874+
searchRequest.source().trackTotalHitsUpTo(2);
875+
searchRequest.allowPartialSearchResults(false);
876+
SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
877+
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
878+
try (
879+
QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
880+
searchRequest,
881+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
882+
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
883+
controller,
884+
task::isCancelled,
885+
task.getProgressListener(),
886+
shardsIter.size(),
887+
exc -> {}
888+
)
889+
) {
890+
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
891+
logger,
892+
null,
893+
searchTransportService,
894+
(clusterAlias, node) -> lookup.get(node),
895+
Collections.singletonMap("_na_", AliasFilter.EMPTY),
896+
Collections.emptyMap(),
897+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
898+
resultConsumer,
899+
searchRequest,
900+
null,
901+
shardsIter,
902+
timeProvider,
903+
new ClusterState.Builder(new ClusterName("test")).build(),
904+
task,
905+
SearchResponse.Clusters.EMPTY,
906+
null,
907+
false
908+
) {
909+
@Override
910+
protected SearchPhase getNextPhase() {
911+
return new SearchPhase("test") {
912+
@Override
913+
protected void run() {
914+
latch.countDown();
915+
}
916+
};
917+
}
918+
919+
@Override
920+
void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) {
921+
latch.countDown();
922+
fail(e, "Unexpected shard failure");
923+
}
924+
};
925+
action.start();
926+
latch.await();
927+
assertThat(successfulOps.get(), equalTo(numShards));
928+
SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
929+
assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
930+
assertThat(phase.totalHits().value(), equalTo(2L));
931+
}
932+
}
933+
743934
}

0 commit comments

Comments
 (0)