-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Open
ChimdumebiNebolisa/pulsar
#2Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- broker version: 4.0.8
- client version: 4.1.1 (java)
Issue Description
configs
- producer config
- enableBatching(true)
- blockIfQueueFull(true)
- .batchingMaxMessages(100)
- consumer config
- subscriptionType(SubscriptionType.Exclusive)
- subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- enableBatchIndexAcknowledgment(false)
What I did
- produce messages with property "sequence", then consume them and validate sequence number.
- cumulative acknowledge every
groupSizemessage received redeliverUnacknowledgedMessagesin schedule, and reset sequence number.
Expected
assume current epoch = 0
if lastAckNumber = 1000,and number of current received message is 1300 which matches expectedSequenceNumber, after calling redeliverUnacknowledgedMessages(), it should received a message with number 1001 with higher comsumerEpoch and redeliveryCount
Got
A random message with number > 1001, consumerEpoch = 0, redeliveryCount = 0
And other message with bigger batch index in the same batch(entry) filltered out.
see logs below.
Error messages
2026-02-03 02:58:43.202 [pool-2-thread-2] INFO o.e.OrderedCumulativeConsumerWorker - After interval redeliver, resetting expected sequence to 4501
2026-02-03 02:58:43.202 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:76], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:77], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:78], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:79], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:80], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:81], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:82], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:83], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:84], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:85], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:86], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:87], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:88], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:89], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:90], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:91], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.205 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:92], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:93], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:94], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:95], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:96], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:97], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:98], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.206 [pulsar-client-internal-13-1] INFO o.a.pulsar.client.impl.ConsumerBase - Consumer filter old epoch message, topic : [persistent://public/default/order], messageId : [14248:2059:-1:99], messageConsumerEpoch : [0], consumerEpoch : [1]
2026-02-03 02:58:43.703 [pool-2-thread-2] WARN o.e.OrderedCumulativeConsumerWorker - Found out-of-order message: MessageId=14248:2059:-1:75, expectedSequence=4501, actualSequence=5976, uuid=d2363f75-f1db-453b-9b0c-e8d70b91ba0f, consumerEpoch=0, redeliveryCount=0
Reproducing the issue
@SneakyThrows
private void produceMessages(Producer<byte[]> producer, int count) {
for (int i = 0; i < count; i++) {
producer.newMessage().property("sequence", String.valueOf(i))
.value(("message-" + i).getBytes())
.send();
}
}
@SneakyThrows
private void consumeAndRedeliverRoundtrip(Consumer<byte[]> consumer, int groupSize, long interval) {
long expectedSequenceNumber = 0;
long lastAckNumer = 0;
int count = 0;
long last = System.currentTimeMillis();
while (true) {
var msg = consumer.receive(10, TimeUnit.SECONDS);
if (msg == null) {
log.info("No more messages to consume.");
return;
}
// Validate sequence - should be strictly increasing, gap indicates message loss
String sequenceStr = msg.getProperty("sequence");
if (sequenceStr != null) {
long actualSequence = Long.parseLong(sequenceStr);
if (actualSequence != expectedSequenceNumber) {
log.error("Sequence gap detected! Expected: {}, Actual: {}", expectedSequenceNumber, actualSequence);
throw new RuntimeException("Sequence gap detected! Expected: " + expectedSequenceNumber
+ ", Actual: " + actualSequence);
}
}
count ++;
if (count == groupSize) {
consumer.acknowledgeCumulative(msg);
count = 0;
lastAckNumer = expectedSequenceNumber;
}
expectedSequenceNumber++;
long cur = System.currentTimeMillis();
if ( cur - last >= interval ) {
consumer.redeliverUnacknowledgedMessages();
expectedSequenceNumber = lastAckNumber + 1; // reset expected seq
last = cur;
Thread.sleep(100);
}
}
}Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug