2727import java .util .concurrent .CountDownLatch ;
2828import java .util .concurrent .CyclicBarrier ;
2929import java .util .concurrent .TimeUnit ;
30- import java .util .concurrent .atomic .AtomicBoolean ;
3130import java .util .concurrent .locks .LockSupport ;
3231
3332import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
3433import static org .elasticsearch .xcontent .XContentFactory .jsonBuilder ;
3534
35+ @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .SUITE , numDataNodes = 1 )
3636public class BulkSourceReleaseIT extends ESIntegTestCase {
3737
3838 @ Override
@@ -58,7 +58,7 @@ public void testBulkSourceReleaseWhenIngestReplacesSource() throws Exception {
5858 indexRequest .setPipeline (pipelineId );
5959
6060 CountDownLatch blockLatch = new CountDownLatch (1 );
61- blockWritePool (internalCluster ().getInstance (ThreadPool .class ), blockLatch );
61+ blockWritePool (internalCluster ().getDataNodeInstance (ThreadPool .class ), blockLatch );
6262
6363 PlainActionFuture <BulkResponse > future = new PlainActionFuture <>();
6464
@@ -103,14 +103,14 @@ public void testBytesReferencedByTwoSourcesNotReleasedIfOnlyOneIngestPipeline()
103103 assertTrue (originalBytes .hasReferences ());
104104
105105 CountDownLatch blockLatch = new CountDownLatch (1 );
106- blockWritePool (internalCluster ().getInstance (ThreadPool .class ), blockLatch );
106+ blockWritePool (internalCluster ().getDataNodeInstance (ThreadPool .class ), blockLatch );
107107
108108 PlainActionFuture <BulkResponse > future = new PlainActionFuture <>();
109109 try {
110110 handler .lastItems (List .of (indexRequest , indexRequestNoIngest ), future );
111111
112112 // Pause briefly to allow bytes to theoretically be released after ingest processing
113- LockSupport .parkNanos (TimeUnit .MILLISECONDS .toNanos (50 ));
113+ LockSupport .parkNanos (TimeUnit .MILLISECONDS .toNanos (500 ));
114114
115115 assertTrue (originalBytes .hasReferences ());
116116 } finally {
@@ -159,7 +159,7 @@ public void testSomeReferencesCanBeReleasedWhileOthersRetained() throws Exceptio
159159 assertTrue (retainedBytes .hasReferences ());
160160
161161 CountDownLatch blockLatch = new CountDownLatch (1 );
162- blockWritePool (internalCluster ().getInstance (ThreadPool .class ), blockLatch );
162+ blockWritePool (internalCluster ().getDataNodeInstance (ThreadPool .class ), blockLatch );
163163
164164 PlainActionFuture <BulkResponse > future = new PlainActionFuture <>();
165165 try {
@@ -218,29 +218,4 @@ public boolean isForceExecution() {
218218 }
219219 safeAwait (startBarrier );
220220 }
221-
222- private static void fillWriteQueue (ThreadPool threadPool ) {
223- final var queueSize = Math .toIntExact (threadPool .info (ThreadPool .Names .WRITE ).getQueueSize ().singles ());
224- final var queueFilled = new AtomicBoolean (false );
225- final var queueFillingTask = new AbstractRunnable () {
226- @ Override
227- public void onFailure (Exception e ) {
228- fail (e );
229- }
230-
231- @ Override
232- protected void doRun () {
233- assertTrue ("thread pool not blocked" , queueFilled .get ());
234- }
235-
236- @ Override
237- public boolean isForceExecution () {
238- return true ;
239- }
240- };
241- for (int i = 0 ; i < queueSize ; i ++) {
242- threadPool .executor (ThreadPool .Names .WRITE ).execute (queueFillingTask );
243- }
244- queueFilled .set (true );
245- }
246221}
0 commit comments