@@ -44,13 +44,21 @@ This buffer intentionally weakens the delivery guarantees of the pipeline and th
4444
4545## Batching
4646
47- It is possible to batch up messages sent from this buffer using a [batch policy](/docs/configuration/batching#batch-policy).` ).
47+ It is possible to batch up messages sent from this buffer using a [batch policy](/docs/configuration/batching#batch-policy).
48+
49+ ## Metrics
50+
51+ - ` + "`buffer_active`" + ` Gauge metric tracking the current number of bytes in the buffer.
52+ - ` + "`buffer_spillover`" + ` Counter metric tracking the total number of bytes dropped because of spillover.
53+
54+ ` ).
4855 Field (service .NewIntField ("limit" ).
4956 Description (`The maximum buffer size (in bytes) to allow before applying backpressure upstream.` ).
5057 Default (524288000 )).
5158 Field (service .NewBoolField ("spillover" ).
5259 Description ("Whether to drop incoming messages that will exceed the buffer limit." ).
5360 Advanced ().
61+ Version ("1.13.0" ).
5462 Default (false )).
5563 Field (service .NewInternalField (bs ))
5664}
@@ -97,7 +105,7 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource
97105 }
98106 }
99107
100- return newMemoryBuffer (limit , spilloverEnabled , batcher ), nil
108+ return newMemoryBuffer (limit , spilloverEnabled , batcher , res ), nil
101109}
102110
103111//------------------------------------------------------------------------------
@@ -118,14 +126,19 @@ type memoryBuffer struct {
118126 closed bool
119127
120128 batcher * service.Batcher
129+
130+ activeBytes * service.MetricGauge
131+ spilloverBytes * service.MetricCounter
121132}
122133
123- func newMemoryBuffer (capacity int , spilloverEnabled bool , batcher * service.Batcher ) * memoryBuffer {
134+ func newMemoryBuffer (capacity int , spilloverEnabled bool , batcher * service.Batcher , res * service. Resources ) * memoryBuffer {
124135 return & memoryBuffer {
125136 cap : capacity ,
126137 spilloverEnabled : spilloverEnabled ,
127138 cond : sync .NewCond (& sync.Mutex {}),
128139 batcher : batcher ,
140+ activeBytes : res .Metrics ().NewGauge ("buffer_active" ),
141+ spilloverBytes : res .Metrics ().NewCounter ("buffer_spillover" ),
129142 }
130143}
131144
@@ -224,6 +237,7 @@ func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, ser
224237 defer m .cond .L .Unlock ()
225238 if err == nil {
226239 m .bytes -= outSize
240+ m .activeBytes .Set (int64 (m .bytes ))
227241 } else {
228242 m .batches = append (batchSources , m .batches ... )
229243 }
@@ -262,6 +276,7 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB
262276
263277 for (m .bytes + extraBytes ) > m .cap {
264278 if m .spilloverEnabled {
279+ m .spilloverBytes .Incr (int64 (extraBytes ))
265280 return component .ErrLimitReached
266281 }
267282
@@ -276,6 +291,7 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB
276291 size : extraBytes ,
277292 })
278293 m .bytes += extraBytes
294+ m .activeBytes .Set (int64 (m .bytes ))
279295
280296 m .cond .Broadcast ()
281297 return nil
0 commit comments