|
14 | 14 | import org.apache.lucene.search.TopFieldDocs; |
15 | 15 | import org.apache.lucene.search.TotalHits; |
16 | 16 | import org.elasticsearch.Version; |
| 17 | +import org.apache.lucene.util.BytesRef; |
17 | 18 | import org.elasticsearch.action.ActionListener; |
18 | 19 | import org.elasticsearch.action.OriginalIndices; |
19 | 20 | import org.elasticsearch.cluster.ClusterName; |
|
26 | 27 | import org.elasticsearch.cluster.routing.UnassignedInfo; |
27 | 28 | import org.elasticsearch.common.breaker.CircuitBreaker; |
28 | 29 | import org.elasticsearch.common.breaker.NoopCircuitBreaker; |
| 30 | +import org.elasticsearch.common.io.stream.StreamOutput; |
29 | 31 | import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; |
30 | 32 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
31 | 33 | import org.elasticsearch.core.TimeValue; |
|
51 | 53 | import org.elasticsearch.transport.Transport; |
52 | 54 | import org.elasticsearch.transport.TransportService; |
53 | 55 |
|
54 | | -import java.util.ArrayList; |
| 56 | +import java.io.IOException; |
55 | 57 | import java.util.Collections; |
56 | 58 | import java.util.List; |
57 | 59 | import java.util.Map; |
58 | 60 | import java.util.concurrent.ConcurrentHashMap; |
59 | 61 | import java.util.concurrent.CountDownLatch; |
60 | 62 | import java.util.concurrent.atomic.AtomicBoolean; |
61 | 63 | import java.util.concurrent.atomic.AtomicInteger; |
| 64 | +import java.util.function.LongSupplier; |
62 | 65 |
|
63 | 66 | import static java.util.Collections.singletonList; |
64 | 67 | import static org.elasticsearch.test.VersionUtils.allVersions; |
@@ -745,4 +748,191 @@ public void run() { |
745 | 748 | assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); |
746 | 749 | } |
747 | 750 | } |
| 751 | + |
| 752 | + static class BadRawDocValueFormat implements DocValueFormat { |
| 753 | + @Override |
| 754 | + public String getWriteableName() { |
| 755 | + return "bad"; |
| 756 | + } |
| 757 | + |
| 758 | + @Override |
| 759 | + public void writeTo(StreamOutput out) throws IOException {} |
| 760 | + |
| 761 | + @Override |
| 762 | + public Object format(long value) { |
| 763 | + if (value == Long.MAX_VALUE) { |
| 764 | + // Simulate a bad value that cannot be formatted correctly |
| 765 | + throw new IllegalArgumentException("Cannot format Long.MAX_VALUE"); |
| 766 | + } |
| 767 | + return RawDocValueFormat.INSTANCE.format(value); |
| 768 | + } |
| 769 | + |
| 770 | + @Override |
| 771 | + public Object format(double value) { |
| 772 | + return RawDocValueFormat.INSTANCE.format(value); |
| 773 | + } |
| 774 | + |
| 775 | + @Override |
| 776 | + public Object format(BytesRef value) { |
| 777 | + return RawDocValueFormat.INSTANCE.format(value); |
| 778 | + } |
| 779 | + |
| 780 | + @Override |
| 781 | + public long parseLong(String value, boolean roundUp, LongSupplier now) { |
| 782 | + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); |
| 783 | + } |
| 784 | + |
| 785 | + @Override |
| 786 | + public double parseDouble(String value, boolean roundUp, LongSupplier now) { |
| 787 | + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); |
| 788 | + } |
| 789 | + |
| 790 | + @Override |
| 791 | + public BytesRef parseBytesRef(Object value) { |
| 792 | + return RawDocValueFormat.INSTANCE.parseBytesRef(value); |
| 793 | + } |
| 794 | + |
| 795 | + @Override |
| 796 | + public Object formatSortValue(Object value) { |
| 797 | + return RawDocValueFormat.INSTANCE.formatSortValue(value); |
| 798 | + } |
| 799 | + } |
| 800 | + |
| 801 | + // Test what happens if doc formatter fails to format the bottom sort values |
| 802 | + public void testBadFormatting() throws Exception { |
| 803 | + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( |
| 804 | + 0, |
| 805 | + System.nanoTime(), |
| 806 | + System::nanoTime |
| 807 | + ); |
| 808 | + |
| 809 | + Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>(); |
| 810 | + DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1"); |
| 811 | + DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2"); |
| 812 | + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); |
| 813 | + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); |
| 814 | + |
| 815 | + int numShards = randomIntBetween(10, 20); |
| 816 | + int numConcurrent = randomIntBetween(1, 4); |
| 817 | + AtomicInteger numWithTopDocs = new AtomicInteger(); |
| 818 | + AtomicInteger successfulOps = new AtomicInteger(); |
| 819 | + AtomicBoolean canReturnNullResponse = new AtomicBoolean(false); |
| 820 | + var transportService = mock(TransportService.class); |
| 821 | + when(transportService.getLocalNode()).thenReturn(primaryNode); |
| 822 | + SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) { |
| 823 | + @Override |
| 824 | + public void sendExecuteQuery( |
| 825 | + Transport.Connection connection, |
| 826 | + ShardSearchRequest request, |
| 827 | + SearchTask task, |
| 828 | + ActionListener<SearchPhaseResult> listener |
| 829 | + ) { |
| 830 | + int shardId = request.shardId().id(); |
| 831 | + if (request.canReturnNullResponseIfMatchNoDocs()) { |
| 832 | + canReturnNullResponse.set(true); |
| 833 | + } |
| 834 | + if (request.getBottomSortValues() != null) { |
| 835 | + numWithTopDocs.incrementAndGet(); |
| 836 | + } |
| 837 | + QuerySearchResult queryResult = new QuerySearchResult( |
| 838 | + new ShardSearchContextId("N/A", 123), |
| 839 | + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), |
| 840 | + null |
| 841 | + ); |
| 842 | + try { |
| 843 | + SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG); |
| 844 | + queryResult.topDocs( |
| 845 | + new TopDocsAndMaxScore( |
| 846 | + new TopFieldDocs( |
| 847 | + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), |
| 848 | + new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) }, |
| 849 | + new SortField[] { sortField } |
| 850 | + ), |
| 851 | + Float.NaN |
| 852 | + ), |
| 853 | + new DocValueFormat[] { new BadRawDocValueFormat() } |
| 854 | + ); |
| 855 | + queryResult.from(0); |
| 856 | + queryResult.size(1); |
| 857 | + successfulOps.incrementAndGet(); |
| 858 | + queryResult.incRef(); |
| 859 | + new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); |
| 860 | + } finally { |
| 861 | + queryResult.decRef(); |
| 862 | + } |
| 863 | + } |
| 864 | + }; |
| 865 | + CountDownLatch latch = new CountDownLatch(1); |
| 866 | + List<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter( |
| 867 | + "idx", |
| 868 | + new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS), |
| 869 | + numShards, |
| 870 | + randomBoolean(), |
| 871 | + primaryNode, |
| 872 | + replicaNode |
| 873 | + ); |
| 874 | + final SearchRequest searchRequest = new SearchRequest(); |
| 875 | + searchRequest.setMaxConcurrentShardRequests(numConcurrent); |
| 876 | + searchRequest.setBatchedReduceSize(2); |
| 877 | + searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); |
| 878 | + searchRequest.source().trackTotalHitsUpTo(2); |
| 879 | + searchRequest.allowPartialSearchResults(false); |
| 880 | + SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); |
| 881 | + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); |
| 882 | + try ( |
| 883 | + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( |
| 884 | + searchRequest, |
| 885 | + EsExecutors.DIRECT_EXECUTOR_SERVICE, |
| 886 | + new NoopCircuitBreaker(CircuitBreaker.REQUEST), |
| 887 | + controller, |
| 888 | + task::isCancelled, |
| 889 | + task.getProgressListener(), |
| 890 | + shardsIter.size(), |
| 891 | + exc -> {} |
| 892 | + ) |
| 893 | + ) { |
| 894 | + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( |
| 895 | + logger, |
| 896 | + null, |
| 897 | + searchTransportService, |
| 898 | + (clusterAlias, node) -> lookup.get(node), |
| 899 | + Collections.singletonMap("_na_", AliasFilter.EMPTY), |
| 900 | + Collections.emptyMap(), |
| 901 | + EsExecutors.DIRECT_EXECUTOR_SERVICE, |
| 902 | + resultConsumer, |
| 903 | + searchRequest, |
| 904 | + null, |
| 905 | + shardsIter, |
| 906 | + timeProvider, |
| 907 | + new ClusterState.Builder(new ClusterName("test")).build(), |
| 908 | + task, |
| 909 | + SearchResponse.Clusters.EMPTY, |
| 910 | + null, |
| 911 | + false |
| 912 | + ) { |
| 913 | + @Override |
| 914 | + protected SearchPhase getNextPhase() { |
| 915 | + return new SearchPhase("test") { |
| 916 | + @Override |
| 917 | + protected void run() { |
| 918 | + latch.countDown(); |
| 919 | + } |
| 920 | + }; |
| 921 | + } |
| 922 | + |
| 923 | + @Override |
| 924 | + void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) { |
| 925 | + latch.countDown(); |
| 926 | + fail(e, "Unexpected shard failure"); |
| 927 | + } |
| 928 | + }; |
| 929 | + action.start(); |
| 930 | + latch.await(); |
| 931 | + assertThat(successfulOps.get(), equalTo(numShards)); |
| 932 | + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); |
| 933 | + assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); |
| 934 | + assertThat(phase.totalHits().value(), equalTo(2L)); |
| 935 | + } |
| 936 | + } |
| 937 | + |
748 | 938 | } |
0 commit comments