-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Make MutableSearchResponse ref-counted to prevent use-after-close in async search #134359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 39 commits
f4a1a81
025e9e3
47f554e
4a2cdc0
e3b7c64
f7db807
beb3f23
0c7e61d
b02ce52
966ad7b
987434e
91eac66
f23f78f
dd9567d
c643cda
dd09251
497a736
029c0d1
4f8e55b
b504e2a
85ec60f
746caa3
8758981
81a6a55
e1d33dc
99610b2
2cb90cd
a2f7f7a
ab343f3
aed9d1f
b82dbe8
a6a45cb
07825fd
3412045
e4fedfc
c26b15d
770a902
fc178a9
2c9d1c7
0359f76
9fa88b9
286b6a9
9b57b99
dab85e4
a452322
89b3f3f
ee8fbca
30fe790
26f105c
bfcd31e
547a050
adada6a
9ce4cc7
634e698
ce9f54d
e552d16
2ac0077
063b5a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 134359 | ||
| summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async | ||
| search | ||
| area: Search | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,266 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.search; | ||
|
|
||
| import org.elasticsearch.ElasticsearchStatusException; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.search.aggregations.AggregationBuilders; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; | ||
| import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
|
|
||
| @SuiteScopeTestCase | ||
| public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase { | ||
| private static String indexName; | ||
| private static int numShards; | ||
|
|
||
| private static int numKeywords; | ||
| private static Map<String, AtomicInteger> keywordFreqs; | ||
| private static float maxMetric = Float.NEGATIVE_INFINITY; | ||
| private static float minMetric = Float.POSITIVE_INFINITY; | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @Override | ||
| public void setupSuiteScopeCluster() { | ||
| indexName = "test-async"; | ||
| numShards = randomIntBetween(1, 20); | ||
| int numDocs = randomIntBetween(100, 1000); | ||
| createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build()); | ||
| numKeywords = randomIntBetween(50, 100); | ||
| keywordFreqs = new HashMap<>(); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Set<String> keywordSet = new HashSet<>(); | ||
| for (int i = 0; i < numKeywords; i++) { | ||
| keywordSet.add(randomAlphaOfLengthBetween(10, 20)); | ||
| } | ||
| numKeywords = keywordSet.size(); | ||
| String[] keywords = keywordSet.toArray(String[]::new); | ||
| List<IndexRequestBuilder> reqs = new ArrayList<>(); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| float metric = randomFloat(); | ||
| maxMetric = Math.max(metric, maxMetric); | ||
| minMetric = Math.min(metric, minMetric); | ||
| String keyword = keywords[randomIntBetween(0, numKeywords - 1)]; | ||
| keywordFreqs.compute(keyword, (k, v) -> { | ||
| if (v == null) { | ||
| return new AtomicInteger(1); | ||
| } | ||
| v.incrementAndGet(); | ||
| return v; | ||
| }); | ||
| reqs.add(prepareIndex(indexName).setSource("terms", keyword, "metric", metric)); | ||
| } | ||
| indexRandom(true, true, reqs); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that concurrent async search status requests behave correctly | ||
| * while the underlying async search task is still executing and during its close/cleanup. | ||
| */ | ||
| public void testConcurrentStatusFetchWhileTaskCloses() throws Exception { | ||
drempapis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final String aggName = "terms"; | ||
| final SearchSourceBuilder source = new SearchSourceBuilder().aggregation( | ||
| AggregationBuilders.terms(aggName).field("terms.keyword").size(Math.max(1, numKeywords)) | ||
| ); | ||
|
|
||
| final int progressStep = (numShards > 2) ? randomIntBetween(2, numShards) : 2; | ||
| try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, 0, progressStep)) { | ||
| String id = getAsyncId(it); | ||
|
|
||
| PollStats stats = new PollStats(); | ||
|
|
||
| int statusThreads = randomIntBetween(1, Math.max(2, 4 * numShards)); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| StartableThreadGroup pollers = startGetStatusThreadsHot(id, statusThreads, aggName, stats); | ||
| pollers.startHotThreads.countDown(); // release pollers | ||
|
|
||
| // Finish consumption on a separate thread and capture errors | ||
| var consumerExec = Executors.newSingleThreadExecutor(); | ||
| AtomicReference<Throwable> consumerError = new AtomicReference<>(); | ||
| Future<?> consumer = consumerExec.submit(() -> { | ||
| try { | ||
| consumeAllResponses(it, aggName); | ||
| } catch (Throwable t) { | ||
| consumerError.set(t); | ||
| } | ||
| }); | ||
|
|
||
| Thread.sleep(randomIntBetween(100, 200)); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pollers.stopAndAwait(TimeValue.timeValueMillis(randomIntBetween(500, 1000))); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Join consumer & surface errors | ||
| try { | ||
| consumer.get(); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } catch (Exception ignored) {} finally { | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| consumerExec.shutdown(); | ||
drempapis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| assertNull(consumerError.get()); | ||
| assertNoWorkerFailures(pollers); | ||
| assertStats(stats); | ||
| } | ||
| } | ||
|
|
||
| private void assertNoWorkerFailures(StartableThreadGroup pollers) { | ||
| List<Throwable> failures = pollers.getFailures(); | ||
| assertTrue( | ||
| "Unexpected worker failures:\n" + failures.stream().map(ExceptionsHelper::stackTrace).reduce("", (a, b) -> a + "\n---\n" + b), | ||
| failures.isEmpty() | ||
| ); | ||
| } | ||
|
|
||
| private void assertStats(PollStats stats) { | ||
| assertEquals(stats.totalCalls.sum(), stats.runningResponses.sum() + stats.completedResponses.sum()); | ||
| assertEquals("There should be no exceptions other than GONE", 0, stats.exceptions.sum()); | ||
| } | ||
|
|
||
| private String getAsyncId(SearchResponseIterator it) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| assertNotNull(response.getId()); | ||
| return response.getId(); | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
|
|
||
| private void consumeAllResponses(SearchResponseIterator it, String aggName) throws Exception { | ||
| while (it.hasNext()) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| if (response.getSearchResponse() != null && response.getSearchResponse().getAggregations() != null) { | ||
| assertNotNull(response.getSearchResponse().getAggregations().get(aggName)); | ||
| } | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private StartableThreadGroup startGetStatusThreadsHot(String id, int threads, String aggName, PollStats stats) { | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final ExecutorService exec = Executors.newFixedThreadPool(threads); | ||
| final List<Future<?>> futures = new ArrayList<>(threads); | ||
| final AtomicBoolean running = new AtomicBoolean(true); | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final CountDownLatch start = new CountDownLatch(1); | ||
|
|
||
| for (int i = 0; i < threads; i++) { | ||
| futures.add(exec.submit(() -> { | ||
| start.await(); | ||
| while (running.get()) { | ||
| AsyncSearchResponse resp = null; | ||
| try { | ||
| resp = getAsyncSearch(id); | ||
| stats.totalCalls.increment(); | ||
|
|
||
| if (resp.isRunning()) { | ||
| stats.runningResponses.increment(); | ||
| } else { | ||
| // Success-only assertions: if reported completed, we must have a proper search response | ||
| assertNull("Async search reported completed with failure", resp.getFailure()); | ||
| assertNotNull("Completed async search must carry a SearchResponse", resp.getSearchResponse()); | ||
| assertNotNull("Completed async search must have aggregations", resp.getSearchResponse().getAggregations()); | ||
| assertNotNull( | ||
| "Completed async search must contain the expected aggregation", | ||
| resp.getSearchResponse().getAggregations().get(aggName) | ||
| ); | ||
| stats.completedResponses.increment(); | ||
| } | ||
| } catch (Exception e) { | ||
| Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
| if (cause instanceof ElasticsearchStatusException) { | ||
| RestStatus status = ExceptionsHelper.status(cause); | ||
| if (status == RestStatus.GONE) { | ||
| stats.gone410.increment(); | ||
| } | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| } | ||
| } finally { | ||
| if (resp != null) { | ||
| resp.decRef(); | ||
| } | ||
| } | ||
| } | ||
| return null; | ||
| })); | ||
| } | ||
| return new StartableThreadGroup(exec, futures, running, start); | ||
| } | ||
|
|
||
| static final class PollStats { | ||
| final LongAdder totalCalls = new LongAdder(); | ||
| final LongAdder runningResponses = new LongAdder(); | ||
| final LongAdder completedResponses = new LongAdder(); | ||
| final LongAdder exceptions = new LongAdder(); | ||
| final LongAdder gone410 = new LongAdder(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use keep_alive values that prevent us from ever seeing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test itself completes quickly, so the default The class
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we randomise the keep_alive value to sometimes be:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minimum This ensures the async search document is deleted before pollers stop. In practice, though, we’ll usually see Practically both 410 GONE and ResourceNotFoundException represent the same expired state through different APIs (REST vs transport). Handling both wouldn’t test new logic, it would only duplicate coverage of the same lifecycle behavior, adding complexity without improving the test’s value.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point was mostly around the test being prescriptive around what it is indeed expecting. It's currently, I believe, mixing success and some 410 as "everything is cool". I was suggesting we separate the cases where 410s are not even an option. I appreciate you don't consider this valuable, and I'm happy to go forward as this is now (hence why I approved the PR) |
||
| } | ||
|
|
||
| static class StartableThreadGroup extends ThreadGroup { | ||
drempapis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private final CountDownLatch startHotThreads; | ||
|
|
||
| StartableThreadGroup(ExecutorService exec, List<Future<?>> futures, AtomicBoolean running, CountDownLatch startHotThreads) { | ||
| super(exec, futures, running); | ||
| this.startHotThreads = startHotThreads; | ||
| } | ||
| } | ||
|
|
||
| static class ThreadGroup { | ||
| private final ExecutorService exec; | ||
| private final List<Future<?>> futures; | ||
| private final AtomicBoolean running; | ||
|
|
||
| private ThreadGroup(ExecutorService exec, List<Future<?>> futures, AtomicBoolean running) { | ||
| this.exec = exec; | ||
| this.futures = futures; | ||
| this.running = running; | ||
| } | ||
|
|
||
| void stopAndAwait(TimeValue timeout) throws InterruptedException { | ||
| running.set(false); | ||
| exec.shutdown(); | ||
| if (exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS) == false) { | ||
| exec.shutdownNow(); | ||
| exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } | ||
| } | ||
|
|
||
| List<Throwable> getFailures() { | ||
| List<Throwable> failures = new ArrayList<>(); | ||
| for (Future<?> f : futures) { | ||
| try { | ||
| f.get(); | ||
| } catch (CancellationException ignored) {} catch (ExecutionException ee) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ee.getCause())); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| if (failures.isEmpty()) failures.add(ie); | ||
| } | ||
| } | ||
| return failures; | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.