-
Notifications
You must be signed in to change notification settings - Fork 682
Add mechanism to detect leaked references to mimirpb buffers #13609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
8b48ad9
56fd9b0
0b57a52
f27be92
86258b7
71f14aa
f1ab684
f6a6b41
ef1d54e
69ca33a
9d6d2d0
efc7fec
9012300
a6eb3e6
a6e1c78
de97dd5
da56796
7ca73a3
d9afc39
466a6de
628ee24
2dbb9a6
4180411
de52a8a
4b0636e
c10028f
25fef18
a47d9c2
49fc9a2
b39485d
8bff504
e5f7446
375561b
325f3c6
a3ff570
739133c
fc76511
5785b01
f44250c
ff97b8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -787,6 +787,7 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag | |
| type CommonConfig struct { | ||
| Storage bucket.StorageBackendConfig `yaml:"storage"` | ||
| ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` | ||
| InstrumentRefLeaksPct float64 `yaml:"instrument_ref_leaks_pct" category:"experimental"` | ||
| } | ||
|
|
||
| type CommonConfigInheritance struct { | ||
|
|
@@ -798,6 +799,7 @@ type CommonConfigInheritance struct { | |
| func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { | ||
| c.Storage.RegisterFlagsWithPrefix("common.storage.", f) | ||
| c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) | ||
| f.Float64Var(&c.InstrumentRefLeaksPct, "common.instrument-reference-leaks-pct", 0, `Percentage of buffers from pools to instrument for reference leaks. 0 to disable.`) | ||
|
||
| } | ||
|
|
||
| // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. | ||
|
|
@@ -814,6 +816,7 @@ type configWithCustomCommonUnmarshaler struct { | |
| type commonConfigUnmarshaler struct { | ||
| Storage *specificLocationsUnmarshaler `yaml:"storage"` | ||
| ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` | ||
| InstrumentRefLeaksPct *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks_pct"` | ||
| } | ||
|
|
||
| // specificLocationsUnmarshaler will unmarshal yaml into specific locations. | ||
|
|
@@ -909,6 +912,8 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { | |
|
|
||
| setUpGoRuntimeMetrics(cfg, reg) | ||
|
|
||
| mimirpb.CustomCodecConfig{InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaksPct}.RegisterGlobally() | ||
|
|
||
| if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { | ||
| util_log.WarnExperimentalUse("ruler.tenant-federation") | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,19 +6,47 @@ import ( | |
| "bytes" | ||
| "fmt" | ||
| "math" | ||
| "runtime" | ||
| "testing" | ||
| "unsafe" | ||
| "weak" | ||
|
|
||
| gogoproto "github.com/gogo/protobuf/proto" | ||
| "github.com/prometheus/prometheus/model/histogram" | ||
| "go.uber.org/atomic" | ||
| "google.golang.org/grpc/encoding" | ||
| "google.golang.org/grpc/encoding/proto" | ||
| "google.golang.org/grpc/mem" | ||
| protobufproto "google.golang.org/protobuf/proto" | ||
| "google.golang.org/protobuf/protoadapt" | ||
|
|
||
| "github.com/grafana/mimir/pkg/mimirpb/internal" | ||
| ) | ||
|
|
||
| type CustomCodecConfig struct { | ||
| InstrumentRefLeaksPct float64 | ||
| } | ||
|
|
||
| func (cfg CustomCodecConfig) Codec() encoding.CodecV2 { | ||
| c := &codecV2{codec: encoding.GetCodecV2(proto.Name)} | ||
| if cfg.InstrumentRefLeaksPct > 0 { | ||
| c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) | ||
tcard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| return c | ||
| } | ||
tcard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| var GlobalCodec encoding.CodecV2 | ||
|
|
||
| func (cfg CustomCodecConfig) RegisterGlobally() { | ||
| GlobalCodec = cfg.Codec() | ||
| encoding.RegisterCodecV2(GlobalCodec) | ||
| } | ||
alexweav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| func init() { | ||
| c := encoding.GetCodecV2(proto.Name) | ||
| encoding.RegisterCodecV2(&codecV2{codec: c}) | ||
| // Instrument all buffers when testing. | ||
| if testing.Testing() { | ||
| CustomCodecConfig{InstrumentRefLeaksPct: 100}.RegisterGlobally() | ||
| } | ||
alexweav marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // codecV2 customizes gRPC marshalling and unmarshalling. | ||
|
|
@@ -27,6 +55,9 @@ func init() { | |
| // and to retain the unmarshalling buffer when necessary. | ||
| type codecV2 struct { | ||
| codec encoding.CodecV2 | ||
|
|
||
| instrumentRefLeaksOneIn uint64 | ||
| unmarshaledWithBufferRefCount atomic.Uint64 | ||
| } | ||
|
|
||
| var _ encoding.CodecV2 = &codecV2{} | ||
|
|
@@ -113,14 +144,38 @@ func unmarshalSlicePoolSizes() []int { | |
| return sizes | ||
| } | ||
|
|
||
| // Unmarshal unmarshals an object using the global codec. Prefer this over calling | ||
| // the Unmarshal method directly, as it will take advantage of pools and leak | ||
| // detection. | ||
|
||
| func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { | ||
| if GlobalCodec == nil { | ||
| return v.Unmarshal(data) | ||
| } | ||
| return GlobalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) | ||
| } | ||
|
|
||
| // Unmarshal customizes gRPC unmarshalling. | ||
| // If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented. | ||
| // The latter means that v's FreeBuffer method should be called when v is no longer used. | ||
| func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { | ||
| buf := data.MaterializeToBuffer(unmarshalSlicePool) | ||
| holder, isBufferHolder := v.(MessageWithBufferRef) | ||
| instrumentLeaks := isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 | ||
|
|
||
| var buf mem.Buffer | ||
| if instrumentLeaks { | ||
| // Always allocate; we'll detect leaks by checking if it's garbage-collected. | ||
| b := make([]byte, data.Len()) | ||
| data.CopyTo(b) | ||
| buf = mem.NewBuffer(&b, nil) | ||
| } else { | ||
| buf = data.MaterializeToBuffer(unmarshalSlicePool) | ||
| } | ||
| // Decrement buf's reference count. Note though that if v implements MessageWithBufferRef, | ||
| // we increase buf's reference count first so it doesn't go to zero. | ||
| defer buf.Free() | ||
| // | ||
| // We're possibly replacing buf below, so it's important to keep this a closure | ||
| // rather than a direct call to buf.Free. | ||
| defer func() { buf.Free() }() | ||
tcard marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if unmarshaler, ok := v.(gogoproto.Unmarshaler); ok { | ||
| if err := unmarshaler.Unmarshal(buf.ReadOnlyData()); err != nil { | ||
|
|
@@ -133,7 +188,10 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { | |
| } | ||
| } | ||
|
|
||
| if holder, ok := v.(MessageWithBufferRef); ok { | ||
| if isBufferHolder { | ||
| if instrumentLeaks { | ||
| buf = &instrumentLeaksBuf{Buffer: buf, refCount: 1} | ||
| } | ||
| buf.Ref() | ||
| holder.SetBuffer(buf) | ||
| } | ||
|
|
@@ -158,6 +216,10 @@ type BufferHolder struct { | |
| buffer mem.Buffer | ||
| } | ||
|
|
||
| func (m *BufferHolder) Buffer() mem.Buffer { | ||
| return m.buffer | ||
| } | ||
|
|
||
| func (m *BufferHolder) SetBuffer(buf mem.Buffer) { | ||
| m.buffer = buf | ||
| } | ||
|
|
@@ -171,6 +233,53 @@ func (m *BufferHolder) FreeBuffer() { | |
|
|
||
| var _ MessageWithBufferRef = &BufferHolder{} | ||
|
|
||
| type instrumentLeaksBuf struct { | ||
| mem.Buffer | ||
| refCount int | ||
| } | ||
|
|
||
| func (b *instrumentLeaksBuf) Ref() { | ||
| b.Buffer.Ref() | ||
| b.refCount++ | ||
| } | ||
|
|
||
| func (b *instrumentLeaksBuf) Free() { | ||
| b.Buffer.Free() | ||
| b.refCount-- | ||
|
|
||
| if b.refCount == 0 { | ||
| buf := b.ReadOnlyData() | ||
| // Taint data, in case the buffer is accessed before the panic below, | ||
| // or after the panic if it's recovered. | ||
| for i := range buf { | ||
| const taint = "KAEL" | ||
| buf[i] = taint[i%len(taint)] | ||
| } | ||
| // Remove our ref; no (strong) refs should remain (except in the stack). | ||
| b.Buffer = nil | ||
| ptr := unsafe.SliceData(buf) | ||
| addr := uintptr(unsafe.Pointer(ptr)) | ||
| weakRef := weak.Make(ptr) | ||
|
|
||
| // Check in a separate goroutine, because this stack still has locals | ||
| // pointing to the buffer. | ||
| go func() { | ||
| runtime.GC() | ||
| runtime.GC() | ||
|
|
||
| leaked := weakRef.Value() != nil | ||
|
|
||
| select { | ||
| case internal.RefLeakChecks <- internal.RefLeakCheck{Addr: addr, Leaked: leaked}: | ||
| default: | ||
| if leaked { | ||
| panic(fmt.Sprintf("reference leak for object 0x%x", addr)) | ||
| } | ||
| } | ||
| }() | ||
| } | ||
| } | ||
tcard marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // MinTimestamp returns the minimum timestamp (milliseconds) among all series | ||
| // in the WriteRequest. Returns math.MaxInt64 if the request is empty. | ||
| func (m *WriteRequest) MinTimestamp() int64 { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.