Skip to content

Commit 4410665

Browse files
rockwotjmmatczuk
authored andcommitted
protobuf: remove hyperpb
Shows signs of a memory leak
1 parent cba4dc7 commit 4410665

File tree

11 files changed

+8
-218
lines changed

11 files changed

+8
-218
lines changed

go.mod

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ require (
2020
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2
2121
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1
2222
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1
23-
buf.build/go/hyperpb v0.1.3
2423
cloud.google.com/go/aiplatform v1.104.0
2524
cloud.google.com/go/bigquery v1.71.0
2625
cloud.google.com/go/pubsub v1.50.1
@@ -283,7 +282,6 @@ require (
283282
github.com/tidwall/gjson v1.18.0 // indirect
284283
github.com/tidwall/match v1.1.1 // indirect
285284
github.com/tidwall/pretty v1.2.1 // indirect
286-
github.com/timandy/routine v1.1.5 // indirect
287285
github.com/twpayne/go-geom v1.6.1 // indirect
288286
github.com/x448/float16 v0.8.4 // indirect
289287
github.com/xanzy/ssh-agent v0.3.3 // indirect

go.sum

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1 h1:bFnppdLYActzr2F0iomSrkjUnGgVufb0DtZxjKgTLGc=
2-
buf.build/gen/go/bufbuild/hyperpb-examples/protocolbuffers/go v1.36.7-20250725192734-0dd56aa9cbbc.1/go.mod h1:x7jYNX5/7EPnsKHEq596krkOGzvR97/MsZw2fw3Mrq0=
31
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 h1:31on4W/yPcV4nZHL4+UCiCvLPsMqe/vJcNg8Rci0scc=
42
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1/go.mod h1:fUl8CEN/6ZAMk6bP8ahBJPUJw7rbp+j4x+wCcYi2IG4=
53
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 h1:vK2m7N3SPeHRqfVBj4FpmjlNCBEhR05OgCgJ+xIGfAs=
@@ -8,10 +6,6 @@ buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8
86
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1/go.mod h1:dDSnTB/bSMAA9z59+0E2JWab9LyGnb+spW8nrVeEAqA=
97
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1 h1:4jqc94IBC9Ea9GaMbmgfhczXZzCkA4ZWfon3/uI3KV0=
108
buf.build/gen/go/redpandadata/otel/protocolbuffers/go v1.36.11-20251216164002-58c749b888d8.1/go.mod h1:akvBCH3f6fL10sDu4NppgjHrQITLe1m5YWLt/yiLEKI=
11-
buf.build/go/hyperpb v0.1.3 h1:wiw2F7POvAe2VA2kkB0TAsFwj91lXbFrKM41D3ZgU1w=
12-
buf.build/go/hyperpb v0.1.3/go.mod h1:IHXAM5qnS0/Fsnd7/HGDghFNvUET646WoHmq1FDZXIE=
13-
buf.build/go/protovalidate v0.14.0 h1:kr/rC/no+DtRyYX+8KXLDxNnI1rINz0imk5K44ZpZ3A=
14-
buf.build/go/protovalidate v0.14.0/go.mod h1:+F/oISho9MO7gJQNYC2VWLzcO1fTPmaTA08SDYJZncA=
159
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
1610
cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw=
1711
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
@@ -850,8 +844,6 @@ github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUS
850844
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
851845
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
852846
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
853-
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
854-
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
855847
github.com/apache/arrow-go/v18 v18.4.1 h1:q/jVkBWCJOB9reDgaIZIdruLQUb1kbkvOnOFezVH1C4=
856848
github.com/apache/arrow-go/v18 v18.4.1/go.mod h1:tLyFubsAl17bvFdUAy24bsSvA/6ww95Iqi67fTpGu3E=
857849
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0=
@@ -1419,8 +1411,6 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
14191411
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
14201412
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
14211413
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
1422-
github.com/google/cel-go v0.26.0 h1:DPGjXackMpJWH680oGY4lZhYjIameYmR+/6RBdDGmaI=
1423-
github.com/google/cel-go v0.26.0/go.mod h1:A9O8OU9rdvrK5MQyrqfIxo1a0u4g3sF8KB6PUIaryMM=
14241414
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
14251415
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
14261416
github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
@@ -2010,8 +2000,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
20102000
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
20112001
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
20122002
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
2013-
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9 h1:arwj11zP0yJIxIRiDn22E0H8PxfF7TsTrc2wIPFIsf4=
2014-
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9/go.mod h1:SKZx6stCn03JN3BOWTwvVIO2ajMkb/zQdTceXYhKw/4=
20152003
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91 h1:s1LvMaU6mVwoFtbxv/rCZKE7/fwDmDY684FfUe4c1Io=
20162004
github.com/protocolbuffers/txtpbfmt v0.0.0-20251016062345-16587c79cd91/go.mod h1:JSbkp0BviKovYYt9XunS95M3mLPibE9bGg+Y95DsEEY=
20172005
github.com/pusher/pusher-http-go v4.0.1+incompatible h1:4u6tomPG1WhHaST7Wi9mw83Y+MS/j2EplR2YmDh8Xp4=
@@ -2124,8 +2112,6 @@ github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3A
21242112
github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo=
21252113
github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs=
21262114
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
2127-
github.com/stoewer/go-strcase v1.3.1 h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs=
2128-
github.com/stoewer/go-strcase v1.3.1/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
21292115
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
21302116
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
21312117
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
@@ -2173,8 +2159,6 @@ github.com/tigerbeetle/tigerbeetle-go v0.16.61 h1:ciGSFxBhpXRbTorxPV7O/vXQKupVKe
21732159
github.com/tigerbeetle/tigerbeetle-go v0.16.61/go.mod h1:d6G7n4OlD7GLHd62x0VlWPXeI/L0SoNNTfm/ee24GJI=
21742160
github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw=
21752161
github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs=
2176-
github.com/timandy/routine v1.1.5 h1:LSpm7Iijwb9imIPlucl4krpr2EeCeAUvifiQ9Uf5X+M=
2177-
github.com/timandy/routine v1.1.5/go.mod h1:kXslgIosdY8LW0byTyPnenDgn4/azt2euufAq9rK51w=
21782162
github.com/timeplus-io/proton-go-driver/v2 v2.1.2 h1:XPHvI4irUBBuVGAyvAzpb170IiyWK5DEBfGpC7h8bgU=
21792163
github.com/timeplus-io/proton-go-driver/v2 v2.1.2/go.mod h1:rUs4zvXvKsmuyFpzdJnnid6p8IvRJTa/n/jNQ2B6Dfw=
21802164
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=

internal/impl/confluent/serde_protobuf.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,7 @@ func (s *schemaRegistryDecoder) getProtobufDecoder(
8585
defer mu.Unlock()
8686
if msgDesc.FullName() != cachedMessageName {
8787
cachedMessageName = msgDesc.FullName()
88-
cachedDecoder = common.NewHyperPbDecoder(msgDesc, common.ProfilingOptions{
89-
Rate: 0.01,
90-
RecompileInterval: 100_000,
91-
})
88+
cachedDecoder = common.NewDynamicPbDecoder(msgDesc)
9289
}
9390
return cachedDecoder
9491
}

internal/impl/protobuf/common/bench_test.go

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func loadTestFileDescriptorSet(t testing.TB) (protoreflect.MessageDescriptor, *p
5656

5757
// BenchmarkProtobufToMessage benchmarks the complete pipeline of decoding protobuf
5858
// and converting to a Benthos message, testing the matrix of:
59-
// - Decoding: dynamicpb vs hyperpb (with PGO)
59+
// - Decoding: dynamicpb
6060
// - Conversion: Fast (SetStructuredMut) vs Slow (SetBytes)
6161
func BenchmarkProtobufToMessage(b *testing.B) {
6262
md, types := loadTestFileDescriptorSet(b)
@@ -109,18 +109,8 @@ func BenchmarkProtobufToMessage(b *testing.B) {
109109
},
110110
}
111111

112-
b.StopTimer()
113-
// Profile-guided optimization settings for hyperpb
114-
pgoOpts := ProfilingOptions{
115-
Rate: 0.01, // Profile every message during priming
116-
RecompileInterval: 100000, // Recompile after 1000 messages
117-
}
118-
119-
// Create decoders
120-
dynamicpbDecoder := NewDynamicPbDecoder(md, ProfilingOptions{})
121-
hyperpbDecoder := NewHyperPbDecoder(md, pgoOpts)
122-
123-
b.StartTimer()
112+
// Create decoder
113+
dynamicpbDecoder := NewDynamicPbDecoder(md)
124114

125115
marshalOpts := protojson.MarshalOptions{Resolver: types}
126116

@@ -137,18 +127,6 @@ func BenchmarkProtobufToMessage(b *testing.B) {
137127
b.Fatal(err)
138128
}
139129

140-
// Prime the hyperpb decoder with sample data to build profile
141-
// Run with enough iterations to trigger at least one recompilation
142-
for range pgoOpts.RecompileInterval * 2 {
143-
err := hyperpbDecoder.WithDecoded(pbBytes, func(proto.Message) error {
144-
return nil
145-
})
146-
if err != nil {
147-
b.Fatal(err)
148-
}
149-
}
150-
b.StartTimer()
151-
152130
// Benchmark: dynamicpb decode + fast conversion + read
153131
b.Run(tc.name+"/dynamicpb/fast", func(b *testing.B) {
154132
b.ReportAllocs()
@@ -185,40 +163,5 @@ func BenchmarkProtobufToMessage(b *testing.B) {
185163
}
186164
})
187165

188-
// Benchmark: hyperpb decode + fast conversion + read
189-
b.Run(tc.name+"/hyperpb/fast", func(b *testing.B) {
190-
b.ReportAllocs()
191-
for b.Loop() {
192-
msg := service.NewMessage(nil)
193-
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
194-
return ToMessageFast(decoded.(protoreflect.Message), marshalOpts, msg)
195-
})
196-
if err != nil {
197-
b.Fatal(err)
198-
}
199-
_, err = msg.AsStructured()
200-
if err != nil {
201-
b.Fatal(err)
202-
}
203-
}
204-
})
205-
206-
// Benchmark: hyperpb decode + slow conversion + read
207-
b.Run(tc.name+"/hyperpb/slow", func(b *testing.B) {
208-
b.ReportAllocs()
209-
for b.Loop() {
210-
msg := service.NewMessage(nil)
211-
err := hyperpbDecoder.WithDecoded(pbBytes, func(decoded proto.Message) error {
212-
return ToMessageSlow(decoded.(protoreflect.Message), marshalOpts, msg)
213-
})
214-
if err != nil {
215-
b.Fatal(err)
216-
}
217-
_, err = msg.AsStructured()
218-
if err != nil {
219-
b.Fatal(err)
220-
}
221-
}
222-
})
223166
}
224167
}

internal/impl/protobuf/common/decode_common.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import "google.golang.org/protobuf/proto"
1515

1616
// ProtobufDecoder is an interface for different methods to parse protobuf
1717
// (the binary format) in a dynamic and reflective way.
18-
//
19-
// Currently, there are two supported approaches: dynamicpb and hyperpb
2018
type ProtobufDecoder interface {
2119
// Decode the buffer into a proto message that is passed into the callback.
2220
//
@@ -25,10 +23,3 @@ type ProtobufDecoder interface {
2523
// the provided callback.
2624
WithDecoded(buf []byte, cb func(msg proto.Message) error) error
2725
}
28-
29-
// ProfilingOptions specifies the profiling rate and how often we recompile
30-
// for ProtobufDecoders that support profile-guided optimizations in flight (PGO)
31-
type ProfilingOptions struct {
32-
Rate float64
33-
RecompileInterval int64
34-
}

internal/impl/protobuf/common/decode_dynamicpb.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,8 @@ import (
2020
)
2121

2222
// NewDynamicPbDecoder returns a new ProtobufDecoder based on standard proto reflection
23-
// in the offical protobuf library.
24-
func NewDynamicPbDecoder(
25-
md protoreflect.MessageDescriptor,
26-
_ ProfilingOptions,
27-
) ProtobufDecoder {
23+
// in the official protobuf library.
24+
func NewDynamicPbDecoder(md protoreflect.MessageDescriptor) ProtobufDecoder {
2825
return &dynamicPbParser{dynamicpb.NewMessageType(md)}
2926
}
3027

internal/impl/protobuf/common/decode_hyperpb.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

internal/impl/protobuf/common/decode_hyperpb_fallback.go

Lines changed: 0 additions & 25 deletions
This file was deleted.

internal/impl/protobuf/processor_protobuf.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -329,12 +329,7 @@ func newProtobufToJSONOperator(
329329
if err != nil {
330330
return nil, fmt.Errorf("unable to find protobuf type %q: %w", msg, err)
331331
}
332-
decoder := common.NewHyperPbDecoder(
333-
msgType.Descriptor(),
334-
common.ProfilingOptions{
335-
Rate: 0.01,
336-
RecompileInterval: 100_000,
337-
})
332+
decoder := common.NewDynamicPbDecoder(msgType.Descriptor())
338333
opts.Resolver = types
339334
return func(part *service.Message) error {
340335
partBytes, err := part.AsBytes()
@@ -403,12 +398,7 @@ func newProtobufToJSONBSROperator(
403398
if err != nil {
404399
return nil, fmt.Errorf("unable to find message '%v' definition: %w", msg, err)
405400
}
406-
decoder := common.NewHyperPbDecoder(
407-
d.Descriptor(),
408-
common.ProfilingOptions{
409-
Rate: 0.01,
410-
RecompileInterval: 100_000,
411-
})
401+
decoder := common.NewDynamicPbDecoder(d.Descriptor())
412402
opts.Resolver = multiModuleWatcher
413403
return func(part *service.Message) error {
414404
partBytes, err := part.AsBytes()

public/bundle/enterprise/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ require (
88
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 // indirect
99
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.19.1-20240117202343-bf8f65e8876c.2 // indirect
1010
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.36.10-20240117202343-bf8f65e8876c.1 // indirect
11-
buf.build/go/hyperpb v0.1.3 // indirect
1211
cel.dev/expr v0.24.0 // indirect
1312
cloud.google.com/go/aiplatform v1.104.0 // indirect
1413
cloud.google.com/go/bigquery v1.71.0 // indirect

0 commit comments

Comments
 (0)