Skip to content

Commit c18f605

Browse files
committed
Fix: SendAsync callback was not invoked when producer is in reconnecting
1 parent 274a10f commit c18f605

File tree

2 files changed

+102
-1
lines changed

2 files changed

+102
-1
lines changed

pulsar/producer_partition.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,8 @@ func (p *partitionProducer) runEventsLoop() {
581581
}
582582
case connectionClosed := <-p.connectClosedCh:
583583
p.log.Info("runEventsLoop will reconnect in producer")
584-
p.reconnectToBroker(connectionClosed)
584+
// reconnect to broker in a new goroutine so that it won't block the event loop, see issue #1332
585+
go p.reconnectToBroker(connectionClosed)
585586
case <-p.batchFlushTicker.C:
586587
p.internalFlushCurrentBatch()
587588
}

pulsar/producer_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"testing"
3131
"time"
3232

33+
"github.com/docker/docker/api/types/container"
34+
"github.com/docker/go-connections/nat"
3335
"github.com/stretchr/testify/require"
3436
"github.com/testcontainers/testcontainers-go"
3537
"github.com/testcontainers/testcontainers-go/wait"
@@ -2575,6 +2577,104 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) {
25752577
}, 30*time.Second, 1*time.Second)
25762578
}
25772579

2580+
func TestProducerKeepReconnectingAndThenCallSendAsync(t *testing.T) {
2581+
req := testcontainers.ContainerRequest{
2582+
Image: getPulsarTestImage(),
2583+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
2584+
WaitingFor: wait.ForExposedPort(),
2585+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
2586+
// use fixed port binding so that it can be reconnected after restart
2587+
HostConfigModifier: func(hostConfig *container.HostConfig) {
2588+
hostConfig.PortBindings = map[nat.Port][]nat.PortBinding{
2589+
"6650/tcp": {{HostIP: "0.0.0.0", HostPort: "6650"}},
2590+
"8080/tcp": {{HostIP: "0.0.0.0", HostPort: "8080"}},
2591+
}
2592+
},
2593+
}
2594+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
2595+
ContainerRequest: req,
2596+
Started: true,
2597+
})
2598+
require.NoError(t, err, "Failed to start the pulsar container")
2599+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
2600+
require.NoError(t, err, "Failed to get the pulsar endpoint")
2601+
2602+
client, err := NewClient(ClientOptions{
2603+
URL: endpoint,
2604+
ConnectionTimeout: 5 * time.Second,
2605+
OperationTimeout: 5 * time.Second,
2606+
})
2607+
require.NoError(t, err)
2608+
defer client.Close()
2609+
2610+
var testProducer Producer
2611+
require.Eventually(t, func() bool {
2612+
testProducer, err = client.CreateProducer(ProducerOptions{
2613+
Topic: newTopicName(),
2614+
Schema: NewBytesSchema(nil),
2615+
SendTimeout: 3 * time.Second,
2616+
})
2617+
return err == nil
2618+
}, 30*time.Second, 1*time.Second)
2619+
2620+
// send a message
2621+
errChan := make(chan error)
2622+
defer close(errChan)
2623+
2624+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2625+
Payload: []byte("test"),
2626+
}, func(messageID MessageID, message *ProducerMessage, err error) {
2627+
errChan <- err
2628+
})
2629+
// should success
2630+
err = <-errChan
2631+
require.NoError(t, err)
2632+
2633+
// stop pulsar server
2634+
timeout := 10 * time.Second
2635+
_ = c.Stop(context.Background(), &timeout)
2636+
2637+
// send again
2638+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2639+
Payload: []byte("test"),
2640+
}, func(messageID MessageID, message *ProducerMessage, err error) {
2641+
errChan <- err
2642+
})
2643+
// should get a timeout error
2644+
err = <-errChan
2645+
require.True(t, errors.Is(err, ErrSendTimeout))
2646+
2647+
oldConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn()
2648+
// restart pulsar server
2649+
err = c.Start(context.Background())
2650+
require.NoError(t, err)
2651+
defer c.Terminate(context.Background())
2652+
2653+
// wait for reconnection success
2654+
waitTime := 0
2655+
for {
2656+
newConn := testProducer.(*producer).producers[0].(*partitionProducer)._getConn()
2657+
if oldConn != newConn {
2658+
break
2659+
}
2660+
time.Sleep(5 * time.Second)
2661+
waitTime += 5
2662+
if waitTime > 60 {
2663+
break
2664+
}
2665+
}
2666+
2667+
// send again
2668+
testProducer.SendAsync(context.Background(), &ProducerMessage{
2669+
Payload: []byte("test"),
2670+
}, func(messageID MessageID, message *ProducerMessage, err error) {
2671+
errChan <- err
2672+
})
2673+
// should success
2674+
err = <-errChan
2675+
require.NoError(t, err)
2676+
}
2677+
25782678
func TestSelectConnectionForSameProducer(t *testing.T) {
25792679
topicName := newTopicName()
25802680

0 commit comments

Comments
 (0)