-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Make MutableSearchResponse ref-counted to prevent use-after-close in async search #134359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f4a1a81
025e9e3
47f554e
4a2cdc0
e3b7c64
f7db807
beb3f23
0c7e61d
b02ce52
966ad7b
987434e
91eac66
f23f78f
dd9567d
c643cda
dd09251
497a736
029c0d1
4f8e55b
b504e2a
85ec60f
746caa3
8758981
81a6a55
e1d33dc
99610b2
2cb90cd
a2f7f7a
ab343f3
aed9d1f
b82dbe8
a6a45cb
07825fd
3412045
e4fedfc
c26b15d
770a902
fc178a9
2c9d1c7
0359f76
9fa88b9
286b6a9
9b57b99
dab85e4
a452322
89b3f3f
ee8fbca
30fe790
26f105c
bfcd31e
547a050
adada6a
9ce4cc7
634e698
ce9f54d
e552d16
2ac0077
063b5a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 134359 | ||
| summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async | ||
| search | ||
| area: Search | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,271 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.search; | ||
|
|
||
| import org.elasticsearch.ElasticsearchStatusException; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.search.aggregations.AggregationBuilders; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; | ||
| import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Queue; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| @SuiteScopeTestCase | ||
| public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase { | ||
| private static String indexName; | ||
| private static int numShards; | ||
|
|
||
| private static int numKeywords; | ||
|
|
||
| @Override | ||
| public void setupSuiteScopeCluster() { | ||
| indexName = "test-async"; | ||
| numShards = randomIntBetween(1, 20); | ||
| int numDocs = randomIntBetween(100, 1000); | ||
| createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build()); | ||
| numKeywords = randomIntBetween(50, 100); | ||
| Set<String> keywordSet = new HashSet<>(); | ||
| for (int i = 0; i < numKeywords; i++) { | ||
| keywordSet.add(randomAlphaOfLengthBetween(10, 20)); | ||
| } | ||
| numKeywords = keywordSet.size(); | ||
| String[] keywords = keywordSet.toArray(String[]::new); | ||
| List<IndexRequestBuilder> reqs = new ArrayList<>(); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| float metric = randomFloat(); | ||
| String keyword = keywords[randomIntBetween(0, numKeywords - 1)]; | ||
| reqs.add(prepareIndex(indexName).setSource("terms", keyword, "metric", metric)); | ||
| } | ||
| indexRandom(true, true, reqs); | ||
| } | ||
|
|
||
| /** | ||
| * This test spins up a set of poller threads that repeatedly call | ||
| * {@code _async_search/{id}}. Each poller starts immediately, and once enough | ||
| * requests have been issued they signal a latch to indicate the group is "warmed up". | ||
| * The test waits on this latch to deterministically ensure pollers are active. | ||
| * In parallel, a consumer thread drives the async search to completion using the | ||
| * blocking iterator. This coordinated overlap exercises the window where the task | ||
| * is closing and some status calls may return {@code 410 GONE}. | ||
| */ | ||
| public void testConcurrentStatusFetchWhileTaskCloses() throws Exception { | ||
drempapis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final TimeValue timeout = TimeValue.timeValueSeconds(3); | ||
| final String aggName = "terms"; | ||
| final SearchSourceBuilder source = new SearchSourceBuilder().aggregation( | ||
| AggregationBuilders.terms(aggName).field("terms.keyword").size(Math.max(1, numKeywords)) | ||
| ); | ||
|
|
||
| final int progressStep = (numShards > 2) ? randomIntBetween(2, numShards) : 2; | ||
| try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, 0, progressStep)) { | ||
| String id = getAsyncId(it); | ||
|
|
||
| PollStats stats = new PollStats(); | ||
|
|
||
| // Pick a random number of status-poller threads, at least 1, up to (4×numShards) | ||
| int pollerThreads = randomIntBetween(1, 4 * numShards); | ||
|
|
||
| // Wait for pollers to be active | ||
| CountDownLatch warmed = new CountDownLatch(1); | ||
|
|
||
| // Executor and coordination for pollers | ||
| ExecutorService pollerExec = Executors.newFixedThreadPool(pollerThreads); | ||
| AtomicBoolean running = new AtomicBoolean(true); | ||
| Queue<Throwable> failures = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| CompletableFuture<Void> pollers = createPollers(id, pollerThreads, stats, warmed, pollerExec, running, failures); | ||
|
|
||
| // Wait until pollers are issuing requests (warming period) | ||
| assertTrue("pollers did not warm up in time", warmed.await(timeout.millis(), TimeUnit.MILLISECONDS)); | ||
|
|
||
| // Start consumer on a separate thread and capture errors | ||
| var consumerExec = Executors.newSingleThreadExecutor(); | ||
| AtomicReference<Throwable> consumerError = new AtomicReference<>(); | ||
| Future<?> consumer = consumerExec.submit(() -> { | ||
| try { | ||
| consumeAllResponses(it, aggName); | ||
| } catch (Throwable t) { | ||
| consumerError.set(t); | ||
| } | ||
| }); | ||
|
|
||
| // Join consumer & surface errors | ||
| try { | ||
| consumer.get(timeout.millis(), TimeUnit.MILLISECONDS); | ||
|
|
||
| if (consumerError.get() != null) { | ||
| fail("consumeAllResponses failed: " + consumerError.get()); | ||
| } | ||
| } catch (TimeoutException e) { | ||
| consumer.cancel(true); | ||
| fail(e, "Consumer thread did not finish within timeout"); | ||
| } catch (Exception ignored) { | ||
| // ignored | ||
| } finally { | ||
| // Stop pollers | ||
| running.set(false); | ||
| try { | ||
| pollers.get(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (TimeoutException te) { | ||
| // The finally block will shut down the pollers forcibly | ||
| } catch (ExecutionException ee) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ee.getCause())); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| pollerExec.shutdownNow(); | ||
| try { | ||
| pollerExec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| fail("Interrupted while stopping pollers: " + ie.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| // Shut down the consumer executor | ||
| consumerExec.shutdown(); | ||
drempapis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| try { | ||
| consumerExec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
| assertNoWorkerFailures(failures); | ||
| assertStats(stats); | ||
| } | ||
| } | ||
|
|
||
| private void assertNoWorkerFailures(Queue<Throwable> failures) { | ||
| assertTrue( | ||
| "Unexpected worker failures:\n" + failures.stream().map(ExceptionsHelper::stackTrace).reduce("", (a, b) -> a + "\n---\n" + b), | ||
| failures.isEmpty() | ||
| ); | ||
| } | ||
|
|
||
| private void assertStats(PollStats stats) { | ||
| assertEquals(stats.totalCalls.sum(), stats.runningResponses.sum() + stats.completedResponses.sum()); | ||
| assertEquals("There should be no exceptions other than GONE", 0, stats.exceptions.sum()); | ||
| } | ||
|
|
||
| private String getAsyncId(SearchResponseIterator it) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| assertNotNull(response.getId()); | ||
| return response.getId(); | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
|
|
||
| private void consumeAllResponses(SearchResponseIterator it, String aggName) throws Exception { | ||
| while (it.hasNext()) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| if (response.getSearchResponse() != null && response.getSearchResponse().getAggregations() != null) { | ||
| assertNotNull(response.getSearchResponse().getAggregations().get(aggName)); | ||
| } | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private CompletableFuture<Void> createPollers( | ||
| String id, | ||
| int threads, | ||
| PollStats stats, | ||
| CountDownLatch warmed, | ||
| ExecutorService pollerExec, | ||
| AtomicBoolean running, | ||
| Queue<Throwable> failures | ||
| ) { | ||
| @SuppressWarnings("unchecked") | ||
| final CompletableFuture<Void>[] tasks = IntStream.range(0, threads).mapToObj(i -> CompletableFuture.runAsync(() -> { | ||
| while (running.get()) { | ||
| AsyncSearchResponse resp = null; | ||
| try { | ||
| resp = getAsyncSearch(id); | ||
| stats.totalCalls.increment(); | ||
|
|
||
| // Once enough requests have been sent, consider pollers "warmed". | ||
| if (stats.totalCalls.sum() >= threads) { | ||
| warmed.countDown(); | ||
| } | ||
|
|
||
| if (resp.isRunning()) { | ||
| stats.runningResponses.increment(); | ||
| } else { | ||
| // Success-only assertions: if reported completed, we must have a proper search response | ||
| assertNull("Async search reported completed with failure", resp.getFailure()); | ||
| assertNotNull("Completed async search must carry a SearchResponse", resp.getSearchResponse()); | ||
| assertNotNull("Completed async search must have aggregations", resp.getSearchResponse().getAggregations()); | ||
| assertNotNull( | ||
| "Completed async search must contain the expected aggregation", | ||
| resp.getSearchResponse().getAggregations().get("terms") | ||
| ); | ||
| stats.completedResponses.increment(); | ||
| } | ||
| } catch (Exception e) { | ||
| Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
| if (cause instanceof ElasticsearchStatusException) { | ||
| RestStatus status = ExceptionsHelper.status(cause); | ||
| if (status == RestStatus.GONE) { | ||
| stats.gone410.increment(); | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| failures.add(cause); | ||
| } | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| failures.add(cause); | ||
| } | ||
| } finally { | ||
| if (resp != null) { | ||
| resp.decRef(); | ||
| } | ||
| } | ||
| } | ||
| }, pollerExec).whenComplete((v, ex) -> { | ||
| if (ex != null) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ex)); | ||
| } | ||
| })).toArray(CompletableFuture[]::new); | ||
|
|
||
| return CompletableFuture.allOf(tasks); | ||
| } | ||
|
|
||
| static final class PollStats { | ||
| final LongAdder totalCalls = new LongAdder(); | ||
| final LongAdder runningResponses = new LongAdder(); | ||
| final LongAdder completedResponses = new LongAdder(); | ||
| final LongAdder exceptions = new LongAdder(); | ||
| final LongAdder gone410 = new LongAdder(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use keep_alive values that prevent us from ever seeing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test itself completes quickly, so the default The class
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we randomise the keep_alive value to sometimes be:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minimum This ensures the async search document is deleted before pollers stop. In practice, though, we’ll usually see Practically both 410 GONE and ResourceNotFoundException represent the same expired state through different APIs (REST vs transport). Handling both wouldn’t test new logic, it would only duplicate coverage of the same lifecycle behavior, adding complexity without improving the test’s value.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point was mostly around the test being prescriptive around what it is indeed expecting. It's currently, I believe, mixing success and some 410 as "everything is cool". I was suggesting we separate the cases where 410s are not even an option. I appreciate you don't consider this valuable, and I'm happy to go forward as this is now (hence why I approved the PR) |
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.