Skip to content

Commit f59a4b8

Browse files
committed
Fix: SendAsync callback was not invoked when producer is in reconnecting
1 parent 6c83f56 commit f59a4b8

File tree

2 files changed

+118
-24
lines changed

2 files changed

+118
-24
lines changed

pulsar/producer_partition.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ type partitionProducer struct {
110110
compressionProvider compression.Provider
111111

112112
// Channel where app is posting messages to be published
113-
dataChan chan *sendRequest
113+
dataChan chan *pendingItem
114114
cmdChan chan interface{}
115115
connectClosedCh chan *connectionClosed
116116
publishSemaphore internal.Semaphore
@@ -177,16 +177,15 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
177177

178178
ctx, cancelFunc := context.WithCancel(context.Background())
179179
p := &partitionProducer{
180-
client: client,
181-
topic: topic,
182-
log: logger,
183-
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
184-
options: options,
185-
producerID: client.rpcClient.NewProducerID(),
186-
dataChan: make(chan *sendRequest, maxPendingMessages),
187-
cmdChan: make(chan interface{}, 10),
188-
connectClosedCh: make(chan *connectionClosed, 1),
189-
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
180+
client: client,
181+
topic: topic,
182+
log: logger,
183+
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
184+
options: options,
185+
producerID: client.rpcClient.NewProducerID(),
186+
dataChan: make(chan *pendingItem, maxPendingMessages),
187+
cmdChan: make(chan interface{}, 10),
188+
connectClosedCh: make(chan *connectionClosed, 1),
190189
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
191190
compression.Level(options.CompressionLevel)),
192191
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
@@ -200,9 +199,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
200199
cancelFunc: cancelFunc,
201200
backOffPolicyFunc: boFunc,
202201
}
203-
if p.options.DisableBatching {
204-
p.batchFlushTicker.Stop()
205-
}
206202
p.setProducerState(producerInit)
207203

208204
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
@@ -219,7 +215,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
219215
}
220216
err := p.grabCnx("")
221217
if err != nil {
222-
p.batchFlushTicker.Stop()
223218
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
224219
return nil, err
225220
}
@@ -232,6 +227,11 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
232227
p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
233228
p.setProducerState(producerReady)
234229

230+
if !p.options.DisableBatching {
231+
p.batchFlushTicker = time.NewTicker(batchingMaxPublishDelay)
232+
go p.listenBatch()
233+
}
234+
235235
if p.options.SendTimeout > 0 {
236236
go p.failTimeoutMessages()
237237
}
@@ -558,6 +558,15 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
558558
})
559559
}
560560

561+
func (p *partitionProducer) listenBatch() {
562+
for {
563+
select {
564+
case <-p.batchFlushTicker.C:
565+
p.internalFlushCurrentBatch()
566+
}
567+
}
568+
}
569+
561570
func (p *partitionProducer) runEventsLoop() {
562571
for {
563572
select {
@@ -566,7 +575,7 @@ func (p *partitionProducer) runEventsLoop() {
566575
if !ok {
567576
return
568577
}
569-
p.internalSend(data)
578+
p.internalWriteData(data)
570579
case cmd, ok := <-p.cmdChan:
571580
// when doClose() is call, p.dataChan will be closed, cmd will be nil
572581
if !ok {
@@ -582,8 +591,6 @@ func (p *partitionProducer) runEventsLoop() {
582591
case connectionClosed := <-p.connectClosedCh:
583592
p.log.Info("runEventsLoop will reconnect in producer")
584593
p.reconnectToBroker(connectionClosed)
585-
case <-p.batchFlushTicker.C:
586-
p.internalFlushCurrentBatch()
587594
}
588595
}
589596
}
@@ -688,6 +695,10 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
688695
}
689696
}
690697

698+
func (p *partitionProducer) internalWriteData(item *pendingItem) {
699+
p._getConn().WriteData(item.ctx, item.buffer)
700+
}
701+
691702
func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
692703
uncompressedPayload []byte,
693704
request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
@@ -898,16 +909,17 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
898909
default:
899910
now := time.Now()
900911
ctx, cancel := context.WithCancel(context.Background())
901-
p.pendingQueue.Put(&pendingItem{
912+
item := pendingItem{
902913
ctx: ctx,
903914
cancel: cancel,
904915
createdAt: now,
905916
sentAt: now,
906917
buffer: buffer,
907918
sequenceID: sequenceID,
908919
sendRequests: callbacks,
909-
})
910-
p._getConn().WriteData(ctx, buffer)
920+
}
921+
p.pendingQueue.Put(&item)
922+
p.dataChan <- &item
911923
}
912924
}
913925

@@ -1077,7 +1089,7 @@ func (p *partitionProducer) clearPendingSendRequests() {
10771089
for i := 0; i < sizeBeforeFlushing; i++ {
10781090
select {
10791091
case pendingData := <-p.dataChan:
1080-
p.internalSend(pendingData)
1092+
p.internalWriteData(pendingData)
10811093

10821094
default:
10831095
return
@@ -1352,7 +1364,7 @@ func (p *partitionProducer) internalSendAsync(
13521364
return
13531365
}
13541366

1355-
p.dataChan <- sr
1367+
p.internalSend(sr)
13561368
}
13571369

13581370
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -1466,7 +1478,10 @@ func (p *partitionProducer) doClose(reason error) {
14661478

14671479
p.setProducerState(producerClosed)
14681480
p._getConn().UnregisterListener(p.producerID)
1469-
p.batchFlushTicker.Stop()
1481+
1482+
if p.batchFlushTicker != nil {
1483+
p.batchFlushTicker.Stop()
1484+
}
14701485
}
14711486

14721487
func (p *partitionProducer) failPendingMessages(err error) {

pulsar/producer_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
26052605

26062606
client.Close()
26072607
}
2608+
2609+
func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
2610+
testProducerKeepReconnectingAndThenCallSendAsync(t, false)
2611+
testProducerKeepReconnectingAndThenCallSendAsync(t, true)
2612+
}
2613+
2614+
func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, isEnabledBatching bool) {
2615+
t.Helper()
2616+
2617+
req := testcontainers.ContainerRequest{
2618+
Image: getPulsarTestImage(),
2619+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
2620+
WaitingFor: wait.ForExposedPort(),
2621+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
2622+
}
2623+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
2624+
ContainerRequest: req,
2625+
Started: true,
2626+
})
2627+
require.NoError(t, err, "Failed to start the pulsar container")
2628+
defer c.Terminate(context.Background())
2629+
2630+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
2631+
require.NoError(t, err, "Failed to get the pulsar endpoint")
2632+
2633+
client, err := NewClient(ClientOptions{
2634+
URL: endpoint,
2635+
ConnectionTimeout: 5 * time.Second,
2636+
OperationTimeout: 5 * time.Second,
2637+
})
2638+
require.NoError(t, err)
2639+
defer client.Close()
2640+
2641+
var testProducer Producer
2642+
require.Eventually(t, func() bool {
2643+
testProducer, err = client.CreateProducer(ProducerOptions{
2644+
Topic: newTopicName(),
2645+
Schema: NewBytesSchema(nil),
2646+
SendTimeout: 3 * time.Second,
2647+
DisableBatching: isEnabledBatching,
2648+
})
2649+
return err == nil
2650+
}, 30*time.Second, 1*time.Second)
2651+
2652+
// send a message
2653+
errChan := make(chan error)
2654+
defer close(errChan)
2655+
2656+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2657+
Payload: []byte("test"),
2658+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2659+
errChan <- err
2660+
})
2661+
select {
2662+
case <-time.After(10 * time.Second):
2663+
t.Fatal("test timeout")
2664+
case <-errChan:
2665+
// fine
2666+
}
2667+
2668+
// stop pulsar server
2669+
timeout := 10 * time.Second
2670+
err = c.Stop(context.Background(), &timeout)
2671+
require.NoError(t, err)
2672+
2673+
// send again
2674+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2675+
Payload: []byte("test"),
2676+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2677+
errChan <- err
2678+
})
2679+
select {
2680+
case <-time.After(10 * time.Second):
2681+
t.Fatal("test timeout")
2682+
case err = <-errChan:
2683+
// should get a timeout error
2684+
require.True(t, errors.Is(err, ErrSendTimeout))
2685+
}
2686+
}

0 commit comments

Comments
 (0)