Skip to content

[Bug] Transaction abort may cause duplicate message consumption #25107

@3pacccccc

Description

@3pacccccc

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

master branch

Issue Description

When using Pulsar transactions with transaction.abort(), I observed duplicate message consumption in certain scenarios. After aborting a transaction that contains acknowledged messages, those messages can be redelivered and consumed again, leading to duplicate processing.

Error messages

no error message

Reproducing the issue

Use the following code to reproduce the issue in TransactionConsumeTest

@Test
public void duplicateMessageConsume() throws Exception {
    final String topic = "persistent://public/txn/duplicateMessageConsume";
    
    @Cleanup
    PulsarClient pulsarClient1 = PulsarClient.builder()
            .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
            .enableTransaction(true)
            .build();
    
    @Cleanup
    Producer<String> producer = pulsarClient1
            .newProducer(Schema.STRING)
            .topic(topic)
            .enableBatching(true)
            .batchingMaxMessages(5)
            .create();
    
    Consumer<String> consumer = pulsarClient1
            .newConsumer(Schema.STRING)
            .subscriptionType(SubscriptionType.Shared)
            .topic(topic)
            .subscriptionName("mySub")
            .subscribe();

    Transaction transaction = pulsarClient1.newTransaction()
            .withTransactionTimeout(5, TimeUnit.HOURS)
            .build()
            .get();

    // Send 10 messages
    for (int j = 0; j < 10; j++) {
        producer.newMessage()
                .value(("value-" + j))
                .sendAsync();
    }

    // Consume messages
    for (int j = 0; j < 20; j++) {
        Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
        if (receive == null) {
            break;
        }
        if (j == 0) {
            // Acknowledge first message in transaction, then abort the transaction
            consumer.acknowledgeAsync(receive.getMessageId(), transaction).get();
            System.out.println("receive1: " + new String(receive1.getData()) + ", msgId: " + receive1.getMessageId());
            transaction.abort().get();
            // sleep 2s to ensure dispath message before ack.
            Thread.sleep(2000);
        } else {
            // Acknowledge other messages normally
            consumer.acknowledge(receive1.getMessageId());
            System.out.println("receive: " + new String(receive.getData()) + ", msgId: " + receive.getMessageId());
        }
    }
}

and you can see the console output:

receive1: value-0, msgId: 12:0:-1:0
receive2: value-1, msgId: 12:0:-1:1
receive2: value-2, msgId: 12:0:-1:2
receive2: value-3, msgId: 12:0:-1:3
receive2: value-4, msgId: 12:0:-1:4
receive2: value-5, msgId: 12:1:-1:0
receive2: value-6, msgId: 12:1:-1:1
receive2: value-7, msgId: 12:1:-1:2
receive2: value-8, msgId: 12:1:-1:3
receive2: value-9, msgId: 12:1:-1:4
receive2: value-0, msgId: 12:0:-1:0 
receive2: value-1, msgId: 12:0:-1:1  # Duplicate!
receive2: value-2, msgId: 12:0:-1:2  # Duplicate!
receive2: value-3, msgId: 12:0:-1:3  # Duplicate!
receive2: value-4, msgId: 12:0:-1:4  # Duplicate!

Additional information

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions