|
13 | 13 | import org.apache.lucene.search.SortField;
|
14 | 14 | import org.apache.lucene.search.TopFieldDocs;
|
15 | 15 | import org.apache.lucene.search.TotalHits;
|
| 16 | +import org.apache.lucene.util.BytesRef; |
16 | 17 | import org.elasticsearch.action.ActionListener;
|
17 | 18 | import org.elasticsearch.action.OriginalIndices;
|
18 | 19 | import org.elasticsearch.cluster.ClusterName;
|
|
21 | 22 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
|
22 | 23 | import org.elasticsearch.common.breaker.CircuitBreaker;
|
23 | 24 | import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
| 25 | +import org.elasticsearch.common.io.stream.StreamOutput; |
24 | 26 | import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
25 | 27 | import org.elasticsearch.common.util.concurrent.EsExecutors;
|
26 | 28 | import org.elasticsearch.core.TimeValue;
|
|
41 | 43 | import org.elasticsearch.transport.Transport;
|
42 | 44 | import org.elasticsearch.transport.TransportService;
|
43 | 45 |
|
| 46 | +import java.io.IOException; |
44 | 47 | import java.util.Collections;
|
45 | 48 | import java.util.List;
|
46 | 49 | import java.util.Map;
|
47 | 50 | import java.util.concurrent.ConcurrentHashMap;
|
48 | 51 | import java.util.concurrent.CountDownLatch;
|
49 | 52 | import java.util.concurrent.atomic.AtomicBoolean;
|
50 | 53 | import java.util.concurrent.atomic.AtomicInteger;
|
| 54 | +import java.util.function.LongSupplier; |
51 | 55 |
|
52 | 56 | import static org.hamcrest.Matchers.equalTo;
|
53 | 57 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
@@ -243,4 +247,191 @@ protected void run() {
|
243 | 247 | assertThat(((FieldDoc) phase.sortedTopDocs().scoreDocs()[0]).fields[0], equalTo(0));
|
244 | 248 | }
|
245 | 249 | }
|
| 250 | + |
| 251 | + static class BadRawDocValueFormat implements DocValueFormat { |
| 252 | + @Override |
| 253 | + public String getWriteableName() { |
| 254 | + return "bad"; |
| 255 | + } |
| 256 | + |
| 257 | + @Override |
| 258 | + public void writeTo(StreamOutput out) throws IOException {} |
| 259 | + |
| 260 | + @Override |
| 261 | + public Object format(long value) { |
| 262 | + if (value == Long.MAX_VALUE) { |
| 263 | + // Simulate a bad value that cannot be formatted correctly |
| 264 | + throw new IllegalArgumentException("Cannot format Long.MAX_VALUE"); |
| 265 | + } |
| 266 | + return RawDocValueFormat.INSTANCE.format(value); |
| 267 | + } |
| 268 | + |
| 269 | + @Override |
| 270 | + public Object format(double value) { |
| 271 | + return RawDocValueFormat.INSTANCE.format(value); |
| 272 | + } |
| 273 | + |
| 274 | + @Override |
| 275 | + public Object format(BytesRef value) { |
| 276 | + return RawDocValueFormat.INSTANCE.format(value); |
| 277 | + } |
| 278 | + |
| 279 | + @Override |
| 280 | + public long parseLong(String value, boolean roundUp, LongSupplier now) { |
| 281 | + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); |
| 282 | + } |
| 283 | + |
| 284 | + @Override |
| 285 | + public double parseDouble(String value, boolean roundUp, LongSupplier now) { |
| 286 | + return RawDocValueFormat.INSTANCE.parseLong(value, roundUp, now); |
| 287 | + } |
| 288 | + |
| 289 | + @Override |
| 290 | + public BytesRef parseBytesRef(Object value) { |
| 291 | + return RawDocValueFormat.INSTANCE.parseBytesRef(value); |
| 292 | + } |
| 293 | + |
| 294 | + @Override |
| 295 | + public Object formatSortValue(Object value) { |
| 296 | + return RawDocValueFormat.INSTANCE.formatSortValue(value); |
| 297 | + } |
| 298 | + } |
| 299 | + |
| 300 | + // Test what happens if doc formatter fails to format the bottom sort values |
| 301 | + public void testBadFormatting() throws Exception { |
| 302 | + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( |
| 303 | + 0, |
| 304 | + System.nanoTime(), |
| 305 | + System::nanoTime |
| 306 | + ); |
| 307 | + |
| 308 | + Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>(); |
| 309 | + DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node1"); |
| 310 | + DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node2"); |
| 311 | + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); |
| 312 | + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); |
| 313 | + |
| 314 | + int numShards = randomIntBetween(10, 20); |
| 315 | + int numConcurrent = randomIntBetween(1, 4); |
| 316 | + AtomicInteger numWithTopDocs = new AtomicInteger(); |
| 317 | + AtomicInteger successfulOps = new AtomicInteger(); |
| 318 | + AtomicBoolean canReturnNullResponse = new AtomicBoolean(false); |
| 319 | + var transportService = mock(TransportService.class); |
| 320 | + when(transportService.getLocalNode()).thenReturn(primaryNode); |
| 321 | + SearchTransportService searchTransportService = new SearchTransportService(transportService, null, null) { |
| 322 | + @Override |
| 323 | + public void sendExecuteQuery( |
| 324 | + Transport.Connection connection, |
| 325 | + ShardSearchRequest request, |
| 326 | + SearchTask task, |
| 327 | + ActionListener<SearchPhaseResult> listener |
| 328 | + ) { |
| 329 | + int shardId = request.shardId().id(); |
| 330 | + if (request.canReturnNullResponseIfMatchNoDocs()) { |
| 331 | + canReturnNullResponse.set(true); |
| 332 | + } |
| 333 | + if (request.getBottomSortValues() != null) { |
| 334 | + numWithTopDocs.incrementAndGet(); |
| 335 | + } |
| 336 | + QuerySearchResult queryResult = new QuerySearchResult( |
| 337 | + new ShardSearchContextId("N/A", 123), |
| 338 | + new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), |
| 339 | + null |
| 340 | + ); |
| 341 | + try { |
| 342 | + SortField sortField = new SortField("RegistrationDate", SortField.Type.LONG); |
| 343 | + queryResult.topDocs( |
| 344 | + new TopDocsAndMaxScore( |
| 345 | + new TopFieldDocs( |
| 346 | + new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), |
| 347 | + new FieldDoc[] { new FieldDoc(0, Float.NaN, new Object[] { Long.MAX_VALUE }) }, |
| 348 | + new SortField[] { sortField } |
| 349 | + ), |
| 350 | + Float.NaN |
| 351 | + ), |
| 352 | + new DocValueFormat[] { new BadRawDocValueFormat() } |
| 353 | + ); |
| 354 | + queryResult.from(0); |
| 355 | + queryResult.size(1); |
| 356 | + successfulOps.incrementAndGet(); |
| 357 | + queryResult.incRef(); |
| 358 | + new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); |
| 359 | + } finally { |
| 360 | + queryResult.decRef(); |
| 361 | + } |
| 362 | + } |
| 363 | + }; |
| 364 | + CountDownLatch latch = new CountDownLatch(1); |
| 365 | + List<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter( |
| 366 | + "idx", |
| 367 | + new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS), |
| 368 | + numShards, |
| 369 | + randomBoolean(), |
| 370 | + primaryNode, |
| 371 | + replicaNode |
| 372 | + ); |
| 373 | + final SearchRequest searchRequest = new SearchRequest(); |
| 374 | + searchRequest.setMaxConcurrentShardRequests(numConcurrent); |
| 375 | + searchRequest.setBatchedReduceSize(2); |
| 376 | + searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); |
| 377 | + searchRequest.source().trackTotalHitsUpTo(2); |
| 378 | + searchRequest.allowPartialSearchResults(false); |
| 379 | + SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); |
| 380 | + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); |
| 381 | + try ( |
| 382 | + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( |
| 383 | + searchRequest, |
| 384 | + EsExecutors.DIRECT_EXECUTOR_SERVICE, |
| 385 | + new NoopCircuitBreaker(CircuitBreaker.REQUEST), |
| 386 | + controller, |
| 387 | + task::isCancelled, |
| 388 | + task.getProgressListener(), |
| 389 | + shardsIter.size(), |
| 390 | + exc -> {} |
| 391 | + ) |
| 392 | + ) { |
| 393 | + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( |
| 394 | + logger, |
| 395 | + null, |
| 396 | + searchTransportService, |
| 397 | + (clusterAlias, node) -> lookup.get(node), |
| 398 | + Collections.singletonMap("_na_", AliasFilter.EMPTY), |
| 399 | + Collections.emptyMap(), |
| 400 | + EsExecutors.DIRECT_EXECUTOR_SERVICE, |
| 401 | + resultConsumer, |
| 402 | + searchRequest, |
| 403 | + null, |
| 404 | + shardsIter, |
| 405 | + timeProvider, |
| 406 | + new ClusterState.Builder(new ClusterName("test")).build(), |
| 407 | + task, |
| 408 | + SearchResponse.Clusters.EMPTY, |
| 409 | + null, |
| 410 | + false |
| 411 | + ) { |
| 412 | + @Override |
| 413 | + protected SearchPhase getNextPhase() { |
| 414 | + return new SearchPhase("test") { |
| 415 | + @Override |
| 416 | + protected void run() { |
| 417 | + latch.countDown(); |
| 418 | + } |
| 419 | + }; |
| 420 | + } |
| 421 | + |
| 422 | + @Override |
| 423 | + void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) { |
| 424 | + latch.countDown(); |
| 425 | + fail(e, "Unexpected shard failure"); |
| 426 | + } |
| 427 | + }; |
| 428 | + action.start(); |
| 429 | + latch.await(); |
| 430 | + assertThat(successfulOps.get(), equalTo(numShards)); |
| 431 | + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); |
| 432 | + assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); |
| 433 | + assertThat(phase.totalHits().value(), equalTo(2L)); |
| 434 | + } |
| 435 | + } |
| 436 | + |
246 | 437 | }
|
0 commit comments