Skip to content

Commit d207f52

Browse files
committed
Fix batch race
1 parent f59a4b8 commit d207f52

File tree

1 file changed

+69
-34
lines changed

1 file changed

+69
-34
lines changed

pulsar/producer_partition.go

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ type partitionProducer struct {
112112
// Channel where app is posting messages to be published
113113
dataChan chan *pendingItem
114114
cmdChan chan interface{}
115+
batchChan chan interface{}
115116
connectClosedCh chan *connectionClosed
116117
publishSemaphore internal.Semaphore
117118
pendingQueue internal.BlockingQueue
@@ -228,6 +229,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
228229
p.setProducerState(producerReady)
229230

230231
if !p.options.DisableBatching {
232+
p.batchChan = make(chan interface{})
231233
p.batchFlushTicker = time.NewTicker(batchingMaxPublishDelay)
232234
go p.listenBatch()
233235
}
@@ -562,7 +564,47 @@ func (p *partitionProducer) listenBatch() {
562564
for {
563565
select {
564566
case <-p.batchFlushTicker.C:
565-
p.internalFlushCurrentBatch()
567+
p.internalFlushCurrentBatch(false)
568+
case req, ok := <-p.batchChan:
569+
if !ok {
570+
return
571+
}
572+
573+
switch req.(type) {
574+
case *sendRequest:
575+
sr := req.(*sendRequest)
576+
smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
577+
multiSchemaEnabled := !p.options.DisableMultiSchema
578+
579+
added := addRequestToBatch(
580+
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
581+
if !added {
582+
// The current batch is full. flush it and retry
583+
p.internalFlushCurrentBatch(false)
584+
585+
// after flushing try again to add the current payload
586+
ok := addRequestToBatch(
587+
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
588+
if !ok {
589+
p.log.WithField("size", sr.uncompressedSize).
590+
WithField("properties", sr.msg.Properties).
591+
Error("unable to add message to batch")
592+
sr.done(nil, ErrFailAddToBatch)
593+
continue
594+
}
595+
}
596+
if sr.flushImmediately {
597+
p.internalFlushCurrentBatch(false)
598+
continue
599+
}
600+
case *flushRequest:
601+
fr := req.(*flushRequest)
602+
// Flush request happened on the event loop.
603+
p.internalFlushCurrentBatch(true)
604+
fr.doneCh <- struct{}{}
605+
default:
606+
p.log.Error("Unknown request type")
607+
}
566608
}
567609
}
568610
}
@@ -615,30 +657,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
615657
p.log.Debug("Received send request: ", *sr.msg)
616658

617659
if sr.sendAsBatch {
618-
smm := p.genSingleMessageMetadataInBatch(sr.msg, int(sr.uncompressedSize))
619-
multiSchemaEnabled := !p.options.DisableMultiSchema
620-
621-
added := addRequestToBatch(
622-
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
623-
if !added {
624-
// The current batch is full. flush it and retry
625-
p.internalFlushCurrentBatch()
626-
627-
// after flushing try again to add the current payload
628-
ok := addRequestToBatch(
629-
smm, p, sr.uncompressedPayload, sr, sr.msg, sr.deliverAt, sr.schemaVersion, multiSchemaEnabled)
630-
if !ok {
631-
p.log.WithField("size", sr.uncompressedSize).
632-
WithField("properties", sr.msg.Properties).
633-
Error("unable to add message to batch")
634-
sr.done(nil, ErrFailAddToBatch)
635-
return
636-
}
637-
}
638-
639-
if sr.flushImmediately {
640-
p.internalFlushCurrentBatch()
641-
}
660+
p.batchChan <- sr
642661
return
643662
}
644663

@@ -843,7 +862,7 @@ func (p *partitionProducer) internalSingleSend(
843862
return
844863
}
845864

846-
p.writeData(buffer, sid, []interface{}{sr})
865+
p.writeData(buffer, sid, []interface{}{sr}, false)
847866
}
848867

849868
type pendingItem struct {
@@ -859,7 +878,7 @@ type pendingItem struct {
859878
flushCallback func(err error)
860879
}
861880

862-
func (p *partitionProducer) internalFlushCurrentBatch() {
881+
func (p *partitionProducer) internalFlushCurrentBatch(inEventLoop bool) {
863882
if p.batchBuilder == nil {
864883
// batch is not enabled
865884
// the batch flush ticker should be stopped but it might still called once
@@ -868,7 +887,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
868887
return
869888
}
870889
if p.batchBuilder.IsMultiBatches() {
871-
p.internalFlushCurrentBatches()
890+
p.internalFlushCurrentBatches(inEventLoop)
872891
return
873892
}
874893

@@ -894,10 +913,11 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
894913
return
895914
}
896915

897-
p.writeData(batchData, sequenceID, callbacks)
916+
p.writeData(batchData, sequenceID, callbacks, inEventLoop)
898917
}
899918

900-
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
919+
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{},
920+
inEventLoop bool) {
901921
select {
902922
case <-p.ctx.Done():
903923
for _, cb := range callbacks {
@@ -919,7 +939,12 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
919939
sendRequests: callbacks,
920940
}
921941
p.pendingQueue.Put(&item)
922-
p.dataChan <- &item
942+
if inEventLoop {
943+
// When the flush request is called on the event loop, we can directly write the data.
944+
p.internalWriteData(&item)
945+
} else {
946+
p.dataChan <- &item
947+
}
923948
}
924949
}
925950

@@ -1015,7 +1040,7 @@ func (p *partitionProducer) failTimeoutMessages() {
10151040
}
10161041
}
10171042

1018-
func (p *partitionProducer) internalFlushCurrentBatches() {
1043+
func (p *partitionProducer) internalFlushCurrentBatches(inEventLoop bool) {
10191044
flushBatches := p.batchBuilder.FlushBatches()
10201045
if flushBatches == nil {
10211046
return
@@ -1041,7 +1066,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
10411066
if b.BatchData == nil {
10421067
continue
10431068
}
1044-
p.writeData(b.BatchData, b.SequenceID, b.Callbacks)
1069+
p.writeData(b.BatchData, b.SequenceID, b.Callbacks, inEventLoop)
10451070
}
10461071

10471072
}
@@ -1050,7 +1075,17 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
10501075
p.clearPendingSendRequests()
10511076

10521077
if !p.options.DisableBatching {
1053-
p.internalFlushCurrentBatch()
1078+
// Flush batch
1079+
batchFlushRequest := &flushRequest{
1080+
doneCh: make(chan struct{}),
1081+
}
1082+
p.batchChan <- batchFlushRequest
1083+
<-batchFlushRequest.doneCh
1084+
if batchFlushRequest.err != nil {
1085+
fr.err = batchFlushRequest.err
1086+
close(fr.doneCh)
1087+
return
1088+
}
10541089
}
10551090

10561091
pi, ok := p.pendingQueue.PeekLast().(*pendingItem)

0 commit comments

Comments
 (0)