Skip to content

Commit 687c80a

Browse files
committed
Fix: SendAsync callback was not invoked when producer is in reconnecting
1 parent 712e14c commit 687c80a

File tree

2 files changed

+115
-21
lines changed

2 files changed

+115
-21
lines changed

pulsar/producer_partition.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ type partitionProducer struct {
110110
compressionProvider compression.Provider
111111

112112
// Channel where app is posting messages to be published
113+
writeChan chan *pendingItem
113114
dataChan chan *sendRequest
114115
cmdChan chan interface{}
115116
connectClosedCh chan *connectionClosed
@@ -184,6 +185,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
184185
options: options,
185186
producerID: client.rpcClient.NewProducerID(),
186187
dataChan: make(chan *sendRequest, maxPendingMessages),
188+
writeChan: make(chan *pendingItem),
187189
cmdChan: make(chan interface{}, 10),
188190
connectClosedCh: make(chan *connectionClosed, 1),
189191
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
@@ -559,31 +561,42 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
559561
}
560562

561563
func (p *partitionProducer) runEventsLoop() {
564+
go func() {
565+
for {
566+
select {
567+
case data, ok := <-p.dataChan:
568+
if !ok {
569+
return
570+
}
571+
p.internalSend(data)
572+
case <-p.batchFlushTicker.C:
573+
p.internalFlushCurrentBatch()
574+
case cmd, ok := <-p.cmdChan:
575+
// when doClose() is call, p.dataChan will be closed, cmd will be nil
576+
if !ok {
577+
return
578+
}
579+
switch v := cmd.(type) {
580+
case *flushRequest:
581+
p.internalFlush(v)
582+
case *closeProducer:
583+
p.internalClose(v)
584+
return
585+
}
586+
}
587+
}
588+
}()
589+
562590
for {
563591
select {
564-
case data, ok := <-p.dataChan:
565-
// when doClose() is call, p.dataChan will be closed, data will be nil
592+
case pi, ok := <-p.writeChan:
566593
if !ok {
567594
return
568595
}
569-
p.internalSend(data)
570-
case cmd, ok := <-p.cmdChan:
571-
// when doClose() is call, p.dataChan will be closed, cmd will be nil
572-
if !ok {
573-
return
574-
}
575-
switch v := cmd.(type) {
576-
case *flushRequest:
577-
p.internalFlush(v)
578-
case *closeProducer:
579-
p.internalClose(v)
580-
return
581-
}
596+
p._getConn().WriteData(pi.ctx, pi.buffer)
582597
case connectionClosed := <-p.connectClosedCh:
583598
p.log.Info("runEventsLoop will reconnect in producer")
584599
p.reconnectToBroker(connectionClosed)
585-
case <-p.batchFlushTicker.C:
586-
p.internalFlushCurrentBatch()
587600
}
588601
}
589602
}
@@ -898,16 +911,17 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
898911
default:
899912
now := time.Now()
900913
ctx, cancel := context.WithCancel(context.Background())
901-
p.pendingQueue.Put(&pendingItem{
914+
item := pendingItem{
902915
ctx: ctx,
903916
cancel: cancel,
904917
createdAt: now,
905918
sentAt: now,
906919
buffer: buffer,
907920
sequenceID: sequenceID,
908921
sendRequests: callbacks,
909-
})
910-
p._getConn().WriteData(ctx, buffer)
922+
}
923+
p.pendingQueue.Put(&item)
924+
p.writeChan <- &item
911925
}
912926
}
913927

@@ -1443,6 +1457,7 @@ func (p *partitionProducer) doClose(reason error) {
14431457

14441458
p.log.Info("Closing producer")
14451459
defer close(p.dataChan)
1460+
defer close(p.writeChan)
14461461
defer close(p.cmdChan)
14471462

14481463
id := p.client.rpcClient.NewRequestID()

pulsar/producer_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2345,7 +2345,7 @@ func TestProducerSendWithContext(t *testing.T) {
23452345
// Make ctx be canceled to invalidate the context immediately
23462346
cancel()
23472347
_, err = producer.Send(ctx, &ProducerMessage{
2348-
Payload: make([]byte, 1024*1024),
2348+
Payload: make([]byte, 1024),
23492349
})
23502350
// producer.Send should fail and return err context.Canceled
23512351
assert.True(t, errors.Is(err, context.Canceled))
@@ -2605,3 +2605,82 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
26052605

26062606
client.Close()
26072607
}
2608+
2609+
func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
2610+
testProducerKeepReconnectingAndThenCallSendAsync(t, false)
2611+
testProducerKeepReconnectingAndThenCallSendAsync(t, true)
2612+
}
2613+
2614+
func testProducerKeepReconnectingAndThenCallSendAsync(t *testing.T, isEnabledBatching bool) {
2615+
t.Helper()
2616+
2617+
req := testcontainers.ContainerRequest{
2618+
Image: getPulsarTestImage(),
2619+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
2620+
WaitingFor: wait.ForExposedPort(),
2621+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
2622+
}
2623+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
2624+
ContainerRequest: req,
2625+
Started: true,
2626+
})
2627+
require.NoError(t, err, "Failed to start the pulsar container")
2628+
defer c.Terminate(context.Background())
2629+
2630+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
2631+
require.NoError(t, err, "Failed to get the pulsar endpoint")
2632+
2633+
client, err := NewClient(ClientOptions{
2634+
URL: endpoint,
2635+
ConnectionTimeout: 5 * time.Second,
2636+
OperationTimeout: 5 * time.Second,
2637+
})
2638+
require.NoError(t, err)
2639+
defer client.Close()
2640+
2641+
var testProducer Producer
2642+
require.Eventually(t, func() bool {
2643+
testProducer, err = client.CreateProducer(ProducerOptions{
2644+
Topic: newTopicName(),
2645+
Schema: NewBytesSchema(nil),
2646+
SendTimeout: 3 * time.Second,
2647+
DisableBatching: isEnabledBatching,
2648+
})
2649+
return err == nil
2650+
}, 30*time.Second, 1*time.Second)
2651+
2652+
// send a message
2653+
errChan := make(chan error)
2654+
defer close(errChan)
2655+
2656+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2657+
Payload: []byte("test"),
2658+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2659+
errChan <- err
2660+
})
2661+
select {
2662+
case <-time.After(10 * time.Second):
2663+
t.Fatal("test timeout")
2664+
case err := <-errChan:
2665+
require.NoError(t, err)
2666+
}
2667+
2668+
// stop pulsar server
2669+
timeout := 10 * time.Second
2670+
err = c.Stop(context.Background(), &timeout)
2671+
require.NoError(t, err)
2672+
2673+
// send again
2674+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2675+
Payload: []byte("test"),
2676+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2677+
errChan <- err
2678+
})
2679+
select {
2680+
case <-time.After(10 * time.Second):
2681+
t.Fatal("test timeout")
2682+
case err = <-errChan:
2683+
// should get a timeout error
2684+
require.ErrorIs(t, err, ErrSendTimeout)
2685+
}
2686+
}

0 commit comments

Comments
 (0)