diff --git a/pkg/cortexpb/codec.go b/pkg/cortexpb/codec.go index d263c55897f..0bf037ac97a 100644 --- a/pkg/cortexpb/codec.go +++ b/pkg/cortexpb/codec.go @@ -14,7 +14,10 @@ import ( const Name = "proto" func init() { - encoding.RegisterCodecV2(&cortexCodec{}) + encoding.RegisterCodecV2(&cortexCodec{ + noOpBufferPool: mem.NopBufferPool{}, + defaultBufferPool: mem.DefaultBufferPool(), + }) } type ReleasableMessage interface { @@ -25,7 +28,10 @@ type GogoProtoMessage interface { MarshalToSizedBuffer(dAtA []byte) (int, error) } -type cortexCodec struct{} +type cortexCodec struct { + noOpBufferPool mem.BufferPool + defaultBufferPool mem.BufferPool +} func (c cortexCodec) Name() string { return Name @@ -64,7 +70,7 @@ func (c *cortexCodec) Marshal(v any) (data mem.BufferSlice, err error) { data = append(data, buf) } else { - pool := mem.DefaultBufferPool() + pool := c.defaultBufferPool buf := pool.Get(size) // If v implements MarshalToSizedBuffer we should use it as it is more optimized @@ -94,10 +100,17 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error { return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) } - // To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically. - // This should simulate the same behavior of grpc v1.65.0 and before. - buf := data.MaterializeToBuffer(mem.DefaultBufferPool()) + // To be safe, we avoid automatically releasing the buffer used to unmarshal the message. + // Additionally, we avoid using a pooled byte slice unless the message implements ReleasableMessage. + // This mimics the behavior of gRPC versions 1.65.0 and earlier. + rm, ok := v.(ReleasableMessage) + bufferPool := c.defaultBufferPool + + if !ok { + bufferPool = c.noOpBufferPool + } + buf := data.MaterializeToBuffer(bufferPool) err := proto.Unmarshal(buf.ReadOnlyData(), vv) if err != nil { @@ -106,8 +119,8 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error { } // If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used - if fm, ok := v.(ReleasableMessage); ok { - fm.RegisterBuffer(buf) + if rm != nil { + rm.RegisterBuffer(buf) } return err diff --git a/pkg/cortexpb/codec_test.go b/pkg/cortexpb/codec_test.go new file mode 100644 index 00000000000..69038687da4 --- /dev/null +++ b/pkg/cortexpb/codec_test.go @@ -0,0 +1,88 @@ +package cortexpb + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/mem" +) + +type wrappedBufferPool struct { + inner mem.BufferPool + getCount int + putCount int +} + +func (w *wrappedBufferPool) Get(length int) *[]byte { + w.getCount++ + return w.inner.Get(length) +} + +func (w *wrappedBufferPool) Put(b *[]byte) { + w.putCount++ + w.inner.Put(b) +} + +func (w *wrappedBufferPool) Reset() { + w.getCount = 0 + w.putCount = 0 +} + +func TestNoopBufferWhenNotReleasableMessage(t *testing.T) { + codec := &cortexCodec{ + noOpBufferPool: &wrappedBufferPool{inner: mem.NopBufferPool{}}, + defaultBufferPool: &wrappedBufferPool{inner: mem.DefaultBufferPool()}, + } + + tc := map[string]struct { + noopBufferGets int + defaultBufferGets int + m any + }{ + "releasable": { + noopBufferGets: 0, + defaultBufferGets: 1, + m: &WriteRequest{ + Metadata: []*MetricMetadata{ + { + Unit: strings.Repeat("a", 10000), + }, + }, + }, + }, + "not releasable": { + noopBufferGets: 1, + defaultBufferGets: 0, + m: &MetricMetadata{ + Unit: strings.Repeat("a", 10000), + }, + }, + } + + for name, tc := range tc { + t.Run(name, func(t *testing.T) { + data, err := codec.Marshal(tc.m) + require.NoError(t, err) + + // lets split the buffer into 2 so we force get another buffer from the pool + r := data.Reader() + size := r.Remaining() + b1 := make([]byte, size/2) + b2 := make([]byte, (size/2)+1) + buffer1 := mem.NewBuffer(&b1, mem.NopBufferPool{}) + buffer2 := mem.NewBuffer(&b2, mem.NopBufferPool{}) + + _, err = r.Read(b1) + require.NoError(t, err) + _, err = r.Read(b2) + require.NoError(t, err) + + codec.noOpBufferPool.(*wrappedBufferPool).Reset() + codec.defaultBufferPool.(*wrappedBufferPool).Reset() + require.NoError(t, codec.Unmarshal(mem.BufferSlice{buffer1, buffer2}, tc.m)) + require.Equal(t, tc.noopBufferGets, codec.noOpBufferPool.(*wrappedBufferPool).getCount) + require.Equal(t, tc.defaultBufferGets, codec.defaultBufferPool.(*wrappedBufferPool).getCount) + }) + } +}