@@ -266,47 +266,6 @@ void asynchronousProcessingWithFlowControl() {
266266 }
267267 }
268268
269- @ Test
270- void asynchronousProcessingWithInMemoryQueue (TestInfo info ) {
271- int messageCount = 100_000 ;
272- publishAndWaitForConfirms (cf , messageCount , stream );
273-
274- CountDownLatch latch = new CountDownLatch (messageCount );
275- BlockingQueue <Tuples .Pair <MessageHandler .Context , Message >> queue =
276- new ArrayBlockingQueue <>(10_000 );
277- Thread t =
278- ThreadUtils .newInternalThread (
279- info .getTestMethod ().get ().getName (),
280- () -> {
281- try {
282- while (!Thread .currentThread ().isInterrupted ()) {
283- Tuples .Pair <MessageHandler .Context , Message > item =
284- queue .poll (10 , TimeUnit .SECONDS );
285- if (item != null ) {
286- latch .countDown ();
287- item .v1 ().processed ();
288- }
289- }
290- } catch (InterruptedException e ) {
291- // finish the thread
292- }
293- });
294- t .start ();
295-
296- try {
297- environment .consumerBuilder ().stream (stream )
298- .offset (OffsetSpecification .first ())
299- .flow ()
300- .strategy (creditWhenHalfMessagesProcessed (1 ))
301- .builder ()
302- .messageHandler ((ctx , message ) -> queue .add (Tuples .pair (ctx , message )))
303- .build ();
304- org .assertj .core .api .Assertions .assertThat (latch ).is (completed ());
305- } finally {
306- t .interrupt ();
307- }
308- }
309-
310269 @ Test
311270 void closeOnCondition () throws Exception {
312271 int messageCount = 50_000 ;
@@ -1088,4 +1047,89 @@ void resetOffsetTrackingFromEnvironment() {
10881047 assertThat (sync ).completes ();
10891048 consumer .close ();
10901049 }
1050+
1051+ @ Test
1052+ void asynchronousProcessingWithInMemoryQueue (TestInfo info ) {
1053+ int messageCount = 100_000 ;
1054+ publishAndWaitForConfirms (cf , messageCount , stream );
1055+
1056+ CountDownLatch latch = new CountDownLatch (messageCount );
1057+
1058+ MessageHandler handler = (ctx , msg ) -> latch .countDown ();
1059+ DispatchingMessageHandler dispatchingHandler =
1060+ new DispatchingMessageHandler (
1061+ handler , ThreadUtils .threadFactory (info .getTestMethod ().get ().getName ()));
1062+
1063+ try {
1064+ environment .consumerBuilder ().stream (stream )
1065+ .offset (OffsetSpecification .first ())
1066+ .flow ()
1067+ .strategy (creditWhenHalfMessagesProcessed (1 ))
1068+ .builder ()
1069+ .messageHandler (dispatchingHandler )
1070+ .build ();
1071+ org .assertj .core .api .Assertions .assertThat (latch ).is (completed ());
1072+ } finally {
1073+ dispatchingHandler .close ();
1074+ }
1075+ }
1076+
1077+ private static final class DispatchingMessageHandler implements MessageHandler , AutoCloseable {
1078+
1079+ private final MessageHandler delegate ;
1080+ private final BlockingQueue <ContextMessageWrapper > queue = new ArrayBlockingQueue <>(10_000 );
1081+ private final Thread t ;
1082+
1083+ private DispatchingMessageHandler (MessageHandler delegate , ThreadFactory tf ) {
1084+ this .delegate = delegate ;
1085+ t =
1086+ tf .newThread (
1087+ () -> {
1088+ try {
1089+ while (!Thread .currentThread ().isInterrupted ()) {
1090+ ContextMessageWrapper item = queue .poll (10 , TimeUnit .SECONDS );
1091+ if (item != null ) {
1092+ try {
1093+ this .delegate .handle (item .ctx , item .msg ());
1094+ } finally {
1095+ item .ctx .processed ();
1096+ }
1097+ }
1098+ }
1099+ } catch (InterruptedException e ) {
1100+ // finish the thread
1101+ }
1102+ });
1103+ t .start ();
1104+ }
1105+
1106+ @ Override
1107+ public void handle (Context context , Message message ) {
1108+ this .queue .add (new ContextMessageWrapper (context , message ));
1109+ }
1110+
1111+ @ Override
1112+ public void close () {
1113+ this .t .interrupt ();
1114+ }
1115+ }
1116+
1117+ private static final class ContextMessageWrapper {
1118+
1119+ private final MessageHandler .Context ctx ;
1120+ private final Message msg ;
1121+
1122+ private ContextMessageWrapper (MessageHandler .Context ctx , Message msg ) {
1123+ this .ctx = ctx ;
1124+ this .msg = msg ;
1125+ }
1126+
1127+ private MessageHandler .Context ctx () {
1128+ return this .ctx ;
1129+ }
1130+
1131+ private Message msg () {
1132+ return this .msg ;
1133+ }
1134+ }
10911135}
0 commit comments