Skip to content

Commit ee99a7a

Browse files
author
Harshil Goel
authored
perf(core): Fixing ristretto cache to pass jepsen tests (#9237)
1 parent 837e334 commit ee99a7a

File tree

21 files changed

+411
-194
lines changed

21 files changed

+411
-194
lines changed

dgraph/cmd/alpha/run.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication.
145145
Flag("percentage",
146146
"Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+
147147
"PstoreBlockCache,PstoreIndexCache)").
148+
Flag("keep-updates",
149+
"Should carry updates in cache or not (bool)").
148150
String())
149151

150152
flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults).
@@ -633,6 +635,7 @@ func run() {
633635
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
634636

635637
cachePercentage := cache.GetString("percentage")
638+
keepUpdates := cache.GetBool("keep-updates")
636639
cachePercent, err := x.GetCachePercentages(cachePercentage, 3)
637640
x.Check(err)
638641
postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
@@ -655,6 +658,7 @@ func run() {
655658
WALDir: Alpha.Conf.GetString("wal"),
656659
CacheMb: totalCache,
657660
CachePercentage: cachePercentage,
661+
KeepUpdates: keepUpdates,
658662

659663
MutationsMode: worker.AllowMutations,
660664
AuthToken: security.GetString("token"),
@@ -782,7 +786,7 @@ func run() {
782786
// Posting will initialize index which requires schema. Hence, initialize
783787
// schema before calling posting.Init().
784788
schema.Init(worker.State.Pstore)
785-
posting.Init(worker.State.Pstore, postingListCacheSize)
789+
posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates)
786790
defer posting.Cleanup()
787791
worker.Init(worker.State.Pstore)
788792

dgraph/cmd/debug/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ func run() {
10241024
db, err := badger.OpenManaged(bopts)
10251025
x.Check(err)
10261026
// Not using posting list cache
1027-
posting.Init(db, 0)
1027+
posting.Init(db, 0, false)
10281028
defer db.Close()
10291029

10301030
printSummary(db)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/dgraph-io/gqlgen v0.13.2
1616
github.com/dgraph-io/gqlparser/v2 v2.2.2
1717
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
18-
github.com/dgraph-io/ristretto/v2 v2.0.1
18+
github.com/dgraph-io/ristretto/v2 v2.1.0
1919
github.com/dgraph-io/simdjson-go v0.3.0
2020
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
2121
github.com/dgryski/go-groupvarint v0.0.0-20230630160417-2bfb7969fb3c
@@ -58,7 +58,7 @@ require (
5858
golang.org/x/mod v0.22.0
5959
golang.org/x/net v0.33.0
6060
golang.org/x/sync v0.10.0
61-
golang.org/x/sys v0.28.0
61+
golang.org/x/sys v0.29.0
6262
golang.org/x/term v0.27.0
6363
golang.org/x/text v0.21.0
6464
golang.org/x/tools v0.28.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.2 h1:CnxXOKL4EPguKqcGV/z4u4VoW5izUkOTIsNM
145145
github.com/dgraph-io/gqlparser/v2 v2.2.2/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
146146
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
147147
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
148-
github.com/dgraph-io/ristretto/v2 v2.0.1 h1:7W0LfEP+USCmtrUjJsk+Jv2jbhJmb72N4yRI7GrLdMI=
149-
github.com/dgraph-io/ristretto/v2 v2.0.1/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo=
148+
github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I=
149+
github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4=
150150
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
151151
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
152152
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
@@ -853,8 +853,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
853853
golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
854854
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
855855
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
856-
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
857-
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
856+
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
857+
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
858858
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
859859
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
860860
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=

posting/index.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
10111011
return nil, err
10121012
}
10131013

1014+
memoryLayer.del(key)
10141015
return &bpb.KVList{Kv: kvs}, nil
10151016
}
10161017
tmpStream.Send = func(buf *z.Buffer) error {

posting/index_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
157157
}
158158

159159
txn.Update()
160+
txn.UpdateCachedKeys(commitTs)
160161
writer := NewTxnWriter(pstore)
161162
require.NoError(t, txn.CommitToDisk(writer, commitTs))
162163
require.NoError(t, writer.Flush())
@@ -271,6 +272,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,
271272

272273
func TestCountReverseIndexWithData(t *testing.T) {
273274
require.NoError(t, pstore.DropAll())
275+
memoryLayer.clear()
274276
indexNameCountVal := "testcount: [uid] @count @reverse ."
275277

276278
attr := x.GalaxyAttr("testcount")
@@ -305,6 +307,7 @@ func TestCountReverseIndexWithData(t *testing.T) {
305307

306308
func TestCountReverseIndexEmptyPosting(t *testing.T) {
307309
require.NoError(t, pstore.DropAll())
310+
memoryLayer.clear()
308311
indexNameCountVal := "testcount: [uid] @count @reverse ."
309312

310313
attr := x.GalaxyAttr("testcount")

posting/list.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer {
121121
}
122122
}
123123

124+
func (mm *MutableLayer) setTs(readTs uint64) {
125+
if mm == nil {
126+
return
127+
}
128+
mm.readTs = readTs
129+
}
130+
124131
// This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right
125132
// things from the existing mutable layer for the new list. It basically copies committedEntries using reference and
126133
// ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and
@@ -866,6 +873,12 @@ func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
866873
return conflictKey
867874
}
868875

876+
// SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed
877+
// on to the functions that would read the data.
878+
func (l *List) SetTs(readTs uint64) {
879+
l.mutationMap.setTs(readTs)
880+
}
881+
869882
func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error {
870883
l.AssertLock()
871884

@@ -978,6 +991,9 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
978991
}
979992

980993
l.mutationMap.committedUids[mpost.Uid] = mpost
994+
if l.mutationMap.length == math.MaxInt64 {
995+
l.mutationMap.length = 0
996+
}
981997
l.mutationMap.length += getLengthDelta(mpost.Op)
982998
}
983999

@@ -999,7 +1015,6 @@ func (l *List) setMutation(startTs uint64, data []byte) {
9991015
l.mutationMap = newMutableLayer()
10001016
}
10011017
l.mutationMap.setCurrentEntries(startTs, pl)
1002-
10031018
if pl.CommitTs != 0 {
10041019
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
10051020
}
@@ -1258,6 +1273,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb
12581273
var count int
12591274
var found bool
12601275
var post *pb.Posting
1276+
12611277
err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
12621278
if p.Uid == uid {
12631279
post = p

posting/list_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@ func TestAddMutation_mrjn1(t *testing.T) {
504504
func TestReadSingleValue(t *testing.T) {
505505
defer setMaxListSize(maxListSize)
506506
maxListSize = math.MaxInt32
507+
require.Equal(t, nil, pstore.DropAll())
507508

508509
// We call pl.Iterate and then stop iterating in the first loop when we are reading
509510
// single values. This test confirms that the two functions, getFirst from this file
@@ -518,6 +519,7 @@ func TestReadSingleValue(t *testing.T) {
518519
Value: []byte(fmt.Sprintf("ho hey there%d", i)),
519520
}
520521
txn := Txn{StartTs: i}
522+
ol.mutationMap.setTs(i)
521523
addMutationHelper(t, ol, edge, Set, &txn)
522524
require.NoError(t, ol.commitMutation(i, i+1))
523525
kData := ol.getMutation(i + 1)
@@ -532,6 +534,8 @@ func TestReadSingleValue(t *testing.T) {
532534
kvs, err := ol.Rollup(nil, txn.StartTs-3)
533535
require.NoError(t, err)
534536
require.NoError(t, writePostingListToDisk(kvs))
537+
// Delete item from global cache before reading, as we are not updating the cache in the test
538+
memoryLayer.del(key)
535539
ol, err = getNew(key, ps, math.MaxUint64)
536540
require.NoError(t, err)
537541
}
@@ -1803,7 +1807,7 @@ func TestMain(m *testing.M) {
18031807
ps, err = badger.OpenManaged(badger.DefaultOptions(dir))
18041808
x.Panic(err)
18051809
// Not using posting list cache
1806-
Init(ps, 0)
1810+
Init(ps, 0, false)
18071811
schema.Init(ps)
18081812

18091813
m.Run()

posting/lists.go

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,13 @@ package posting
1818

1919
import (
2020
"bytes"
21-
"context"
2221
"fmt"
2322
"sync"
24-
"time"
2523

26-
ostats "go.opencensus.io/stats"
2724
"google.golang.org/protobuf/proto"
2825

2926
"github.com/dgraph-io/badger/v4"
3027
"github.com/dgraph-io/dgo/v240/protos/api"
31-
"github.com/dgraph-io/ristretto/v2"
3228
"github.com/dgraph-io/ristretto/v2/z"
3329
"github.com/hypermodeinc/dgraph/v24/protos/pb"
3430
"github.com/hypermodeinc/dgraph/v24/tok/index"
@@ -42,41 +38,15 @@ const (
4238
var (
4339
pstore *badger.DB
4440
closer *z.Closer
45-
lCache *ristretto.Cache[[]byte, *List]
4641
)
4742

4843
// Init initializes the posting lists package, the in memory and dirty list hash.
49-
func Init(ps *badger.DB, cacheSize int64) {
44+
func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) {
5045
pstore = ps
5146
closer = z.NewCloser(1)
5247
go x.MonitorMemoryMetrics(closer)
5348

54-
// Initialize cache.
55-
if cacheSize == 0 {
56-
return
57-
}
58-
59-
var err error
60-
lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{
61-
// Use 5% of cache memory for storing counters.
62-
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
63-
MaxCost: int64(float64(cacheSize) * 0.95),
64-
BufferItems: 64,
65-
Metrics: true,
66-
Cost: func(val *List) int64 {
67-
return 0
68-
},
69-
})
70-
x.Check(err)
71-
go func() {
72-
m := lCache.Metrics
73-
ticker := time.NewTicker(10 * time.Second)
74-
defer ticker.Stop()
75-
for range ticker.C {
76-
// Record the posting list cache hit ratio
77-
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
78-
}
79-
}()
49+
memoryLayer = initMemoryLayer(cacheSize, keepUpdates)
8050
}
8151

8252
func UpdateMaxCost(maxCost int64) {
@@ -145,7 +115,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro
145115
func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
146116
value := pl.findStaticValue(vc.delegate.startTs)
147117

148-
if value == nil {
118+
if value == nil || len(value.Postings) == 0 {
149119
return nil, ErrNoValue
150120
}
151121

@@ -314,8 +284,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
314284
}
315285
} else {
316286
pl = &List{
317-
key: key,
318-
plist: new(pb.PostingList),
287+
key: key,
288+
plist: new(pb.PostingList),
289+
mutationMap: newMutableLayer(),
319290
}
320291
}
321292

@@ -421,8 +392,6 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() {
421392
}
422393

423394
for key, pl := range lc.plists {
424-
//pk, _ := x.Parse([]byte(key))
425-
//fmt.Printf("{TXN} Closing %v\n", pk)
426395
data := pl.getMutation(lc.startTs)
427396
if len(data) > 0 {
428397
lc.deltas[key] = data

0 commit comments

Comments
 (0)