Skip to content

Commit 09482d0

Browse files
committed
refactor(registry,index,db): centralize key resolution with lazy registry cache
Move key field/value resolution and data-prefix derivation behind registry APIs so nested key paths are resolved consistently across db/index paths. Keep legacy inferred-key precedence and add targeted cache invalidation to avoid behavior drift while reducing repeated key lookup logic. Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent e237fb2 commit 09482d0

File tree

9 files changed

+490
-46
lines changed

9 files changed

+490
-46
lines changed

internal/db/db.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func Open(ctx context.Context, opts ...Option) (protodb.DB, error) {
7777
}
7878
db := &db{opts: o, reg: reg, smu: mutex.NewKV(), ctxmu: mutex.NewContextKV()}
7979
db.matcher = pf.NewMatcher()
80-
db.idx = idxstore.NewIndexer(db.reg, db.reg.Files, db.unmarshal)
80+
db.idx = idxstore.NewIndexer(db.reg, db.unmarshal)
8181
if o.repl != nil {
8282
h, err := server.NewServer(db)
8383
if err != nil {
@@ -206,7 +206,7 @@ func (db *db) Watch(ctx context.Context, m proto.Message, opts ...protodb.GetOpt
206206
o := makeGetOpts(opts...)
207207
matcher := db.matcher
208208

209-
k, _, _, _ := protodb.DataPrefix(m)
209+
k, _, _, _ := db.reg.DataPrefix(m)
210210
log := logger.C(ctx).WithFields("service", "protodb", "action", "watch", "key", string(k))
211211
log.Debugf("start watching for key %s", string(k))
212212
ch := make(chan protodb.Event, 1)

internal/db/tx.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (tx *tx) get(ctx context.Context, m proto.Message, opts ...protodb.GetOptio
144144
} else if lim := o.Paging.GetLimit(); lim > 0 {
145145
out = make([]proto.Message, 0, lim)
146146
}
147-
prefix, field, value, _ := protodb.DataPrefix(m)
147+
prefix, field, value, _ := tx.db.reg.DataPrefix(m)
148148
span.SetAttributes(
149149
attribute.String("prefix", string(prefix)),
150150
attribute.String("key_field", field),
@@ -499,7 +499,7 @@ func (tx *tx) set(ctx context.Context, m proto.Message, opts ...protodb.SetOptio
499499
if o.TTL != 0 {
500500
expiresAt = uint64(time.Now().Add(o.TTL).Unix())
501501
}
502-
k, field, value, err := protodb.DataPrefix(m)
502+
k, field, value, err := tx.db.reg.DataPrefix(m)
503503
if err != nil {
504504
return nil, err
505505
}
@@ -645,7 +645,7 @@ func (tx *tx) delete(ctx context.Context, m proto.Message) error {
645645
return badger.ErrReadOnlyTxn
646646
}
647647
// TODO(adphi): should we check / read for key first ?
648-
k, field, value, err := protodb.DataPrefix(m)
648+
k, field, value, err := tx.db.reg.DataPrefix(m)
649649
if err != nil {
650650
return err
651651
}
@@ -878,7 +878,7 @@ func (tx *tx) getOrdered(ctx context.Context, m proto.Message, o protodb.GetOpts
878878
)
879879
defer span.End()
880880
}
881-
plan, err := buildOrderPlan(m.ProtoReflect().New(), o.OrderBy)
881+
plan, err := buildOrderPlanWithKeyField(m.ProtoReflect().New(), o.OrderBy, tx.db.reg.KeyFieldName)
882882
if err != nil {
883883
return nil, nil, err
884884
}
@@ -929,7 +929,7 @@ func (tx *tx) getOrderedIndexed(ctx context.Context, m proto.Message, o protodb.
929929
span.SetAttributes(attribute.Bool("fallback", true), attribute.String("fallback_reason", "key_order"))
930930
return nil, nil, false, nil
931931
}
932-
prefix, _, _, _ := protodb.DataPrefix(m)
932+
prefix, _, _, _ := tx.db.reg.DataPrefix(m)
933933
if o.Filter != nil {
934934
ok, err := tx.db.idx.IndexableFilter(m, o.Filter)
935935
if err != nil {
@@ -1082,7 +1082,7 @@ func (tx *tx) getOrderedFallbackScanSort(ctx context.Context, m proto.Message, o
10821082

10831083
ordered := make([]orderedResult, 0, len(all))
10841084
for _, item := range all {
1085-
key, _, _, err := protodb.DataPrefix(item)
1085+
key, _, _, err := tx.db.reg.DataPrefix(item)
10861086
if err != nil {
10871087
return nil, nil, err
10881088
}
@@ -1158,11 +1158,15 @@ func (tx *tx) getOrderedFallbackScanSort(ctx context.Context, m proto.Message, o
11581158
}
11591159

11601160
func buildOrderPlan(msg protoreflect.Message, orderBy *v1alpha1.OrderBy) (orderField, error) {
1161+
return buildOrderPlanWithKeyField(msg, orderBy, protodb.KeyFieldName)
1162+
}
1163+
1164+
func buildOrderPlanWithKeyField(msg protoreflect.Message, orderBy *v1alpha1.OrderBy, keyFieldName func(protoreflect.MessageDescriptor) (string, bool)) (orderField, error) {
11611165
if orderBy == nil {
11621166
return orderField{}, errors.New("order_by cannot be empty")
11631167
}
11641168
md := msg.Descriptor()
1165-
keyField, hasKey := protodb.KeyFieldName(md)
1169+
keyField, hasKey := keyFieldName(md)
11661170
fieldPath := strings.TrimSpace(orderBy.GetField())
11671171
if fieldPath == "" {
11681172
return orderField{}, errors.New("order_by field cannot be empty")

internal/index/indexer.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,24 @@ import (
3434
"go.opentelemetry.io/otel"
3535
"go.opentelemetry.io/otel/attribute"
3636
"google.golang.org/protobuf/proto"
37-
"google.golang.org/protobuf/reflect/protodesc"
3837
"google.golang.org/protobuf/reflect/protoreflect"
3938
"google.golang.org/protobuf/types/dynamicpb"
4039

4140
"go.linka.cloud/protodb/internal/badgerd"
4241
"go.linka.cloud/protodb/internal/protodb"
42+
"go.linka.cloud/protodb/internal/registry"
4343
protopts "go.linka.cloud/protodb/protodb"
4444
)
4545

4646
var idxTracer = otel.Tracer("protodb.indexer")
4747

48-
type FileRegistry interface {
49-
RangeFiles(func(protoreflect.FileDescriptor) bool)
50-
}
51-
5248
type Tx interface {
5349
Txn() badgerd.Tx
5450
UID(ctx context.Context, key []byte, inc bool) (uint64, bool, error)
5551
}
5652

5753
type Indexer struct {
58-
reg protodesc.Resolver
59-
freg FileRegistry
54+
reg *registry.Registry
6055
unmarshal func([]byte, proto.Message) error
6156
mu sync.RWMutex
6257
entries map[protoreflect.FullName]entryCache
@@ -73,8 +68,8 @@ type entryCache struct {
7368
entries []string
7469
}
7570

76-
func NewIndexer(reg protodesc.Resolver, freg FileRegistry, unmarshal func([]byte, proto.Message) error) *Indexer {
77-
return &Indexer{reg: reg, freg: freg, unmarshal: unmarshal}
71+
func NewIndexer(reg *registry.Registry, unmarshal func([]byte, proto.Message) error) *Indexer {
72+
return &Indexer{reg: reg, unmarshal: unmarshal}
7873
}
7974

8075
func (idx *Indexer) RebuildIfNeeded(ctx context.Context, tx Tx, files ...protoreflect.FileDescriptor) error {
@@ -88,7 +83,7 @@ func (idx *Indexer) Rebuild(ctx context.Context, tx Tx, files ...protoreflect.Fi
8883
defer span.End()
8984
msgs := collectMessagesFromFiles(files)
9085
if len(files) == 0 {
91-
msgs = collectMessageDescriptors(idx.freg)
86+
msgs = collectMessageDescriptors(idx.reg)
9287
}
9388
span.SetAttributes(
9489
attribute.Int("index.messages", len(msgs)),
@@ -175,7 +170,7 @@ func (idx *Indexer) IndexableFilter(m proto.Message, f protodb.Filter) (bool, er
175170
if f == nil || f.Expr() == nil {
176171
return false, nil
177172
}
178-
return isIndexableExpr(m.ProtoReflect().New(), f.Expr())
173+
return idx.isIndexableExpr(m.ProtoReflect().New(), f.Expr())
179174
}
180175

181176
func (idx *Indexer) EnforceUnique(ctx context.Context, tx Tx, m proto.Message, uid uint64) error {
@@ -820,7 +815,7 @@ func collectIndexEntries(md protoreflect.MessageDescriptor) []string {
820815
walk(fd.Message(), prefix+fmt.Sprintf("%d", fd.Number())+".")
821816
continue
822817
}
823-
if !proto.HasExtension(fd.Options(), protopts.E_Index) || !isIndexableField(fd) {
818+
if !proto.HasExtension(fd.Options(), protopts.E_Index) || !IsIndexableLeaf(fd) {
824819
continue
825820
}
826821
entries = append(entries, fmt.Sprintf("%s%d|%s|%d", prefix, fd.Number(), fd.Kind(), fd.Cardinality()))
@@ -831,12 +826,12 @@ func collectIndexEntries(md protoreflect.MessageDescriptor) []string {
831826
return entries
832827
}
833828

834-
func collectMessageDescriptors(freg FileRegistry) []protoreflect.MessageDescriptor {
835-
if freg == nil {
829+
func collectMessageDescriptors(reg *registry.Registry) []protoreflect.MessageDescriptor {
830+
if reg == nil {
836831
return nil
837832
}
838833
var out []protoreflect.MessageDescriptor
839-
freg.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
834+
reg.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
840835
out = append(out, collectMessages(fd.Messages())...)
841836
return true
842837
})
@@ -937,32 +932,32 @@ func collectUniqueFromMsgList(val protoreflect.Value, fds ...protoreflect.FieldD
937932
return out, nil
938933
}
939934

940-
func isIndexableExpr(msg protoreflect.Message, expr *filters.Expression) (bool, error) {
935+
func (idx *Indexer) isIndexableExpr(msg protoreflect.Message, expr *filters.Expression) (bool, error) {
941936
if expr == nil {
942937
return true, nil
943938
}
944939
if expr.Condition != nil {
945-
ok, err := isIndexableFieldPath(msg, expr.Condition.GetField())
940+
ok, err := idx.isIndexableFieldPath(msg, expr.Condition.GetField())
946941
if err != nil || !ok {
947942
return ok, err
948943
}
949944
}
950945
for _, v := range expr.AndExprs {
951-
ok, err := isIndexableExpr(msg, v)
946+
ok, err := idx.isIndexableExpr(msg, v)
952947
if err != nil || !ok {
953948
return ok, err
954949
}
955950
}
956951
for _, v := range expr.OrExprs {
957-
ok, err := isIndexableExpr(msg, v)
952+
ok, err := idx.isIndexableExpr(msg, v)
958953
if err != nil || !ok {
959954
return ok, err
960955
}
961956
}
962957
return true, nil
963958
}
964959

965-
func isIndexableFieldPath(msg protoreflect.Message, fieldPath string) (bool, error) {
960+
func (idx *Indexer) isIndexableFieldPath(msg protoreflect.Message, fieldPath string) (bool, error) {
966961
if fieldPath == "" {
967962
return false, nil
968963
}
@@ -974,26 +969,33 @@ func isIndexableFieldPath(msg protoreflect.Message, fieldPath string) (bool, err
974969
if proto.HasExtension(fd.Options(), protopts.E_Index) {
975970
return isIndexableFieldPathDescriptors(fds), nil
976971
}
977-
if !isKeyField(msg.Descriptor(), fds) {
972+
if !idx.isKeyField(msg.Descriptor(), fds) {
978973
return false, nil
979974
}
980975
return isIndexableFieldPathDescriptors(fds), nil
981976
}
982977

983-
func isKeyField(md protoreflect.MessageDescriptor, fds []protoreflect.FieldDescriptor) bool {
978+
func (idx *Indexer) isKeyField(md protoreflect.MessageDescriptor, fds []protoreflect.FieldDescriptor) bool {
984979
if md == nil {
985980
return false
986981
}
987982
if len(fds) == 0 {
988983
return false
989984
}
990-
field, ok := protodb.KeyFieldName(md)
985+
field, ok := keyName(md, idx.reg)
991986
if !ok {
992987
return false
993988
}
994989
return field == fieldPathFromNames(fds)
995990
}
996991

992+
func keyName(md protoreflect.MessageDescriptor, reg *registry.Registry) (string, bool) {
993+
if reg == nil {
994+
return protodb.KeyFieldName(md)
995+
}
996+
return reg.KeyFieldName(md)
997+
}
998+
997999
func isIndexableFieldPathDescriptors(fds []protoreflect.FieldDescriptor) bool {
9981000
if len(fds) == 0 {
9991001
return false
@@ -1016,10 +1018,6 @@ func isIndexableFieldPathDescriptors(fds []protoreflect.FieldDescriptor) bool {
10161018
return IsIndexableLeaf(last)
10171019
}
10181020

1019-
func isIndexableField(fd protoreflect.FieldDescriptor) bool {
1020-
return IsIndexableLeaf(fd)
1021-
}
1022-
10231021
// IsIndexableLeaf reports whether a field descriptor supports index ordering/lookup.
10241022
func IsIndexableLeaf(fd protoreflect.FieldDescriptor) bool {
10251023
switch fd.Kind() {

internal/index/indexer_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,17 @@ import (
1212
"go.linka.cloud/protofilters/index/bitmap"
1313
"google.golang.org/protobuf/proto"
1414
"google.golang.org/protobuf/reflect/protoreflect"
15-
"google.golang.org/protobuf/reflect/protoregistry"
1615
"google.golang.org/protobuf/types/descriptorpb"
1716
"google.golang.org/protobuf/types/dynamicpb"
1817

1918
"go.linka.cloud/protodb/internal/badgerd"
2019
"go.linka.cloud/protodb/internal/protodb"
20+
regpkg "go.linka.cloud/protodb/internal/registry"
2121
protopts "go.linka.cloud/protodb/protodb"
2222
)
2323

2424
func TestCollectEntriesCacheAndRefresh(t *testing.T) {
25-
idx := NewIndexer(nil, nil, nil)
25+
idx := NewIndexer(nil, nil)
2626
md1 := buildIndexerDocDescriptorV1(t)
2727
md2 := buildIndexerDocDescriptorV2(t)
2828

@@ -38,7 +38,7 @@ func TestCollectEntriesCacheAndRefresh(t *testing.T) {
3838
}
3939

4040
func TestIndexableFilter(t *testing.T) {
41-
idx := NewIndexer(nil, nil, nil)
41+
idx := NewIndexer(nil, nil)
4242
md := buildIndexerDocDescriptorV1(t)
4343
m := dynamicpb.NewMessage(md)
4444

@@ -66,7 +66,7 @@ func TestEnforceUnique(t *testing.T) {
6666
require.NoError(t, err)
6767
defer db.Close()
6868

69-
idx := NewIndexer(nil, nil, nil)
69+
idx := NewIndexer(nil, nil)
7070
md := buildUniqueDocDescriptor(t)
7171
emailFD := md.Fields().ByName("email")
7272
require.NotNil(t, emailFD)
@@ -105,7 +105,7 @@ func TestEnforceUniqueDeduplicatesCollectedValues(t *testing.T) {
105105
require.NoError(t, err)
106106
defer db.Close()
107107

108-
idx := NewIndexer(nil, nil, nil)
108+
idx := NewIndexer(nil, nil)
109109
md := buildUniqueTagsDescriptor(t)
110110
tagsFD := md.Fields().ByName("tags")
111111
require.NotNil(t, tagsFD)
@@ -169,7 +169,7 @@ func TestOrderedUIDGroupsSeq(t *testing.T) {
169169
require.NoError(t, err)
170170
defer rtx.Close(ctx)
171171

172-
idx := NewIndexer(nil, nil, nil)
172+
idx := NewIndexer(nil, nil)
173173
fds := []protoreflect.FieldDescriptor{statusFD}
174174

175175
ascSeq, err := idx.OrderedUIDGroupsSeq(ctx, indexTx{tx: rtx}, md.FullName(), fds, false)
@@ -218,7 +218,7 @@ func TestOrderedUIDGroupsSeqContextCanceled(t *testing.T) {
218218
cctx, cancel := context.WithCancel(context.Background())
219219
cancel()
220220

221-
idx := NewIndexer(nil, nil, nil)
221+
idx := NewIndexer(nil, nil)
222222
seq, err := idx.OrderedUIDGroupsSeq(cctx, indexTx{tx: rtx}, md.FullName(), []protoreflect.FieldDescriptor{statusFD}, false)
223223
require.NoError(t, err)
224224
for _, err := range seq {
@@ -405,7 +405,9 @@ func buildUniqueTagsDescriptor(t *testing.T) protoreflect.MessageDescriptor {
405405
}
406406

407407
func TestCollectEntriesDeterministic(t *testing.T) {
408-
idx := NewIndexer(&protoregistry.Files{}, nil, nil)
408+
reg, err := regpkg.New()
409+
require.NoError(t, err)
410+
idx := NewIndexer(reg, nil)
409411
md := buildIndexerDocDescriptorV1(t)
410412
first := idx.CollectEntries(md)
411413
second := idx.CollectEntries(md)

internal/index/store.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,18 @@ func lookupByPath(md protoreflect.MessageDescriptor, fieldPath string, lookup fu
605605
return fds, nil
606606
}
607607

608+
type keyFieldNamer interface {
609+
KeyFieldName(protoreflect.MessageDescriptor) (string, bool)
610+
}
611+
612+
func keyFieldName(resolver protodesc.Resolver, md protoreflect.MessageDescriptor) (string, bool) {
613+
r, ok := resolver.(keyFieldNamer)
614+
if ok {
615+
return r.KeyFieldName(md)
616+
}
617+
return protodb.KeyFieldName(md)
618+
}
619+
608620
func buildFieldReader(txn badgerd.Tx, resolver protodesc.Resolver, name protoreflect.FullName, precompute bool, owner *tx) (*fieldReader, error) {
609621
d, err := resolver.FindDescriptorByName(name)
610622
if err != nil {
@@ -615,7 +627,7 @@ func buildFieldReader(txn badgerd.Tx, resolver protodesc.Resolver, name protoref
615627
return nil, fmt.Errorf("descriptor %s is not a message", name)
616628
}
617629
var key *keyField
618-
keyName, ok := protodb.KeyFieldName(md)
630+
keyName, ok := keyFieldName(resolver, md)
619631
if ok {
620632
fds, err := lookupByNamePath(md, keyName)
621633
if err == nil {

0 commit comments

Comments
 (0)