Skip to content

Commit 6267bc3

Browse files
committed
stop writing if ctx is done
1 parent 5df06bd commit 6267bc3

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

pulsar/internal/connection.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ type request struct {
130130
callback func(command *pb.BaseCommand, err error)
131131
}
132132

133+
type dataRequest struct {
134+
ctx context.Context
135+
data Buffer
136+
}
137+
133138
type connection struct {
134139
started int32
135140
connectionTimeout time.Duration
@@ -158,7 +163,7 @@ type connection struct {
158163
incomingRequestsCh chan *request
159164
closeCh chan struct{}
160165
readyCh chan struct{}
161-
writeRequestsCh chan Buffer
166+
writeRequestsCh chan *dataRequest
162167

163168
pendingLock sync.Mutex
164169
pendingReqs map[uint64]*request
@@ -210,7 +215,7 @@ func newConnection(opts connectionOptions) *connection {
210215
// partition produces writing on a single connection. In general it's
211216
// good to keep this above the number of partition producers assigned
212217
// to a single connection.
213-
writeRequestsCh: make(chan Buffer, 256),
218+
writeRequestsCh: make(chan *dataRequest, 256),
214219
listeners: make(map[uint64]ConnectionListener),
215220
consumerHandlers: make(map[uint64]ConsumerHandler),
216221
metrics: opts.metrics,
@@ -422,11 +427,11 @@ func (c *connection) run() {
422427
return // TODO: this never gonna be happen
423428
}
424429
c.internalSendRequest(req)
425-
case data := <-c.writeRequestsCh:
426-
if data == nil {
430+
case req := <-c.writeRequestsCh:
431+
if req == nil {
427432
return
428433
}
429-
c.internalWriteData(data)
434+
c.internalWriteData(req.ctx, req.data)
430435

431436
case <-pingSendTicker.C:
432437
c.sendPing()
@@ -453,7 +458,7 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
453458

454459
func (c *connection) WriteData(ctx context.Context, data Buffer) {
455460
select {
456-
case c.writeRequestsCh <- data:
461+
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
457462
// Channel is not full
458463
return
459464
case <-ctx.Done():
@@ -465,7 +470,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
465470

466471
for {
467472
select {
468-
case c.writeRequestsCh <- data:
473+
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
469474
// Successfully wrote on the channel
470475
return
471476
case <-ctx.Done():
@@ -486,11 +491,25 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
486491

487492
}
488493

489-
func (c *connection) internalWriteData(data Buffer) {
494+
func (c *connection) internalWriteData(ctx context.Context, data Buffer) {
490495
c.log.Debug("Write data: ", data.ReadableBytes())
491-
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
492-
c.log.WithError(err).Warn("Failed to write on connection")
493-
c.Close()
496+
if ctx == nil {
497+
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
498+
c.log.WithError(err).Warn("Failed to write on connection")
499+
c.Close()
500+
}
501+
502+
return
503+
}
504+
505+
select {
506+
case <-ctx.Done():
507+
return
508+
default:
509+
if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
510+
c.log.WithError(err).Warn("Failed to write on connection")
511+
c.Close()
512+
}
494513
}
495514
}
496515

@@ -515,7 +534,7 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) {
515534
}
516535

517536
c.writeBuffer.WrittenBytes(cmdSize)
518-
c.internalWriteData(c.writeBuffer)
537+
c.internalWriteData(nil, c.writeBuffer)
519538
}
520539

521540
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {

0 commit comments

Comments
 (0)