Skip to content

Commit f66040e

Browse files
committed
handle the case of producer is closed by the broker
1 parent 41193b5 commit f66040e

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

pulsar/producer_partition.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ type partitionProducer struct {
113113
dataChan chan *sendRequest
114114
cmdChan chan interface{}
115115
connectClosedCh chan *connectionClosed
116+
connectClosed atomic.Bool
116117
publishSemaphore internal.Semaphore
117118
pendingQueue internal.BlockingQueue
118119
schemaInfo *SchemaInfo
@@ -394,7 +395,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
394395
pi.sentAt = time.Now()
395396
pi.Unlock()
396397
p.pendingQueue.Put(pi)
397-
err = p._getConn().WriteData(pi.ctx, pi.buffer)
398+
err = p.writeConn(pi.ctx, pi.buffer)
398399
if err != nil {
399400
p.log.WithError(err).Warn("failed to write data, it will be retried later")
400401
}
@@ -432,6 +433,8 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu
432433
closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls())
433434
}
434435

436+
// mark the connection as closed
437+
p.connectClosed.Store(true)
435438
select {
436439
case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}:
437440
default:
@@ -914,7 +917,7 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
914917

915918
// If the connection is closed, WriteData() will failed, but it is fine, the buffer is still kept in p.pendingQueue,
916919
// it will be sent out or timeout finally.
917-
err := p._getConn().WriteData(ctx, buffer)
920+
err := p.writeConn(ctx, buffer)
918921
if err != nil {
919922
p.log.WithError(err).Warn("failed to write data, it will be retried later")
920923
}
@@ -1768,6 +1771,7 @@ func (i *pendingItem) done(err error) {
17681771
// Note: should only be called by this partition producer when a new connection is available.
17691772
func (p *partitionProducer) _setConn(conn internal.Connection) {
17701773
p.conn.Store(conn)
1774+
p.connectClosed.Store(false)
17711775
}
17721776

17731777
// _getConn returns internal connection field of this partition producer atomically.
@@ -1779,6 +1783,14 @@ func (p *partitionProducer) _getConn() internal.Connection {
17791783
return p.conn.Load().(internal.Connection)
17801784
}
17811785

1786+
func (p *partitionProducer) writeConn(ctx context.Context, data internal.Buffer) error {
1787+
if p.connectClosed.Load() {
1788+
return errors.New("connection is closed")
1789+
}
1790+
1791+
return p._getConn().WriteData(ctx, data)
1792+
}
1793+
17821794
type chunkRecorder struct {
17831795
chunkedMsgID chunkMessageID
17841796
}

0 commit comments

Comments
 (0)