diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 10f4f666607af..e41c43daf00c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -240,6 +240,7 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(batching) + .batchingMaxPublishDelay(30, TimeUnit.SECONDS) .create(); Set sentMessages = new HashSet<>(); @@ -258,8 +259,9 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, for (int i = 0; i < redeliverCount; i++) { Message msg = null; for (int j = 0; j < N; j++) { - msg = consumer.receive(); - log.info("Received message {}", msg.getValue()); + msg = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(msg); + log.info("Received msgId: {}, message {}", msg.getMessageId(), msg.getValue()); if (!batching) { consumer.negativeAcknowledge(msg); } @@ -275,7 +277,8 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, // All the messages should be received again for (int i = 0; i < N; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(msg); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); }