Skip to content

Conversation

@intkuroky
Copy link

Motivation

This PR fixes an issue where sequenceID was generated at message creation time instead of send time, causing out-of-order sequence IDs when batching is enabled.
The sequenceID lifecycle (generation -> local storage -> network transmission) must be serialized to ensure consistency.

Modifications

  1. sequenceID generation is deferred until messages are actually processed (local storage & network transmission), and the generation method is changed to non-thread-safe since concurrent access is neither needed nor supported.
  2. During message batching, messages must be stored first with size estimation, then sequenceIDs are generated and assigned in batch before final processing. This is a direct consequence of the deferred sequenceID generation.

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to reproduce the ordering issue?
Flushing batch messages and publishing a single message should happen in the same event loop:

func (p *partitionProducer) runEventsLoop() {
for {
select {
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(connectionClosed)
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}

How could the out-of-order issue happen?

@intkuroky
Copy link
Author

intkuroky commented Sep 9, 2025

How could the out-of-order issue happen?

@RobertIndie
Yes, the sending operation is indeed executed within the same event loop. However, the generation of sequenceID does not occur within this event loop.
This PR is precisely to move the sequenceID generation step into the same event loop. It is precisely because the generation of sequenceID and message sending are not both in the same event loop that this leads to potential out-of-order issues.
For example:
`

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	BatchingMaxPublishDelay: time.Second, // Let the batch operation occur after the single message operation
})
producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("batch messages"),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
	// id: -1:-1:0
})

time.Sleep(5e8) // Ensure that genSingleMessageMetadataInBatch is executed first with sequenceID set to 1

// 1. genSingleMessageMetadataInBatch -> updateSingleMessageMetadataSeqID -> sequenceID = 1

producer.SendAsync(context.Background(), &pulsar.ProducerMessage{
	Payload:   []byte("single message"),
	DeliverAt: time.Now(),
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
	//
})
// 2. updateMetaData -> sequenceID = 2
// 3. publishing a single message (payload:"single message" sequenceID:2)
// 4. 1s interval ticker fires -> Flushing batch messages (payload:"batch messages" sequenceID:1)`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants