Skip to content

Commit 3cc823b

Browse files
Use get batch api badger
1 parent 921446e commit 3cc823b

File tree

5 files changed

+125
-29
lines changed

5 files changed

+125
-29
lines changed

go.mod

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/IBM/sarama v1.45.0
1111
github.com/Masterminds/semver/v3 v3.3.1
1212
github.com/blevesearch/bleve/v2 v2.4.4
13-
github.com/dgraph-io/badger/v4 v4.5.1
13+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e
1414
github.com/dgraph-io/dgo/v240 v240.1.0
1515
github.com/dgraph-io/gqlgen v0.13.2
1616
github.com/dgraph-io/gqlparser/v2 v2.2.2
@@ -98,7 +98,7 @@ require (
9898
github.com/gogo/protobuf v1.3.2 // indirect
9999
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
100100
github.com/golang/protobuf v1.5.4 // indirect
101-
github.com/google/flatbuffers v25.1.24+incompatible // indirect
101+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
102102
github.com/google/pprof v0.0.0-20250128161936-077ca0a936bf // indirect
103103
github.com/hashicorp/errwrap v1.1.0 // indirect
104104
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
@@ -156,7 +156,6 @@ require (
156156
go.opentelemetry.io/otel v1.34.0 // indirect
157157
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect
158158
go.opentelemetry.io/otel/metric v1.34.0 // indirect
159-
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
160159
go.opentelemetry.io/otel/trace v1.34.0 // indirect
161160
go.uber.org/multierr v1.11.0 // indirect
162161
golang.org/x/time v0.9.0 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
127127
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
128128
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
129129
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
130-
github.com/dgraph-io/badger/v4 v4.5.1 h1:7DCIXrQjo1LKmM96YD+hLVJ2EEsyyoWxJfpdd56HLps=
131-
github.com/dgraph-io/badger/v4 v4.5.1/go.mod h1:qn3Be0j3TfV4kPbVoK0arXCD1/nr1ftth6sbL5jxdoA=
130+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e h1:sZmnvDqloFjehWjr6f/G5O8ANbhenwSYdkGxkTR2Bww=
131+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e/go.mod h1:aSwx/bXKT3/WRl9rn2BrTU+tfRQlFPKlOsqRTdcpHB8=
132132
github.com/dgraph-io/dgo/v240 v240.1.0 h1:xd8z9kEXDWOAblaLJ2HLg2tXD6ngMQwq3ehLUS7GKNg=
133133
github.com/dgraph-io/dgo/v240 v240.1.0/go.mod h1:r8WASETKfodzKqThSAhhTNIzcEMychArKKlZXQufWuA=
134134
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
@@ -267,8 +267,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z
267267
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
268268
github.com/google/codesearch v1.2.0 h1:VlyAH+AntnIbGGArOUs6sEBdPVwYvf1e8Uw3/TC77cA=
269269
github.com/google/codesearch v1.2.0/go.mod h1:9wQjQDVAP7Mvt96tw1KqVeXncdBLOWUYdxRiHlsG6Xc=
270-
github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLKOaJN/AC6puCca6Jm7o=
271-
github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
270+
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
271+
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
272272
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
273273
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
274274
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

posting/list_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,10 @@ func TestAddMutation_mrjn1(t *testing.T) {
493493
func TestReadSingleValue(t *testing.T) {
494494
defer setMaxListSize(maxListSize)
495495
maxListSize = math.MaxInt32
496+
<<<<<<< HEAD
496497
require.Equal(t, nil, pstore.DropAll())
498+
=======
499+
>>>>>>> 6fbd525d2 (Use get batch api badger)
497500

498501
// We call pl.Iterate and then stop iterating in the first loop when we are reading
499502
// single values. This test confirms that the two functions, getFirst from this file
@@ -502,6 +505,7 @@ func TestReadSingleValue(t *testing.T) {
502505
key := x.DataKey(x.GalaxyAttr("value"), 1240)
503506
ol, err := getNew(key, ps, math.MaxUint64)
504507
require.NoError(t, err)
508+
<<<<<<< HEAD
505509
N := uint64(10000)
506510
for i := uint64(2); i <= N; i += 2 {
507511
edge := &pb.DirectedEdge{
@@ -514,6 +518,19 @@ func TestReadSingleValue(t *testing.T) {
514518
kData := ol.getMutation(i + 1)
515519
writer := NewTxnWriter(pstore)
516520
if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil {
521+
=======
522+
N := int(10000)
523+
for i := 2; i <= N; i += 2 {
524+
edge := &pb.DirectedEdge{
525+
Value: []byte("ho hey there" + strconv.Itoa(i)),
526+
}
527+
txn := Txn{StartTs: uint64(i)}
528+
addMutationHelper(t, ol, edge, Set, &txn)
529+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
530+
kData := ol.getMutation(uint64(i))
531+
writer := NewTxnWriter(pstore)
532+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
533+
>>>>>>> 6fbd525d2 (Use get batch api badger)
517534
require.NoError(t, err)
518535
}
519536
writer.Flush()
@@ -523,12 +540,16 @@ func TestReadSingleValue(t *testing.T) {
523540
kvs, err := ol.Rollup(nil, txn.StartTs-3)
524541
require.NoError(t, err)
525542
require.NoError(t, writePostingListToDisk(kvs))
543+
<<<<<<< HEAD
526544
// Delete item from global cache before reading, as we are not updating the cache in the test
527545
memoryLayer.del(key)
546+
=======
547+
>>>>>>> 6fbd525d2 (Use get batch api badger)
528548
ol, err = getNew(key, ps, math.MaxUint64)
529549
require.NoError(t, err)
530550
}
531551

552+
<<<<<<< HEAD
532553
j := uint64(3)
533554
if j < ol.minTs {
534555
j = ol.minTs
@@ -538,6 +559,17 @@ func TestReadSingleValue(t *testing.T) {
538559
k, err := tx.cache.GetSinglePosting(key)
539560
require.NoError(t, err)
540561
checkValue(t, ol, string(k.Postings[0].Value), j)
562+
=======
563+
j := 2
564+
if j < int(ol.minTs) {
565+
j = int(ol.minTs)
566+
}
567+
for ; j < i+6; j++ {
568+
tx := NewTxn(uint64(j))
569+
k, err := tx.cache.GetSinglePosting(key)
570+
require.NoError(t, err)
571+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
572+
>>>>>>> 6fbd525d2 (Use get batch api badger)
541573
}
542574
}
543575
}

posting/lists.go

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -323,32 +323,32 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
323323
return pl, err
324324
}
325325

326-
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
327-
// given key. This is used for retrieving the value of a scalar predicats.
328-
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
329-
// This would return an error if there is some data in the local cache, but we couldn't read it.
330-
getListFromLocalCache := func() (*pb.PostingList, error) {
331-
lc.RLock()
332-
333-
pl := &pb.PostingList{}
334-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
335-
err := proto.Unmarshal(delta, pl)
336-
lc.RUnlock()
337-
return pl, err
338-
}
326+
func (lc *LocalCache) GetSinglePostingFromLocalCache(key []byte) (*pb.PostingList, error) {
327+
lc.RLock()
339328

340-
l := lc.plists[string(key)]
329+
pl := &pb.PostingList{}
330+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
331+
err := proto.Unmarshal(delta, pl)
341332
lc.RUnlock()
333+
return pl, err
334+
}
342335

343-
if l != nil {
344-
return l.StaticValue(lc.startTs)
345-
}
336+
l := lc.plists[string(key)]
337+
lc.RUnlock()
346338

347-
return nil, nil
339+
if l != nil {
340+
return l.StaticValue(lc.startTs)
348341
}
349342

343+
return nil, nil
344+
}
345+
346+
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
347+
// given key. This is used for retrieving the value of a scalar predicats.
348+
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
349+
// This would return an error if there is some data in the local cache, but we couldn't read it.
350350
getPostings := func() (*pb.PostingList, error) {
351-
pl, err := getListFromLocalCache()
351+
pl, err := lc.GetSinglePostingFromLocalCache(key)
352352
// If both pl and err are empty, that means that there was no data in local cache, hence we should
353353
// read the data from badger.
354354
if pl != nil || err != nil {
@@ -381,6 +381,59 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
381381
return pl, nil
382382
}
383383

384+
func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) {
385+
results := make([]*pb.PostingList, len(keys))
386+
remaining_keys := make([][]byte, 0)
387+
for i, key := range keys {
388+
if pl, err := lc.GetSinglePostingFromLocalCache(key); pl != nil && err != nil {
389+
results[i] = pl
390+
} else {
391+
remaining_keys = append(remaining_keys, key)
392+
}
393+
}
394+
395+
txn := pstore.NewTransactionAt(lc.startTs, false)
396+
items, err := txn.GetBatch(remaining_keys)
397+
if err != nil {
398+
fmt.Println(err, keys)
399+
return nil, err
400+
}
401+
idx := 0
402+
403+
for i := 0; i < len(results); i++ {
404+
if results[i] != nil {
405+
continue
406+
}
407+
pl := &pb.PostingList{}
408+
err = items[idx].Value(func(val []byte) error {
409+
if err := proto.Unmarshal(val, pl); err != nil {
410+
return err
411+
}
412+
return nil
413+
})
414+
idx += 1
415+
results[i] = pl
416+
}
417+
418+
for i := 0; i < len(results); i++ {
419+
pl := results[i]
420+
idx := 0
421+
for _, postings := range pl.Postings {
422+
if hasDeleteAll(postings) {
423+
return nil, nil
424+
}
425+
if postings.Op != Del {
426+
pl.Postings[idx] = postings
427+
idx++
428+
}
429+
}
430+
pl.Postings = pl.Postings[:idx]
431+
results[i] = pl
432+
}
433+
434+
return results, err
435+
}
436+
384437
// Get retrieves the cached version of the list associated with the given key.
385438
func (lc *LocalCache) Get(key []byte) (*List, error) {
386439
return lc.getInternal(key, true)

worker/task.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
423423
out := &pb.Result{}
424424
outputs[start/width] = out
425425

426+
cache := make([]*pb.PostingList, 0)
426427
for i := start; i < end; i++ {
427428
select {
428429
case <-ctx.Done():
@@ -437,9 +438,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
437438
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
438439

439440
if !getMultiplePosting {
440-
pl, err := qs.cache.GetSinglePosting(key)
441-
if err != nil {
442-
return err
441+
if len(cache) == 0 {
442+
keys := make([][]byte, 10)
443+
keys[0] = key
444+
for j := i + 1; j < i+10 && j < end; j++ {
445+
keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j])
446+
}
447+
cache, err = qs.cache.GetBatchSinglePosting(keys)
448+
if err != nil {
449+
return err
450+
}
451+
}
452+
pl := cache[0]
453+
if len(cache) > 1 {
454+
cache = cache[1:]
443455
}
444456
if pl == nil || len(pl.Postings) == 0 {
445457
out.UidMatrix = append(out.UidMatrix, &pb.List{})

0 commit comments

Comments
 (0)