File tree Expand file tree Collapse file tree 1 file changed +11
-4
lines changed
pulsar-broker/src/test/java/org/apache/pulsar/client/api Expand file tree Collapse file tree 1 file changed +11
-4
lines changed Original file line number Diff line number Diff line change @@ -871,14 +871,15 @@ public void testMsgDropStat() throws Exception {
871871 .messageRoutingMode (MessageRoutingMode .SinglePartition )
872872 .create ();
873873 @ Cleanup ("shutdownNow" )
874- ExecutorService executor = Executors .newFixedThreadPool (5 );
874+ ExecutorService executor = Executors .newFixedThreadPool (10 );
875875 byte [] msgData = "testData" .getBytes ();
876876 final int totalProduceMessages = 1000 ;
877877 CountDownLatch latch = new CountDownLatch (1 );
878878 AtomicInteger messagesSent = new AtomicInteger (0 );
879879 for (int i = 0 ; i < totalProduceMessages ; i ++) {
880880 executor .submit (() -> {
881- producer .sendAsync (msgData ).handle ((msgId , e ) -> {
881+ try {
882+ MessageId msgId = producer .send (msgData );
882883 int count = messagesSent .incrementAndGet ();
883884 // process at least 20% of messages before signalling the latch
884885 // a non-persistent message will return entryId as -1 when it has been dropped
@@ -888,8 +889,14 @@ public void testMsgDropStat() throws Exception {
888889 && ((MessageIdImpl ) msgId ).getEntryId () == -1 ) {
889890 latch .countDown ();
890891 }
891- return null ;
892- });
892+
893+ Thread .sleep (10 );
894+ } catch (PulsarClientException e ) {
895+ throw new RuntimeException (e );
896+ } catch (InterruptedException e ) {
897+ Thread .currentThread ().interrupt ();
898+ throw new RuntimeException (e );
899+ }
893900 });
894901 }
895902 assertTrue (latch .await (5 , TimeUnit .SECONDS ));
You can’t perform that action at this time.
0 commit comments