Skip to content

Commit 3807a17

Browse files
committed
Fix SendAsync won't be timeout during producer reconnection
1 parent 0ab28c2 commit 3807a17

File tree

2 files changed

+133
-0
lines changed

2 files changed

+133
-0
lines changed

pulsar/producer_partition.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
907907
sequenceID: sequenceID,
908908
sendRequests: callbacks,
909909
})
910+
for _, cb := range callbacks {
911+
if sr, ok := cb.(*sendRequest); ok {
912+
if sr.enqueued != nil {
913+
close(sr.enqueued)
914+
}
915+
}
916+
}
910917
p._getConn().WriteData(ctx, buffer)
911918
}
912919
}
@@ -1312,6 +1319,7 @@ func (p *partitionProducer) internalSendAsync(
13121319
flushImmediately: flushImmediately,
13131320
publishTime: time.Now(),
13141321
chunkID: -1,
1322+
enqueued: make(chan struct{}),
13151323
}
13161324

13171325
if err := p.prepareTransaction(sr); err != nil {
@@ -1353,6 +1361,17 @@ func (p *partitionProducer) internalSendAsync(
13531361
}
13541362

13551363
p.dataChan <- sr
1364+
select {
1365+
case <-sr.enqueued:
1366+
case <-ctx.Done():
1367+
err := ctx.Err()
1368+
if errors.Is(err, context.DeadlineExceeded) { // Convert DeadlineExceeded error to ErrSendTimeout
1369+
err = ErrSendTimeout
1370+
}
1371+
sr.callbackOnce.Do(func() {
1372+
runCallback(callback, nil, msg, err)
1373+
})
1374+
}
13561375
}
13571376

13581377
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
@@ -1583,6 +1602,7 @@ type sendRequest struct {
15831602
chunkID int
15841603
uuid string
15851604
chunkRecorder *chunkRecorder
1605+
enqueued chan struct{}
15861606

15871607
// resource management
15881608

pulsar/producer_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2605,3 +2605,116 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
26052605

26062606
client.Close()
26072607
}
2608+
2609+
func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
2610+
testSendAsyncCouldTimeoutWhileReconnecting(t, false)
2611+
testSendAsyncCouldTimeoutWhileReconnecting(t, true)
2612+
}
2613+
2614+
func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) {
2615+
t.Helper()
2616+
2617+
req := testcontainers.ContainerRequest{
2618+
Image: getPulsarTestImage(),
2619+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
2620+
WaitingFor: wait.ForExposedPort(),
2621+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
2622+
}
2623+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
2624+
ContainerRequest: req,
2625+
Started: true,
2626+
})
2627+
require.NoError(t, err, "Failed to start the pulsar container")
2628+
defer func() {
2629+
err := c.Terminate(context.Background())
2630+
if err != nil {
2631+
t.Fatal("Failed to terminate the pulsar container", err)
2632+
}
2633+
}()
2634+
2635+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
2636+
require.NoError(t, err, "Failed to get the pulsar endpoint")
2637+
2638+
client, err := NewClient(ClientOptions{
2639+
URL: endpoint,
2640+
ConnectionTimeout: 5 * time.Second,
2641+
OperationTimeout: 5 * time.Second,
2642+
})
2643+
require.NoError(t, err)
2644+
defer client.Close()
2645+
2646+
var testProducer Producer
2647+
require.Eventually(t, func() bool {
2648+
testProducer, err = client.CreateProducer(ProducerOptions{
2649+
Topic: newTopicName(),
2650+
Schema: NewBytesSchema(nil),
2651+
SendTimeout: 3 * time.Second,
2652+
DisableBatching: isDisableBatching,
2653+
BatchingMaxMessages: 5,
2654+
MaxPendingMessages: 10,
2655+
})
2656+
return err == nil
2657+
}, 30*time.Second, 1*time.Second)
2658+
2659+
numMessages := 10
2660+
// Send 10 messages synchronously
2661+
for i := 0; i < numMessages; i++ {
2662+
send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")})
2663+
require.NoError(t, err)
2664+
require.NotNil(t, send)
2665+
}
2666+
2667+
// stop pulsar server
2668+
timeout := 10 * time.Second
2669+
err = c.Stop(context.Background(), &timeout)
2670+
require.NoError(t, err)
2671+
2672+
// Test the SendAsync could be timeout if the producer is reconnecting
2673+
2674+
finalErr := make(chan error, 1)
2675+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2676+
defer cancel()
2677+
testProducer.SendAsync(ctx, &ProducerMessage{
2678+
Payload: []byte("test"),
2679+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2680+
finalErr <- err
2681+
})
2682+
select {
2683+
case <-time.After(10 * time.Second):
2684+
t.Fatal("test timeout")
2685+
case err = <-finalErr:
2686+
// should get a timeout error
2687+
require.ErrorIs(t, err, ErrSendTimeout)
2688+
}
2689+
close(finalErr)
2690+
2691+
// Test that the SendAsync could be timeout if the pending queue is full
2692+
2693+
go func() {
2694+
// Send 10 messages asynchronously to make the pending queue full
2695+
errs := make(chan error, numMessages)
2696+
for i := 0; i < numMessages; i++ {
2697+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2698+
Payload: []byte("test"),
2699+
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
2700+
errs <- err
2701+
})
2702+
}
2703+
}()
2704+
2705+
time.Sleep(3 * time.Second)
2706+
finalErr = make(chan error, 1)
2707+
testProducer.SendAsync(ctx, &ProducerMessage{
2708+
Payload: []byte("test"),
2709+
}, func(_ MessageID, _ *ProducerMessage, err error) {
2710+
finalErr <- err
2711+
})
2712+
select {
2713+
case <-time.After(10 * time.Second):
2714+
t.Fatal("test timeout")
2715+
case err = <-finalErr:
2716+
// should get a timeout error
2717+
require.ErrorIs(t, err, ErrSendTimeout)
2718+
}
2719+
close(finalErr)
2720+
}

0 commit comments

Comments
 (0)