File tree Expand file tree Collapse file tree 1 file changed +8
-7
lines changed
server/src/test/java/org/opensearch/indices/pollingingest Expand file tree Collapse file tree 1 file changed +8
-7
lines changed Original file line number Diff line number Diff line change @@ -650,6 +650,13 @@ public void testConsumerReinitializationWithNoInitialMessages() throws Exception
650650 // Start with no messages
651651 messages .clear ();
652652
653+ // Set up latch to wait for message processing
654+ CountDownLatch latch = new CountDownLatch (1 );
655+ doAnswer (invocation -> {
656+ latch .countDown ();
657+ return null ;
658+ }).when (processor ).process (any (), any ());
659+
653660 FakeIngestionSource .FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource .FakeIngestionConsumerFactory (messages );
654661
655662 poller = new DefaultStreamPoller (
@@ -684,13 +691,7 @@ public void testConsumerReinitializationWithNoInitialMessages() throws Exception
684691 messages .add ("{\" _id\" :\" 1\" ,\" _source\" :{\" name\" :\" bob\" , \" age\" : 24}}" .getBytes (StandardCharsets .UTF_8 ));
685692
686693 // Wait for the message to be processed
687- CountDownLatch latch = new CountDownLatch (1 );
688- doAnswer (invocation -> {
689- latch .countDown ();
690- return null ;
691- }).when (processor ).process (any (), any ());
692-
693- latch .await ();
694+ assertTrue ("Message should be processed within timeout" , latch .await (30 , TimeUnit .SECONDS ));
694695
695696 // Verify 1 message was processed
696697 verify (processor , times (1 )).process (any (), any ());
You can’t perform that action at this time.
0 commit comments