From 8b48ad94ae1597f0229448d7d5a0aa50fc46e341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 15:31:35 +0100 Subject: [PATCH 01/38] Add mechanism to detect leaked references to mimirpb buffers. --- pkg/mimir/mimir.go | 4 ++ pkg/mimirpb/custom.go | 112 +++++++++++++++++++++++++++++++++++-- pkg/mimirpb/custom_test.go | 57 +++++++++++++++++++ 3 files changed, 168 insertions(+), 5 deletions(-) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 3f0b2570ce7..4aede03b266 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -787,6 +787,7 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag type CommonConfig struct { Storage bucket.StorageBackendConfig `yaml:"storage"` ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` + InstrumentRefLeaksPct float64 `yaml:"instrument_ref_leaks_pct" category:"experimental"` } type CommonConfigInheritance struct { @@ -798,6 +799,7 @@ type CommonConfigInheritance struct { func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) + f.Float64Var(&c.InstrumentRefLeaksPct, "common.instrument-reference-leaks-pct", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. @@ -909,6 +911,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { setUpGoRuntimeMetrics(cfg, reg) + mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPct}.RegisterGlobally() + if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { util_log.WarnExperimentalUse("ruler.tenant-federation") } diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index e042da838bf..2572c55466f 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -6,6 +6,11 @@ import ( "bytes" "fmt" "math" + "runtime" + "sync/atomic" + "testing" + "unsafe" + "weak" gogoproto "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/histogram" @@ -16,9 +21,31 @@ import ( "google.golang.org/protobuf/protoadapt" ) +type CustomCodecConfig struct { + InstrumentRefLeaksPct float64 + OnRefLeakDetected func(uintptr) +} + +func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { + c := &codecV2{codec: encoding.GetCodecV2(proto.Name), onRefLeakDetected: cfg.OnRefLeakDetected} + if cfg.InstrumentRefLeaksPct > 0 { + c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) + } + return c +} + +var GlobalCodec encoding.CodecV2 + +func (cfg CustomCodecConfig) RegisterGlobally() { + GlobalCodec = cfg.Codec() + encoding.RegisterCodecV2(GlobalCodec) +} + func init() { - c := encoding.GetCodecV2(proto.Name) - encoding.RegisterCodecV2(&codecV2{codec: c}) + // Instrument all buffers when testing. + if testing.Testing() { + CustomCodecConfig{InstrumentRefLeaksPct: 100}.RegisterGlobally() + } } // codecV2 customizes gRPC marshalling and unmarshalling. @@ -27,6 +54,10 @@ func init() { // and to retain the unmarshalling buffer when necessary. type codecV2 struct { codec encoding.CodecV2 + + instrumentRefLeaksOneIn uint64 + unmarshaledWithBufferRefCount atomic.Uint64 + onRefLeakDetected func(uintptr) } var _ encoding.CodecV2 = &codecV2{} @@ -113,14 +144,38 @@ func unmarshalSlicePoolSizes() []int { return sizes } +// Unmarshal unmarshals an object using the global codec. Prefer this over calling +// the Unmarshal method directly, as it will take advantage of pools and leak +// detection. +func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { + if GlobalCodec == nil { + return v.Unmarshal(data) + } + return GlobalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) +} + // Unmarshal customizes gRPC unmarshalling. // If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented. // The latter means that v's FreeBuffer method should be called when v is no longer used. func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { - buf := data.MaterializeToBuffer(unmarshalSlicePool) + holder, isBufferHolder := v.(MessageWithBufferRef) + instrumentLeaks := isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 + + var buf mem.Buffer + if instrumentLeaks { + // Always allocate; we'll detect leaks by checking if it's garbage-collected. + b := make([]byte, data.Len()) + data.CopyTo(b) + buf = mem.NewBuffer(&b, nil) + } else { + buf = data.MaterializeToBuffer(unmarshalSlicePool) + } // Decrement buf's reference count. Note though that if v implements MessageWithBufferRef, // we increase buf's reference count first so it doesn't go to zero. - defer buf.Free() + // + // We're possibly replacing buf below, so it's important to keep this a closure + // rather than a direct call to buf.Free. + defer func() { buf.Free() }() if unmarshaler, ok := v.(gogoproto.Unmarshaler); ok { if err := unmarshaler.Unmarshal(buf.ReadOnlyData()); err != nil { @@ -133,7 +188,10 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { } } - if holder, ok := v.(MessageWithBufferRef); ok { + if isBufferHolder { + if instrumentLeaks { + buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1, onRefLeakDetected: c.onRefLeakDetected} + } buf.Ref() holder.SetBuffer(buf) } @@ -171,6 +229,50 @@ func (m *BufferHolder) FreeBuffer() { var _ MessageWithBufferRef = &BufferHolder{} +type instrumentLeaksBuf struct { + mem.Buffer + refCount int + onRefLeakDetected func(uintptr) +} + +func (b *instrumentLeaksBuf) Ref() { + b.Buffer.Ref() + b.refCount++ +} + +func (b *instrumentLeaksBuf) Free() { + b.Buffer.Free() + b.refCount-- + + if b.refCount == 0 { + buf := b.ReadOnlyData() + // Taint data, in case the buffer is accessed before the panic below, + // or after the panic if it's recovered. + for i := range buf { + const taint = "KAEL" + buf[i] = taint[i%len(taint)] + } + // Remove our ref; no (strong) refs should remain (except in the stack). + b.Buffer = nil + weakRef := weak.Make(unsafe.SliceData(buf)) + + // Check in a separate goroutine, because this stack still has locals + // pointing to the buffer. + go func() { + runtime.GC() + runtime.GC() + + if v := weakRef.Value(); v != nil { + if f := b.onRefLeakDetected; f != nil { + f(uintptr(unsafe.Pointer(v))) + return + } + panic(fmt.Sprintf("reference leak for object %p", v)) + } + }() + } +} + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index b20625d34ed..d2c0f3554cb 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -7,6 +7,8 @@ package mimirpb import ( "testing" + "time" + "unsafe" "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/assert" @@ -247,3 +249,58 @@ func TestHistogram_BucketsCount(t *testing.T) { }) } } + +func TestInstrumentRefLeaks(t *testing.T) { + leaks := make(chan uintptr, 1) + + codec := CustomCodecConfig{ + InstrumentRefLeaksPct: 100, + OnRefLeakDetected: func(addr uintptr) { + leaks <- addr + }, + }.Codec() + + src := WriteRequest{Timeseries: []PreallocTimeseries{{TimeSeries: &TimeSeries{ + Labels: []UnsafeMutableLabel{{Name: "labelName", Value: "labelValue"}}, + Samples: []Sample{{TimestampMs: 1234, Value: 1337}}, + }}}} + buf, err := src.Marshal() + require.NoError(t, err) + + var req WriteRequest + err = codec.Unmarshal(mem.BufferSlice{mem.NewBuffer(&buf, nil)}, &req) + require.NoError(t, err) + + bufAddr := uintptr(unsafe.Pointer(unsafe.SliceData(req.buffer.ReadOnlyData()))) + // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive + // the call to req.FreeBuffer. + leakingLabelName := req.Timeseries[0].Labels[0].Name + + req.FreeBuffer() // leakingLabelName becomes a leak here + + var detectedAddr uintptr + recvLeak := func() bool { + select { + case detectedAddr = <-leaks: + return true + default: + return false + } + } + + // Expect to receive a leak detection. + require.Eventually(t, recvLeak, 10*time.Millisecond, 1*time.Millisecond) + require.Equal(t, bufAddr, detectedAddr) + // Expect the label name contents to have been replaced with a taint word. + // Keep this check last, because we need to extend the lifespan of leakingLabelName + // to avoid Go optimizing the leak away. + require.Contains(t, leakingLabelName, "KAEL") + + // Now let's check a non-leak doesn't get falsely detected. + dataNoLeak, err := src.Marshal() + require.NoError(t, err) + var reqNoLeak WriteRequest + err = codec.Unmarshal(mem.BufferSlice{mem.NewBuffer(&dataNoLeak, nil)}, &reqNoLeak) + require.NoError(t, err) + require.Never(t, recvLeak, 10*time.Millisecond, 1*time.Millisecond) +} From 56fd9b0f1577682284a26e8c0b865d4607dc9ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 17:01:28 +0100 Subject: [PATCH 02/38] Simplify test. --- pkg/mimirpb/custom.go | 39 ++++++++++++++++++++++++++++---------- pkg/mimirpb/custom_test.go | 34 +++++++++++++-------------------- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 2572c55466f..4ca890f55e0 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -4,6 +4,7 @@ package mimirpb import ( "bytes" + "context" "fmt" "math" "runtime" @@ -23,11 +24,10 @@ import ( type CustomCodecConfig struct { InstrumentRefLeaksPct float64 - OnRefLeakDetected func(uintptr) } func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { - c := &codecV2{codec: encoding.GetCodecV2(proto.Name), onRefLeakDetected: cfg.OnRefLeakDetected} + c := &codecV2{codec: encoding.GetCodecV2(proto.Name)} if cfg.InstrumentRefLeaksPct > 0 { c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) } @@ -57,7 +57,6 @@ type codecV2 struct { instrumentRefLeaksOneIn uint64 unmarshaledWithBufferRefCount atomic.Uint64 - onRefLeakDetected func(uintptr) } var _ encoding.CodecV2 = &codecV2{} @@ -190,7 +189,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { if isBufferHolder { if instrumentLeaks { - buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1, onRefLeakDetected: c.onRefLeakDetected} + buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1} } buf.Ref() holder.SetBuffer(buf) @@ -231,8 +230,7 @@ var _ MessageWithBufferRef = &BufferHolder{} type instrumentLeaksBuf struct { mem.Buffer - refCount int - onRefLeakDetected func(uintptr) + refCount int } func (b *instrumentLeaksBuf) Ref() { @@ -263,16 +261,37 @@ func (b *instrumentLeaksBuf) Free() { runtime.GC() if v := weakRef.Value(); v != nil { - if f := b.onRefLeakDetected; f != nil { - f(uintptr(unsafe.Pointer(v))) - return + select { + case leaks <- v: + default: + panic(fmt.Sprintf("reference leak for object %p", v)) } - panic(fmt.Sprintf("reference leak for object %p", v)) } }() } } +var leaks = make(chan *byte) + +// NextRefLeakChannel returns a channel where the next detected reference leak's +// address will be sent, instead of the default behavior of panicking. +// +// Canceling the context immediately closes the channel and restores the default +// behavior. +// +// Intended for use in tests. +func NextRefLeakChannel(ctx context.Context) <-chan uintptr { + ch := make(chan uintptr, 1) + go func() { + defer close(ch) + select { + case ch <- uintptr(unsafe.Pointer(<-leaks)): + case <-ctx.Done(): + } + }() + return ch +} + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index d2c0f3554cb..5aa9ee799e2 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -251,15 +251,6 @@ func TestHistogram_BucketsCount(t *testing.T) { } func TestInstrumentRefLeaks(t *testing.T) { - leaks := make(chan uintptr, 1) - - codec := CustomCodecConfig{ - InstrumentRefLeaksPct: 100, - OnRefLeakDetected: func(addr uintptr) { - leaks <- addr - }, - }.Codec() - src := WriteRequest{Timeseries: []PreallocTimeseries{{TimeSeries: &TimeSeries{ Labels: []UnsafeMutableLabel{{Name: "labelName", Value: "labelValue"}}, Samples: []Sample{{TimestampMs: 1234, Value: 1337}}, @@ -268,7 +259,7 @@ func TestInstrumentRefLeaks(t *testing.T) { require.NoError(t, err) var req WriteRequest - err = codec.Unmarshal(mem.BufferSlice{mem.NewBuffer(&buf, nil)}, &req) + err = Unmarshal(buf, &req) require.NoError(t, err) bufAddr := uintptr(unsafe.Pointer(unsafe.SliceData(req.buffer.ReadOnlyData()))) @@ -276,20 +267,20 @@ func TestInstrumentRefLeaks(t *testing.T) { // the call to req.FreeBuffer. leakingLabelName := req.Timeseries[0].Labels[0].Name - req.FreeBuffer() // leakingLabelName becomes a leak here - - var detectedAddr uintptr - recvLeak := func() bool { + recvLeak := func() (uintptr, bool) { select { - case detectedAddr = <-leaks: - return true - default: - return false + case detectedAddr := <-NextRefLeakChannel(t.Context()): + return detectedAddr, true + case <-time.After(10 * time.Millisecond): + return 0, false } } + req.FreeBuffer() // leakingLabelName becomes a leak here + // Expect to receive a leak detection. - require.Eventually(t, recvLeak, 10*time.Millisecond, 1*time.Millisecond) + detectedAddr, ok := recvLeak() + require.True(t, ok, "expected a reference leak") require.Equal(t, bufAddr, detectedAddr) // Expect the label name contents to have been replaced with a taint word. // Keep this check last, because we need to extend the lifespan of leakingLabelName @@ -300,7 +291,8 @@ func TestInstrumentRefLeaks(t *testing.T) { dataNoLeak, err := src.Marshal() require.NoError(t, err) var reqNoLeak WriteRequest - err = codec.Unmarshal(mem.BufferSlice{mem.NewBuffer(&dataNoLeak, nil)}, &reqNoLeak) + err = Unmarshal(dataNoLeak, &reqNoLeak) require.NoError(t, err) - require.Never(t, recvLeak, 10*time.Millisecond, 1*time.Millisecond) + detectedAddr, ok = recvLeak() + require.False(t, ok, "unexpected a reference leak %x", detectedAddr) } From 0b57a52f52d2718dbc3d118fda252ea85b0f3fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 17:02:45 +0100 Subject: [PATCH 03/38] Plumb ingester with ref leaks detection. And add regression test for #13573. --- pkg/ingester/ingester_test.go | 64 +++++++++++++++++++++++++++++++++++ pkg/storage/ingest/version.go | 4 +-- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f1200e3cf19..f01a0bccaa7 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -9136,6 +9136,39 @@ func TestIngesterMetadataMetrics(t *testing.T) { } +func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.MetadataRetainPeriod = 20 * time.Second + + ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, "", reg) + require.NoError(t, err) + startAndWaitHealthy(t, ing, r) + + syms := util_test.NewSymbolTableBuilder(nil) + buf, err := makeTestRW2WriteRequest(syms).Marshal() + require.NoError(t, err) + + var wr mimirpb.PreallocWriteRequest + wr.UnmarshalFromRW2 = true + wr.SkipNormalizeMetadataMetricName = true + wr.SkipDeduplicateMetadata = true + err = mimirpb.Unmarshal(buf, &wr) + require.NoError(t, err) + + nextLeak := mimirpb.NextRefLeakChannel(t.Context()) + + ctx := user.InjectOrgID(context.Background(), userID) + _, err = ing.Push(ctx, &wr.WriteRequest) + require.NoError(t, err) + + select { + case addr := <-nextLeak: + require.Fail(t, "reference leak detected", "addr: %x", addr) + case <-time.After(10 * time.Millisecond): + } +} + func TestIngesterSendsOnlySeriesWithData(t *testing.T) { ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), defaultLimitsTestConfig(), nil, "", nil) require.NoError(t, err) @@ -12255,3 +12288,34 @@ func TestIngester_NotifyPreCommit(t *testing.T) { // As there are three users, fsync should have been called at least three times assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3)) } + +func makeTestRW2WriteRequest(syms *util_test.SymbolTableBuilder) *mimirpb.WriteRequest { + req := &mimirpb.WriteRequest{ + TimeseriesRW2: []mimirpb.TimeSeriesRW2{ + { + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")}, + Samples: []mimirpb.Sample{ + { + Value: 123.456, + TimestampMs: 1234567890, + }, + }, + Exemplars: []mimirpb.ExemplarRW2{ + { + Value: 123.456, + Timestamp: 1234567890, + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("traceID"), syms.GetSymbol("1234567890abcdef")}, + }, + }, + Metadata: mimirpb.MetadataRW2{ + Type: mimirpb.METRIC_TYPE_COUNTER, + HelpRef: syms.GetSymbol("test_metric_help"), + UnitRef: syms.GetSymbol("test_metric_unit"), + }, + }, + }, + } + req.SymbolsRW2 = syms.GetSymbols() + + return req +} diff --git a/pkg/storage/ingest/version.go b/pkg/storage/ingest/version.go index f15554b7a7f..03c6f39a70d 100644 --- a/pkg/storage/ingest/version.go +++ b/pkg/storage/ingest/version.go @@ -182,7 +182,7 @@ func DeserializeRecordContent(content []byte, wr *mimirpb.PreallocWriteRequest, } func deserializeRecordContentV1(content []byte, wr *mimirpb.PreallocWriteRequest) error { - return wr.Unmarshal(content) + return mimirpb.Unmarshal(content, wr) } func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest) error { @@ -191,7 +191,7 @@ func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest wr.RW2CommonSymbols = V2CommonSymbols.GetSlice() wr.SkipNormalizeMetadataMetricName = true wr.SkipDeduplicateMetadata = true - return wr.Unmarshal(content) + return mimirpb.Unmarshal(content, wr) } // splitRequestVersionTwo adapts mimirpb.SplitWriteRequestByMaxMarshalSizeRW2 to requestSplitter From f27be92b7825aa16f7aeb25d183bc10cee632277 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 17:32:18 +0100 Subject: [PATCH 04/38] Fix lint, test. --- pkg/mimirpb/custom.go | 3 ++- pkg/storage/ingest/version_test.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 4ca890f55e0..90db3d3852e 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -8,11 +8,12 @@ import ( "fmt" "math" "runtime" - "sync/atomic" "testing" "unsafe" "weak" + "go.uber.org/atomic" + gogoproto "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/histogram" "google.golang.org/grpc/encoding" diff --git a/pkg/storage/ingest/version_test.go b/pkg/storage/ingest/version_test.go index d1a9f6aeb90..6300e425ab1 100644 --- a/pkg/storage/ingest/version_test.go +++ b/pkg/storage/ingest/version_test.go @@ -97,6 +97,7 @@ func TestDeserializeRecordContent(t *testing.T) { require.NoError(t, err) wr.ClearTimeseriesUnmarshalData() + wr.BufferHolder = mimirpb.BufferHolder{} // We don't want to compare this. require.Equal(t, reqv0, &wr) }) @@ -124,6 +125,7 @@ func TestDeserializeRecordContent(t *testing.T) { require.NoError(t, err) wr.ClearTimeseriesUnmarshalData() + wr.BufferHolder = mimirpb.BufferHolder{} // We don't want to compare this. require.Equal(t, reqv1, &wr) }) From 86258b790b97cd8a39aa4cfc0440c7415e01b229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 18:00:22 +0100 Subject: [PATCH 05/38] More fixing. --- pkg/mimir/mimir.go | 1 + pkg/mimirpb/custom.go | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 4aede03b266..5913ef011d0 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -816,6 +816,7 @@ type configWithCustomCommonUnmarshaler struct { type commonConfigUnmarshaler struct { Storage *specificLocationsUnmarshaler `yaml:"storage"` ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` + InstrumentRefLeaksPct *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_pct"` } // specificLocationsUnmarshaler will unmarshal yaml into specific locations. diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 90db3d3852e..c87f54236c7 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -12,10 +12,9 @@ import ( "unsafe" "weak" - "go.uber.org/atomic" - gogoproto "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/histogram" + "go.uber.org/atomic" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/mem" From 71f14aaf635b78f342c260aefb563e6120ec75b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 18:41:19 +0100 Subject: [PATCH 06/38] Fix racy test --- pkg/mimirpb/custom_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 5aa9ee799e2..eb9390ec4a2 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -267,9 +267,11 @@ func TestInstrumentRefLeaks(t *testing.T) { // the call to req.FreeBuffer. leakingLabelName := req.Timeseries[0].Labels[0].Name + leaks := NextRefLeakChannel(t.Context()) + recvLeak := func() (uintptr, bool) { select { - case detectedAddr := <-NextRefLeakChannel(t.Context()): + case detectedAddr := <-leaks: return detectedAddr, true case <-time.After(10 * time.Millisecond): return 0, false @@ -293,6 +295,7 @@ func TestInstrumentRefLeaks(t *testing.T) { var reqNoLeak WriteRequest err = Unmarshal(dataNoLeak, &reqNoLeak) require.NoError(t, err) + leaks = NextRefLeakChannel(t.Context()) detectedAddr, ok = recvLeak() require.False(t, ok, "unexpected a reference leak %x", detectedAddr) } From f1ab6847bc4407fc4958d5ca2bd44b3f6991587a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 18:47:23 +0100 Subject: [PATCH 07/38] make doc --- .../mimir/configure/configuration-parameters/index.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 3abd6735055..126280d4f88 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -518,6 +518,11 @@ client_cluster_validation: # (experimental) Primary cluster validation label. # CLI flag: -common.client-cluster-validation.label [label: | default = ""] + +# (experimental) Percentage of buffers from pools to instrument for reference +# leaks. 0 to disable. +# CLI flag: -common.instrument-reference-leaks-pct +[instrument_ref_leaks_pct: | default = 0] ``` ### server From f6a6b4177c66cbd6472921ed7afc7bb3cd58caad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 19:00:19 +0100 Subject: [PATCH 08/38] CHANGELOG. --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 +++++++++++ cmd/mimir/help-all.txt.tmpl | 2 ++ operations/mimir/mimir-flags-defaults.json | 1 + 4 files changed, 15 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62f3d317cf1..c402104310e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ * [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534 * [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525 * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 +* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-pct` to leaked references to gRPC buffers. #13609 * [BUGFIX] Compactor: Fix potential concurrent map writes. #13053 * [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084 * [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 355d28654a3..e86984e44ca 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20749,6 +20749,17 @@ ], "fieldValue": null, "fieldDefaultValue": null + }, + { + "kind": "field", + "name": "instrument_ref_leaks_pct", + "required": false, + "desc": "Percentage of buffers from pools to instrument for reference leaks. 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "common.instrument-reference-leaks-pct", + "fieldType": "float", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 637971b8099..3d5f70d33db 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,6 +857,8 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. + -common.instrument-reference-leaks-pct float + [experimental] Percentage of buffers from pools to instrument for reference leaks. 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 822a3d89f4a..93093907f88 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1473,6 +1473,7 @@ "common.storage.swift.request-timeout": 5000000000, "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", + "common.instrument-reference-leaks-pct": 0, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", From ef1d54e6b78ece366a5ff52ae696d064298b803d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 19:19:26 +0100 Subject: [PATCH 09/38] about-versioning.md --- docs/sources/mimir/configure/about-versioning.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 4931767ac34..2c917696fe2 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -251,6 +251,10 @@ The following features are currently experimental: - Assuming that a gRPC client configuration can be reached via `-`, cluster validation label is configured via: `-.cluster-validation.label`. - The cluster validation label of all gRPC clients can be configured via `-common.client-cluster-validation.label`. - Requests with invalid cluster validation labels are tracked via the `cortex_client_invalid_cluster_validation_label_requests_total` metric. +- Common + - Instrument a fraction of pooled objects for references that outlive their lifetime + - Only implemented for objects embedding `mimirpb.BufferHolder` so far. + - Flag: `-common.instrument-reference-leaks-pct` - Preferred available zone for querying ingesters and store-gateways - `-querier.prefer-availability-zone` - Memberlist zone-aware routing From 69ca33a9dd95354e5ae696a021f1f44efbf263d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Fri, 21 Nov 2025 15:53:25 +0100 Subject: [PATCH 10/38] Revert "mimirpb: Clone metadata fields rather than maintaining references (#13573)" This reverts commit cf9a026ea6defdc7ace012be60d593f159a0fd09. --- CHANGELOG.md | 1 - pkg/mimirpb/mimir.pb.go | 6 +++--- pkg/mimirpb/mimir.pb.go.expdiff | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c402104310e..a3de605552d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -519,7 +519,6 @@ * [BUGFIX] Block-builder-scheduler: Fix bugs in handling of partitions with no commit. #12130 * [BUGFIX] Ingester: Fix issue where ingesters can exit read-only mode during idle compactions, resulting in write errors. #12128 * [BUGFIX] otlp: Reverts #11889 which has a pooled memory re-use bug. #12266 -* [BUGFIX] Ingester: Fix issue where metadata stored in ingesters indirectly prevents large Kafka record buffers from being garbage collected, resulting in unusual memory growth. #13573 ### Mixin diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go index 3a886f50499..e7d3c6a4392 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -11827,9 +11827,9 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata } if len(unit) > 0 || len(help) > 0 || metricType != 0 { metadata.add(normalizedMetricName, MetricMetadata{ - MetricFamilyName: strings.Clone(normalizedMetricName), - Help: strings.Clone(help), - Unit: strings.Clone(unit), + MetricFamilyName: normalizedMetricName, + Help: help, + Unit: unit, Type: MetricMetadata_MetricType(metricType), }) } diff --git a/pkg/mimirpb/mimir.pb.go.expdiff b/pkg/mimirpb/mimir.pb.go.expdiff index d858944c833..589eb878a1d 100644 --- a/pkg/mimirpb/mimir.pb.go.expdiff +++ b/pkg/mimirpb/mimir.pb.go.expdiff @@ -1,5 +1,5 @@ diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go -index 3a886f5049..dda8609298 100644 +index e7d3c6a439..dda8609298 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -14,7 +14,6 @@ import ( @@ -526,9 +526,9 @@ index 3a886f5049..dda8609298 100644 - } - if len(unit) > 0 || len(help) > 0 || metricType != 0 { - metadata.add(normalizedMetricName, MetricMetadata{ -- MetricFamilyName: strings.Clone(normalizedMetricName), -- Help: strings.Clone(help), -- Unit: strings.Clone(unit), +- MetricFamilyName: normalizedMetricName, +- Help: help, +- Unit: unit, - Type: MetricMetadata_MetricType(metricType), - }) - } From 9d6d2d03d677bea2f16743ab5367c22092693afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 09:23:16 +0100 Subject: [PATCH 11/38] Make test deterministic. --- pkg/ingester/ingester_test.go | 12 ++++--- pkg/mimirpb/custom.go | 43 +++++++++--------------- pkg/mimirpb/custom_test.go | 56 ++++++++++++++------------------ pkg/mimirpb/internal/refleaks.go | 48 +++++++++++++++++++++++++++ pkg/mimirpb/testutil/refleaks.go | 22 +++++++++++++ 5 files changed, 117 insertions(+), 64 deletions(-) create mode 100644 pkg/mimirpb/internal/refleaks.go create mode 100644 pkg/mimirpb/testutil/refleaks.go diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f01a0bccaa7..e00557bb532 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -64,6 +64,7 @@ import ( asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" + mimirpbtest "github.com/grafana/mimir/pkg/mimirpb/testutil" "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" @@ -9156,16 +9157,19 @@ func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { err = mimirpb.Unmarshal(buf, &wr) require.NoError(t, err) - nextLeak := mimirpb.NextRefLeakChannel(t.Context()) + nextLeak := mimirpbtest.NextRefLeakCheck(t.Context(), wr.Buffer().ReadOnlyData()) ctx := user.InjectOrgID(context.Background(), userID) _, err = ing.Push(ctx, &wr.WriteRequest) require.NoError(t, err) select { - case addr := <-nextLeak: - require.Fail(t, "reference leak detected", "addr: %x", addr) - case <-time.After(10 * time.Millisecond): + case leaked := <-nextLeak: + if leaked { + require.Fail(t, "reference leak detected", "addr: %p", buf) + } + case <-time.After(1 * time.Second): + require.Fail(t, "expected reference leak check", "addr: %p", buf) } } diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index c87f54236c7..a013d2bf3e0 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -4,7 +4,6 @@ package mimirpb import ( "bytes" - "context" "fmt" "math" "runtime" @@ -13,6 +12,7 @@ import ( "weak" gogoproto "github.com/gogo/protobuf/proto" + "github.com/grafana/mimir/pkg/mimirpb/internal" "github.com/prometheus/prometheus/model/histogram" "go.uber.org/atomic" "google.golang.org/grpc/encoding" @@ -215,6 +215,10 @@ type BufferHolder struct { buffer mem.Buffer } +func (m *BufferHolder) Buffer() mem.Buffer { + return m.buffer +} + func (m *BufferHolder) SetBuffer(buf mem.Buffer) { m.buffer = buf } @@ -252,7 +256,9 @@ func (b *instrumentLeaksBuf) Free() { } // Remove our ref; no (strong) refs should remain (except in the stack). b.Buffer = nil - weakRef := weak.Make(unsafe.SliceData(buf)) + ptr := unsafe.SliceData(buf) + addr := uintptr(unsafe.Pointer(ptr)) + weakRef := weak.Make(ptr) // Check in a separate goroutine, because this stack still has locals // pointing to the buffer. @@ -260,38 +266,19 @@ func (b *instrumentLeaksBuf) Free() { runtime.GC() runtime.GC() - if v := weakRef.Value(); v != nil { - select { - case leaks <- v: - default: - panic(fmt.Sprintf("reference leak for object %p", v)) + leaked := weakRef.Value() != nil + + select { + case internal.RefLeakChecks <- internal.RefLeakCheck{Addr: addr, Leaked: leaked}: + default: + if leaked { + panic(fmt.Sprintf("reference leak for object 0x%x", addr)) } } }() } } -var leaks = make(chan *byte) - -// NextRefLeakChannel returns a channel where the next detected reference leak's -// address will be sent, instead of the default behavior of panicking. -// -// Canceling the context immediately closes the channel and restores the default -// behavior. -// -// Intended for use in tests. -func NextRefLeakChannel(ctx context.Context) <-chan uintptr { - ch := make(chan uintptr, 1) - go func() { - defer close(ch) - select { - case ch <- uintptr(unsafe.Pointer(<-leaks)): - case <-ctx.Done(): - } - }() - return ch -} - // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index eb9390ec4a2..7966f495722 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -8,7 +8,6 @@ package mimirpb import ( "testing" "time" - "unsafe" "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/assert" @@ -17,6 +16,7 @@ import ( "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/mem" + "github.com/grafana/mimir/pkg/mimirpb/internal" "github.com/grafana/mimir/pkg/util/test" ) @@ -208,7 +208,7 @@ func TestCodecV2_Unmarshal(t *testing.T) { require.True(t, origReq.Equal(req)) - require.NotNil(t, req.buffer) + require.NotNil(t, req.Buffer) req.FreeBuffer() } @@ -258,44 +258,36 @@ func TestInstrumentRefLeaks(t *testing.T) { buf, err := src.Marshal() require.NoError(t, err) - var req WriteRequest - err = Unmarshal(buf, &req) - require.NoError(t, err) - - bufAddr := uintptr(unsafe.Pointer(unsafe.SliceData(req.buffer.ReadOnlyData()))) - // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive - // the call to req.FreeBuffer. - leakingLabelName := req.Timeseries[0].Labels[0].Name - - leaks := NextRefLeakChannel(t.Context()) + var leaks <-chan bool + var leakingLabelName UnsafeMutableString + func() { + var req WriteRequest + err = Unmarshal(buf, &req) + require.NoError(t, err) - recvLeak := func() (uintptr, bool) { - select { - case detectedAddr := <-leaks: - return detectedAddr, true - case <-time.After(10 * time.Millisecond): - return 0, false - } - } + // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive + // the call to req.FreeBuffer. + leakingLabelName = req.Timeseries[0].Labels[0].Name - req.FreeBuffer() // leakingLabelName becomes a leak here + leaks = internal.NextRefLeakCheck(t.Context(), req.Buffer.ReadOnlyData()) + req.FreeBuffer() // leakingLabelName becomes a leak here + }() // Expect to receive a leak detection. - detectedAddr, ok := recvLeak() - require.True(t, ok, "expected a reference leak") - require.Equal(t, bufAddr, detectedAddr) + require.Eventually(t, func() bool { return <-leaks }, 10*time.Millisecond, 1*time.Second, "expected a reference leak") // Expect the label name contents to have been replaced with a taint word. // Keep this check last, because we need to extend the lifespan of leakingLabelName // to avoid Go optimizing the leak away. require.Contains(t, leakingLabelName, "KAEL") // Now let's check a non-leak doesn't get falsely detected. - dataNoLeak, err := src.Marshal() - require.NoError(t, err) - var reqNoLeak WriteRequest - err = Unmarshal(dataNoLeak, &reqNoLeak) - require.NoError(t, err) - leaks = NextRefLeakChannel(t.Context()) - detectedAddr, ok = recvLeak() - require.False(t, ok, "unexpected a reference leak %x", detectedAddr) + func() { + var reqNoLeak WriteRequest + err = Unmarshal(buf, &reqNoLeak) + require.NoError(t, err) + + leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer.ReadOnlyData()) + reqNoLeak.FreeBuffer() + }() + require.Eventually(t, func() bool { return !<-leaks }, 10*time.Millisecond, 1*time.Second, "expected no reference leaks") } diff --git a/pkg/mimirpb/internal/refleaks.go b/pkg/mimirpb/internal/refleaks.go new file mode 100644 index 00000000000..67e162bd85a --- /dev/null +++ b/pkg/mimirpb/internal/refleaks.go @@ -0,0 +1,48 @@ +package internal + +import ( + "context" + "reflect" + "testing" +) + +type RefLeakCheck struct { + Addr uintptr + Leaked bool +} + +var RefLeakChecks chan RefLeakCheck + +func init() { + if testing.Testing() { + RefLeakChecks = make(chan RefLeakCheck) + } +} + +// NextRefLeakCheck is documented in testutil.NextRefLeakCheck. +func NextRefLeakCheck(ctx context.Context, object any) <-chan bool { + if !testing.Testing() { + panic("NextRefLeakCheck can only be used in tests") + } + var addr uintptr + switch o := reflect.ValueOf(object); o.Type().Kind() { + case reflect.Pointer, reflect.Slice, reflect.String: + addr = uintptr(o.UnsafePointer()) + } + + ch := make(chan bool, 1) + go func() { + defer close(ch) + for { + select { + case leak := <-RefLeakChecks: + if leak.Addr == addr { + ch <- leak.Leaked + return + } + case <-ctx.Done(): + } + } + }() + return ch +} diff --git a/pkg/mimirpb/testutil/refleaks.go b/pkg/mimirpb/testutil/refleaks.go new file mode 100644 index 00000000000..8c377f211be --- /dev/null +++ b/pkg/mimirpb/testutil/refleaks.go @@ -0,0 +1,22 @@ +package testutil + +import ( + "context" + + "github.com/grafana/mimir/pkg/mimirpb/internal" +) + +// NextRefLeakChannel returns a channel where the next check for reference leaks +// to the given object will be reported, whether a leak is detected +// or not. It replaces the default behavior of panicking when a leak is detected. +// +// The object must be identified with a reference value: a pointer, a slice, or +// a string. +// +// Canceling the context immediately closes the channel and restores the default +// behavior. +// +// Only works when [testing.Testing]. +func NextRefLeakCheck(ctx context.Context, object any) <-chan bool { + return internal.NextRefLeakCheck(ctx, object) +} From efc7feca8cc7fa8c52ad9f8defd243e47edf68a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 10:10:03 +0100 Subject: [PATCH 12/38] Fix mimirpb build. --- pkg/mimirpb/custom_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 7966f495722..7733c80fdf1 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -269,7 +269,7 @@ func TestInstrumentRefLeaks(t *testing.T) { // the call to req.FreeBuffer. leakingLabelName = req.Timeseries[0].Labels[0].Name - leaks = internal.NextRefLeakCheck(t.Context(), req.Buffer.ReadOnlyData()) + leaks = internal.NextRefLeakCheck(t.Context(), req.Buffer().ReadOnlyData()) req.FreeBuffer() // leakingLabelName becomes a leak here }() @@ -286,7 +286,7 @@ func TestInstrumentRefLeaks(t *testing.T) { err = Unmarshal(buf, &reqNoLeak) require.NoError(t, err) - leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer.ReadOnlyData()) + leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer().ReadOnlyData()) reqNoLeak.FreeBuffer() }() require.Eventually(t, func() bool { return !<-leaks }, 10*time.Millisecond, 1*time.Second, "expected no reference leaks") From 9012300b72b8edf8c4f98f9619e3cdda480c8309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 10:10:13 +0100 Subject: [PATCH 13/38] Reapply "mimirpb: Clone metadata fields rather than maintaining references (#13573)" This reverts commit 69ca33a9dd95354e5ae696a021f1f44efbf263d5. --- CHANGELOG.md | 1 + pkg/mimirpb/mimir.pb.go | 6 +++--- pkg/mimirpb/mimir.pb.go.expdiff | 8 ++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3de605552d..c402104310e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -519,6 +519,7 @@ * [BUGFIX] Block-builder-scheduler: Fix bugs in handling of partitions with no commit. #12130 * [BUGFIX] Ingester: Fix issue where ingesters can exit read-only mode during idle compactions, resulting in write errors. #12128 * [BUGFIX] otlp: Reverts #11889 which has a pooled memory re-use bug. #12266 +* [BUGFIX] Ingester: Fix issue where metadata stored in ingesters indirectly prevents large Kafka record buffers from being garbage collected, resulting in unusual memory growth. #13573 ### Mixin diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go index e7d3c6a4392..3a886f50499 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -11827,9 +11827,9 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata } if len(unit) > 0 || len(help) > 0 || metricType != 0 { metadata.add(normalizedMetricName, MetricMetadata{ - MetricFamilyName: normalizedMetricName, - Help: help, - Unit: unit, + MetricFamilyName: strings.Clone(normalizedMetricName), + Help: strings.Clone(help), + Unit: strings.Clone(unit), Type: MetricMetadata_MetricType(metricType), }) } diff --git a/pkg/mimirpb/mimir.pb.go.expdiff b/pkg/mimirpb/mimir.pb.go.expdiff index 589eb878a1d..d858944c833 100644 --- a/pkg/mimirpb/mimir.pb.go.expdiff +++ b/pkg/mimirpb/mimir.pb.go.expdiff @@ -1,5 +1,5 @@ diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go -index e7d3c6a439..dda8609298 100644 +index 3a886f5049..dda8609298 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -14,7 +14,6 @@ import ( @@ -526,9 +526,9 @@ index e7d3c6a439..dda8609298 100644 - } - if len(unit) > 0 || len(help) > 0 || metricType != 0 { - metadata.add(normalizedMetricName, MetricMetadata{ -- MetricFamilyName: normalizedMetricName, -- Help: help, -- Unit: unit, +- MetricFamilyName: strings.Clone(normalizedMetricName), +- Help: strings.Clone(help), +- Unit: strings.Clone(unit), - Type: MetricMetadata_MetricType(metricType), - }) - } From a6eb3e630227037ae75c2081bd7eaded9cc8dc0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 10:19:46 +0100 Subject: [PATCH 14/38] Lint. --- pkg/mimirpb/custom.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index a013d2bf3e0..fa0571ef9f9 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -12,7 +12,6 @@ import ( "weak" gogoproto "github.com/gogo/protobuf/proto" - "github.com/grafana/mimir/pkg/mimirpb/internal" "github.com/prometheus/prometheus/model/histogram" "go.uber.org/atomic" "google.golang.org/grpc/encoding" @@ -20,6 +19,8 @@ import ( "google.golang.org/grpc/mem" protobufproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/protoadapt" + + "github.com/grafana/mimir/pkg/mimirpb/internal" ) type CustomCodecConfig struct { From a6e1c785c77501c024d9aa974672c9322299f075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 10:39:54 +0100 Subject: [PATCH 15/38] make license --- pkg/mimirpb/internal/refleaks.go | 2 ++ pkg/mimirpb/testutil/refleaks.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/mimirpb/internal/refleaks.go b/pkg/mimirpb/internal/refleaks.go index 67e162bd85a..7db16e81083 100644 --- a/pkg/mimirpb/internal/refleaks.go +++ b/pkg/mimirpb/internal/refleaks.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package internal import ( diff --git a/pkg/mimirpb/testutil/refleaks.go b/pkg/mimirpb/testutil/refleaks.go index 8c377f211be..9caac63dad4 100644 --- a/pkg/mimirpb/testutil/refleaks.go +++ b/pkg/mimirpb/testutil/refleaks.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-only + package testutil import ( From de97dd54502c16c58c46d53a6afc4bf4aac592cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Mon, 24 Nov 2025 13:26:21 +0100 Subject: [PATCH 16/38] Fixes from Cursor review. --- pkg/mimirpb/custom_test.go | 4 ++-- pkg/mimirpb/internal/refleaks.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 7733c80fdf1..9430114cc89 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -274,7 +274,7 @@ func TestInstrumentRefLeaks(t *testing.T) { }() // Expect to receive a leak detection. - require.Eventually(t, func() bool { return <-leaks }, 10*time.Millisecond, 1*time.Second, "expected a reference leak") + require.Eventually(t, func() bool { return <-leaks }, 1*time.Second, 10*time.Millisecond, "expected a reference leak") // Expect the label name contents to have been replaced with a taint word. // Keep this check last, because we need to extend the lifespan of leakingLabelName // to avoid Go optimizing the leak away. @@ -289,5 +289,5 @@ func TestInstrumentRefLeaks(t *testing.T) { leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer().ReadOnlyData()) reqNoLeak.FreeBuffer() }() - require.Eventually(t, func() bool { return !<-leaks }, 10*time.Millisecond, 1*time.Second, "expected no reference leaks") + require.Eventually(t, func() bool { return !<-leaks }, 1*time.Second, 10*time.Millisecond, "expected no reference leaks") } diff --git a/pkg/mimirpb/internal/refleaks.go b/pkg/mimirpb/internal/refleaks.go index 7db16e81083..cb630d30a42 100644 --- a/pkg/mimirpb/internal/refleaks.go +++ b/pkg/mimirpb/internal/refleaks.go @@ -43,6 +43,7 @@ func NextRefLeakCheck(ctx context.Context, object any) <-chan bool { return } case <-ctx.Done(): + return } } }() From da56796558bd546a9f4eb23dc45cf9e2036c283d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 25 Nov 2025 13:04:58 +0100 Subject: [PATCH 17/38] Apply suggestions from code review Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- cmd/mimir/help-all.txt.tmpl | 2 +- docs/sources/mimir/configure/about-versioning.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 3d5f70d33db..5b57471456e 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -858,7 +858,7 @@ Usage of ./cmd/mimir/mimir: -common.client-cluster-validation.label string [experimental] Primary cluster validation label. -common.instrument-reference-leaks-pct float - [experimental] Percentage of buffers from pools to instrument for reference leaks. 0 to disable. + [experimental] Percentage of buffers from pools to instrument for reference leaks. Set to 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 2c917696fe2..36890c02134 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -252,8 +252,8 @@ The following features are currently experimental: - The cluster validation label of all gRPC clients can be configured via `-common.client-cluster-validation.label`. - Requests with invalid cluster validation labels are tracked via the `cortex_client_invalid_cluster_validation_label_requests_total` metric. - Common - - Instrument a fraction of pooled objects for references that outlive their lifetime - - Only implemented for objects embedding `mimirpb.BufferHolder` so far. + - Instrument a fraction of pooled objects for references that outlive their lifetime. + - Only implemented for objects embedding `mimirpb.BufferHolder`. - Flag: `-common.instrument-reference-leaks-pct` - Preferred available zone for querying ingesters and store-gateways - `-querier.prefer-availability-zone` From 7ca73a34f9c6779a569c5d781fe6a56c9e09dfd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Wed, 26 Nov 2025 14:13:35 +0100 Subject: [PATCH 18/38] pct -> percentage --- CHANGELOG.md | 2 +- cmd/mimir/config-descriptor.json | 4 ++-- cmd/mimir/help-all.txt.tmpl | 2 +- docs/sources/mimir/configure/about-versioning.md | 2 +- .../configure/configuration-parameters/index.md | 4 ++-- operations/mimir/mimir-flags-defaults.json | 2 +- pkg/mimir/mimir.go | 16 ++++++++-------- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c402104310e..94579289bac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,7 +67,7 @@ * [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534 * [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525 * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 -* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-pct` to leaked references to gRPC buffers. #13609 +* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-percentage` to leaked references to gRPC buffers. #13609 * [BUGFIX] Compactor: Fix potential concurrent map writes. #13053 * [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084 * [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index e86984e44ca..4c9a0a48291 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20752,12 +20752,12 @@ }, { "kind": "field", - "name": "instrument_ref_leaks_pct", + "name": "instrument_ref_leaks_percentage", "required": false, "desc": "Percentage of buffers from pools to instrument for reference leaks. 0 to disable.", "fieldValue": null, "fieldDefaultValue": 0, - "fieldFlag": "common.instrument-reference-leaks-pct", + "fieldFlag": "common.instrument-reference-leaks-percentage", "fieldType": "float", "fieldCategory": "experimental" } diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 5b57471456e..f535f5e3231 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,7 +857,7 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. - -common.instrument-reference-leaks-pct float + -common.instrument-reference-leaks-percentage float [experimental] Percentage of buffers from pools to instrument for reference leaks. Set to 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 36890c02134..990d8e5725e 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -254,7 +254,7 @@ The following features are currently experimental: - Common - Instrument a fraction of pooled objects for references that outlive their lifetime. - Only implemented for objects embedding `mimirpb.BufferHolder`. - - Flag: `-common.instrument-reference-leaks-pct` + - Flag: `-common.instrument-reference-leaks-percentage` - Preferred available zone for querying ingesters and store-gateways - `-querier.prefer-availability-zone` - Memberlist zone-aware routing diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 126280d4f88..5f76ddf5048 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -521,8 +521,8 @@ client_cluster_validation: # (experimental) Percentage of buffers from pools to instrument for reference # leaks. 0 to disable. -# CLI flag: -common.instrument-reference-leaks-pct -[instrument_ref_leaks_pct: | default = 0] +# CLI flag: -common.instrument-reference-leaks-percentage +[instrument_ref_leaks_percentage: | default = 0] ``` ### server diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 93093907f88..a67e232591a 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1473,7 +1473,7 @@ "common.storage.swift.request-timeout": 5000000000, "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", - "common.instrument-reference-leaks-pct": 0, + "common.instrument-reference-leaks-percentage": 0, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 5913ef011d0..c98fcc518e2 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -785,9 +785,9 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag } type CommonConfig struct { - Storage bucket.StorageBackendConfig `yaml:"storage"` - ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` - InstrumentRefLeaksPct float64 `yaml:"instrument_ref_leaks_pct" category:"experimental"` + Storage bucket.StorageBackendConfig `yaml:"storage"` + ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` + InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"` } type CommonConfigInheritance struct { @@ -799,7 +799,7 @@ type CommonConfigInheritance struct { func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) - f.Float64Var(&c.InstrumentRefLeaksPct, "common.instrument-reference-leaks-pct", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`) + f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. @@ -814,9 +814,9 @@ type configWithCustomCommonUnmarshaler struct { // commonConfigUnmarshaler will unmarshal each field of the common config into specific locations. type commonConfigUnmarshaler struct { - Storage *specificLocationsUnmarshaler `yaml:"storage"` - ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` - InstrumentRefLeaksPct *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_pct"` + Storage *specificLocationsUnmarshaler `yaml:"storage"` + ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` + InstrumentRefLeaksPercentage *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_percentage"` } // specificLocationsUnmarshaler will unmarshal yaml into specific locations. @@ -912,7 +912,7 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { setUpGoRuntimeMetrics(cfg, reg) - mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPct}.RegisterGlobally() + mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPercentage}.RegisterGlobally() if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { util_log.WarnExperimentalUse("ruler.tenant-federation") From d9afc3968f123a98a4f74fc70f86f595685250b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 25 Nov 2025 12:53:46 +0100 Subject: [PATCH 19/38] Don't rely on GC; use mmap/munmap instead. --- pkg/ingester/ingester_test.go | 25 ++++++++------- pkg/mimirpb/custom.go | 54 +++++++++++++------------------- pkg/mimirpb/custom_test.go | 43 ++++++++++--------------- pkg/mimirpb/internal/refleaks.go | 51 ------------------------------ pkg/mimirpb/testutil/refleaks.go | 24 -------------- 5 files changed, 50 insertions(+), 147 deletions(-) delete mode 100644 pkg/mimirpb/internal/refleaks.go delete mode 100644 pkg/mimirpb/testutil/refleaks.go diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index e00557bb532..efb4710399d 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -64,7 +64,6 @@ import ( asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" - mimirpbtest "github.com/grafana/mimir/pkg/mimirpb/testutil" "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" @@ -9147,7 +9146,8 @@ func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { startAndWaitHealthy(t, ing, r) syms := util_test.NewSymbolTableBuilder(nil) - buf, err := makeTestRW2WriteRequest(syms).Marshal() + orig := makeTestRW2WriteRequest(syms) + buf, err := orig.Marshal() require.NoError(t, err) var wr mimirpb.PreallocWriteRequest @@ -9157,20 +9157,21 @@ func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { err = mimirpb.Unmarshal(buf, &wr) require.NoError(t, err) - nextLeak := mimirpbtest.NextRefLeakCheck(t.Context(), wr.Buffer().ReadOnlyData()) - ctx := user.InjectOrgID(context.Background(), userID) _, err = ing.Push(ctx, &wr.WriteRequest) require.NoError(t, err) - select { - case leaked := <-nextLeak: - if leaked { - require.Fail(t, "reference leak detected", "addr: %p", buf) - } - case <-time.After(1 * time.Second): - require.Fail(t, "expected reference leak check", "addr: %p", buf) - } + meta, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{ + Metric: "test_metric_total", + Limit: 1, LimitPerMetric: 1, + }) + require.NoError(t, err) + require.Equal(t, []*mimirpb.MetricMetadata{{ + MetricFamilyName: "test_metric_total", + Type: mimirpb.COUNTER, + Help: "test_metric_help", + Unit: "test_metric_unit", + }}, meta.Metadata) } func TestIngesterSendsOnlySeriesWithData(t *testing.T) { diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index fa0571ef9f9..166de1b1993 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -6,10 +6,9 @@ import ( "bytes" "fmt" "math" - "runtime" + "syscall" "testing" "unsafe" - "weak" gogoproto "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/histogram" @@ -19,8 +18,6 @@ import ( "google.golang.org/grpc/mem" protobufproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/protoadapt" - - "github.com/grafana/mimir/pkg/mimirpb/internal" ) type CustomCodecConfig struct { @@ -154,6 +151,8 @@ func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { return GlobalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) } +var pageSize = syscall.Getpagesize() + // Unmarshal customizes gRPC unmarshalling. // If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented. // The latter means that v's FreeBuffer method should be called when v is no longer used. @@ -163,8 +162,15 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { var buf mem.Buffer if instrumentLeaks { - // Always allocate; we'll detect leaks by checking if it's garbage-collected. - b := make([]byte, data.Len()) + // Allocate separate pages for this buffer. We'll detect ref leaks by + // munmaping the pages on Free, after which trying to access them will + // segfault. + pageAlignedLen := roundUpToMultiple(data.Len(), pageSize) + b, err := syscall.Mmap(0, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) + if err != nil { + panic(fmt.Errorf("mmap: %w", err)) + } + b = b[:data.Len()] data.CopyTo(b) buf = mem.NewBuffer(&b, nil) } else { @@ -249,34 +255,12 @@ func (b *instrumentLeaksBuf) Free() { if b.refCount == 0 { buf := b.ReadOnlyData() - // Taint data, in case the buffer is accessed before the panic below, - // or after the panic if it's recovered. - for i := range buf { - const taint = "KAEL" - buf[i] = taint[i%len(taint)] - } - // Remove our ref; no (strong) refs should remain (except in the stack). - b.Buffer = nil ptr := unsafe.SliceData(buf) - addr := uintptr(unsafe.Pointer(ptr)) - weakRef := weak.Make(ptr) - - // Check in a separate goroutine, because this stack still has locals - // pointing to the buffer. - go func() { - runtime.GC() - runtime.GC() - - leaked := weakRef.Value() != nil - - select { - case internal.RefLeakChecks <- internal.RefLeakCheck{Addr: addr, Leaked: leaked}: - default: - if leaked { - panic(fmt.Sprintf("reference leak for object 0x%x", addr)) - } - } - }() + allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize)) + err := syscall.Munmap(allPages) + if err != nil { + panic(fmt.Errorf("munmap: %w", err)) + } } } @@ -643,3 +627,7 @@ type orderAwareMetricMetadata struct { // order is the 0-based index of this metadata object in a wider metadata array. order int } + +func roundUpToMultiple(n, of int) int { + return ((n + of - 1) / of) * of +} diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 9430114cc89..6f779d90205 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -6,8 +6,9 @@ package mimirpb import ( + "fmt" + "runtime/debug" "testing" - "time" "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/assert" @@ -16,7 +17,6 @@ import ( "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/mem" - "github.com/grafana/mimir/pkg/mimirpb/internal" "github.com/grafana/mimir/pkg/util/test" ) @@ -258,36 +258,25 @@ func TestInstrumentRefLeaks(t *testing.T) { buf, err := src.Marshal() require.NoError(t, err) - var leaks <-chan bool var leakingLabelName UnsafeMutableString - func() { - var req WriteRequest - err = Unmarshal(buf, &req) - require.NoError(t, err) - // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive - // the call to req.FreeBuffer. - leakingLabelName = req.Timeseries[0].Labels[0].Name + var req WriteRequest + err = Unmarshal(buf, &req) + require.NoError(t, err) - leaks = internal.NextRefLeakCheck(t.Context(), req.Buffer().ReadOnlyData()) - req.FreeBuffer() // leakingLabelName becomes a leak here - }() + // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive + // the call to req.FreeBuffer. + leakingLabelName = req.Timeseries[0].Labels[0].Name - // Expect to receive a leak detection. - require.Eventually(t, func() bool { return <-leaks }, 1*time.Second, 10*time.Millisecond, "expected a reference leak") - // Expect the label name contents to have been replaced with a taint word. - // Keep this check last, because we need to extend the lifespan of leakingLabelName - // to avoid Go optimizing the leak away. - require.Contains(t, leakingLabelName, "KAEL") + req.FreeBuffer() // leakingLabelName becomes a leak here - // Now let's check a non-leak doesn't get falsely detected. + debug.SetPanicOnFault(true) + var recovered any func() { - var reqNoLeak WriteRequest - err = Unmarshal(buf, &reqNoLeak) - require.NoError(t, err) - - leaks = internal.NextRefLeakCheck(t.Context(), reqNoLeak.Buffer().ReadOnlyData()) - reqNoLeak.FreeBuffer() + defer func() { + recovered = recover() + }() + t.Log(leakingLabelName) // Just forcing a read on leakingLabelName here }() - require.Eventually(t, func() bool { return !<-leaks }, 1*time.Second, 10*time.Millisecond, "expected no reference leaks") + require.Equal(t, fmt.Sprint(recovered), "runtime error: invalid memory address or nil pointer dereference") } diff --git a/pkg/mimirpb/internal/refleaks.go b/pkg/mimirpb/internal/refleaks.go deleted file mode 100644 index cb630d30a42..00000000000 --- a/pkg/mimirpb/internal/refleaks.go +++ /dev/null @@ -1,51 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package internal - -import ( - "context" - "reflect" - "testing" -) - -type RefLeakCheck struct { - Addr uintptr - Leaked bool -} - -var RefLeakChecks chan RefLeakCheck - -func init() { - if testing.Testing() { - RefLeakChecks = make(chan RefLeakCheck) - } -} - -// NextRefLeakCheck is documented in testutil.NextRefLeakCheck. -func NextRefLeakCheck(ctx context.Context, object any) <-chan bool { - if !testing.Testing() { - panic("NextRefLeakCheck can only be used in tests") - } - var addr uintptr - switch o := reflect.ValueOf(object); o.Type().Kind() { - case reflect.Pointer, reflect.Slice, reflect.String: - addr = uintptr(o.UnsafePointer()) - } - - ch := make(chan bool, 1) - go func() { - defer close(ch) - for { - select { - case leak := <-RefLeakChecks: - if leak.Addr == addr { - ch <- leak.Leaked - return - } - case <-ctx.Done(): - return - } - } - }() - return ch -} diff --git a/pkg/mimirpb/testutil/refleaks.go b/pkg/mimirpb/testutil/refleaks.go deleted file mode 100644 index 9caac63dad4..00000000000 --- a/pkg/mimirpb/testutil/refleaks.go +++ /dev/null @@ -1,24 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package testutil - -import ( - "context" - - "github.com/grafana/mimir/pkg/mimirpb/internal" -) - -// NextRefLeakChannel returns a channel where the next check for reference leaks -// to the given object will be reported, whether a leak is detected -// or not. It replaces the default behavior of panicking when a leak is detected. -// -// The object must be identified with a reference value: a pointer, a slice, or -// a string. -// -// Canceling the context immediately closes the channel and restores the default -// behavior. -// -// Only works when [testing.Testing]. -func NextRefLeakCheck(ctx context.Context, object any) <-chan bool { - return internal.NextRefLeakCheck(ctx, object) -} From 466a6de82953c9f5826e19c10e95edc8b90a6c1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Wed, 26 Nov 2025 15:35:25 +0100 Subject: [PATCH 20/38] Don't instrument empty buffers. --- pkg/mimirpb/custom.go | 2 +- pkg/mimirpb/custom_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 166de1b1993..064c911c4d1 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -158,7 +158,7 @@ var pageSize = syscall.Getpagesize() // The latter means that v's FreeBuffer method should be called when v is no longer used. func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { holder, isBufferHolder := v.(MessageWithBufferRef) - instrumentLeaks := isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 + instrumentLeaks := data.Len() > 0 && isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 var buf mem.Buffer if instrumentLeaks { diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 6f779d90205..8d8d343592b 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -208,7 +208,7 @@ func TestCodecV2_Unmarshal(t *testing.T) { require.True(t, origReq.Equal(req)) - require.NotNil(t, req.Buffer) + require.NotNil(t, req.Buffer()) req.FreeBuffer() } From 628ee24d8b5160921aa93dfb787608ef6da6e2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Wed, 26 Nov 2025 15:48:29 +0100 Subject: [PATCH 21/38] Atomic ref count. --- pkg/mimirpb/custom.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 064c911c4d1..ca2b9bc65a3 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -196,7 +196,9 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { if isBufferHolder { if instrumentLeaks { - buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1} + instrumentedBuf := &instrumentLeaksBuf{Buffer: buf} + instrumentedBuf.refCount.Inc() + buf = instrumentedBuf } buf.Ref() holder.SetBuffer(buf) @@ -241,19 +243,18 @@ var _ MessageWithBufferRef = &BufferHolder{} type instrumentLeaksBuf struct { mem.Buffer - refCount int + refCount atomic.Int64 } func (b *instrumentLeaksBuf) Ref() { b.Buffer.Ref() - b.refCount++ + b.refCount.Inc() } func (b *instrumentLeaksBuf) Free() { b.Buffer.Free() - b.refCount-- - if b.refCount == 0 { + if b.refCount.Dec() == 0 { buf := b.ReadOnlyData() ptr := unsafe.SliceData(buf) allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize)) From 2dbb9a6b06e49b97e272e0559ff88f0f558426da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Wed, 26 Nov 2025 17:08:38 +0100 Subject: [PATCH 22/38] make reference-help --- cmd/mimir/help-all.txt.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index f535f5e3231..9633e3b82fb 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -858,7 +858,7 @@ Usage of ./cmd/mimir/mimir: -common.client-cluster-validation.label string [experimental] Primary cluster validation label. -common.instrument-reference-leaks-percentage float - [experimental] Percentage of buffers from pools to instrument for reference leaks. Set to 0 to disable. + [experimental] Percentage of buffers from pools to instrument for reference leaks. 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string From de52a8ac833341419d484e02e6c79f6929518df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:23:23 +0100 Subject: [PATCH 23/38] Move things around to ensure no mmap leak. --- pkg/mimirpb/custom.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index ca2b9bc65a3..37d2d9bc71c 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -173,15 +173,15 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { b = b[:data.Len()] data.CopyTo(b) buf = mem.NewBuffer(&b, nil) + instrumentedBuf := &instrumentLeaksBuf{Buffer: buf} + instrumentedBuf.refCount.Inc() + buf = instrumentedBuf } else { buf = data.MaterializeToBuffer(unmarshalSlicePool) } // Decrement buf's reference count. Note though that if v implements MessageWithBufferRef, // we increase buf's reference count first so it doesn't go to zero. - // - // We're possibly replacing buf below, so it's important to keep this a closure - // rather than a direct call to buf.Free. - defer func() { buf.Free() }() + defer buf.Free() if unmarshaler, ok := v.(gogoproto.Unmarshaler); ok { if err := unmarshaler.Unmarshal(buf.ReadOnlyData()); err != nil { @@ -195,11 +195,6 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { } if isBufferHolder { - if instrumentLeaks { - instrumentedBuf := &instrumentLeaksBuf{Buffer: buf} - instrumentedBuf.refCount.Inc() - buf = instrumentedBuf - } buf.Ref() holder.SetBuffer(buf) } From 4b0636ef1f7c42f5cd1c06d50f3ea1e9f07e418b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:27:22 +0100 Subject: [PATCH 24/38] Reword description. --- cmd/mimir/config-descriptor.json | 2 +- cmd/mimir/help-all.txt.tmpl | 2 +- .../sources/mimir/configure/configuration-parameters/index.md | 4 ++-- pkg/mimir/mimir.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 915e5056f69..02610dd6a53 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20774,7 +20774,7 @@ "kind": "field", "name": "instrument_ref_leaks_percentage", "required": false, - "desc": "Percentage of buffers from pools to instrument for reference leaks. 0 to disable.", + "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "common.instrument-reference-leaks-percentage", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index f28621b6375..93aedd7a8bb 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -858,7 +858,7 @@ Usage of ./cmd/mimir/mimir: -common.client-cluster-validation.label string [experimental] Primary cluster validation label. -common.instrument-reference-leaks-percentage float - [experimental] Percentage of buffers from pools to instrument for reference leaks. 0 to disable. + [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index fb80f484fb3..cd1b5d01519 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -519,8 +519,8 @@ client_cluster_validation: # CLI flag: -common.client-cluster-validation.label [label: | default = ""] -# (experimental) Percentage of buffers from pools to instrument for reference -# leaks. 0 to disable. +# (experimental) Percentage [0-100] of request or message buffers to instrument +# for reference leaks. 0 to disable. # CLI flag: -common.instrument-reference-leaks-percentage [instrument_ref_leaks_percentage: | default = 0] ``` diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index c98fcc518e2..7c7c8e91810 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -799,7 +799,7 @@ type CommonConfigInheritance struct { func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) - f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`) + f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.`) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. From c10028f4616c4bb1b898f4cf74c9ecd90c018ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:30:09 +0100 Subject: [PATCH 25/38] Register default global codec on init. --- pkg/mimirpb/custom.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 37d2d9bc71c..5b4ceadc514 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -40,10 +40,12 @@ func (cfg CustomCodecConfig) RegisterGlobally() { } func init() { - // Instrument all buffers when testing. + config := CustomCodecConfig{} if testing.Testing() { - CustomCodecConfig{InstrumentRefLeaksPct: 100}.RegisterGlobally() + // Instrument all buffers when testing. + config.InstrumentRefLeaksPct = 100 } + config.RegisterGlobally() } // codecV2 customizes gRPC marshalling and unmarshalling. From 25fef1814a320f021dd8a92adbb092c769c2c58a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:33:45 +0100 Subject: [PATCH 26/38] Don't wrap encoding.CodecV2. --- pkg/mimirpb/custom.go | 8 ++++---- pkg/mimirpb/custom_test.go | 13 +------------ 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 5b4ceadc514..8f59fe3b662 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -24,8 +24,10 @@ type CustomCodecConfig struct { InstrumentRefLeaksPct float64 } +var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name() + func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { - c := &codecV2{codec: encoding.GetCodecV2(proto.Name)} + c := &codecV2{} if cfg.InstrumentRefLeaksPct > 0 { c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) } @@ -53,8 +55,6 @@ func init() { // We customize unmarshalling in order to use an optimized path when possible, // and to retain the unmarshalling buffer when necessary. type codecV2 struct { - codec encoding.CodecV2 - instrumentRefLeaksOneIn uint64 unmarshaledWithBufferRefCount atomic.Uint64 } @@ -205,7 +205,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { } func (c *codecV2) Name() string { - return c.codec.Name() + return baseCodecV2Name } // MessageWithBufferRef is an unmarshalling buffer retaining protobuf message. diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index 8d8d343592b..723b47bc4dc 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -13,9 +13,6 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/encoding" - "google.golang.org/grpc/encoding/proto" - "google.golang.org/grpc/mem" "github.com/grafana/mimir/pkg/util/test" ) @@ -197,7 +194,7 @@ func TestIsFloatHistogram(t *testing.T) { } func TestCodecV2_Unmarshal(t *testing.T) { - c := codecV2{codec: fakeCodecV2{}} + c := codecV2{} var origReq WriteRequest data, err := c.Marshal(&origReq) @@ -212,14 +209,6 @@ func TestCodecV2_Unmarshal(t *testing.T) { req.FreeBuffer() } -type fakeCodecV2 struct { - encoding.CodecV2 -} - -func (c fakeCodecV2) Marshal(v any) (mem.BufferSlice, error) { - return encoding.GetCodecV2(proto.Name).Marshal(v) -} - func TestHistogram_BucketsCount(t *testing.T) { tests := []struct { name string From a47d9c24ca666af281b7394b9883700bc86bc425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:59:09 +0100 Subject: [PATCH 27/38] Fewer public globals. --- pkg/mimirpb/custom.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 8f59fe3b662..070f40b27ac 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -26,7 +26,7 @@ type CustomCodecConfig struct { var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name() -func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { +func (cfg CustomCodecConfig) codec() *codecV2 { c := &codecV2{} if cfg.InstrumentRefLeaksPct > 0 { c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) @@ -34,11 +34,11 @@ func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { return c } -var GlobalCodec encoding.CodecV2 +var globalCodec encoding.CodecV2 func (cfg CustomCodecConfig) RegisterGlobally() { - GlobalCodec = cfg.Codec() - encoding.RegisterCodecV2(GlobalCodec) + globalCodec = cfg.codec() + encoding.RegisterCodecV2(globalCodec) } func init() { @@ -147,10 +147,7 @@ func unmarshalSlicePoolSizes() []int { // the Unmarshal method directly, as it will take advantage of pools and leak // detection. func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { - if GlobalCodec == nil { - return v.Unmarshal(data) - } - return GlobalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) + return globalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) } var pageSize = syscall.Getpagesize() From 49fc9a24d732e5b007dd83083b761229aba052f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 11:59:30 +0100 Subject: [PATCH 28/38] Document that pool won't be used, and make it explicitly so. --- pkg/mimirpb/custom.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 070f40b27ac..56a79b4ecb0 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -143,8 +143,8 @@ func unmarshalSlicePoolSizes() []int { return sizes } -// Unmarshal unmarshals an object using the global codec. Prefer this over calling -// the Unmarshal method directly, as it will take advantage of pools and leak +// Unmarshal unmarshals an object using the global codec. Prefer this over +// calling the Unmarshal method directly, as it will take advantage of leak // detection. func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { return globalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) @@ -175,6 +175,12 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { instrumentedBuf := &instrumentLeaksBuf{Buffer: buf} instrumentedBuf.refCount.Inc() buf = instrumentedBuf + } else if len(data) == 1 { + // BufferSlice.MaterializeToBuffer already has this behavior when + // len(data) == 1, but we reproduce it here for explicitness and for + // ensuring forward-compatibility. + data[0].Ref() + buf = data[0] } else { buf = data.MaterializeToBuffer(unmarshalSlicePool) } From b39485d3c4c2693366cfff830e474bdcfa1584f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 13:19:28 +0100 Subject: [PATCH 29/38] Wait before munmapping. --- cmd/mimir/config-descriptor.json | 13 +++++- cmd/mimir/help-all.txt.tmpl | 4 +- .../configuration-parameters/index.md | 9 +++- operations/mimir/mimir-flags-defaults.json | 1 + pkg/mimir/mimir.go | 15 ++++--- pkg/mimirpb/custom.go | 45 +++++++++++++++++-- 6 files changed, 75 insertions(+), 12 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 02610dd6a53..8cd992c399d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20774,12 +20774,23 @@ "kind": "field", "name": "instrument_ref_leaks_percentage", "required": false, - "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.", + "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.", "fieldValue": null, "fieldDefaultValue": 0, "fieldFlag": "common.instrument-reference-leaks-percentage", "fieldType": "float", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "instrument_ref_leaks_after_free_period", + "required": false, + "desc": "Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.", + "fieldValue": null, + "fieldDefaultValue": 120000000000, + "fieldFlag": "common.instrument-reference-leaks-after-free-period", + "fieldType": "duration", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 93aedd7a8bb..a468e57149d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,8 +857,10 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. + -common.instrument-reference-leaks-after-free-period duration + [experimental] Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection. (default 2m0s) -common.instrument-reference-leaks-percentage float - [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable. + [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index cd1b5d01519..f08fe1f0637 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -520,9 +520,16 @@ client_cluster_validation: [label: | default = ""] # (experimental) Percentage [0-100] of request or message buffers to instrument -# for reference leaks. 0 to disable. +# for reference leaks. Set to 0 to disable. # CLI flag: -common.instrument-reference-leaks-percentage [instrument_ref_leaks_percentage: | default = 0] + +# (experimental) Period after a buffer instrumented for referenced leaks is +# nominally freed until the buffer is uninstrumented and effectively freed to be +# reused. After this period, any lingering references to the buffer may +# potentially be dereferenced again with no detection. +# CLI flag: -common.instrument-reference-leaks-after-free-period +[instrument_ref_leaks_after_free_period: | default = 2m] ``` ### server diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 2839307e2d9..8c0d7066421 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1476,6 +1476,7 @@ "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", "common.instrument-reference-leaks-percentage": 0, + "common.instrument-reference-leaks-after-free-period": 120000000000, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 7c7c8e91810..2fbf0e3870c 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -785,9 +785,10 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag } type CommonConfig struct { - Storage bucket.StorageBackendConfig `yaml:"storage"` - ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` - InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"` + Storage bucket.StorageBackendConfig `yaml:"storage"` + ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` + InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"` + InstrumentRefLeaksBeforeReusePeriod time.Duration `yaml:"instrument_ref_leaks_after_free_period" category:"experimental"` } type CommonConfigInheritance struct { @@ -799,7 +800,8 @@ type CommonConfigInheritance struct { func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) - f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.`) + f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.`) + f.DurationVar(&c.InstrumentRefLeaksBeforeReusePeriod, "common.instrument-reference-leaks-after-free-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. @@ -912,7 +914,10 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { setUpGoRuntimeMetrics(cfg, reg) - mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPercentage}.RegisterGlobally() + mimirpb.CustomCodecConfig{ + InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPercentage, + WaitBeforeReuseInstrumentedBuffer: cfg.Common.InstrumentRefLeaksBeforeReusePeriod, + }.RegisterGlobally() if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { util_log.WarnExperimentalUse("ruler.tenant-federation") diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 56a79b4ecb0..ddd34921a24 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -6,8 +6,10 @@ import ( "bytes" "fmt" "math" + "sync" "syscall" "testing" + "time" "unsafe" gogoproto "github.com/gogo/protobuf/proto" @@ -21,7 +23,8 @@ import ( ) type CustomCodecConfig struct { - InstrumentRefLeaksPct float64 + InstrumentRefLeaksPct float64 + WaitBeforeReuseInstrumentedBuffer time.Duration } var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name() @@ -37,6 +40,9 @@ func (cfg CustomCodecConfig) codec() *codecV2 { var globalCodec encoding.CodecV2 func (cfg CustomCodecConfig) RegisterGlobally() { + if cfg.InstrumentRefLeaksPct > 0 && cfg.WaitBeforeReuseInstrumentedBuffer > 0 { + startFreeingInstrumentedBuffers() + } globalCodec = cfg.codec() encoding.RegisterCodecV2(globalCodec) } @@ -46,6 +52,7 @@ func init() { if testing.Testing() { // Instrument all buffers when testing. config.InstrumentRefLeaksPct = 100 + config.WaitBeforeReuseInstrumentedBuffer = 0 } config.RegisterGlobally() } @@ -57,6 +64,7 @@ func init() { type codecV2 struct { instrumentRefLeaksOneIn uint64 unmarshaledWithBufferRefCount atomic.Uint64 + waitBeforeReuse time.Duration } var _ encoding.CodecV2 = &codecV2{} @@ -165,14 +173,14 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { // munmaping the pages on Free, after which trying to access them will // segfault. pageAlignedLen := roundUpToMultiple(data.Len(), pageSize) - b, err := syscall.Mmap(0, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) + b, err := syscall.Mmap(-1, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) if err != nil { panic(fmt.Errorf("mmap: %w", err)) } b = b[:data.Len()] data.CopyTo(b) buf = mem.NewBuffer(&b, nil) - instrumentedBuf := &instrumentLeaksBuf{Buffer: buf} + instrumentedBuf := &instrumentLeaksBuf{Buffer: buf, waitBeforeReuse: c.waitBeforeReuse} instrumentedBuf.refCount.Inc() buf = instrumentedBuf } else if len(data) == 1 { @@ -243,7 +251,8 @@ var _ MessageWithBufferRef = &BufferHolder{} type instrumentLeaksBuf struct { mem.Buffer - refCount atomic.Int64 + refCount atomic.Int64 + waitBeforeReuse time.Duration } func (b *instrumentLeaksBuf) Ref() { @@ -258,6 +267,14 @@ func (b *instrumentLeaksBuf) Free() { buf := b.ReadOnlyData() ptr := unsafe.SliceData(buf) allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize)) + if b.waitBeforeReuse > 0 { + select { + case unmapQueue <- unmapTask{buf: allPages, at: time.Now().Add(b.waitBeforeReuse)}: + return + default: + // Queue is full, munmap right away. + } + } err := syscall.Munmap(allPages) if err != nil { panic(fmt.Errorf("munmap: %w", err)) @@ -265,6 +282,26 @@ func (b *instrumentLeaksBuf) Free() { } } +type unmapTask struct { + buf []byte + at time.Time +} + +var unmapQueue chan unmapTask + +var startFreeingInstrumentedBuffers = sync.OnceFunc(func() { + unmapQueue = make(chan unmapTask, 1000) + go func() { + for t := range unmapQueue { + time.Sleep(time.Until(t.at)) + err := syscall.Munmap(t.buf) + if err != nil { + panic(fmt.Errorf("munmap: %w", err)) + } + } + }() +}) + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { From 8bff50476c6da3909d5c73e3654787a21d7c08a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 13:53:59 +0100 Subject: [PATCH 30/38] Consistent naming. --- cmd/mimir/config-descriptor.json | 4 ++-- cmd/mimir/help-all.txt.tmpl | 2 +- .../sources/mimir/configure/configuration-parameters/index.md | 4 ++-- operations/mimir/mimir-flags-defaults.json | 2 +- pkg/mimir/mimir.go | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 8cd992c399d..037e9f5bcea 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20783,12 +20783,12 @@ }, { "kind": "field", - "name": "instrument_ref_leaks_after_free_period", + "name": "instrument_ref_leaks_before_reuse_period", "required": false, "desc": "Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.", "fieldValue": null, "fieldDefaultValue": 120000000000, - "fieldFlag": "common.instrument-reference-leaks-after-free-period", + "fieldFlag": "common.instrument-reference-leaks-before-reuse-period", "fieldType": "duration", "fieldCategory": "experimental" } diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index a468e57149d..0cf4083d0aa 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,7 +857,7 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. - -common.instrument-reference-leaks-after-free-period duration + -common.instrument-reference-leaks-before-reuse-period duration [experimental] Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection. (default 2m0s) -common.instrument-reference-leaks-percentage float [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable. diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index f08fe1f0637..a7c2aaee0b2 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -528,8 +528,8 @@ client_cluster_validation: # nominally freed until the buffer is uninstrumented and effectively freed to be # reused. After this period, any lingering references to the buffer may # potentially be dereferenced again with no detection. -# CLI flag: -common.instrument-reference-leaks-after-free-period -[instrument_ref_leaks_after_free_period: | default = 2m] +# CLI flag: -common.instrument-reference-leaks-before-reuse-period +[instrument_ref_leaks_before_reuse_period: | default = 2m] ``` ### server diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 8c0d7066421..d0a935e86e7 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1476,7 +1476,7 @@ "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", "common.instrument-reference-leaks-percentage": 0, - "common.instrument-reference-leaks-after-free-period": 120000000000, + "common.instrument-reference-leaks-before-reuse-period": 120000000000, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 2fbf0e3870c..ced280feea3 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -788,7 +788,7 @@ type CommonConfig struct { Storage bucket.StorageBackendConfig `yaml:"storage"` ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"` - InstrumentRefLeaksBeforeReusePeriod time.Duration `yaml:"instrument_ref_leaks_after_free_period" category:"experimental"` + InstrumentRefLeaksBeforeReusePeriod time.Duration `yaml:"instrument_ref_leaks_before_reuse_period" category:"experimental"` } type CommonConfigInheritance struct { @@ -801,7 +801,7 @@ func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.`) - f.DurationVar(&c.InstrumentRefLeaksBeforeReusePeriod, "common.instrument-reference-leaks-after-free-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) + f.DurationVar(&c.InstrumentRefLeaksBeforeReusePeriod, "common.instrument-reference-leaks-before-reuse-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. From e5f74466658d04736195396320170ff096b2969f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 14:23:31 +0100 Subject: [PATCH 31/38] Fix YAML unmarshaling. --- pkg/mimir/mimir.go | 27 ++++++++++++++++++++------- pkg/mimir/mimir_config_test.go | 12 ++++++++++++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index ced280feea3..19274ff6f33 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -713,11 +713,21 @@ func UnmarshalCommonYAML(value *yaml.Node, inheriters ...CommonConfigInheriter) for name, loc := range inheritance.ClientClusterValidation { specificClusterValidationLocations[name] = loc } + specificInstrumentRefLeaksPercentageLocations := specificLocationsUnmarshaler{} + for name, loc := range inheritance.InstrumentRefLeaksPercentage { + specificInstrumentRefLeaksPercentageLocations[name] = loc + } + specificInstrumentRefLeaksBeforeReusePeriodLocations := specificLocationsUnmarshaler{} + for name, loc := range inheritance.InstrumentRefLeaksBeforeReusePeriod { + specificInstrumentRefLeaksBeforeReusePeriodLocations[name] = loc + } common := configWithCustomCommonUnmarshaler{ Common: &commonConfigUnmarshaler{ - Storage: &specificStorageLocations, - ClientClusterValidation: &specificClusterValidationLocations, + Storage: &specificStorageLocations, + ClientClusterValidation: &specificClusterValidationLocations, + InstrumentRefLeaksPercentage: &specificInstrumentRefLeaksPercentageLocations, + InstrumentRefLeaksBeforeReusePeriod: &specificInstrumentRefLeaksBeforeReusePeriodLocations, }, } @@ -792,8 +802,10 @@ type CommonConfig struct { } type CommonConfigInheritance struct { - Storage map[string]*bucket.StorageBackendConfig - ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig + Storage map[string]*bucket.StorageBackendConfig + ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig + InstrumentRefLeaksPercentage map[string]*float64 + InstrumentRefLeaksBeforeReusePeriod map[string]*time.Duration } // RegisterFlags registers flag. @@ -816,9 +828,10 @@ type configWithCustomCommonUnmarshaler struct { // commonConfigUnmarshaler will unmarshal each field of the common config into specific locations. type commonConfigUnmarshaler struct { - Storage *specificLocationsUnmarshaler `yaml:"storage"` - ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` - InstrumentRefLeaksPercentage *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_percentage"` + Storage *specificLocationsUnmarshaler `yaml:"storage"` + ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` + InstrumentRefLeaksPercentage *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_percentage"` + InstrumentRefLeaksBeforeReusePeriod *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_before_reuse_period"` } // specificLocationsUnmarshaler will unmarshal yaml into specific locations. diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index 61f66004f22..d825d20d220 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -25,6 +25,8 @@ func TestCommonConfigCanBeExtended(t *testing.T) { args := []string{ "-common.storage.backend", "s3", "-common.client-cluster-validation.label", "client-cluster", + "-common.instrument-reference-leaks-percentage", "13.37", + "-common.instrument-reference-leaks-before-reuse-period", "20h", } require.NoError(t, fs.Parse(args)) @@ -36,6 +38,10 @@ func TestCommonConfigCanBeExtended(t *testing.T) { // Mimir's inheritance should still work. checkAllClusterValidationLabels(t, cfg, "client-cluster") + + // Non-inherited flags still work. + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaksPercentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaksBeforeReusePeriod) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -45,6 +51,8 @@ common: backend: s3 client_cluster_validation: label: client-cluster + instrument_ref_leaks_percentage: 13.37 + instrument_ref_leaks_before_reuse_period: 20h ` var cfg customExtendedConfig @@ -60,6 +68,10 @@ common: // Mimir's inheritance should still work. checkAllClusterValidationLabels(t, cfg, "client-cluster") + + // Non-inherited flags should still work. + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaksPercentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaksBeforeReusePeriod) }) } From 375561bfa2d6715c66862063ad508721aee6a4f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Tue, 2 Dec 2025 14:26:33 +0100 Subject: [PATCH 32/38] Propagate InstrumentRefLeaksBeforeReusePeriod. --- pkg/mimirpb/custom.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index ddd34921a24..17371159b99 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -33,6 +33,7 @@ func (cfg CustomCodecConfig) codec() *codecV2 { c := &codecV2{} if cfg.InstrumentRefLeaksPct > 0 { c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) + c.waitBeforeReuse = cfg.WaitBeforeReuseInstrumentedBuffer } return c } From 325f3c6bb19182fc89f332b42f025801741ff9de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Wed, 3 Dec 2025 11:31:00 +0100 Subject: [PATCH 33/38] Forgot to mprotect. --- pkg/mimirpb/custom.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 17371159b99..04ba5db01ef 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -269,6 +269,10 @@ func (b *instrumentLeaksBuf) Free() { ptr := unsafe.SliceData(buf) allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize)) if b.waitBeforeReuse > 0 { + err := syscall.Mprotect(allPages, syscall.PROT_NONE) + if err != nil { + panic(fmt.Errorf("mprotect: %w", err)) + } select { case unmapQueue <- unmapTask{buf: allPages, at: time.Now().Add(b.waitBeforeReuse)}: return From 739133cb323808bbcf1a98fd252cc33cb889f4c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Thu, 4 Dec 2025 17:57:54 +0100 Subject: [PATCH 34/38] Missing FreeBuffer in test. --- pkg/storage/ingest/version_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/ingest/version_test.go b/pkg/storage/ingest/version_test.go index b43e8796cbd..ae58627fab7 100644 --- a/pkg/storage/ingest/version_test.go +++ b/pkg/storage/ingest/version_test.go @@ -97,6 +97,7 @@ func TestDeserializeRecordContent(t *testing.T) { require.NoError(t, err) wr.ClearTimeseriesUnmarshalData() + defer wr.FreeBuffer() wr.BufferHolder = mimirpb.BufferHolder{} // We don't want to compare this. require.Equal(t, reqv0, &wr) }) From fc76511322aa22b062e95900d25baba15e1386df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Thu, 4 Dec 2025 17:59:55 +0100 Subject: [PATCH 35/38] blockbuilder: defer WriteRequest.FreeBuffer --- pkg/blockbuilder/tsdb.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index 34d0a209a32..dc23f6d806b 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -80,6 +80,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, partitionID int32, blocks // PushToStorageAndReleaseRequest implements ingest.Pusher. // It puts the samples in the TSDB. Some parts taken from (*Ingester).pushSamplesToAppender. func (b *TSDBBuilder) PushToStorageAndReleaseRequest(ctx context.Context, req *mimirpb.WriteRequest) error { + defer req.FreeBuffer() defer mimirpb.ReuseSlice(req.Timeseries) tenantID, err := dskittenant.TenantID(ctx) From 5785b01026a451f988a996114401215eb46ef6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Thu, 4 Dec 2025 18:23:16 +0100 Subject: [PATCH 36/38] Fix test build. --- pkg/ingester/ingester_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index efb4710399d..567e1aa8c29 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -72,6 +72,7 @@ import ( "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/globalerror" util_log "github.com/grafana/mimir/pkg/util/log" + "github.com/grafana/mimir/pkg/util/rw2util" util_test "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) @@ -9145,7 +9146,7 @@ func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { require.NoError(t, err) startAndWaitHealthy(t, ing, r) - syms := util_test.NewSymbolTableBuilder(nil) + syms := rw2util.NewSymbolTableBuilder(nil) orig := makeTestRW2WriteRequest(syms) buf, err := orig.Marshal() require.NoError(t, err) @@ -12294,7 +12295,7 @@ func TestIngester_NotifyPreCommit(t *testing.T) { assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3)) } -func makeTestRW2WriteRequest(syms *util_test.SymbolTableBuilder) *mimirpb.WriteRequest { +func makeTestRW2WriteRequest(syms *rw2util.SymbolTableBuilder) *mimirpb.WriteRequest { req := &mimirpb.WriteRequest{ TimeseriesRW2: []mimirpb.TimeSeriesRW2{ { From f44250c8d97884ec1517d0c5df99687ca164b042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Thu, 4 Dec 2025 18:43:16 +0100 Subject: [PATCH 37/38] Add max-inflight-instrumented-bytes. --- cmd/mimir/config-descriptor.json | 57 ++++++++++++------ cmd/mimir/help-all.txt.tmpl | 6 +- .../mimir/configure/about-versioning.md | 5 +- .../configuration-parameters/index.md | 29 ++++++---- operations/mimir/mimir-flags-defaults.json | 5 +- pkg/mimir/mimir.go | 58 ++++++++++--------- pkg/mimir/mimir_config_test.go | 21 ++++--- pkg/mimirpb/custom.go | 58 ++++++++++++++----- 8 files changed, 154 insertions(+), 85 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 037e9f5bcea..179a02d8761 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20771,26 +20771,47 @@ "fieldDefaultValue": null }, { - "kind": "field", - "name": "instrument_ref_leaks_percentage", - "required": false, - "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.", - "fieldValue": null, - "fieldDefaultValue": 0, - "fieldFlag": "common.instrument-reference-leaks-percentage", - "fieldType": "float", - "fieldCategory": "experimental" - }, - { - "kind": "field", - "name": "instrument_ref_leaks_before_reuse_period", + "kind": "block", + "name": "instrument_ref_leaks", "required": false, - "desc": "Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.", + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "percentage", + "required": false, + "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "common.instrument-reference-leaks.percentage", + "fieldType": "float", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "before_reuse_period", + "required": false, + "desc": "Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.", + "fieldValue": null, + "fieldDefaultValue": 120000000000, + "fieldFlag": "common.instrument-reference-leaks.before-reuse-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_inflight_instrumented_bytes", + "required": false, + "desc": "Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "common.instrument-reference-leaks.max-inflight-instrumented-bytes", + "fieldType": "int", + "fieldCategory": "experimental" + } + ], "fieldValue": null, - "fieldDefaultValue": 120000000000, - "fieldFlag": "common.instrument-reference-leaks-before-reuse-period", - "fieldType": "duration", - "fieldCategory": "experimental" + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index c1078007f50..0503c04b38d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,9 +857,11 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. - -common.instrument-reference-leaks-before-reuse-period duration + -common.instrument-reference-leaks.before-reuse-period duration [experimental] Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection. (default 2m0s) - -common.instrument-reference-leaks-percentage float + -common.instrument-reference-leaks.max-inflight-instrumented-bytes uint + [experimental] Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit. + -common.instrument-reference-leaks.percentage float [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 990d8e5725e..deee11a8d63 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -254,7 +254,10 @@ The following features are currently experimental: - Common - Instrument a fraction of pooled objects for references that outlive their lifetime. - Only implemented for objects embedding `mimirpb.BufferHolder`. - - Flag: `-common.instrument-reference-leaks-percentage` + - Flags: + - `-common.instrument-reference-leaks.percentage` + - `-common.instrument-reference-leaks.before-reuse-period` + - `-common.instrument-reference-leaks.max-inflight-instrumented-bytes` - Preferred available zone for querying ingesters and store-gateways - `-querier.prefer-availability-zone` - Memberlist zone-aware routing diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index a7c2aaee0b2..a964a8323cd 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -519,17 +519,24 @@ client_cluster_validation: # CLI flag: -common.client-cluster-validation.label [label: | default = ""] -# (experimental) Percentage [0-100] of request or message buffers to instrument -# for reference leaks. Set to 0 to disable. -# CLI flag: -common.instrument-reference-leaks-percentage -[instrument_ref_leaks_percentage: | default = 0] - -# (experimental) Period after a buffer instrumented for referenced leaks is -# nominally freed until the buffer is uninstrumented and effectively freed to be -# reused. After this period, any lingering references to the buffer may -# potentially be dereferenced again with no detection. -# CLI flag: -common.instrument-reference-leaks-before-reuse-period -[instrument_ref_leaks_before_reuse_period: | default = 2m] +instrument_ref_leaks: + # (experimental) Percentage [0-100] of request or message buffers to + # instrument for reference leaks. Set to 0 to disable. + # CLI flag: -common.instrument-reference-leaks.percentage + [percentage: | default = 0] + + # (experimental) Period after a buffer instrumented for referenced leaks is + # nominally freed until the buffer is uninstrumented and effectively freed to + # be reused. After this period, any lingering references to the buffer may + # potentially be dereferenced again with no detection. + # CLI flag: -common.instrument-reference-leaks.before-reuse-period + [before_reuse_period: | default = 2m] + + # (experimental) Maximum sum of length of buffers instrumented at any given + # time, in bytes. When surpassed, incoming buffers will not be instrumented, + # regardless of the configured percentage. Zero means no limit. + # CLI flag: -common.instrument-reference-leaks.max-inflight-instrumented-bytes + [max_inflight_instrumented_bytes: | default = 0] ``` ### server diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index d0a935e86e7..070b2580f71 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1475,8 +1475,9 @@ "common.storage.swift.request-timeout": 5000000000, "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", - "common.instrument-reference-leaks-percentage": 0, - "common.instrument-reference-leaks-before-reuse-period": 120000000000, + "common.instrument-reference-leaks.percentage": 0, + "common.instrument-reference-leaks.before-reuse-period": 120000000000, + "common.instrument-reference-leaks.max-inflight-instrumented-bytes": 0, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 19274ff6f33..b857251694d 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -713,21 +713,16 @@ func UnmarshalCommonYAML(value *yaml.Node, inheriters ...CommonConfigInheriter) for name, loc := range inheritance.ClientClusterValidation { specificClusterValidationLocations[name] = loc } - specificInstrumentRefLeaksPercentageLocations := specificLocationsUnmarshaler{} - for name, loc := range inheritance.InstrumentRefLeaksPercentage { - specificInstrumentRefLeaksPercentageLocations[name] = loc - } - specificInstrumentRefLeaksBeforeReusePeriodLocations := specificLocationsUnmarshaler{} - for name, loc := range inheritance.InstrumentRefLeaksBeforeReusePeriod { - specificInstrumentRefLeaksBeforeReusePeriodLocations[name] = loc + specificInstrumentRefLeaksLocations := specificLocationsUnmarshaler{} + for name, loc := range inheritance.InstrumentRefLeaksConfig { + specificInstrumentRefLeaksLocations[name] = loc } common := configWithCustomCommonUnmarshaler{ Common: &commonConfigUnmarshaler{ - Storage: &specificStorageLocations, - ClientClusterValidation: &specificClusterValidationLocations, - InstrumentRefLeaksPercentage: &specificInstrumentRefLeaksPercentageLocations, - InstrumentRefLeaksBeforeReusePeriod: &specificInstrumentRefLeaksBeforeReusePeriodLocations, + Storage: &specificStorageLocations, + ClientClusterValidation: &specificClusterValidationLocations, + InstrumentRefLeaks: &specificInstrumentRefLeaksLocations, }, } @@ -795,25 +790,34 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag } type CommonConfig struct { - Storage bucket.StorageBackendConfig `yaml:"storage"` - ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` - InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"` - InstrumentRefLeaksBeforeReusePeriod time.Duration `yaml:"instrument_ref_leaks_before_reuse_period" category:"experimental"` + Storage bucket.StorageBackendConfig `yaml:"storage"` + ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` + InstrumentRefLeaks InstrumentRefLeaksConfig `yaml:"instrument_ref_leaks" category:"experimental"` +} + +type InstrumentRefLeaksConfig struct { + Percentage float64 `yaml:"percentage" category:"experimental"` + BeforeReusePeriod time.Duration `yaml:"before_reuse_period" category:"experimental"` + MaxInflightInstrumentedBytes uint64 `yaml:"max_inflight_instrumented_bytes" category:"experimental"` +} + +func (c *InstrumentRefLeaksConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.Float64Var(&c.Percentage, prefix+"percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.`) + f.DurationVar(&c.BeforeReusePeriod, prefix+"before-reuse-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) + f.Uint64Var(&c.MaxInflightInstrumentedBytes, prefix+"max-inflight-instrumented-bytes", 0, `Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit.`) } type CommonConfigInheritance struct { - Storage map[string]*bucket.StorageBackendConfig - ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig - InstrumentRefLeaksPercentage map[string]*float64 - InstrumentRefLeaksBeforeReusePeriod map[string]*time.Duration + Storage map[string]*bucket.StorageBackendConfig + ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig + InstrumentRefLeaksConfig map[string]*InstrumentRefLeaksConfig } // RegisterFlags registers flag. func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) - f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.`) - f.DurationVar(&c.InstrumentRefLeaksBeforeReusePeriod, "common.instrument-reference-leaks-before-reuse-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) + c.InstrumentRefLeaks.RegisterFlagsWithPrefix("common.instrument-reference-leaks.", f) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. @@ -828,10 +832,9 @@ type configWithCustomCommonUnmarshaler struct { // commonConfigUnmarshaler will unmarshal each field of the common config into specific locations. type commonConfigUnmarshaler struct { - Storage *specificLocationsUnmarshaler `yaml:"storage"` - ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` - InstrumentRefLeaksPercentage *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_percentage"` - InstrumentRefLeaksBeforeReusePeriod *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_before_reuse_period"` + Storage *specificLocationsUnmarshaler `yaml:"storage"` + ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` + InstrumentRefLeaks *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks"` } // specificLocationsUnmarshaler will unmarshal yaml into specific locations. @@ -928,8 +931,9 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { setUpGoRuntimeMetrics(cfg, reg) mimirpb.CustomCodecConfig{ - InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPercentage, - WaitBeforeReuseInstrumentedBuffer: cfg.Common.InstrumentRefLeaksBeforeReusePeriod, + InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaks.Percentage, + WaitBeforeReuseInstrumentedBuffer: cfg.Common.InstrumentRefLeaks.BeforeReusePeriod, + MaxInflightInstrumentedBytes: cfg.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes, }.RegisterGlobally() if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index d825d20d220..e6fdcea5939 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -25,8 +25,9 @@ func TestCommonConfigCanBeExtended(t *testing.T) { args := []string{ "-common.storage.backend", "s3", "-common.client-cluster-validation.label", "client-cluster", - "-common.instrument-reference-leaks-percentage", "13.37", - "-common.instrument-reference-leaks-before-reuse-period", "20h", + "-common.instrument-reference-leaks.percentage", "13.37", + "-common.instrument-reference-leaks.before-reuse-period", "20h", + "-common.instrument-reference-leaks.max-inflight-instrumented-bytes", "1048576", } require.NoError(t, fs.Parse(args)) @@ -40,8 +41,9 @@ func TestCommonConfigCanBeExtended(t *testing.T) { checkAllClusterValidationLabels(t, cfg, "client-cluster") // Non-inherited flags still work. - require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaksPercentage) - require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaksBeforeReusePeriod) + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaks.Percentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaks.BeforeReusePeriod) + require.Equal(t, uint64(1048576), cfg.MimirConfig.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -51,8 +53,10 @@ common: backend: s3 client_cluster_validation: label: client-cluster - instrument_ref_leaks_percentage: 13.37 - instrument_ref_leaks_before_reuse_period: 20h + instrument_ref_leaks: + percentage: 13.37 + before_reuse_period: 20h + max_inflight_instrumented_bytes: 2097152 ` var cfg customExtendedConfig @@ -70,8 +74,9 @@ common: checkAllClusterValidationLabels(t, cfg, "client-cluster") // Non-inherited flags should still work. - require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaksPercentage) - require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaksBeforeReusePeriod) + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaks.Percentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaks.BeforeReusePeriod) + require.Equal(t, uint64(2097152), cfg.MimirConfig.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes) }) } diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 04ba5db01ef..8a496ae5c61 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -25,6 +25,7 @@ import ( type CustomCodecConfig struct { InstrumentRefLeaksPct float64 WaitBeforeReuseInstrumentedBuffer time.Duration + MaxInflightInstrumentedBytes uint64 } var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name() @@ -34,6 +35,10 @@ func (cfg CustomCodecConfig) codec() *codecV2 { if cfg.InstrumentRefLeaksPct > 0 { c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) c.waitBeforeReuse = cfg.WaitBeforeReuseInstrumentedBuffer + c.maxInflightInstrumentedBytes = cfg.MaxInflightInstrumentedBytes + if c.maxInflightInstrumentedBytes == 0 { + c.maxInflightInstrumentedBytes = math.MaxUint64 + } } return c } @@ -54,6 +59,7 @@ func init() { // Instrument all buffers when testing. config.InstrumentRefLeaksPct = 100 config.WaitBeforeReuseInstrumentedBuffer = 0 + config.MaxInflightInstrumentedBytes = 0 } config.RegisterGlobally() } @@ -63,9 +69,12 @@ func init() { // We customize unmarshalling in order to use an optimized path when possible, // and to retain the unmarshalling buffer when necessary. type codecV2 struct { - instrumentRefLeaksOneIn uint64 + instrumentRefLeaksOneIn uint64 + waitBeforeReuse time.Duration + maxInflightInstrumentedBytes uint64 + unmarshaledWithBufferRefCount atomic.Uint64 - waitBeforeReuse time.Duration + inflightInstrumentedBytes atomic.Uint64 } var _ encoding.CodecV2 = &codecV2{} @@ -168,12 +177,21 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { holder, isBufferHolder := v.(MessageWithBufferRef) instrumentLeaks := data.Len() > 0 && isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 + var pageAlignedLen int + if instrumentLeaks { + pageAlignedLen = roundUpToMultiple(data.Len(), pageSize) + inflight := c.inflightInstrumentedBytes.Add(uint64(pageAlignedLen)) + if inflight > c.maxInflightInstrumentedBytes { + c.inflightInstrumentedBytes.Sub(uint64(pageAlignedLen)) + instrumentLeaks = false + } + } + var buf mem.Buffer if instrumentLeaks { // Allocate separate pages for this buffer. We'll detect ref leaks by // munmaping the pages on Free, after which trying to access them will // segfault. - pageAlignedLen := roundUpToMultiple(data.Len(), pageSize) b, err := syscall.Mmap(-1, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) if err != nil { panic(fmt.Errorf("mmap: %w", err)) @@ -181,7 +199,11 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { b = b[:data.Len()] data.CopyTo(b) buf = mem.NewBuffer(&b, nil) - instrumentedBuf := &instrumentLeaksBuf{Buffer: buf, waitBeforeReuse: c.waitBeforeReuse} + instrumentedBuf := &instrumentLeaksBuf{ + Buffer: buf, + waitBeforeReuse: c.waitBeforeReuse, + inflightInstrumentedBytes: &c.inflightInstrumentedBytes, + } instrumentedBuf.refCount.Inc() buf = instrumentedBuf } else if len(data) == 1 { @@ -252,8 +274,9 @@ var _ MessageWithBufferRef = &BufferHolder{} type instrumentLeaksBuf struct { mem.Buffer - refCount atomic.Int64 - waitBeforeReuse time.Duration + refCount atomic.Int64 + waitBeforeReuse time.Duration + inflightInstrumentedBytes *atomic.Uint64 } func (b *instrumentLeaksBuf) Ref() { @@ -280,16 +303,14 @@ func (b *instrumentLeaksBuf) Free() { // Queue is full, munmap right away. } } - err := syscall.Munmap(allPages) - if err != nil { - panic(fmt.Errorf("munmap: %w", err)) - } + unmap(allPages, b.inflightInstrumentedBytes) } } type unmapTask struct { - buf []byte - at time.Time + buf []byte + at time.Time + inflightInstrumentedBytes *atomic.Uint64 } var unmapQueue chan unmapTask @@ -299,14 +320,19 @@ var startFreeingInstrumentedBuffers = sync.OnceFunc(func() { go func() { for t := range unmapQueue { time.Sleep(time.Until(t.at)) - err := syscall.Munmap(t.buf) - if err != nil { - panic(fmt.Errorf("munmap: %w", err)) - } + unmap(t.buf, t.inflightInstrumentedBytes) } }() }) +func unmap(buf []byte, inflightInstrumentedBytes *atomic.Uint64) { + inflightInstrumentedBytes.Sub(uint64(len(buf))) + err := syscall.Munmap(buf) + if err != nil { + panic(fmt.Errorf("munmap: %w", err)) + } +} + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { From ff97b8c7bf255a13c8bf240b8e3795f5dc030ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20C=C3=A1rdenas?= Date: Thu, 4 Dec 2025 18:52:06 +0100 Subject: [PATCH 38/38] Pass inflightInstrumentedBytes around. --- pkg/mimirpb/custom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 8a496ae5c61..9d2e72be141 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -297,7 +297,7 @@ func (b *instrumentLeaksBuf) Free() { panic(fmt.Errorf("mprotect: %w", err)) } select { - case unmapQueue <- unmapTask{buf: allPages, at: time.Now().Add(b.waitBeforeReuse)}: + case unmapQueue <- unmapTask{buf: allPages, at: time.Now().Add(b.waitBeforeReuse), inflightInstrumentedBytes: b.inflightInstrumentedBytes}: return default: // Queue is full, munmap right away.