Skip to content

Commit 5df06bd

Browse files
committed
fix: potential data race
1 parent 042bfcd commit 5df06bd

File tree

3 files changed

+38
-22
lines changed

3 files changed

+38
-22
lines changed

pulsar/consumer_multitopic_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@
1818
package pulsar
1919

2020
import (
21+
"context"
2122
"errors"
2223
"fmt"
2324
"strings"
2425
"testing"
2526
"time"
2627

28+
"github.com/stretchr/testify/assert"
29+
2730
"github.com/apache/pulsar-client-go/pulsar/internal"
2831
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2932
"github.com/apache/pulsar-client-go/pulsaradmin"
3033
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
3134
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
32-
"github.com/stretchr/testify/assert"
3335
)
3436

3537
func TestMultiTopicConsumerReceive(t *testing.T) {
@@ -330,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
330332
return nil
331333
}
332334

333-
func (dummyConnection) WriteData(_ internal.Buffer) {
335+
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
334336
}
335337

336338
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {

pulsar/internal/connection.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package internal
1919

2020
import (
21+
"context"
2122
"crypto/tls"
2223
"crypto/x509"
2324
"errors"
@@ -78,7 +79,7 @@ type ConnectionListener interface {
7879
type Connection interface {
7980
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
8081
SendRequestNoWait(req *pb.BaseCommand) error
81-
WriteData(data Buffer)
82+
WriteData(ctx context.Context, data Buffer)
8283
RegisterListener(id uint64, listener ConnectionListener) error
8384
UnregisterListener(id uint64)
8485
AddConsumeHandler(id uint64, handler ConsumerHandler) error
@@ -450,12 +451,14 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
450451
}
451452
}
452453

453-
func (c *connection) WriteData(data Buffer) {
454+
func (c *connection) WriteData(ctx context.Context, data Buffer) {
454455
select {
455456
case c.writeRequestsCh <- data:
456457
// Channel is not full
457458
return
458-
459+
case <-ctx.Done():
460+
c.log.Debug("Write data context cancelled")
461+
return
459462
default:
460463
// Channel full, fallback to probe if connection is closed
461464
}
@@ -465,7 +468,9 @@ func (c *connection) WriteData(data Buffer) {
465468
case c.writeRequestsCh <- data:
466469
// Successfully wrote on the channel
467470
return
468-
471+
case <-ctx.Done():
472+
c.log.Debug("Write data context cancelled")
473+
return
469474
case <-time.After(100 * time.Millisecond):
470475
// The channel is either:
471476
// 1. blocked, in which case we need to wait until we have space

pulsar/producer_partition.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
394394
pi.sentAt = time.Now()
395395
pi.Unlock()
396396
p.pendingQueue.Put(pi)
397-
p._getConn().WriteData(pi.buffer)
397+
p._getConn().WriteData(pi.ctx, pi.buffer)
398398

399399
if pi == lastViewItem {
400400
break
@@ -837,6 +837,8 @@ func (p *partitionProducer) internalSingleSend(
837837

838838
type pendingItem struct {
839839
sync.Mutex
840+
ctx context.Context
841+
cancel context.CancelFunc
840842
buffer internal.Buffer
841843
sequenceID uint64
842844
createdAt time.Time
@@ -846,6 +848,21 @@ type pendingItem struct {
846848
flushCallback func(err error)
847849
}
848850

851+
func (i *pendingItem) done(err error) {
852+
if i.isDone {
853+
return
854+
}
855+
i.isDone = true
856+
buffersPool.Put(i.buffer)
857+
if i.flushCallback != nil {
858+
i.flushCallback(err)
859+
}
860+
861+
if i.cancel != nil {
862+
i.cancel()
863+
}
864+
}
865+
849866
func (p *partitionProducer) internalFlushCurrentBatch() {
850867
if p.batchBuilder == nil {
851868
// batch is not enabled
@@ -895,14 +912,17 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
895912
return
896913
default:
897914
now := time.Now()
915+
ctx, cancel := context.WithCancel(context.Background())
898916
p.pendingQueue.Put(&pendingItem{
917+
ctx: ctx,
918+
cancel: cancel,
899919
createdAt: now,
900920
sentAt: now,
901921
buffer: buffer,
902922
sequenceID: sequenceID,
903923
sendRequests: callbacks,
904924
})
905-
p._getConn().WriteData(buffer)
925+
p._getConn().WriteData(ctx, buffer)
906926
}
907927
}
908928

@@ -1579,14 +1599,14 @@ type sendRequest struct {
15791599
uuid string
15801600
chunkRecorder *chunkRecorder
15811601

1582-
/// resource management
1602+
// resource management
15831603

15841604
memLimit internal.MemoryLimitController
15851605
reservedMem int64
15861606
semaphore internal.Semaphore
15871607
reservedSemaphore int
15881608

1589-
/// convey settable state
1609+
// convey settable state
15901610

15911611
sendAsBatch bool
15921612
transaction *transaction
@@ -1659,7 +1679,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
16591679
}
16601680

16611681
func (p *partitionProducer) blockIfQueueFull() bool {
1662-
//DisableBlockIfQueueFull == false means enable block
1682+
// DisableBlockIfQueueFull == false means enable block
16631683
return !p.options.DisableBlockIfQueueFull
16641684
}
16651685

@@ -1732,17 +1752,6 @@ type flushRequest struct {
17321752
err error
17331753
}
17341754

1735-
func (i *pendingItem) done(err error) {
1736-
if i.isDone {
1737-
return
1738-
}
1739-
i.isDone = true
1740-
buffersPool.Put(i.buffer)
1741-
if i.flushCallback != nil {
1742-
i.flushCallback(err)
1743-
}
1744-
}
1745-
17461755
// _setConn sets the internal connection field of this partition producer atomically.
17471756
// Note: should only be called by this partition producer when a new connection is available.
17481757
func (p *partitionProducer) _setConn(conn internal.Connection) {

0 commit comments

Comments
 (0)