Skip to content
Draft
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8b48ad9
Add mechanism to detect leaked references to mimirpb buffers.
tcard Nov 21, 2025
56fd9b0
Simplify test.
tcard Nov 21, 2025
0b57a52
Plumb ingester with ref leaks detection.
tcard Nov 21, 2025
f27be92
Fix lint, test.
tcard Nov 21, 2025
86258b7
More fixing.
tcard Nov 21, 2025
71f14aa
Fix racy test
tcard Nov 21, 2025
f1ab684
make doc
tcard Nov 21, 2025
f6a6b41
CHANGELOG.
tcard Nov 21, 2025
ef1d54e
about-versioning.md
tcard Nov 21, 2025
69ca33a
Revert "mimirpb: Clone metadata fields rather than maintaining refere…
tcard Nov 21, 2025
9d6d2d0
Make test deterministic.
tcard Nov 24, 2025
efc7fec
Fix mimirpb build.
tcard Nov 24, 2025
9012300
Reapply "mimirpb: Clone metadata fields rather than maintaining refer…
tcard Nov 24, 2025
a6eb3e6
Lint.
tcard Nov 24, 2025
a6e1c78
make license
tcard Nov 24, 2025
de97dd5
Fixes from Cursor review.
tcard Nov 24, 2025
da56796
Apply suggestions from code review
tcard Nov 25, 2025
7ca73a3
pct -> percentage
tcard Nov 26, 2025
d9afc39
Don't rely on GC; use mmap/munmap instead.
tcard Nov 25, 2025
466a6de
Don't instrument empty buffers.
tcard Nov 26, 2025
628ee24
Atomic ref count.
tcard Nov 26, 2025
2dbb9a6
make reference-help
tcard Nov 26, 2025
4180411
Merge branch 'main' into instrument-ref-leaks
tcard Nov 27, 2025
de52a8a
Move things around to ensure no mmap leak.
tcard Dec 2, 2025
4b0636e
Reword description.
tcard Dec 2, 2025
c10028f
Register default global codec on init.
tcard Dec 2, 2025
25fef18
Don't wrap encoding.CodecV2.
tcard Dec 2, 2025
a47d9c2
Fewer public globals.
tcard Dec 2, 2025
49fc9a2
Document that pool won't be used, and make it explicitly so.
tcard Dec 2, 2025
b39485d
Wait before munmapping.
tcard Dec 2, 2025
8bff504
Consistent naming.
tcard Dec 2, 2025
e5f7446
Fix YAML unmarshaling.
tcard Dec 2, 2025
375561b
Propagate InstrumentRefLeaksBeforeReusePeriod.
tcard Dec 2, 2025
325f3c6
Forgot to mprotect.
tcard Dec 3, 2025
a3ff570
Merge branch 'main' of github.com:grafana/mimir into instrument-ref-l…
tcard Dec 3, 2025
739133c
Missing FreeBuffer in test.
tcard Dec 4, 2025
fc76511
blockbuilder: defer WriteRequest.FreeBuffer
tcard Dec 4, 2025
5785b01
Fix test build.
tcard Dec 4, 2025
f44250c
Add max-inflight-instrumented-bytes.
tcard Dec 4, 2025
ff97b8c
Pass inflightInstrumentedBytes around.
tcard Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
* [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586
* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-percentage` to leaked references to gRPC buffers. #13609
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
* [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -20769,6 +20769,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "instrument_ref_leaks_percentage",
"required": false,
"desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "common.instrument-reference-leaks-percentage",
"fieldType": "float",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,8 @@ Usage of ./cmd/mimir/mimir:
TSDB WAL segments files max size (bytes). (default 134217728)
-common.client-cluster-validation.label string
[experimental] Primary cluster validation label.
-common.instrument-reference-leaks-percentage float
[experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.
-common.storage.azure.account-key string
Azure storage account key. If unset, Azure managed identities will be used for authentication instead.
-common.storage.azure.account-name string
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ The following features are currently experimental:
- Assuming that a gRPC client configuration can be reached via `-<grpc-client-config-path>`, cluster validation label is configured via: `-<grpc-client-config-path>.cluster-validation.label`.
- The cluster validation label of all gRPC clients can be configured via `-common.client-cluster-validation.label`.
- Requests with invalid cluster validation labels are tracked via the `cortex_client_invalid_cluster_validation_label_requests_total` metric.
- Common
- Instrument a fraction of pooled objects for references that outlive their lifetime.
- Only implemented for objects embedding `mimirpb.BufferHolder`.
- Flag: `-common.instrument-reference-leaks-percentage`
- Preferred available zone for querying ingesters and store-gateways
- `-querier.prefer-availability-zone`
- Memberlist zone-aware routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ client_cluster_validation:
# (experimental) Primary cluster validation label.
# CLI flag: -common.client-cluster-validation.label
[label: <string> | default = ""]

# (experimental) Percentage [0-100] of request or message buffers to instrument
# for reference leaks. 0 to disable.
# CLI flag: -common.instrument-reference-leaks-percentage
[instrument_ref_leaks_percentage: <float> | default = 0]
```

### server
Expand Down
1 change: 1 addition & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@
"common.storage.swift.request-timeout": 5000000000,
"common.storage.filesystem.dir": "",
"common.client-cluster-validation.label": "",
"common.instrument-reference-leaks-percentage": 0,
"timeseries-unmarshal-caching-optimization-enabled": true,
"cost-attribution.eviction-interval": 1200000000000,
"cost-attribution.registry-path": "",
Expand Down
69 changes: 69 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9136,6 +9136,44 @@ func TestIngesterMetadataMetrics(t *testing.T) {

}

func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.MetadataRetainPeriod = 20 * time.Second

ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, "", reg)
require.NoError(t, err)
startAndWaitHealthy(t, ing, r)

syms := util_test.NewSymbolTableBuilder(nil)
orig := makeTestRW2WriteRequest(syms)
buf, err := orig.Marshal()
require.NoError(t, err)

var wr mimirpb.PreallocWriteRequest
wr.UnmarshalFromRW2 = true
wr.SkipNormalizeMetadataMetricName = true
wr.SkipDeduplicateMetadata = true
err = mimirpb.Unmarshal(buf, &wr)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing.Push(ctx, &wr.WriteRequest)
require.NoError(t, err)

meta, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{
Metric: "test_metric_total",
Limit: 1, LimitPerMetric: 1,
})
require.NoError(t, err)
require.Equal(t, []*mimirpb.MetricMetadata{{
MetricFamilyName: "test_metric_total",
Type: mimirpb.COUNTER,
Help: "test_metric_help",
Unit: "test_metric_unit",
}}, meta.Metadata)
}

func TestIngesterSendsOnlySeriesWithData(t *testing.T) {
ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), defaultLimitsTestConfig(), nil, "", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -12255,3 +12293,34 @@ func TestIngester_NotifyPreCommit(t *testing.T) {
// As there are three users, fsync should have been called at least three times
assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3))
}

func makeTestRW2WriteRequest(syms *util_test.SymbolTableBuilder) *mimirpb.WriteRequest {
req := &mimirpb.WriteRequest{
TimeseriesRW2: []mimirpb.TimeSeriesRW2{
{
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")},
Samples: []mimirpb.Sample{
{
Value: 123.456,
TimestampMs: 1234567890,
},
},
Exemplars: []mimirpb.ExemplarRW2{
{
Value: 123.456,
Timestamp: 1234567890,
LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("traceID"), syms.GetSymbol("1234567890abcdef")},
},
},
Metadata: mimirpb.MetadataRW2{
Type: mimirpb.METRIC_TYPE_COUNTER,
HelpRef: syms.GetSymbol("test_metric_help"),
UnitRef: syms.GetSymbol("test_metric_unit"),
},
},
},
}
req.SymbolsRW2 = syms.GetSymbols()

return req
}
13 changes: 9 additions & 4 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,9 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag
}

type CommonConfig struct {
Storage bucket.StorageBackendConfig `yaml:"storage"`
ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"`
Storage bucket.StorageBackendConfig `yaml:"storage"`
ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"`
InstrumentRefLeaksPercentage float64 `yaml:"instrument_ref_leaks_percentage" category:"experimental"`
}

type CommonConfigInheritance struct {
Expand All @@ -798,6 +799,7 @@ type CommonConfigInheritance struct {
func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) {
c.Storage.RegisterFlagsWithPrefix("common.storage.", f)
c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f)
f.Float64Var(&c.InstrumentRefLeaksPercentage, "common.instrument-reference-leaks-percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. 0 to disable.`)
}

// configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field.
Expand All @@ -812,8 +814,9 @@ type configWithCustomCommonUnmarshaler struct {

// commonConfigUnmarshaler will unmarshal each field of the common config into specific locations.
type commonConfigUnmarshaler struct {
Storage *specificLocationsUnmarshaler `yaml:"storage"`
ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"`
Storage *specificLocationsUnmarshaler `yaml:"storage"`
ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"`
InstrumentRefLeaksPercentage *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_percentage"`
}

// specificLocationsUnmarshaler will unmarshal yaml into specific locations.
Expand Down Expand Up @@ -909,6 +912,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) {

setUpGoRuntimeMetrics(cfg, reg)

mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPercentage}.RegisterGlobally()

if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled {
util_log.WarnExperimentalUse("ruler.tenant-federation")
}
Expand Down
110 changes: 104 additions & 6 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,57 @@ import (
"bytes"
"fmt"
"math"
"syscall"
"testing"
"unsafe"

gogoproto "github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/model/histogram"
"go.uber.org/atomic"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/mem"
protobufproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)

type CustomCodecConfig struct {
InstrumentRefLeaksPct float64
}

var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name()

func (cfg CustomCodecConfig) codec() *codecV2 {
c := &codecV2{}
if cfg.InstrumentRefLeaksPct > 0 {
c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct))
}
return c
}

var globalCodec encoding.CodecV2

func (cfg CustomCodecConfig) RegisterGlobally() {
globalCodec = cfg.codec()
encoding.RegisterCodecV2(globalCodec)
}

func init() {
c := encoding.GetCodecV2(proto.Name)
encoding.RegisterCodecV2(&codecV2{codec: c})
config := CustomCodecConfig{}
if testing.Testing() {
// Instrument all buffers when testing.
config.InstrumentRefLeaksPct = 100
}
config.RegisterGlobally()
}

// codecV2 customizes gRPC marshalling and unmarshalling.
// We customize marshalling in order to use optimized paths when possible.
// We customize unmarshalling in order to use an optimized path when possible,
// and to retain the unmarshalling buffer when necessary.
type codecV2 struct {
codec encoding.CodecV2
instrumentRefLeaksOneIn uint64
unmarshaledWithBufferRefCount atomic.Uint64
}

var _ encoding.CodecV2 = &codecV2{}
Expand Down Expand Up @@ -113,11 +143,47 @@ func unmarshalSlicePoolSizes() []int {
return sizes
}

// Unmarshal unmarshals an object using the global codec. Prefer this over
// calling the Unmarshal method directly, as it will take advantage of leak
// detection.
func Unmarshal(data []byte, v gogoproto.Unmarshaler) error {
return globalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v)
}

var pageSize = syscall.Getpagesize()

// Unmarshal customizes gRPC unmarshalling.
// If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented.
// The latter means that v's FreeBuffer method should be called when v is no longer used.
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
buf := data.MaterializeToBuffer(unmarshalSlicePool)
holder, isBufferHolder := v.(MessageWithBufferRef)
instrumentLeaks := data.Len() > 0 && isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0

var buf mem.Buffer
if instrumentLeaks {
// Allocate separate pages for this buffer. We'll detect ref leaks by
// munmaping the pages on Free, after which trying to access them will
// segfault.
pageAlignedLen := roundUpToMultiple(data.Len(), pageSize)
b, err := syscall.Mmap(0, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON)
if err != nil {
panic(fmt.Errorf("mmap: %w", err))
}
b = b[:data.Len()]
data.CopyTo(b)
buf = mem.NewBuffer(&b, nil)
instrumentedBuf := &instrumentLeaksBuf{Buffer: buf}
instrumentedBuf.refCount.Inc()
buf = instrumentedBuf
} else if len(data) == 1 {
// BufferSlice.MaterializeToBuffer already has this behavior when
// len(data) == 1, but we reproduce it here for explicitness and for
// ensuring forward-compatibility.
data[0].Ref()
buf = data[0]
} else {
buf = data.MaterializeToBuffer(unmarshalSlicePool)
}
// Decrement buf's reference count. Note though that if v implements MessageWithBufferRef,
// we increase buf's reference count first so it doesn't go to zero.
defer buf.Free()
Expand All @@ -133,7 +199,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
}
}

if holder, ok := v.(MessageWithBufferRef); ok {
if isBufferHolder {
buf.Ref()
holder.SetBuffer(buf)
}
Expand All @@ -142,7 +208,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
}

func (c *codecV2) Name() string {
return c.codec.Name()
return baseCodecV2Name
}

// MessageWithBufferRef is an unmarshalling buffer retaining protobuf message.
Expand All @@ -158,6 +224,10 @@ type BufferHolder struct {
buffer mem.Buffer
}

func (m *BufferHolder) Buffer() mem.Buffer {
return m.buffer
}

func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}
Expand All @@ -171,6 +241,30 @@ func (m *BufferHolder) FreeBuffer() {

var _ MessageWithBufferRef = &BufferHolder{}

type instrumentLeaksBuf struct {
mem.Buffer
refCount atomic.Int64
}

func (b *instrumentLeaksBuf) Ref() {
b.Buffer.Ref()
b.refCount.Inc()
}

func (b *instrumentLeaksBuf) Free() {
b.Buffer.Free()

if b.refCount.Dec() == 0 {
buf := b.ReadOnlyData()
ptr := unsafe.SliceData(buf)
allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize))
err := syscall.Munmap(allPages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buf is (say) (0x70000000, 0x74000000] and immediately after this Munmap call returns, another instrumented pool buffer is mmapped that at least partially overlaps with that same range, then there can be times when dangling references to this buf are permitted to dereference data in the new buf without error. It may be more confidence-inspiring to claim "our new build served 500,000 requests out of the leak-instrumented pool with zero use-after-frees" if the guard mechanism is precise.

The idea I had was to have a long-lived MMapped buffer that has a cool-down time that is >> any reasonable request processing time. Which would have the same problem described above but only if there's a pathological request that runs for more than the cooldown time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same concern initially, but then noticed the docs for munmap say:

The munmap() system call deletes the mappings for the specified address range, and causes further references to addresses within the range to generate invalid memory references.

It seems to me the emphasized part guarantees that the address space range won't be reused, but maybe there's an implicit "... until the addresses are reused" in there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more googling/LLMing and a short foray into OpenBSD's implementation point in the direction that you are right, and the address space can potentially be reused. Thanks for challenging this!

See b39485d

if err != nil {
panic(fmt.Errorf("munmap: %w", err))
}
}
}

// MinTimestamp returns the minimum timestamp (milliseconds) among all series
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
func (m *WriteRequest) MinTimestamp() int64 {
Expand Down Expand Up @@ -534,3 +628,7 @@ type orderAwareMetricMetadata struct {
// order is the 0-based index of this metadata object in a wider metadata array.
order int
}

func roundUpToMultiple(n, of int) int {
return ((n + of - 1) / of) * of
}
Loading