Skip to content

Commit 274a10f

Browse files
authored
Fix: Potential data race (#1338)
* fix: potential data race * stop writing if ctx is done * pass a not nil context * check nil ctx * revert * delete ctx nil check * revert pendingItem.done() to its old position
1 parent ba83732 commit 274a10f

File tree

3 files changed

+50
-23
lines changed

3 files changed

+50
-23
lines changed

pulsar/consumer_multitopic_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@
1818
package pulsar
1919

2020
import (
21+
"context"
2122
"errors"
2223
"fmt"
2324
"strings"
2425
"testing"
2526
"time"
2627

28+
"github.com/stretchr/testify/assert"
29+
2730
"github.com/apache/pulsar-client-go/pulsar/internal"
2831
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2932
"github.com/apache/pulsar-client-go/pulsaradmin"
3033
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
3134
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
32-
"github.com/stretchr/testify/assert"
3335
)
3436

3537
func TestMultiTopicConsumerReceive(t *testing.T) {
@@ -330,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
330332
return nil
331333
}
332334

333-
func (dummyConnection) WriteData(_ internal.Buffer) {
335+
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
334336
}
335337

336338
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {

pulsar/internal/connection.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package internal
1919

2020
import (
21+
"context"
2122
"crypto/tls"
2223
"crypto/x509"
2324
"errors"
@@ -78,7 +79,7 @@ type ConnectionListener interface {
7879
type Connection interface {
7980
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
8081
SendRequestNoWait(req *pb.BaseCommand) error
81-
WriteData(data Buffer)
82+
WriteData(ctx context.Context, data Buffer)
8283
RegisterListener(id uint64, listener ConnectionListener) error
8384
UnregisterListener(id uint64)
8485
AddConsumeHandler(id uint64, handler ConsumerHandler) error
@@ -129,6 +130,11 @@ type request struct {
129130
callback func(command *pb.BaseCommand, err error)
130131
}
131132

133+
type dataRequest struct {
134+
ctx context.Context
135+
data Buffer
136+
}
137+
132138
type connection struct {
133139
started int32
134140
connectionTimeout time.Duration
@@ -157,7 +163,7 @@ type connection struct {
157163
incomingRequestsCh chan *request
158164
closeCh chan struct{}
159165
readyCh chan struct{}
160-
writeRequestsCh chan Buffer
166+
writeRequestsCh chan *dataRequest
161167

162168
pendingLock sync.Mutex
163169
pendingReqs map[uint64]*request
@@ -209,7 +215,7 @@ func newConnection(opts connectionOptions) *connection {
209215
// partition produces writing on a single connection. In general it's
210216
// good to keep this above the number of partition producers assigned
211217
// to a single connection.
212-
writeRequestsCh: make(chan Buffer, 256),
218+
writeRequestsCh: make(chan *dataRequest, 256),
213219
listeners: make(map[uint64]ConnectionListener),
214220
consumerHandlers: make(map[uint64]ConsumerHandler),
215221
metrics: opts.metrics,
@@ -421,11 +427,11 @@ func (c *connection) run() {
421427
return // TODO: this never gonna be happen
422428
}
423429
c.internalSendRequest(req)
424-
case data := <-c.writeRequestsCh:
425-
if data == nil {
430+
case req := <-c.writeRequestsCh:
431+
if req == nil {
426432
return
427433
}
428-
c.internalWriteData(data)
434+
c.internalWriteData(req.ctx, req.data)
429435

430436
case <-pingSendTicker.C:
431437
c.sendPing()
@@ -450,22 +456,26 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
450456
}
451457
}
452458

453-
func (c *connection) WriteData(data Buffer) {
459+
func (c *connection) WriteData(ctx context.Context, data Buffer) {
454460
select {
455-
case c.writeRequestsCh <- data:
461+
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
456462
// Channel is not full
457463
return
458-
464+
case <-ctx.Done():
465+
c.log.Debug("Write data context cancelled")
466+
return
459467
default:
460468
// Channel full, fallback to probe if connection is closed
461469
}
462470

463471
for {
464472
select {
465-
case c.writeRequestsCh <- data:
473+
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
466474
// Successfully wrote on the channel
467475
return
468-
476+
case <-ctx.Done():
477+
c.log.Debug("Write data context cancelled")
478+
return
469479
case <-time.After(100 * time.Millisecond):
470480
// The channel is either:
471481
// 1. blocked, in which case we need to wait until we have space
@@ -481,11 +491,17 @@ func (c *connection) WriteData(data Buffer) {
481491

482492
}
483493

484-
func (c *connection) internalWriteData(data Buffer) {
494+
func (c *connection) internalWriteData(ctx context.Context, data Buffer) {
485495
c.log.Debug("Write data: ", data.ReadableBytes())
486-
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
487-
c.log.WithError(err).Warn("Failed to write on connection")
488-
c.Close()
496+
497+
select {
498+
case <-ctx.Done():
499+
return
500+
default:
501+
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
502+
c.log.WithError(err).Warn("Failed to write on connection")
503+
c.Close()
504+
}
489505
}
490506
}
491507

@@ -510,7 +526,7 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) {
510526
}
511527

512528
c.writeBuffer.WrittenBytes(cmdSize)
513-
c.internalWriteData(c.writeBuffer)
529+
c.internalWriteData(context.Background(), c.writeBuffer)
514530
}
515531

516532
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {

pulsar/producer_partition.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
394394
pi.sentAt = time.Now()
395395
pi.Unlock()
396396
p.pendingQueue.Put(pi)
397-
p._getConn().WriteData(pi.buffer)
397+
p._getConn().WriteData(pi.ctx, pi.buffer)
398398

399399
if pi == lastViewItem {
400400
break
@@ -837,6 +837,8 @@ func (p *partitionProducer) internalSingleSend(
837837

838838
type pendingItem struct {
839839
sync.Mutex
840+
ctx context.Context
841+
cancel context.CancelFunc
840842
buffer internal.Buffer
841843
sequenceID uint64
842844
createdAt time.Time
@@ -895,14 +897,17 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
895897
return
896898
default:
897899
now := time.Now()
900+
ctx, cancel := context.WithCancel(context.Background())
898901
p.pendingQueue.Put(&pendingItem{
902+
ctx: ctx,
903+
cancel: cancel,
899904
createdAt: now,
900905
sentAt: now,
901906
buffer: buffer,
902907
sequenceID: sequenceID,
903908
sendRequests: callbacks,
904909
})
905-
p._getConn().WriteData(buffer)
910+
p._getConn().WriteData(ctx, buffer)
906911
}
907912
}
908913

@@ -1579,14 +1584,14 @@ type sendRequest struct {
15791584
uuid string
15801585
chunkRecorder *chunkRecorder
15811586

1582-
/// resource management
1587+
// resource management
15831588

15841589
memLimit internal.MemoryLimitController
15851590
reservedMem int64
15861591
semaphore internal.Semaphore
15871592
reservedSemaphore int
15881593

1589-
/// convey settable state
1594+
// convey settable state
15901595

15911596
sendAsBatch bool
15921597
transaction *transaction
@@ -1659,7 +1664,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
16591664
}
16601665

16611666
func (p *partitionProducer) blockIfQueueFull() bool {
1662-
//DisableBlockIfQueueFull == false means enable block
1667+
// DisableBlockIfQueueFull == false means enable block
16631668
return !p.options.DisableBlockIfQueueFull
16641669
}
16651670

@@ -1741,6 +1746,10 @@ func (i *pendingItem) done(err error) {
17411746
if i.flushCallback != nil {
17421747
i.flushCallback(err)
17431748
}
1749+
1750+
if i.cancel != nil {
1751+
i.cancel()
1752+
}
17441753
}
17451754

17461755
// _setConn sets the internal connection field of this partition producer atomically.

0 commit comments

Comments
 (0)