Skip to content

Commit 307d873

Browse files
Use get batch api badger
1 parent 684cbb2 commit 307d873

File tree

5 files changed

+122
-25
lines changed

5 files changed

+122
-25
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/IBM/sarama v1.45.1
99
github.com/Masterminds/semver/v3 v3.3.1
1010
github.com/blevesearch/bleve/v2 v2.5.1
11-
github.com/dgraph-io/badger/v4 v4.7.0
11+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e
1212
github.com/dgraph-io/dgo/v250 v250.0.0-preview4
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
122122
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
123123
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
124124
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
125-
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
126-
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
125+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e h1:sZmnvDqloFjehWjr6f/G5O8ANbhenwSYdkGxkTR2Bww=
126+
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e/go.mod h1:aSwx/bXKT3/WRl9rn2BrTU+tfRQlFPKlOsqRTdcpHB8=
127127
github.com/dgraph-io/dgo/v250 v250.0.0-preview4 h1:DkS6iFI6RwStXRzQxT5v8b6NLqqHQi0xKSK6FvcEwYo=
128128
github.com/dgraph-io/dgo/v250 v250.0.0-preview4/go.mod h1:6nnKW4tYiai9xI6NSCrxaBgUGG1YI/+KlY+Tc7smqXY=
129129
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=

posting/list_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,10 @@ func TestAddMutation_mrjn1(t *testing.T) {
493493
func TestReadSingleValue(t *testing.T) {
494494
defer setMaxListSize(maxListSize)
495495
maxListSize = math.MaxInt32
496+
<<<<<<< HEAD
496497
require.Equal(t, nil, pstore.DropAll())
498+
=======
499+
>>>>>>> 6fbd525d2 (Use get batch api badger)
497500

498501
// We call pl.Iterate and then stop iterating in the first loop when we are reading
499502
// single values. This test confirms that the two functions, getFirst from this file
@@ -502,6 +505,7 @@ func TestReadSingleValue(t *testing.T) {
502505
key := x.DataKey(x.AttrInRootNamespace("value"), 1240)
503506
ol, err := getNew(key, ps, math.MaxUint64)
504507
require.NoError(t, err)
508+
<<<<<<< HEAD
505509
N := uint64(10000)
506510
for i := uint64(2); i <= N; i += 2 {
507511
edge := &pb.DirectedEdge{
@@ -514,6 +518,19 @@ func TestReadSingleValue(t *testing.T) {
514518
kData := ol.getMutation(i + 1)
515519
writer := NewTxnWriter(pstore)
516520
if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil {
521+
=======
522+
N := int(10000)
523+
for i := 2; i <= N; i += 2 {
524+
edge := &pb.DirectedEdge{
525+
Value: []byte("ho hey there" + strconv.Itoa(i)),
526+
}
527+
txn := Txn{StartTs: uint64(i)}
528+
addMutationHelper(t, ol, edge, Set, &txn)
529+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
530+
kData := ol.getMutation(uint64(i))
531+
writer := NewTxnWriter(pstore)
532+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
533+
>>>>>>> 6fbd525d2 (Use get batch api badger)
517534
require.NoError(t, err)
518535
}
519536
writer.Flush()
@@ -523,12 +540,16 @@ func TestReadSingleValue(t *testing.T) {
523540
kvs, err := ol.Rollup(nil, txn.StartTs-3)
524541
require.NoError(t, err)
525542
require.NoError(t, writePostingListToDisk(kvs))
543+
<<<<<<< HEAD
526544
// Delete item from global cache before reading, as we are not updating the cache in the test
527545
memoryLayer.del(key)
546+
=======
547+
>>>>>>> 6fbd525d2 (Use get batch api badger)
528548
ol, err = getNew(key, ps, math.MaxUint64)
529549
require.NoError(t, err)
530550
}
531551

552+
<<<<<<< HEAD
532553
j := uint64(3)
533554
if j < ol.minTs {
534555
j = ol.minTs
@@ -538,6 +559,17 @@ func TestReadSingleValue(t *testing.T) {
538559
k, err := tx.cache.GetSinglePosting(key)
539560
require.NoError(t, err)
540561
checkValue(t, ol, string(k.Postings[0].Value), j)
562+
=======
563+
j := 2
564+
if j < int(ol.minTs) {
565+
j = int(ol.minTs)
566+
}
567+
for ; j < i+6; j++ {
568+
tx := NewTxn(uint64(j))
569+
k, err := tx.cache.GetSinglePosting(key)
570+
require.NoError(t, err)
571+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
572+
>>>>>>> 6fbd525d2 (Use get batch api badger)
541573
}
542574
}
543575
}

posting/lists.go

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -330,32 +330,32 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
330330
return pl, err
331331
}
332332

333-
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
334-
// given key. This is used for retrieving the value of a scalar predicats.
335-
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
336-
// This would return an error if there is some data in the local cache, but we couldn't read it.
337-
getListFromLocalCache := func() (*pb.PostingList, error) {
338-
lc.RLock()
339-
340-
pl := &pb.PostingList{}
341-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
342-
err := proto.Unmarshal(delta, pl)
343-
lc.RUnlock()
344-
return pl, err
345-
}
333+
func (lc *LocalCache) GetSinglePostingFromLocalCache(key []byte) (*pb.PostingList, error) {
334+
lc.RLock()
346335

347-
l := lc.plists[string(key)]
336+
pl := &pb.PostingList{}
337+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
338+
err := proto.Unmarshal(delta, pl)
348339
lc.RUnlock()
340+
return pl, err
341+
}
349342

350-
if l != nil {
351-
return l.StaticValue(lc.startTs)
352-
}
343+
l := lc.plists[string(key)]
344+
lc.RUnlock()
353345

354-
return nil, nil
346+
if l != nil {
347+
return l.StaticValue(lc.startTs)
355348
}
356349

350+
return nil, nil
351+
}
352+
353+
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
354+
// given key. This is used for retrieving the value of a scalar predicats.
355+
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
356+
// This would return an error if there is some data in the local cache, but we couldn't read it.
357357
getPostings := func() (*pb.PostingList, error) {
358-
pl, err := getListFromLocalCache()
358+
pl, err := lc.GetSinglePostingFromLocalCache(key)
359359
// If both pl and err are empty, that means that there was no data in local cache, hence we should
360360
// read the data from badger.
361361
if pl != nil || err != nil {
@@ -388,6 +388,59 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
388388
return pl, nil
389389
}
390390

391+
func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) {
392+
results := make([]*pb.PostingList, len(keys))
393+
remaining_keys := make([][]byte, 0)
394+
for i, key := range keys {
395+
if pl, err := lc.GetSinglePostingFromLocalCache(key); pl != nil && err != nil {
396+
results[i] = pl
397+
} else {
398+
remaining_keys = append(remaining_keys, key)
399+
}
400+
}
401+
402+
txn := pstore.NewTransactionAt(lc.startTs, false)
403+
items, err := txn.GetBatch(remaining_keys)
404+
if err != nil {
405+
fmt.Println(err, keys)
406+
return nil, err
407+
}
408+
idx := 0
409+
410+
for i := 0; i < len(results); i++ {
411+
if results[i] != nil {
412+
continue
413+
}
414+
pl := &pb.PostingList{}
415+
err = items[idx].Value(func(val []byte) error {
416+
if err := proto.Unmarshal(val, pl); err != nil {
417+
return err
418+
}
419+
return nil
420+
})
421+
idx += 1
422+
results[i] = pl
423+
}
424+
425+
for i := 0; i < len(results); i++ {
426+
pl := results[i]
427+
idx := 0
428+
for _, postings := range pl.Postings {
429+
if hasDeleteAll(postings) {
430+
return nil, nil
431+
}
432+
if postings.Op != Del {
433+
pl.Postings[idx] = postings
434+
idx++
435+
}
436+
}
437+
pl.Postings = pl.Postings[:idx]
438+
results[i] = pl
439+
}
440+
441+
return results, err
442+
}
443+
391444
// Get retrieves the cached version of the list associated with the given key.
392445
func (lc *LocalCache) Get(key []byte) (*List, error) {
393446
return lc.getInternal(key, true)

worker/task.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
428428
out := &pb.Result{}
429429
outputs[start/width] = out
430430

431+
cache := make([]*pb.PostingList, 0)
431432
for i := start; i < end; i++ {
432433
select {
433434
case <-ctx.Done():
@@ -442,9 +443,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
442443
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
443444

444445
if !getMultiplePosting {
445-
pl, err := qs.cache.GetSinglePosting(key)
446-
if err != nil {
447-
return err
446+
if len(cache) == 0 {
447+
keys := make([][]byte, 10)
448+
keys[0] = key
449+
for j := i + 1; j < i+10 && j < end; j++ {
450+
keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j])
451+
}
452+
cache, err = qs.cache.GetBatchSinglePosting(keys)
453+
if err != nil {
454+
return err
455+
}
456+
}
457+
pl := cache[0]
458+
if len(cache) > 1 {
459+
cache = cache[1:]
448460
}
449461
if pl == nil || len(pl.Postings) == 0 {
450462
out.UidMatrix = append(out.UidMatrix, &pb.List{})

0 commit comments

Comments
 (0)