Skip to content

Commit 6ec01d9

Browse files
committed
stop sending if the connenction is closed
1 parent e06eec7 commit 6ec01d9

File tree

4 files changed

+34
-11
lines changed

4 files changed

+34
-11
lines changed

pulsar/consumer_multitopic_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,8 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
332332
return nil
333333
}
334334

335-
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
335+
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) error {
336+
return nil
336337
}
337338

338339
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {

pulsar/internal/connection.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type ConnectionListener interface {
7979
type Connection interface {
8080
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
8181
SendRequestNoWait(req *pb.BaseCommand) error
82-
WriteData(ctx context.Context, data Buffer)
82+
WriteData(ctx context.Context, data Buffer) error
8383
RegisterListener(id uint64, listener ConnectionListener) error
8484
UnregisterListener(id uint64)
8585
AddConsumeHandler(id uint64, handler ConsumerHandler) error
@@ -456,26 +456,38 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
456456
}
457457
}
458458

459-
func (c *connection) WriteData(ctx context.Context, data Buffer) {
459+
func (c *connection) WriteData(ctx context.Context, data Buffer) error {
460+
// If the connection is closed, we should not write on it
461+
if c.getState() != connectionReady {
462+
c.log.Debug("Connection was already closed")
463+
return errConnectionClosed
464+
}
465+
460466
select {
461467
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
462468
// Channel is not full
463-
return
469+
return nil
464470
case <-ctx.Done():
465471
c.log.Debug("Write data context cancelled")
466-
return
472+
return ctx.Err()
467473
default:
468474
// Channel full, fallback to probe if connection is closed
469475
}
470476

477+
// check if the connection is closed again
478+
if c.getState() != connectionReady {
479+
c.log.Debug("Connection was already closed")
480+
return errConnectionClosed
481+
}
482+
471483
for {
472484
select {
473485
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
474486
// Successfully wrote on the channel
475-
return
487+
return nil
476488
case <-ctx.Done():
477489
c.log.Debug("Write data context cancelled")
478-
return
490+
return ctx.Err()
479491
case <-time.After(100 * time.Millisecond):
480492
// The channel is either:
481493
// 1. blocked, in which case we need to wait until we have space
@@ -484,7 +496,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
484496

485497
if c.getState() != connectionReady {
486498
c.log.Debug("Connection was already closed")
487-
return
499+
return errConnectionClosed
488500
}
489501
}
490502
}

pulsar/internal/connection_reader.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import (
2121
"fmt"
2222
"io"
2323

24-
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2524
"google.golang.org/protobuf/proto"
25+
26+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2627
)
2728

2829
type connectionReader struct {

pulsar/producer_partition.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,10 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
394394
pi.sentAt = time.Now()
395395
pi.Unlock()
396396
p.pendingQueue.Put(pi)
397-
p._getConn().WriteData(pi.ctx, pi.buffer)
397+
err = p._getConn().WriteData(pi.ctx, pi.buffer)
398+
if err != nil {
399+
p.log.WithError(err).Warn("failed to write data, it will be retried later")
400+
}
398401

399402
if pi == lastViewItem {
400403
break
@@ -908,7 +911,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
908911
sequenceID: sequenceID,
909912
sendRequests: callbacks,
910913
})
911-
p._getConn().WriteData(ctx, buffer)
914+
915+
// If the connection is closed, WriteData() will failed, but it is fine, the buffer is still kept in p.pendingQueue,
916+
// it will be sent out or timeout finally.
917+
err := p._getConn().WriteData(ctx, buffer)
918+
if err != nil {
919+
p.log.WithError(err).Warn("failed to write data, it will be retried later")
920+
}
912921
}
913922
}
914923

0 commit comments

Comments
 (0)