@@ -22,8 +22,10 @@ import (
22
22
"io"
23
23
"log"
24
24
"strings"
25
+ "sync"
25
26
26
27
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
28
+ "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/cache"
27
29
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/confluent"
28
30
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/confluent/types"
29
31
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
@@ -69,7 +71,9 @@ type Serializer struct {
69
71
// Deserializer represents a Protobuf deserializer
70
72
type Deserializer struct {
71
73
serde.BaseDeserializer
72
- ProtoRegistry * protoregistry.Types
74
+ ProtoRegistry * protoregistry.Types
75
+ schemaToDescCache cache.Cache
76
+ schemaToDescCacheLock sync.RWMutex
73
77
}
74
78
75
79
var _ serde.Serializer = new (Serializer )
@@ -337,8 +341,14 @@ func ignoreFile(name string) bool {
337
341
338
342
// NewDeserializer creates a Protobuf deserializer for Protobuf-generated objects
339
343
func NewDeserializer (client schemaregistry.Client , serdeType serde.Type , conf * DeserializerConfig ) (* Deserializer , error ) {
340
- s := & Deserializer {}
341
- err := s .ConfigureDeserializer (client , serdeType , & conf .DeserializerConfig )
344
+ cache , err := cache .NewLRUCache (1000 )
345
+ if err != nil {
346
+ return nil , err
347
+ }
348
+ s := & Deserializer {
349
+ schemaToDescCache : cache ,
350
+ }
351
+ err = s .ConfigureDeserializer (client , serdeType , & conf .DeserializerConfig )
342
352
if err != nil {
343
353
return nil , err
344
354
}
@@ -405,6 +415,12 @@ func (s *Deserializer) DeserializeInto(topic string, payload []byte, msg interfa
405
415
}
406
416
407
417
func (s * Deserializer ) toFileDesc (info schemaregistry.SchemaInfo ) (* desc.FileDescriptor , error ) {
418
+ s .schemaToDescCacheLock .RLock ()
419
+ value , ok := s .schemaToDescCache .Get (info .Schema )
420
+ s .schemaToDescCacheLock .RUnlock ()
421
+ if ok {
422
+ return value .(* desc.FileDescriptor ), nil
423
+ }
408
424
deps := make (map [string ]string )
409
425
err := serde .ResolveReferences (s .Client , info , deps )
410
426
if err != nil {
@@ -433,7 +449,11 @@ func (s *Deserializer) toFileDesc(info schemaregistry.SchemaInfo) (*desc.FileDes
433
449
if len (fileDescriptors ) != 1 {
434
450
return nil , fmt .Errorf ("could not resolve schema" )
435
451
}
436
- return fileDescriptors [0 ], nil
452
+ fd := fileDescriptors [0 ]
453
+ s .schemaToDescCacheLock .Lock ()
454
+ s .schemaToDescCache .Put (info .Schema , fd )
455
+ s .schemaToDescCacheLock .Unlock ()
456
+ return fd , nil
437
457
}
438
458
439
459
func readMessageIndexes (payload []byte ) (int , []int , error ) {
0 commit comments