diff --git a/CHANGELOG.md b/CHANGELOG.md index 97983a27ee0..91beabbe333 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ * [ENHANCEMENT] OTLP: Add metric `cortex_distributor_otlp_requests_by_content_type_total` to track content type (json or proto) of OTLP packets. #13525 * [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525 * [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586 +* [ENHANCEMENT] Add experimental flag `common.instrument-reference-leaks-percentage` to leaked references to gRPC buffers. #13609 * [BUGFIX] Compactor: Fix potential concurrent map writes. #13053 * [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084 * [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 02ef88fddf3..179a02d8761 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -20769,6 +20769,49 @@ ], "fieldValue": null, "fieldDefaultValue": null + }, + { + "kind": "block", + "name": "instrument_ref_leaks", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "percentage", + "required": false, + "desc": "Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "common.instrument-reference-leaks.percentage", + "fieldType": "float", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "before_reuse_period", + "required": false, + "desc": "Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.", + "fieldValue": null, + "fieldDefaultValue": 120000000000, + "fieldFlag": "common.instrument-reference-leaks.before-reuse-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_inflight_instrumented_bytes", + "required": false, + "desc": "Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "common.instrument-reference-leaks.max-inflight-instrumented-bytes", + "fieldType": "int", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 0ec43dfcb4a..0503c04b38d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -857,6 +857,12 @@ Usage of ./cmd/mimir/mimir: TSDB WAL segments files max size (bytes). (default 134217728) -common.client-cluster-validation.label string [experimental] Primary cluster validation label. + -common.instrument-reference-leaks.before-reuse-period duration + [experimental] Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection. (default 2m0s) + -common.instrument-reference-leaks.max-inflight-instrumented-bytes uint + [experimental] Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit. + -common.instrument-reference-leaks.percentage float + [experimental] Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable. -common.storage.azure.account-key string Azure storage account key. If unset, Azure managed identities will be used for authentication instead. -common.storage.azure.account-name string diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 4931767ac34..deee11a8d63 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -251,6 +251,13 @@ The following features are currently experimental: - Assuming that a gRPC client configuration can be reached via `-`, cluster validation label is configured via: `-.cluster-validation.label`. - The cluster validation label of all gRPC clients can be configured via `-common.client-cluster-validation.label`. - Requests with invalid cluster validation labels are tracked via the `cortex_client_invalid_cluster_validation_label_requests_total` metric. +- Common + - Instrument a fraction of pooled objects for references that outlive their lifetime. + - Only implemented for objects embedding `mimirpb.BufferHolder`. + - Flags: + - `-common.instrument-reference-leaks.percentage` + - `-common.instrument-reference-leaks.before-reuse-period` + - `-common.instrument-reference-leaks.max-inflight-instrumented-bytes` - Preferred available zone for querying ingesters and store-gateways - `-querier.prefer-availability-zone` - Memberlist zone-aware routing diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 8bcc2688e99..a964a8323cd 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -518,6 +518,25 @@ client_cluster_validation: # (experimental) Primary cluster validation label. # CLI flag: -common.client-cluster-validation.label [label: | default = ""] + +instrument_ref_leaks: + # (experimental) Percentage [0-100] of request or message buffers to + # instrument for reference leaks. Set to 0 to disable. + # CLI flag: -common.instrument-reference-leaks.percentage + [percentage: | default = 0] + + # (experimental) Period after a buffer instrumented for referenced leaks is + # nominally freed until the buffer is uninstrumented and effectively freed to + # be reused. After this period, any lingering references to the buffer may + # potentially be dereferenced again with no detection. + # CLI flag: -common.instrument-reference-leaks.before-reuse-period + [before_reuse_period: | default = 2m] + + # (experimental) Maximum sum of length of buffers instrumented at any given + # time, in bytes. When surpassed, incoming buffers will not be instrumented, + # regardless of the configured percentage. Zero means no limit. + # CLI flag: -common.instrument-reference-leaks.max-inflight-instrumented-bytes + [max_inflight_instrumented_bytes: | default = 0] ``` ### server diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 3b020a6732a..070b2580f71 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -1475,6 +1475,9 @@ "common.storage.swift.request-timeout": 5000000000, "common.storage.filesystem.dir": "", "common.client-cluster-validation.label": "", + "common.instrument-reference-leaks.percentage": 0, + "common.instrument-reference-leaks.before-reuse-period": 120000000000, + "common.instrument-reference-leaks.max-inflight-instrumented-bytes": 0, "timeseries-unmarshal-caching-optimization-enabled": true, "cost-attribution.eviction-interval": 1200000000000, "cost-attribution.registry-path": "", diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index 34d0a209a32..dc23f6d806b 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -80,6 +80,7 @@ func NewTSDBBuilder(logger log.Logger, dataDir string, partitionID int32, blocks // PushToStorageAndReleaseRequest implements ingest.Pusher. // It puts the samples in the TSDB. Some parts taken from (*Ingester).pushSamplesToAppender. func (b *TSDBBuilder) PushToStorageAndReleaseRequest(ctx context.Context, req *mimirpb.WriteRequest) error { + defer req.FreeBuffer() defer mimirpb.ReuseSlice(req.Timeseries) tenantID, err := dskittenant.TenantID(ctx) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f1200e3cf19..567e1aa8c29 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -72,6 +72,7 @@ import ( "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/globalerror" util_log "github.com/grafana/mimir/pkg/util/log" + "github.com/grafana/mimir/pkg/util/rw2util" util_test "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) @@ -9136,6 +9137,44 @@ func TestIngesterMetadataMetrics(t *testing.T) { } +func TestIngesterNoRW2MetadataRefLeaks(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.MetadataRetainPeriod = 20 * time.Second + + ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, defaultLimitsTestConfig(), nil, "", reg) + require.NoError(t, err) + startAndWaitHealthy(t, ing, r) + + syms := rw2util.NewSymbolTableBuilder(nil) + orig := makeTestRW2WriteRequest(syms) + buf, err := orig.Marshal() + require.NoError(t, err) + + var wr mimirpb.PreallocWriteRequest + wr.UnmarshalFromRW2 = true + wr.SkipNormalizeMetadataMetricName = true + wr.SkipDeduplicateMetadata = true + err = mimirpb.Unmarshal(buf, &wr) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), userID) + _, err = ing.Push(ctx, &wr.WriteRequest) + require.NoError(t, err) + + meta, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{ + Metric: "test_metric_total", + Limit: 1, LimitPerMetric: 1, + }) + require.NoError(t, err) + require.Equal(t, []*mimirpb.MetricMetadata{{ + MetricFamilyName: "test_metric_total", + Type: mimirpb.COUNTER, + Help: "test_metric_help", + Unit: "test_metric_unit", + }}, meta.Metadata) +} + func TestIngesterSendsOnlySeriesWithData(t *testing.T) { ing, r, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), defaultLimitsTestConfig(), nil, "", nil) require.NoError(t, err) @@ -12255,3 +12294,34 @@ func TestIngester_NotifyPreCommit(t *testing.T) { // As there are three users, fsync should have been called at least three times assert.GreaterOrEqual(t, fsyncCountAfter-fsyncCountBefore, uint64(3)) } + +func makeTestRW2WriteRequest(syms *rw2util.SymbolTableBuilder) *mimirpb.WriteRequest { + req := &mimirpb.WriteRequest{ + TimeseriesRW2: []mimirpb.TimeSeriesRW2{ + { + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("job"), syms.GetSymbol("test_job")}, + Samples: []mimirpb.Sample{ + { + Value: 123.456, + TimestampMs: 1234567890, + }, + }, + Exemplars: []mimirpb.ExemplarRW2{ + { + Value: 123.456, + Timestamp: 1234567890, + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_metric_total"), syms.GetSymbol("traceID"), syms.GetSymbol("1234567890abcdef")}, + }, + }, + Metadata: mimirpb.MetadataRW2{ + Type: mimirpb.METRIC_TYPE_COUNTER, + HelpRef: syms.GetSymbol("test_metric_help"), + UnitRef: syms.GetSymbol("test_metric_unit"), + }, + }, + }, + } + req.SymbolsRW2 = syms.GetSymbols() + + return req +} diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 3f0b2570ce7..b857251694d 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -713,11 +713,16 @@ func UnmarshalCommonYAML(value *yaml.Node, inheriters ...CommonConfigInheriter) for name, loc := range inheritance.ClientClusterValidation { specificClusterValidationLocations[name] = loc } + specificInstrumentRefLeaksLocations := specificLocationsUnmarshaler{} + for name, loc := range inheritance.InstrumentRefLeaksConfig { + specificInstrumentRefLeaksLocations[name] = loc + } common := configWithCustomCommonUnmarshaler{ Common: &commonConfigUnmarshaler{ Storage: &specificStorageLocations, ClientClusterValidation: &specificClusterValidationLocations, + InstrumentRefLeaks: &specificInstrumentRefLeaksLocations, }, } @@ -787,17 +792,32 @@ func inheritFlags(log log.Logger, orig flagext.RegisteredFlagsTracker, dest flag type CommonConfig struct { Storage bucket.StorageBackendConfig `yaml:"storage"` ClientClusterValidation clusterutil.ClusterValidationConfig `yaml:"client_cluster_validation" category:"experimental"` + InstrumentRefLeaks InstrumentRefLeaksConfig `yaml:"instrument_ref_leaks" category:"experimental"` +} + +type InstrumentRefLeaksConfig struct { + Percentage float64 `yaml:"percentage" category:"experimental"` + BeforeReusePeriod time.Duration `yaml:"before_reuse_period" category:"experimental"` + MaxInflightInstrumentedBytes uint64 `yaml:"max_inflight_instrumented_bytes" category:"experimental"` +} + +func (c *InstrumentRefLeaksConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.Float64Var(&c.Percentage, prefix+"percentage", 0, `Percentage [0-100] of request or message buffers to instrument for reference leaks. Set to 0 to disable.`) + f.DurationVar(&c.BeforeReusePeriod, prefix+"before-reuse-period", 2*time.Minute, `Period after a buffer instrumented for referenced leaks is nominally freed until the buffer is uninstrumented and effectively freed to be reused. After this period, any lingering references to the buffer may potentially be dereferenced again with no detection.`) + f.Uint64Var(&c.MaxInflightInstrumentedBytes, prefix+"max-inflight-instrumented-bytes", 0, `Maximum sum of length of buffers instrumented at any given time, in bytes. When surpassed, incoming buffers will not be instrumented, regardless of the configured percentage. Zero means no limit.`) } type CommonConfigInheritance struct { - Storage map[string]*bucket.StorageBackendConfig - ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig + Storage map[string]*bucket.StorageBackendConfig + ClientClusterValidation map[string]*clusterutil.ClusterValidationConfig + InstrumentRefLeaksConfig map[string]*InstrumentRefLeaksConfig } // RegisterFlags registers flag. func (c *CommonConfig) RegisterFlags(f *flag.FlagSet) { c.Storage.RegisterFlagsWithPrefix("common.storage.", f) c.ClientClusterValidation.RegisterFlagsWithPrefix("common.client-cluster-validation.", f) + c.InstrumentRefLeaks.RegisterFlagsWithPrefix("common.instrument-reference-leaks.", f) } // configWithCustomCommonUnmarshaler unmarshals config with custom unmarshaler for the `common` field. @@ -814,6 +834,7 @@ type configWithCustomCommonUnmarshaler struct { type commonConfigUnmarshaler struct { Storage *specificLocationsUnmarshaler `yaml:"storage"` ClientClusterValidation *specificLocationsUnmarshaler `yaml:"client_cluster_validation"` + InstrumentRefLeaks *specificLocationsUnmarshaler `yaml:"instrument_ref_leaks"` } // specificLocationsUnmarshaler will unmarshal yaml into specific locations. @@ -909,6 +930,12 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) { setUpGoRuntimeMetrics(cfg, reg) + mimirpb.CustomCodecConfig{ + InstrumentRefLeaksPct: cfg.Common.InstrumentRefLeaks.Percentage, + WaitBeforeReuseInstrumentedBuffer: cfg.Common.InstrumentRefLeaks.BeforeReusePeriod, + MaxInflightInstrumentedBytes: cfg.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes, + }.RegisterGlobally() + if cfg.TenantFederation.Enabled && cfg.Ruler.TenantFederation.Enabled { util_log.WarnExperimentalUse("ruler.tenant-federation") } diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index 61f66004f22..e6fdcea5939 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -25,6 +25,9 @@ func TestCommonConfigCanBeExtended(t *testing.T) { args := []string{ "-common.storage.backend", "s3", "-common.client-cluster-validation.label", "client-cluster", + "-common.instrument-reference-leaks.percentage", "13.37", + "-common.instrument-reference-leaks.before-reuse-period", "20h", + "-common.instrument-reference-leaks.max-inflight-instrumented-bytes", "1048576", } require.NoError(t, fs.Parse(args)) @@ -36,6 +39,11 @@ func TestCommonConfigCanBeExtended(t *testing.T) { // Mimir's inheritance should still work. checkAllClusterValidationLabels(t, cfg, "client-cluster") + + // Non-inherited flags still work. + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaks.Percentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaks.BeforeReusePeriod) + require.Equal(t, uint64(1048576), cfg.MimirConfig.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -45,6 +53,10 @@ common: backend: s3 client_cluster_validation: label: client-cluster + instrument_ref_leaks: + percentage: 13.37 + before_reuse_period: 20h + max_inflight_instrumented_bytes: 2097152 ` var cfg customExtendedConfig @@ -60,6 +72,11 @@ common: // Mimir's inheritance should still work. checkAllClusterValidationLabels(t, cfg, "client-cluster") + + // Non-inherited flags should still work. + require.Equal(t, 13.37, cfg.MimirConfig.Common.InstrumentRefLeaks.Percentage) + require.Equal(t, 20*time.Hour, cfg.MimirConfig.Common.InstrumentRefLeaks.BeforeReusePeriod) + require.Equal(t, uint64(2097152), cfg.MimirConfig.Common.InstrumentRefLeaks.MaxInflightInstrumentedBytes) }) } diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index e042da838bf..9d2e72be141 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -6,9 +6,15 @@ import ( "bytes" "fmt" "math" + "sync" + "syscall" + "testing" + "time" + "unsafe" gogoproto "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/histogram" + "go.uber.org/atomic" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/proto" "google.golang.org/grpc/mem" @@ -16,9 +22,46 @@ import ( "google.golang.org/protobuf/protoadapt" ) +type CustomCodecConfig struct { + InstrumentRefLeaksPct float64 + WaitBeforeReuseInstrumentedBuffer time.Duration + MaxInflightInstrumentedBytes uint64 +} + +var baseCodecV2Name = encoding.GetCodecV2(proto.Name).Name() + +func (cfg CustomCodecConfig) codec() *codecV2 { + c := &codecV2{} + if cfg.InstrumentRefLeaksPct > 0 { + c.instrumentRefLeaksOneIn = uint64(math.Trunc(100 / cfg.InstrumentRefLeaksPct)) + c.waitBeforeReuse = cfg.WaitBeforeReuseInstrumentedBuffer + c.maxInflightInstrumentedBytes = cfg.MaxInflightInstrumentedBytes + if c.maxInflightInstrumentedBytes == 0 { + c.maxInflightInstrumentedBytes = math.MaxUint64 + } + } + return c +} + +var globalCodec encoding.CodecV2 + +func (cfg CustomCodecConfig) RegisterGlobally() { + if cfg.InstrumentRefLeaksPct > 0 && cfg.WaitBeforeReuseInstrumentedBuffer > 0 { + startFreeingInstrumentedBuffers() + } + globalCodec = cfg.codec() + encoding.RegisterCodecV2(globalCodec) +} + func init() { - c := encoding.GetCodecV2(proto.Name) - encoding.RegisterCodecV2(&codecV2{codec: c}) + config := CustomCodecConfig{} + if testing.Testing() { + // Instrument all buffers when testing. + config.InstrumentRefLeaksPct = 100 + config.WaitBeforeReuseInstrumentedBuffer = 0 + config.MaxInflightInstrumentedBytes = 0 + } + config.RegisterGlobally() } // codecV2 customizes gRPC marshalling and unmarshalling. @@ -26,7 +69,12 @@ func init() { // We customize unmarshalling in order to use an optimized path when possible, // and to retain the unmarshalling buffer when necessary. type codecV2 struct { - codec encoding.CodecV2 + instrumentRefLeaksOneIn uint64 + waitBeforeReuse time.Duration + maxInflightInstrumentedBytes uint64 + + unmarshaledWithBufferRefCount atomic.Uint64 + inflightInstrumentedBytes atomic.Uint64 } var _ encoding.CodecV2 = &codecV2{} @@ -113,11 +161,60 @@ func unmarshalSlicePoolSizes() []int { return sizes } +// Unmarshal unmarshals an object using the global codec. Prefer this over +// calling the Unmarshal method directly, as it will take advantage of leak +// detection. +func Unmarshal(data []byte, v gogoproto.Unmarshaler) error { + return globalCodec.Unmarshal(mem.BufferSlice{mem.SliceBuffer(data)}, v) +} + +var pageSize = syscall.Getpagesize() + // Unmarshal customizes gRPC unmarshalling. // If v implements MessageWithBufferRef, its SetBuffer method is called with the unmarshalling buffer and the buffer's reference count gets incremented. // The latter means that v's FreeBuffer method should be called when v is no longer used. func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { - buf := data.MaterializeToBuffer(unmarshalSlicePool) + holder, isBufferHolder := v.(MessageWithBufferRef) + instrumentLeaks := data.Len() > 0 && isBufferHolder && c.instrumentRefLeaksOneIn > 0 && c.unmarshaledWithBufferRefCount.Add(1)%c.instrumentRefLeaksOneIn == 0 + + var pageAlignedLen int + if instrumentLeaks { + pageAlignedLen = roundUpToMultiple(data.Len(), pageSize) + inflight := c.inflightInstrumentedBytes.Add(uint64(pageAlignedLen)) + if inflight > c.maxInflightInstrumentedBytes { + c.inflightInstrumentedBytes.Sub(uint64(pageAlignedLen)) + instrumentLeaks = false + } + } + + var buf mem.Buffer + if instrumentLeaks { + // Allocate separate pages for this buffer. We'll detect ref leaks by + // munmaping the pages on Free, after which trying to access them will + // segfault. + b, err := syscall.Mmap(-1, 0, pageAlignedLen, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) + if err != nil { + panic(fmt.Errorf("mmap: %w", err)) + } + b = b[:data.Len()] + data.CopyTo(b) + buf = mem.NewBuffer(&b, nil) + instrumentedBuf := &instrumentLeaksBuf{ + Buffer: buf, + waitBeforeReuse: c.waitBeforeReuse, + inflightInstrumentedBytes: &c.inflightInstrumentedBytes, + } + instrumentedBuf.refCount.Inc() + buf = instrumentedBuf + } else if len(data) == 1 { + // BufferSlice.MaterializeToBuffer already has this behavior when + // len(data) == 1, but we reproduce it here for explicitness and for + // ensuring forward-compatibility. + data[0].Ref() + buf = data[0] + } else { + buf = data.MaterializeToBuffer(unmarshalSlicePool) + } // Decrement buf's reference count. Note though that if v implements MessageWithBufferRef, // we increase buf's reference count first so it doesn't go to zero. defer buf.Free() @@ -133,7 +230,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { } } - if holder, ok := v.(MessageWithBufferRef); ok { + if isBufferHolder { buf.Ref() holder.SetBuffer(buf) } @@ -142,7 +239,7 @@ func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error { } func (c *codecV2) Name() string { - return c.codec.Name() + return baseCodecV2Name } // MessageWithBufferRef is an unmarshalling buffer retaining protobuf message. @@ -158,6 +255,10 @@ type BufferHolder struct { buffer mem.Buffer } +func (m *BufferHolder) Buffer() mem.Buffer { + return m.buffer +} + func (m *BufferHolder) SetBuffer(buf mem.Buffer) { m.buffer = buf } @@ -171,6 +272,67 @@ func (m *BufferHolder) FreeBuffer() { var _ MessageWithBufferRef = &BufferHolder{} +type instrumentLeaksBuf struct { + mem.Buffer + refCount atomic.Int64 + waitBeforeReuse time.Duration + inflightInstrumentedBytes *atomic.Uint64 +} + +func (b *instrumentLeaksBuf) Ref() { + b.Buffer.Ref() + b.refCount.Inc() +} + +func (b *instrumentLeaksBuf) Free() { + b.Buffer.Free() + + if b.refCount.Dec() == 0 { + buf := b.ReadOnlyData() + ptr := unsafe.SliceData(buf) + allPages := unsafe.Slice(ptr, roundUpToMultiple(len(buf), pageSize)) + if b.waitBeforeReuse > 0 { + err := syscall.Mprotect(allPages, syscall.PROT_NONE) + if err != nil { + panic(fmt.Errorf("mprotect: %w", err)) + } + select { + case unmapQueue <- unmapTask{buf: allPages, at: time.Now().Add(b.waitBeforeReuse), inflightInstrumentedBytes: b.inflightInstrumentedBytes}: + return + default: + // Queue is full, munmap right away. + } + } + unmap(allPages, b.inflightInstrumentedBytes) + } +} + +type unmapTask struct { + buf []byte + at time.Time + inflightInstrumentedBytes *atomic.Uint64 +} + +var unmapQueue chan unmapTask + +var startFreeingInstrumentedBuffers = sync.OnceFunc(func() { + unmapQueue = make(chan unmapTask, 1000) + go func() { + for t := range unmapQueue { + time.Sleep(time.Until(t.at)) + unmap(t.buf, t.inflightInstrumentedBytes) + } + }() +}) + +func unmap(buf []byte, inflightInstrumentedBytes *atomic.Uint64) { + inflightInstrumentedBytes.Sub(uint64(len(buf))) + err := syscall.Munmap(buf) + if err != nil { + panic(fmt.Errorf("munmap: %w", err)) + } +} + // MinTimestamp returns the minimum timestamp (milliseconds) among all series // in the WriteRequest. Returns math.MaxInt64 if the request is empty. func (m *WriteRequest) MinTimestamp() int64 { @@ -534,3 +696,7 @@ type orderAwareMetricMetadata struct { // order is the 0-based index of this metadata object in a wider metadata array. order int } + +func roundUpToMultiple(n, of int) int { + return ((n + of - 1) / of) * of +} diff --git a/pkg/mimirpb/custom_test.go b/pkg/mimirpb/custom_test.go index b20625d34ed..723b47bc4dc 100644 --- a/pkg/mimirpb/custom_test.go +++ b/pkg/mimirpb/custom_test.go @@ -6,14 +6,13 @@ package mimirpb import ( + "fmt" + "runtime/debug" "testing" "github.com/prometheus/prometheus/model/histogram" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/encoding" - "google.golang.org/grpc/encoding/proto" - "google.golang.org/grpc/mem" "github.com/grafana/mimir/pkg/util/test" ) @@ -195,7 +194,7 @@ func TestIsFloatHistogram(t *testing.T) { } func TestCodecV2_Unmarshal(t *testing.T) { - c := codecV2{codec: fakeCodecV2{}} + c := codecV2{} var origReq WriteRequest data, err := c.Marshal(&origReq) @@ -206,18 +205,10 @@ func TestCodecV2_Unmarshal(t *testing.T) { require.True(t, origReq.Equal(req)) - require.NotNil(t, req.buffer) + require.NotNil(t, req.Buffer()) req.FreeBuffer() } -type fakeCodecV2 struct { - encoding.CodecV2 -} - -func (c fakeCodecV2) Marshal(v any) (mem.BufferSlice, error) { - return encoding.GetCodecV2(proto.Name).Marshal(v) -} - func TestHistogram_BucketsCount(t *testing.T) { tests := []struct { name string @@ -247,3 +238,34 @@ func TestHistogram_BucketsCount(t *testing.T) { }) } } + +func TestInstrumentRefLeaks(t *testing.T) { + src := WriteRequest{Timeseries: []PreallocTimeseries{{TimeSeries: &TimeSeries{ + Labels: []UnsafeMutableLabel{{Name: "labelName", Value: "labelValue"}}, + Samples: []Sample{{TimestampMs: 1234, Value: 1337}}, + }}}} + buf, err := src.Marshal() + require.NoError(t, err) + + var leakingLabelName UnsafeMutableString + + var req WriteRequest + err = Unmarshal(buf, &req) + require.NoError(t, err) + + // Label names are UnsafeMutableStrings pointing to buf. They shouldn't outlive + // the call to req.FreeBuffer. + leakingLabelName = req.Timeseries[0].Labels[0].Name + + req.FreeBuffer() // leakingLabelName becomes a leak here + + debug.SetPanicOnFault(true) + var recovered any + func() { + defer func() { + recovered = recover() + }() + t.Log(leakingLabelName) // Just forcing a read on leakingLabelName here + }() + require.Equal(t, fmt.Sprint(recovered), "runtime error: invalid memory address or nil pointer dereference") +} diff --git a/pkg/storage/ingest/version.go b/pkg/storage/ingest/version.go index f15554b7a7f..03c6f39a70d 100644 --- a/pkg/storage/ingest/version.go +++ b/pkg/storage/ingest/version.go @@ -182,7 +182,7 @@ func DeserializeRecordContent(content []byte, wr *mimirpb.PreallocWriteRequest, } func deserializeRecordContentV1(content []byte, wr *mimirpb.PreallocWriteRequest) error { - return wr.Unmarshal(content) + return mimirpb.Unmarshal(content, wr) } func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest) error { @@ -191,7 +191,7 @@ func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest wr.RW2CommonSymbols = V2CommonSymbols.GetSlice() wr.SkipNormalizeMetadataMetricName = true wr.SkipDeduplicateMetadata = true - return wr.Unmarshal(content) + return mimirpb.Unmarshal(content, wr) } // splitRequestVersionTwo adapts mimirpb.SplitWriteRequestByMaxMarshalSizeRW2 to requestSplitter diff --git a/pkg/storage/ingest/version_test.go b/pkg/storage/ingest/version_test.go index 53ed7b63935..ae58627fab7 100644 --- a/pkg/storage/ingest/version_test.go +++ b/pkg/storage/ingest/version_test.go @@ -97,6 +97,8 @@ func TestDeserializeRecordContent(t *testing.T) { require.NoError(t, err) wr.ClearTimeseriesUnmarshalData() + defer wr.FreeBuffer() + wr.BufferHolder = mimirpb.BufferHolder{} // We don't want to compare this. require.Equal(t, reqv0, &wr) }) @@ -124,6 +126,7 @@ func TestDeserializeRecordContent(t *testing.T) { require.NoError(t, err) wr.ClearTimeseriesUnmarshalData() + wr.BufferHolder = mimirpb.BufferHolder{} // We don't want to compare this. require.Equal(t, reqv1, &wr) })