Skip to content

Commit 7bbb5b2

Browse files
authored
Fix close blocked (#1308)
1 parent 95232de commit 7bbb5b2

File tree

3 files changed

+52
-8
lines changed

3 files changed

+52
-8
lines changed

pulsar/consumer_partition.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ type partitionConsumer struct {
190190

191191
dispatcherSeekingControlCh chan struct{}
192192
isSeeking atomic.Bool
193+
ctx context.Context
194+
cancelFunc context.CancelFunc
193195
}
194196

195197
// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
@@ -344,6 +346,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
344346
boFunc = backoff.NewDefaultBackoff
345347
}
346348

349+
ctx, cancelFunc := context.WithCancel(context.Background())
347350
pc := &partitionConsumer{
348351
parentConsumer: parent,
349352
client: client,
@@ -367,6 +370,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
367370
schemaInfoCache: newSchemaInfoCache(client, options.topic),
368371
backoffPolicyFunc: boFunc,
369372
dispatcherSeekingControlCh: make(chan struct{}),
373+
ctx: ctx,
374+
cancelFunc: cancelFunc,
370375
}
371376
if pc.options.autoReceiverQueueSize {
372377
pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -938,6 +943,8 @@ func (pc *partitionConsumer) Close() {
938943
return
939944
}
940945

946+
pc.cancelFunc()
947+
941948
// flush all pending ACK requests and terminate the timer goroutine
942949
pc.ackGroupingTracker.close()
943950

@@ -1866,7 +1873,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
18661873

18671874
return struct{}{}, err
18681875
}
1869-
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
1876+
_, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
18701877
delayReconnectTime := bo.Next()
18711878
pc.log.WithFields(log.Fields{
18721879
"assignedBrokerURL": assignedBrokerURL,

pulsar/consumer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import (
3030
"testing"
3131
"time"
3232

33+
"github.com/stretchr/testify/require"
34+
"github.com/testcontainers/testcontainers-go"
35+
"github.com/testcontainers/testcontainers-go/wait"
36+
3337
"github.com/apache/pulsar-client-go/pulsar/backoff"
3438

3539
"github.com/apache/pulsar-client-go/pulsaradmin"
@@ -4940,3 +4944,42 @@ func TestAckResponseNotBlocked(t *testing.T) {
49404944
}
49414945
}
49424946
}
4947+
4948+
func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
4949+
req := testcontainers.ContainerRequest{
4950+
Image: getPulsarTestImage(),
4951+
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
4952+
WaitingFor: wait.ForExposedPort(),
4953+
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
4954+
}
4955+
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
4956+
ContainerRequest: req,
4957+
Started: true,
4958+
})
4959+
require.NoError(t, err, "Failed to start the pulsar container")
4960+
endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
4961+
require.NoError(t, err, "Failed to get the pulsar endpoint")
4962+
4963+
client, err := NewClient(ClientOptions{
4964+
URL: endpoint,
4965+
ConnectionTimeout: 5 * time.Second,
4966+
OperationTimeout: 5 * time.Second,
4967+
})
4968+
require.NoError(t, err)
4969+
defer client.Close()
4970+
4971+
var testConsumer Consumer
4972+
require.Eventually(t, func() bool {
4973+
testConsumer, err = client.Subscribe(ConsumerOptions{
4974+
Topic: newTopicName(),
4975+
Schema: NewBytesSchema(nil),
4976+
SubscriptionName: "test-sub",
4977+
})
4978+
return err == nil
4979+
}, 30*time.Second, 1*time.Second)
4980+
_ = c.Terminate(context.Background())
4981+
require.Eventually(t, func() bool {
4982+
testConsumer.Close()
4983+
return true
4984+
}, 30*time.Second, 1*time.Second)
4985+
}

pulsar/producer_partition.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -491,12 +491,6 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
491491
return struct{}{}, nil
492492
}
493493

494-
select {
495-
case <-p.ctx.Done():
496-
return struct{}{}, nil
497-
default:
498-
}
499-
500494
if p.getProducerState() != producerReady {
501495
// Producer is already closing
502496
p.log.Info("producer state not ready, exit reconnect")
@@ -552,7 +546,7 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
552546

553547
return struct{}{}, err
554548
}
555-
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
549+
_, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
556550
delayReconnectTime := bo.Next()
557551
p.log.WithFields(log.Fields{
558552
"assignedBrokerURL": assignedBrokerURL,

0 commit comments

Comments
 (0)