Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions pkg/cortexpb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
// Name is the name registered for the proto codec.
const Name = "proto"

var noOpBufferPool = &mem.NopBufferPool{}

func init() {
encoding.RegisterCodecV2(&cortexCodec{})
}
Expand Down Expand Up @@ -94,10 +96,17 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

// To be in the safe side, we will never automatically release the buffer used to Unmarshal the message automatically.
// This should simulate the same behavior of grpc v1.65.0 and before.
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
// To be safe, we avoid automatically releasing the buffer used to unmarshal the message.
// Additionally, we avoid using a pooled byte slice unless the message implements ReleasableMessage.
// This mimics the behavior of gRPC versions 1.65.0 and earlier.
rm, ok := v.(ReleasableMessage)
bufferPool := mem.DefaultBufferPool()

if !ok {
bufferPool = noOpBufferPool
}

buf := data.MaterializeToBuffer(bufferPool)
err := proto.Unmarshal(buf.ReadOnlyData(), vv)

if err != nil {
Expand All @@ -106,8 +115,8 @@ func (c *cortexCodec) Unmarshal(data mem.BufferSlice, v any) error {
}

// If v implements ReleasableMessage interface, we add the buff to be freed later when the request is no longer being used
if fm, ok := v.(ReleasableMessage); ok {
fm.RegisterBuffer(buf)
if rm != nil {
rm.RegisterBuffer(buf)
}

return err
Expand Down