Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions pkg/cortexpb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
const Name = "proto"

func init() {
encoding.RegisterCodecV2(&cortexCodec{})
encoding.RegisterCodecV2(&cortexCodec{
noOpBufferPool: mem.NopBufferPool{},
defaultBufferPool: mem.DefaultBufferPool(),
})
}

type ReleasableMessage interface {
Expand All @@ -25,7 +28,10 @@ type GogoProtoMessage interface {
MarshalToSizedBuffer(dAtA []byte) (int, error)
}

type cortexCodec struct{}
type cortexCodec struct {
noOpBufferPool mem.BufferPool
defaultBufferPool mem.BufferPool
}

func (c cortexCodec) Name() string {
return Name
Expand Down Expand Up @@ -64,7 +70,7 @@ func (c *cortexCodec) Marshal(v any) (data mem.BufferSlice, err error) {

data = append(data, buf)
} else {
pool := mem.DefaultBufferPool()
pool := c.defaultBufferPool
buf := pool.Get(size)

// If v implements MarshalToSizedBuffer we should use it as it is more optimized
Expand Down Expand Up @@ -94,10 +100,17 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

// To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically.
// This should simulate the same behavior of grpc v1.65.0 and before.
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
// To be safe, we avoid automatically releasing the buffer used to unmarshal the message.
// Additionally, we avoid using a pooled byte slice unless the message implements ReleasableMessage.
// This mimics the behavior of gRPC versions 1.65.0 and earlier.
rm, ok := v.(ReleasableMessage)
bufferPool := c.defaultBufferPool

if !ok {
bufferPool = c.noOpBufferPool
}

buf := data.MaterializeToBuffer(bufferPool)
err := proto.Unmarshal(buf.ReadOnlyData(), vv)

if err != nil {
Expand All @@ -106,8 +119,8 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
}

// If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used
if fm, ok := v.(ReleasableMessage); ok {
fm.RegisterBuffer(buf)
if rm != nil {
rm.RegisterBuffer(buf)
}

return err
Expand Down
88 changes: 88 additions & 0 deletions pkg/cortexpb/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cortexpb

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/mem"
)

type wrappedBufferPool struct {
inner mem.BufferPool
getCount int
putCount int
}

func (w *wrappedBufferPool) Get(length int) *[]byte {
w.getCount++
return w.inner.Get(length)
}

func (w *wrappedBufferPool) Put(b *[]byte) {
w.putCount++
w.inner.Put(b)
}

func (w *wrappedBufferPool) Reset() {
w.getCount = 0
w.putCount = 0
}

func TestNoopBufferWhenNotReleasableMessage(t *testing.T) {
codec := &cortexCodec{
noOpBufferPool: &wrappedBufferPool{inner: mem.NopBufferPool{}},
defaultBufferPool: &wrappedBufferPool{inner: mem.DefaultBufferPool()},
}

tc := map[string]struct {
noopBufferGets int
defaultBufferGets int
m any
}{
"releasable": {
noopBufferGets: 0,
defaultBufferGets: 1,
m: &WriteRequest{
Metadata: []*MetricMetadata{
{
Unit: strings.Repeat("a", 10000),
},
},
},
},
"not releasable": {
noopBufferGets: 1,
defaultBufferGets: 0,
m: &MetricMetadata{
Unit: strings.Repeat("a", 10000),
},
},
}

for name, tc := range tc {
t.Run(name, func(t *testing.T) {
data, err := codec.Marshal(tc.m)
require.NoError(t, err)

// lets split the buffer into 2 so we force get another buffer from the pool
r := data.Reader()
size := r.Remaining()
b1 := make([]byte, size/2)
b2 := make([]byte, (size/2)+1)
buffer1 := mem.NewBuffer(&b1, mem.NopBufferPool{})
buffer2 := mem.NewBuffer(&b2, mem.NopBufferPool{})

_, err = r.Read(b1)
require.NoError(t, err)
_, err = r.Read(b2)
require.NoError(t, err)

codec.noOpBufferPool.(*wrappedBufferPool).Reset()
codec.defaultBufferPool.(*wrappedBufferPool).Reset()
require.NoError(t, codec.Unmarshal(mem.BufferSlice{buffer1, buffer2}, tc.m))
require.Equal(t, tc.noopBufferGets, codec.noOpBufferPool.(*wrappedBufferPool).getCount)
require.Equal(t, tc.defaultBufferGets, codec.defaultBufferPool.(*wrappedBufferPool).getCount)
})
}
}