@@ -30,6 +30,8 @@ import (
3030 "testing"
3131 "time"
3232
33+ "github.com/docker/docker/api/types/container"
34+ "github.com/docker/go-connections/nat"
3335 "github.com/stretchr/testify/require"
3436 "github.com/testcontainers/testcontainers-go"
3537 "github.com/testcontainers/testcontainers-go/wait"
@@ -2575,6 +2577,104 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) {
25752577 }, 30 * time .Second , 1 * time .Second )
25762578}
25772579
2580+ func TestProducerKeepReconnectingAndThenCallSendAsync (t * testing.T ) {
2581+ req := testcontainers.ContainerRequest {
2582+ Image : getPulsarTestImage (),
2583+ ExposedPorts : []string {"6650/tcp" , "8080/tcp" },
2584+ WaitingFor : wait .ForExposedPort (),
2585+ Cmd : []string {"bin/pulsar" , "standalone" , "-nfw" },
2586+ // use fixed port binding so that it can be reconnected after restart
2587+ HostConfigModifier : func (hostConfig * container.HostConfig ) {
2588+ hostConfig .PortBindings = map [nat.Port ][]nat.PortBinding {
2589+ "6650/tcp" : {{HostIP : "0.0.0.0" , HostPort : "6650" }},
2590+ "8080/tcp" : {{HostIP : "0.0.0.0" , HostPort : "8080" }},
2591+ }
2592+ },
2593+ }
2594+ c , err := testcontainers .GenericContainer (context .Background (), testcontainers.GenericContainerRequest {
2595+ ContainerRequest : req ,
2596+ Started : true ,
2597+ })
2598+ require .NoError (t , err , "Failed to start the pulsar container" )
2599+ endpoint , err := c .PortEndpoint (context .Background (), "6650" , "pulsar" )
2600+ require .NoError (t , err , "Failed to get the pulsar endpoint" )
2601+
2602+ client , err := NewClient (ClientOptions {
2603+ URL : endpoint ,
2604+ ConnectionTimeout : 5 * time .Second ,
2605+ OperationTimeout : 5 * time .Second ,
2606+ })
2607+ require .NoError (t , err )
2608+ defer client .Close ()
2609+
2610+ var testProducer Producer
2611+ require .Eventually (t , func () bool {
2612+ testProducer , err = client .CreateProducer (ProducerOptions {
2613+ Topic : newTopicName (),
2614+ Schema : NewBytesSchema (nil ),
2615+ SendTimeout : 3 * time .Second ,
2616+ })
2617+ return err == nil
2618+ }, 30 * time .Second , 1 * time .Second )
2619+
2620+ // send a message
2621+ errChan := make (chan error )
2622+ defer close (errChan )
2623+
2624+ testProducer .SendAsync (context .Background (), & ProducerMessage {
2625+ Payload : []byte ("test" ),
2626+ }, func (messageID MessageID , message * ProducerMessage , err error ) {
2627+ errChan <- err
2628+ })
2629+ // should success
2630+ err = <- errChan
2631+ require .NoError (t , err )
2632+
2633+ // stop pulsar server
2634+ timeout := 10 * time .Second
2635+ _ = c .Stop (context .Background (), & timeout )
2636+
2637+ // send again
2638+ testProducer .SendAsync (context .Background (), & ProducerMessage {
2639+ Payload : []byte ("test" ),
2640+ }, func (messageID MessageID , message * ProducerMessage , err error ) {
2641+ errChan <- err
2642+ })
2643+ // should get a timeout error
2644+ err = <- errChan
2645+ require .True (t , errors .Is (err , ErrSendTimeout ))
2646+
2647+ oldConn := testProducer .(* producer ).producers [0 ].(* partitionProducer )._getConn ()
2648+ // restart pulsar server
2649+ err = c .Start (context .Background ())
2650+ require .NoError (t , err )
2651+ defer c .Terminate (context .Background ())
2652+
2653+ // wait for reconnection success
2654+ waitTime := 0
2655+ for {
2656+ newConn := testProducer .(* producer ).producers [0 ].(* partitionProducer )._getConn ()
2657+ if oldConn != newConn {
2658+ break
2659+ }
2660+ time .Sleep (5 * time .Second )
2661+ waitTime += 5
2662+ if waitTime > 60 {
2663+ break
2664+ }
2665+ }
2666+
2667+ // send again
2668+ testProducer .SendAsync (context .Background (), & ProducerMessage {
2669+ Payload : []byte ("test" ),
2670+ }, func (messageID MessageID , message * ProducerMessage , err error ) {
2671+ errChan <- err
2672+ })
2673+ // should success
2674+ err = <- errChan
2675+ require .NoError (t , err )
2676+ }
2677+
25782678func TestSelectConnectionForSameProducer (t * testing.T ) {
25792679 topicName := newTopicName ()
25802680
0 commit comments