Skip to content

Commit 9df606b

Browse files
Harshil Goeldarkcoderrises
authored andcommitted
perf(core): Optimize reading data during mutation for scalar types (#9290)
1 parent 80863c6 commit 9df606b

File tree

5 files changed

+88
-26
lines changed

5 files changed

+88
-26
lines changed

posting/lists.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ package posting
1818

1919
import (
2020
"bytes"
21+
"context"
2122
"fmt"
2223
"sync"
24+
"time"
2325

26+
ostats "go.opencensus.io/stats"
27+
"go.opencensus.io/tag"
2428
"google.golang.org/protobuf/proto"
2529

2630
"github.com/dgraph-io/badger/v4"
@@ -300,6 +304,33 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
300304
return lc.SetIfAbsent(skey, pl), nil
301305
}
302306

307+
func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
308+
start := time.Now()
309+
defer func() {
310+
pk, _ := x.Parse(key)
311+
ms := x.SinceMs(start)
312+
var tags []tag.Mutator
313+
tags = append(tags, tag.Upsert(x.KeyMethod, "get"))
314+
tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr))
315+
_ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms))
316+
}()
317+
318+
pl := &pb.PostingList{}
319+
txn := pstore.NewTransactionAt(lc.startTs, false)
320+
defer txn.Discard()
321+
322+
item, err := txn.Get(key)
323+
if err != nil {
324+
return nil, err
325+
}
326+
327+
err = item.Value(func(val []byte) error {
328+
return proto.Unmarshal(val, pl)
329+
})
330+
331+
return pl, err
332+
}
333+
303334
// GetSinglePosting retrieves the cached version of the first item in the list associated with the
304335
// given key. This is used for retrieving the value of a scalar predicats.
305336
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
@@ -332,20 +363,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
332363
return pl, err
333364
}
334365

335-
pl = &pb.PostingList{}
336-
txn := pstore.NewTransactionAt(lc.startTs, false)
337-
defer txn.Discard()
338-
339-
item, err := txn.Get(key)
340-
if err != nil {
341-
return nil, err
342-
}
343-
344-
err = item.Value(func(val []byte) error {
345-
return proto.Unmarshal(val, pl)
346-
})
347-
348-
return pl, err
366+
return lc.readPostingListAt(key)
349367
}
350368

351369
pl, err := getPostings()

posting/mvcc.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/golang/glog"
3030
"github.com/pkg/errors"
3131
ostats "go.opencensus.io/stats"
32+
"go.opencensus.io/tag"
3233
"google.golang.org/protobuf/proto"
3334

3435
"github.com/dgraph-io/badger/v4"
@@ -547,6 +548,16 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
547548
if err != nil {
548549
return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key)
549550
}
551+
552+
start := time.Now()
553+
defer func() {
554+
ms := x.SinceMs(start)
555+
var tags []tag.Mutator
556+
tags = append(tags, tag.Upsert(x.KeyMethod, "iterate"))
557+
tags = append(tags, tag.Upsert(x.KeyStatus, pk.Attr))
558+
_ = ostats.RecordWithTags(context.Background(), tags, x.BadgerReadLatencyMs.M(ms))
559+
}()
560+
550561
if pk.HasStartUid {
551562
// Trying to read a single part of a multi part list. This type of list
552563
// should be read using using the main key because the information needed

posting/oracle.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync/atomic"
2525
"time"
2626

27+
"github.com/dgraph-io/badger/v4"
2728
"github.com/golang/glog"
2829
ostats "go.opencensus.io/stats"
2930

@@ -163,6 +164,28 @@ func (txn *Txn) GetFromDelta(key []byte) (*List, error) {
163164
return txn.cache.GetFromDelta(key)
164165
}
165166

167+
func (txn *Txn) GetScalarList(key []byte) (*List, error) {
168+
l, err := txn.cache.GetFromDelta(key)
169+
if err != nil {
170+
return nil, err
171+
}
172+
if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 {
173+
pl, err := txn.cache.readPostingListAt(key)
174+
if err == badger.ErrKeyNotFound {
175+
return l, nil
176+
}
177+
if err != nil {
178+
return nil, err
179+
}
180+
if pl.CommitTs == 0 {
181+
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
182+
} else {
183+
l.mutationMap.insertCommittedPostings(pl)
184+
}
185+
}
186+
return l, nil
187+
}
188+
166189
// Update calls UpdateDeltasAndDiscardLists on the local cache.
167190
func (txn *Txn) Update() {
168191
txn.cache.UpdateDeltasAndDiscardLists()

worker/mutation.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,25 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
9090
// The following is a performance optimization which allows us to not read a posting list from
9191
// disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if
9292
// we're not using the read posting list, we don't need to retrieve it. We need the posting list
93-
// if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can
94-
// just create a posting list facade in memory and use it to store the delta in Badger. Later,
95-
// the rollup operation would consolidate all these deltas into a posting list.
93+
// if we're doing count index or delete operation. For scalar predicates, we just get the last item merged.
94+
// In other cases, we can just create a posting list facade in memory and use it to store the delta in Badger.
95+
// Later, the rollup operation would consolidate all these deltas into a posting list.
96+
isList := su.GetList()
9697
var getFn func(key []byte) (*posting.List, error)
9798
switch {
98-
case len(su.GetTokenizer()) > 0 || su.GetCount():
99-
// Any index or count index.
100-
getFn = txn.Get
101-
case su.GetValueType() == pb.Posting_UID && !su.GetList():
102-
// Single UID, not a list.
99+
case len(edge.Lang) == 0 && !isList:
100+
// Scalar Predicates, without lang
101+
getFn = txn.GetScalarList
102+
case len(edge.Lang) > 0 || su.GetCount():
103+
// Language or Count Index
103104
getFn = txn.Get
104105
case edge.Op == pb.DirectedEdge_DEL:
105106
// Covers various delete cases to keep things simple.
106107
getFn = txn.Get
107108
default:
108-
// Reverse index doesn't need the posting list to be read. We already covered count index,
109-
// single uid and delete all above.
110-
// Values, whether single or list, don't need to be read.
111-
// Uid list doesn't need to be read.
109+
// Only count index needs to be read. For other indexes on list, we don't need to read any data.
110+
// For indexes on scalar prediactes, only the last element needs to be left.
111+
// Delete cases covered above.
112112
getFn = txn.GetFromDelta
113113
}
114114

x/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ var (
7171
// LatencyMs is the latency of the various Dgraph operations.
7272
LatencyMs = ostats.Float64("latency",
7373
"Latency of the various methods", ostats.UnitMilliseconds)
74+
// BadgerReadLatencyMs is the latency of various different predicate reads from badger.
75+
BadgerReadLatencyMs = ostats.Float64("badger_read_latency_ms",
76+
"Latency of the various methods", ostats.UnitMilliseconds)
7477

7578
// Point-in-time metrics.
7679

@@ -201,6 +204,13 @@ var (
201204
Aggregation: defaultLatencyMsDistribution,
202205
TagKeys: allTagKeys,
203206
},
207+
{
208+
Name: BadgerReadLatencyMs.Name(),
209+
Measure: BadgerReadLatencyMs,
210+
Description: BadgerReadLatencyMs.Description(),
211+
Aggregation: defaultLatencyMsDistribution,
212+
TagKeys: allTagKeys,
213+
},
204214
{
205215
Name: NumQueries.Name(),
206216
Measure: NumQueries,

0 commit comments

Comments
 (0)