Skip to content
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8b48ad9
Add mechanism to detect leaked references to mimirpb buffers.
tcard Nov 21, 2025
56fd9b0
Simplify test.
tcard Nov 21, 2025
0b57a52
Plumb ingester with ref leaks detection.
tcard Nov 21, 2025
f27be92
Fix lint, test.
tcard Nov 21, 2025
86258b7
More fixing.
tcard Nov 21, 2025
71f14aa
Fix racy test
tcard Nov 21, 2025
f1ab684
make doc
tcard Nov 21, 2025
f6a6b41
CHANGELOG.
tcard Nov 21, 2025
ef1d54e
about-versioning.md
tcard Nov 21, 2025
69ca33a
Revert "mimirpb: Clone metadata fields rather than maintaining refere…
tcard Nov 21, 2025
9d6d2d0
Make test deterministic.
tcard Nov 24, 2025
efc7fec
Fix mimirpb build.
tcard Nov 24, 2025
9012300
Reapply "mimirpb: Clone metadata fields rather than maintaining refer…
tcard Nov 24, 2025
a6eb3e6
Lint.
tcard Nov 24, 2025
a6e1c78
make license
tcard Nov 24, 2025
de97dd5
Fixes from Cursor review.
tcard Nov 24, 2025
da56796
Apply suggestions from code review
tcard Nov 25, 2025
7ca73a3
pct -> percentage
tcard Nov 26, 2025
d9afc39
Don't rely on GC; use mmap/munmap instead.
tcard Nov 25, 2025
466a6de
Don't instrument empty buffers.
tcard Nov 26, 2025
628ee24
Atomic ref count.
tcard Nov 26, 2025
2dbb9a6
make reference-help
tcard Nov 26, 2025
4180411
Merge branch 'main' into instrument-ref-leaks
tcard Nov 27, 2025
de52a8a
Move things around to ensure no mmap leak.
tcard Dec 2, 2025
4b0636e
Reword description.
tcard Dec 2, 2025
c10028f
Register default global codec on init.
tcard Dec 2, 2025
25fef18
Don't wrap encoding.CodecV2.
tcard Dec 2, 2025
a47d9c2
Fewer public globals.
tcard Dec 2, 2025
49fc9a2
Document that pool won't be used, and make it explicitly so.
tcard Dec 2, 2025
b39485d
Wait before munmapping.
tcard Dec 2, 2025
8bff504
Consistent naming.
tcard Dec 2, 2025
e5f7446
Fix YAML unmarshaling.
tcard Dec 2, 2025
375561b
Propagate InstrumentRefLeaksBeforeReusePeriod.
tcard Dec 2, 2025
325f3c6
Forgot to mprotect.
tcard Dec 3, 2025
a3ff570
Merge branch 'main' of github.com:grafana/mimir into instrument-ref-l…
tcard Dec 3, 2025
739133c
Missing FreeBuffer in test.
tcard Dec 4, 2025
fc76511
blockbuilder: defer WriteRequest.FreeBuffer
tcard Dec 4, 2025
5785b01
Fix test build.
tcard Dec 4, 2025
f44250c
Add max-inflight-instrumented-bytes.
tcard Dec 4, 2025
ff97b8c
Pass inflightInstrumentedBytes around.
tcard Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* [ENHANCEMENT] Usage tracker: `loadSnapshot()` checks shard emptiness instead of using explicit `first` parameter. #13534
* [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-pct` to leaked references to gRPC buffers. #13609
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
* [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -20749,6 +20749,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "instrument_ref_leaks_pct",
"required": false,
"desc": "Percentage of buffers from pools to instrument for reference leaks. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "common.instrument-reference-leaks-pct",
"fieldType": "float",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,8 @@ Usage of ./cmd/mimir/mimir:
TSDB WAL segments files max size (bytes). (default 134217728)
-common.client-cluster-validation.label string
[experimental] Primary cluster validation label.
-common.instrument-reference-leaks-pct float
[experimental] Percentage of buffers from pools to instrument for reference leaks. 0 to disable.
-common.storage.azure.account-key string
Azure storage account key. If unset, Azure managed identities will be used for authentication instead.
-common.storage.azure.account-name string
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ The following features are currently experimental:
- Assuming that a gRPC client configuration can be reached via `-<grpc-client-config-path>`, cluster validation label is configured via: `-<grpc-client-config-path>.cluster-validation.label`.
- The cluster validation label of all gRPC clients can be configured via `-common.client-cluster-validation.label`.
- Requests with invalid cluster validation labels are tracked via the `cortex_client_invalid_cluster_validation_label_requests_total` metric.
- Common
- Instrument a fraction of pooled objects for references that outlive their lifetime
- Only implemented for objects embedding `mimirpb.BufferHolder` so far.
- Flag: `-common.instrument-reference-leaks-pct`
- Preferred available zone for querying ingesters and store-gateways
- `-querier.prefer-availability-zone`
- Memberlist zone-aware routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ client_cluster_validation:
# (experimental) Primary cluster validation label.
# CLI flag: -common.client-cluster-validation.label
[label: <string> | default = ""]

# (experimental) Percentage of buffers from pools to instrument for reference
# leaks. 0 to disable.
# CLI flag: -common.instrument-reference-leaks-pct
[instrument_ref_leaks_pct: <float> | default = 0]
```

### server
Expand Down
1 change: 1 addition & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,7 @@
"common.storage.swift.request-timeout": 5000000000,
"common.storage.filesystem.dir": "",
"common.client-cluster-validation.label": "",
"common.instrument-reference-leaks-pct": 0,
"timeseries-unmarshal-caching-optimization-enabled": true,
"cost-attribution.eviction-interval": 1200000000000,
"cost-attribution.registry-path": "",
Expand Down
68 changes: 68 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
mimirpbtest "github.com/grafana/mimir/pkg/mimirpb/testutil"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
Expand Down Expand Up @@ -9136,6 +9137,42 @@ func TestIngesterMetadataMetrics(t *testing.T) {

}

func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.MetadataRetainPeriod = 20 * time.Second

ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, "", reg)
require.NoError(t, err)
startAndWaitHealthy(t, ing, r)

syms := util_test.NewSymbolTableBuilder(nil)
buf, err := makeTestRW2WriteRequest(syms).Marshal()
require.NoError(t, err)

var wr mimirpb.PreallocWriteRequest
wr.UnmarshalFromRW2 = true
wr.SkipNormalizeMetadataMetricName = true
wr.SkipDeduplicateMetadata = true
err = mimirpb.Unmarshal(buf, &wr)
require.NoError(t, err)

nextLeak := mimirpbtest.NextRefLeakCheck(t.Context(), wr.Buffer().ReadOnlyData())

ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing.Push(ctx, &wr.WriteRequest)
require.NoError(t, err)

select {
case leaked := <-nextLeak:
if leaked {
require.Fail(t, "reference leak detected", "addr: %p", buf)
}
case <-time.After(1 * time.Second):
require.Fail(t, "expected reference leak check", "addr: %p", buf)
}
}

func TestIngesterSendsOnlySeriesWithData(t *testing.T) {
ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), defaultLimitsTestConfig(), nil, "", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -12255,3 +12292,34 @@ func TestIngester_NotifyPreCommit(t *testing.T) {
// As there are three users, fsync should have been called at least three times
assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3))
}

func makeTestRW2WriteRequest(syms *util_test.SymbolTableBuilder) *mimirpb.WriteRequest {
req := &mimirpb.WriteRequest{
TimeseriesRW2: []mimirpb.TimeSeriesRW2{
{
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
Samples: []mimirpb.Sample{
{
Value: 123.456,
TimestampMs: 1234567890,
},
},
Exemplars: []mimirpb.ExemplarRW2{
{
Value: 123.456,
Timestamp: 1234567890,
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("traceID"), syms.GetSymbol("1234567890abcdef")},
},
},
Metadata: mimirpb.MetadataRW2{
Type: mimirpb.METRIC_TYPE_COUNTER,
HelpRef: syms.GetSymbol("test_metric_help"),
UnitRef: syms.GetSymbol("test_metric_unit"),
},
},
},
}
req.SymbolsRW2 = syms.GetSymbols()

return req
}
5 changes: 5 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag
type CommonConfig struct {
Storage bucket.StorageBackendConfig `yaml:"storage"`
ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"`
InstrumentRefLeaksPct float64 `yaml:"instrument_ref_leaks_pct" category:"experimental"`
}

type CommonConfigInheritance struct {
Expand All @@ -798,6 +799,7 @@ type CommonConfigInheritance struct {
func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) {
c.Storage.RegisterFlagsWithPrefix("common.storage.", f)
c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f)
f.Float64Var(&c.InstrumentRefLeaksPct, "common.instrument-reference-leaks-pct", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suggest to change "pct" to "percentage". It will be consistent with other percentage-based settings we have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field.
Expand All @@ -814,6 +816,7 @@ type configWithCustomCommonUnmarshaler struct {
type commonConfigUnmarshaler struct {
Storage *specificLocationsUnmarshaler `yaml:"storage"`
ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"`
InstrumentRefLeaksPct *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_pct"`
}

// specificLocationsUnmarshaler will unmarshal yaml into specific locations.
Expand Down Expand Up @@ -909,6 +912,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) {

setUpGoRuntimeMetrics(cfg, reg)

mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPct}.RegisterGlobally()

if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled {
util_log.WarnExperimentalUse("ruler.tenant-federation")
}
Expand Down
119 changes: 114 additions & 5 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,47 @@ import (
"bytes"
"fmt"
"math"
"runtime"
"testing"
"unsafe"
"weak"

gogoproto "github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/model/histogram"
"go.uber.org/atomic"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
protobufproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"

"github.com/grafana/mimir/pkg/mimirpb/internal"
)

type CustomCodecConfig struct {
InstrumentRefLeaksPct float64
}

func (cfg CustomCodecConfig) Codec() encoding.CodecV2 {
c := &codecV2{codec: encoding.GetCodecV2(proto.Name)}
if cfg.InstrumentRefLeaksPct > 0 {
c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct))
}
return c
}

var GlobalCodec encoding.CodecV2

func (cfg CustomCodecConfig) RegisterGlobally() {
GlobalCodec = cfg.Codec()
encoding.RegisterCodecV2(GlobalCodec)
}

func init() {
c := encoding.GetCodecV2(proto.Name)
encoding.RegisterCodecV2(&codecV2{codec: c})
// Instrument all buffers when testing.
if testing.Testing() {
CustomCodecConfig{InstrumentRefLeaksPct: 100}.RegisterGlobally()
}
}

// codecV2 customizes gRPC marshalling and unmarshalling.
Expand All @@ -27,6 +55,9 @@ func init() {
// and to retain the unmarshalling buffer when necessary.
type codecV2 struct {
codec encoding.CodecV2

instrumentRefLeaksOneIn uint64
unmarshaledWithBufferRefCount atomic.Uint64
}

var _ encoding.CodecV2 = &codecV2{}
Expand Down Expand Up @@ -113,14 +144,38 @@ func unmarshalSlicePoolSizes() []int {
return sizes
}

// Unmarshal unmarshals an object using the global codec. Prefer this over calling
// the Unmarshal method directly, as it will take advantage of pools and leak
// detection.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using it everywhere and add a faillint rule (defined in the Makefile) if the Unmarshal-function-with-receiver is called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea long-term, but maybe for now it's better to restrict its usage to the places I know work fine, and then introduce it everywhere else progressively. What do you think?

func Unmarshal(data []byte, v gogoproto.Unmarshaler) error {
if GlobalCodec == nil {
return v.Unmarshal(data)
}
return GlobalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v)
}

// Unmarshal customizes gRPC unmarshalling.
// If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented.
// The latter means that v's FreeBuffer method should be called when v is no longer used.
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
buf := data.MaterializeToBuffer(unmarshalSlicePool)
holder, isBufferHolder := v.(MessageWithBufferRef)
instrumentLeaks := isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0

var buf mem.Buffer
if instrumentLeaks {
// Always allocate; we'll detect leaks by checking if it's garbage-collected.
b := make([]byte, data.Len())
data.CopyTo(b)
buf = mem.NewBuffer(&b, nil)
} else {
buf = data.MaterializeToBuffer(unmarshalSlicePool)
}
// Decrement buf's reference count. Note though that if v implements MessageWithBufferRef,
// we increase buf's reference count first so it doesn't go to zero.
defer buf.Free()
//
// We're possibly replacing buf below, so it's important to keep this a closure
// rather than a direct call to buf.Free.
defer func() { buf.Free() }()

if unmarshaler, ok := v.(gogoproto.Unmarshaler); ok {
if err := unmarshaler.Unmarshal(buf.ReadOnlyData()); err != nil {
Expand All @@ -133,7 +188,10 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
}
}

if holder, ok := v.(MessageWithBufferRef); ok {
if isBufferHolder {
if instrumentLeaks {
buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1}
}
buf.Ref()
holder.SetBuffer(buf)
}
Expand All @@ -158,6 +216,10 @@ type BufferHolder struct {
buffer mem.Buffer
}

func (m *BufferHolder) Buffer() mem.Buffer {
return m.buffer
}

func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}
Expand All @@ -171,6 +233,53 @@ func (m *BufferHolder) FreeBuffer() {

var _ MessageWithBufferRef = &BufferHolder{}

type instrumentLeaksBuf struct {
mem.Buffer
refCount int
}

func (b *instrumentLeaksBuf) Ref() {
b.Buffer.Ref()
b.refCount++
}

func (b *instrumentLeaksBuf) Free() {
b.Buffer.Free()
b.refCount--

if b.refCount == 0 {
buf := b.ReadOnlyData()
// Taint data, in case the buffer is accessed before the panic below,
// or after the panic if it's recovered.
for i := range buf {
const taint = "KAEL"
buf[i] = taint[i%len(taint)]
}
// Remove our ref; no (strong) refs should remain (except in the stack).
b.Buffer = nil
ptr := unsafe.SliceData(buf)
addr := uintptr(unsafe.Pointer(ptr))
weakRef := weak.Make(ptr)

// Check in a separate goroutine, because this stack still has locals
// pointing to the buffer.
go func() {
runtime.GC()
runtime.GC()

leaked := weakRef.Value() != nil

select {
case internal.RefLeakChecks <- internal.RefLeakCheck{Addr: addr, Leaked: leaked}:
default:
if leaked {
panic(fmt.Sprintf("reference leak for object 0x%x", addr))
}
}
}()
}
}

// MinTimestamp returns the minimum timestamp (milliseconds) among all series
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
func (m *WriteRequest) MinTimestamp() int64 {
Expand Down
Loading
Loading