Skip to content

Commit e3d549c

Browse files
author
Harshil Goel
authored
fix(core): fix read scalar list with rollups (#9350) (#9351)
1 parent 84386ea commit e3d549c

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

posting/oracle.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) {
169169
if err != nil {
170170
return nil, err
171171
}
172+
l.Lock()
173+
defer l.Unlock()
172174
if l.mutationMap.len() == 0 && len(l.plist.Postings) == 0 {
173175
pl, err := txn.cache.readPostingListAt(key)
174176
if err == badger.ErrKeyNotFound {
@@ -177,10 +179,15 @@ func (txn *Txn) GetScalarList(key []byte) (*List, error) {
177179
if err != nil {
178180
return nil, err
179181
}
180-
if pl.CommitTs == 0 {
181-
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
182+
183+
if pl.Pack != nil {
184+
l.plist = pl
182185
} else {
183-
l.mutationMap.insertCommittedPostings(pl)
186+
if pl.CommitTs == 0 {
187+
l.mutationMap.setCurrentEntries(txn.StartTs, pl)
188+
} else {
189+
l.mutationMap.insertCommittedPostings(pl)
190+
}
184191
}
185192
}
186193
return l, nil

worker/sort_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,61 @@ func TestEmptyTypeSchema(t *testing.T) {
101101
x.ParseNamespaceAttr(types[0].TypeName)
102102
}
103103

104+
func TestGetScalarList(t *testing.T) {
105+
dir, err := os.MkdirTemp("", "storetest_")
106+
x.Check(err)
107+
defer os.RemoveAll(dir)
108+
109+
opt := badger.DefaultOptions(dir)
110+
ps, err := badger.OpenManaged(opt)
111+
x.Check(err)
112+
pstore = ps
113+
posting.Init(ps, 0, false)
114+
Init(ps)
115+
err = schema.ParseBytes([]byte("scalarPredicateCount4: uid ."), 1)
116+
require.NoError(t, err)
117+
118+
runM := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
119+
txn := posting.Oracle().RegisterStartTs(startTs)
120+
for _, edge := range edges {
121+
x.Check(runMutation(context.Background(), edge, txn))
122+
}
123+
txn.Update()
124+
writer := posting.NewTxnWriter(pstore)
125+
require.NoError(t, txn.CommitToDisk(writer, commitTs))
126+
require.NoError(t, writer.Flush())
127+
txn.UpdateCachedKeys(commitTs)
128+
}
129+
130+
attr := x.GalaxyAttr("scalarPredicateCount4")
131+
132+
runM(5, 7, []*pb.DirectedEdge{{
133+
ValueId: 3,
134+
ValueType: pb.Posting_UID,
135+
Attr: attr,
136+
Entity: 1,
137+
Op: pb.DirectedEdge_SET,
138+
}})
139+
140+
key := x.DataKey(attr, 1)
141+
rollup(t, key, ps, 8)
142+
143+
runM(9, 11, []*pb.DirectedEdge{{
144+
ValueId: 5,
145+
ValueType: pb.Posting_UID,
146+
Attr: attr,
147+
Entity: 1,
148+
Op: pb.DirectedEdge_SET,
149+
}})
150+
151+
txn := posting.Oracle().RegisterStartTs(13)
152+
l, err := txn.Get(key)
153+
require.Nil(t, err)
154+
uids, err := l.Uids(posting.ListOptions{ReadTs: 13})
155+
require.Nil(t, err)
156+
require.Equal(t, 1, len(uids.Uids))
157+
}
158+
104159
func TestMultipleTxnListCount(t *testing.T) {
105160
dir, err := os.MkdirTemp("", "storetest_")
106161
x.Check(err)

0 commit comments

Comments
 (0)