File tree Expand file tree Collapse file tree 1 file changed +4
-7
lines changed
src/test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 1 file changed +4
-7
lines changed Original file line number Diff line number Diff line change @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
203203 environment .consumerBuilder ().stream (stream )
204204 .offset (OffsetSpecification .first ())
205205 .flow ()
206- .strategy (creditWhenHalfMessagesProcessed ())
206+ .strategy (creditWhenHalfMessagesProcessed (1 ))
207207 .builder ();
208208
209209 List <MessageHandler .Context > messageContexts = synchronizedList (new ArrayList <>());
@@ -244,14 +244,13 @@ void asynchronousProcessingWithFlowControl() {
244244 int messageCount = 100_000 ;
245245 publishAndWaitForConfirms (cf , messageCount , stream );
246246
247- ExecutorService executorService =
248- Executors .newFixedThreadPool (getRuntime ().availableProcessors ());
249- try {
247+ try (ExecutorService executorService =
248+ Executors .newFixedThreadPool (getRuntime ().availableProcessors ())) {
250249 CountDownLatch latch = new CountDownLatch (messageCount );
251250 environment .consumerBuilder ().stream (stream )
252251 .offset (OffsetSpecification .first ())
253252 .flow ()
254- .strategy (creditWhenHalfMessagesProcessed ())
253+ .strategy (creditWhenHalfMessagesProcessed (1 ))
255254 .builder ()
256255 .messageHandler (
257256 (ctx , message ) ->
@@ -262,8 +261,6 @@ void asynchronousProcessingWithFlowControl() {
262261 }))
263262 .build ();
264263 assertThat (latch ).is (completed ());
265- } finally {
266- executorService .shutdownNow ();
267264 }
268265 }
269266
You can’t perform that action at this time.
0 commit comments