Skip to content

Commit d471a67

Browse files
Change the pulsar_client_sending_buffers_count metric to client level (#1408)
#1394 introduces the `pulsar_client_sending_buffers_count` metric to track how many buffers are allocated for send purpose and not put back to the pool. However, unlike other metrics, this metric is not client level, so it cannot be attached with `CustomMetricsLabels` in client options. When a send buffer was not put back to the pool, it means the `Release` method is not called due to some reason. Changing this metric to client level could help locate which client has triggered this bug in an application that has many client instances from different businesses. Here is an example metric when I configured `CustomMetricsLabels: map[string]string{"key": "value"}` after this change ``` pulsar_client_sending_buffers_count{client="go",key="value"} 1 ```
1 parent 1a0bfc9 commit d471a67

File tree

6 files changed

+56
-34
lines changed

6 files changed

+56
-34
lines changed

pulsar/internal/batch_builder.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
type BatcherBuilderProvider func(
3434
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
3535
compressionType pb.CompressionType, level compression.Level,
36-
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
36+
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
3737
) (BatchBuilder, error)
3838

3939
// BatchBuilder is a interface of batch builders
@@ -100,6 +100,7 @@ type batchContainer struct {
100100

101101
compressionProvider compression.Provider
102102
buffersPool BuffersPool
103+
metrics *Metrics
103104

104105
log log.Logger
105106

@@ -110,7 +111,7 @@ type batchContainer struct {
110111
func newBatchContainer(
111112
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
112113
compressionType pb.CompressionType, level compression.Level,
113-
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
114+
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
114115
) batchContainer {
115116

116117
bc := batchContainer{
@@ -133,6 +134,7 @@ func newBatchContainer(
133134
callbacks: []interface{}{},
134135
compressionProvider: GetCompressionProvider(compressionType, level),
135136
buffersPool: bufferPool,
137+
metrics: metrics,
136138
log: logger,
137139
encryptor: encryptor,
138140
}
@@ -148,12 +150,12 @@ func newBatchContainer(
148150
func NewBatchBuilder(
149151
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
150152
compressionType pb.CompressionType, level compression.Level,
151-
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
153+
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
152154
) (BatchBuilder, error) {
153155

154156
bc := newBatchContainer(
155157
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType,
156-
level, bufferPool, logger, encryptor,
158+
level, bufferPool, metrics, logger, encryptor,
157159
)
158160

159161
return &bc, nil
@@ -266,6 +268,9 @@ func (bc *batchContainer) Flush() *FlushBatch {
266268
bc.msgMetadata.UncompressedSize = &uncompressedSize
267269

268270
buffer := bc.buffersPool.GetBuffer(int(uncompressedSize * 3 / 2))
271+
bufferCount := bc.metrics.SendingBuffersCount
272+
bufferCount.Inc()
273+
buffer.SetReleaseCallback(func() { bufferCount.Dec() })
269274

270275
sequenceID := uint64(0)
271276
var err error

pulsar/internal/buffer.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,15 @@ type Buffer interface {
7676
Resize(newSize uint32)
7777
ResizeIfNeeded(spaceNeeded uint32)
7878

79+
// Retain increases the reference count
7980
Retain()
81+
// Release decreases the reference count and returns the buffer to the pool
82+
// if it's associated with a buffer pool and the count reaches zero.
8083
Release()
84+
// RefCnt returns the current reference count of the buffer.
8185
RefCnt() int64
86+
// SetReleaseCallback sets a callback function that will be called when the buffer is returned to a pool.
87+
SetReleaseCallback(cb func())
8288

8389
// Clear will clear the current buffer data.
8490
Clear()
@@ -95,7 +101,6 @@ func NewBufferPool() BuffersPool {
95101
}
96102

97103
func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
98-
sendingBuffersCount.Inc()
99104
b, ok := p.Get().(*buffer)
100105
if ok {
101106
b.Clear()
@@ -112,9 +117,14 @@ func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
112117
}
113118

114119
func (p *bufferPoolImpl) Put(buf Buffer) {
115-
sendingBuffersCount.Dec()
116120
if b, ok := buf.(*buffer); ok {
121+
// Get the callback before putting back to the pool because it might be reset after the
122+
// buffer is returned to the pool and reused in GetBuffer.
123+
cb := b.releaseCallback
117124
p.Pool.Put(b)
125+
if cb != nil {
126+
cb()
127+
}
118128
}
119129
}
120130

@@ -126,6 +136,11 @@ type buffer struct {
126136

127137
refCnt atomic.Int64
128138
pool BuffersPool
139+
140+
// releaseCallback is an optional function that is called when the buffer is released back to the pool.
141+
// It allows custom cleanup or notification logic to be executed after the buffer is returned.
142+
// The callback is invoked in bufferPoolImpl.Put, after the buffer is put back into the pool.
143+
releaseCallback func()
129144
}
130145

131146
// NewBuffer creates and initializes a new Buffer using buf as its initial contents.
@@ -277,6 +292,10 @@ func (b *buffer) RefCnt() int64 {
277292
return b.refCnt.Load()
278293
}
279294

295+
func (b *buffer) SetReleaseCallback(cb func()) {
296+
b.releaseCallback = cb
297+
}
298+
280299
func (b *buffer) Clear() {
281300
b.readerIdx = 0
282301
b.writerIdx = 0

pulsar/internal/key_based_batch_builder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
8686
func NewKeyBasedBatchBuilder(
8787
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
8888
compressionType pb.CompressionType, level compression.Level,
89-
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
89+
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
9090
) (BatchBuilder, error) {
9191

9292
bb := &keyBasedBatchContainer{
9393
batches: newKeyBasedBatches(),
9494
batchContainer: newBatchContainer(
9595
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID,
96-
compressionType, level, bufferPool, logger, encryptor,
96+
compressionType, level, bufferPool, metrics, logger, encryptor,
9797
),
9898
compressionType: compressionType,
9999
level: level,
@@ -155,7 +155,7 @@ func (bc *keyBasedBatchContainer) Add(
155155
// create batchContainer for new key
156156
t := newBatchContainer(
157157
bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID,
158-
bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor,
158+
bc.compressionType, bc.level, bc.buffersPool, bc.metrics, bc.log, bc.encryptor,
159159
)
160160
batchPart = &t
161161
bc.batches.Add(msgKey, &t)

pulsar/internal/key_based_batch_builder_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
2626
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2727
"github.com/apache/pulsar-client-go/pulsar/log"
28+
"github.com/prometheus/client_golang/prometheus"
2829
"github.com/sirupsen/logrus"
2930
"github.com/stretchr/testify/assert"
3031
"google.golang.org/protobuf/proto"
@@ -47,6 +48,7 @@ func TestKeyBasedBatcherOrdering(t *testing.T) {
4748
pb.CompressionType_NONE,
4849
compression.Level(0),
4950
&bufferPoolImpl{},
51+
NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer),
5052
log.NewLoggerWithLogrus(logrus.StandardLogger()),
5153
&mockEncryptor{},
5254
)

pulsar/internal/metrics.go

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,16 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
)
2323

24-
var (
25-
defaultConstLabels = map[string]string{
26-
"client": "go",
27-
}
28-
29-
sendingBuffersCount = prometheus.NewGauge(prometheus.GaugeOpts{
30-
Name: "pulsar_client_sending_buffers_count",
31-
Help: "Number of sending buffers",
32-
ConstLabels: defaultConstLabels,
33-
})
34-
)
35-
3624
type Metrics struct {
37-
metricsLevel int
38-
messagesPublished *prometheus.CounterVec
39-
bytesPublished *prometheus.CounterVec
40-
messagesPending *prometheus.GaugeVec
41-
bytesPending *prometheus.GaugeVec
42-
publishErrors *prometheus.CounterVec
43-
publishLatency *prometheus.HistogramVec
44-
publishRPCLatency *prometheus.HistogramVec
25+
metricsLevel int
26+
messagesPublished *prometheus.CounterVec
27+
bytesPublished *prometheus.CounterVec
28+
messagesPending *prometheus.GaugeVec
29+
bytesPending *prometheus.GaugeVec
30+
publishErrors *prometheus.CounterVec
31+
publishLatency *prometheus.HistogramVec
32+
publishRPCLatency *prometheus.HistogramVec
33+
SendingBuffersCount prometheus.Gauge
4534

4635
messagesReceived *prometheus.CounterVec
4736
bytesReceived *prometheus.CounterVec
@@ -111,10 +100,7 @@ type LeveledMetrics struct {
111100
// NewMetricsProvider returns metrics registered to registerer.
112101
func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string,
113102
registerer prometheus.Registerer) *Metrics {
114-
constLabels := make(map[string]string)
115-
for k, v := range defaultConstLabels {
116-
constLabels[k] = v
117-
}
103+
constLabels := map[string]string{"client": "go"}
118104
for k, v := range userDefinedLabels {
119105
constLabels[k] = v
120106
}
@@ -180,6 +166,12 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
180166
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
181167
}, metricsLevelLabels),
182168

169+
SendingBuffersCount: prometheus.NewGauge(prometheus.GaugeOpts{
170+
Name: "pulsar_client_sending_buffers_count",
171+
Help: "Number of sending buffers",
172+
ConstLabels: constLabels,
173+
}),
174+
183175
producersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
184176
Name: "pulsar_client_producers_opened",
185177
Help: "Counter of producers created by the client",
@@ -548,7 +540,7 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
548540
metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
549541
}
550542
}
551-
_ = registerer.Register(sendingBuffersCount)
543+
_ = registerer.Register(metrics.SendingBuffersCount)
552544
return metrics
553545
}
554546

pulsar/producer_partition.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
372372
maxMessageSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
373373
compression.Level(p.options.CompressionLevel),
374374
buffersPool,
375+
p.client.metrics,
375376
p.log,
376377
p.encryptor)
377378
if err != nil {
@@ -798,6 +799,9 @@ func (p *partitionProducer) internalSingleSend(
798799
payloadBuf.Write(compressedPayload)
799800

800801
buffer := buffersPool.GetBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
802+
bufferCount := p.client.metrics.SendingBuffersCount
803+
bufferCount.Inc()
804+
buffer.SetReleaseCallback(func() { bufferCount.Dec() })
801805

802806
sid := *mm.SequenceId
803807
var useTxn bool

0 commit comments

Comments
 (0)