Skip to content

Commit 3145f88

Browse files
author
Harshil Goel
authored
fix(core): delete all before set for scalar postings (#9378) (#9380)
1 parent c1c99ae commit 3145f88

File tree

2 files changed

+73
-24
lines changed

2 files changed

+73
-24
lines changed

posting/list.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,14 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
741741
return p
742742
}
743743

744+
func createDeleteAllPosting() *pb.Posting {
745+
return &pb.Posting{
746+
Op: Del,
747+
Value: []byte(x.Star),
748+
Uid: math.MaxUint64,
749+
}
750+
}
751+
744752
func hasDeleteAll(mpost *pb.Posting) bool {
745753
return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0
746754
}
@@ -772,30 +780,11 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI
772780
// be done because the fingerprint for the value is not math.MaxUint64 as is
773781
// the case with the rest of the scalar predicates.
774782
newPlist := &pb.PostingList{}
775-
newPlist.Postings = append(newPlist.Postings, mpost)
776-
777-
// Add the deletions in the existing plist because those postings are not picked
778-
// up by iterating. Not doing so would result in delete operations that are not
779-
// applied when the transaction is committed.
780-
l.mutationMap.currentEntries = &pb.PostingList{}
781-
err := l.iterate(mpost.StartTs, 0, func(obj *pb.Posting) error {
782-
// Ignore values which have the same uid as they will get replaced
783-
// by the current value.
784-
if obj.Uid == mpost.Uid {
785-
return nil
786-
}
787-
788-
// Mark all other values as deleted. By the end of the iteration, the
789-
// list of postings will contain deleted operations and only one set
790-
// for the mutation stored in mpost.
791-
objCopy := proto.Clone(obj).(*pb.Posting)
792-
objCopy.Op = Del
793-
newPlist.Postings = append(newPlist.Postings, objCopy)
794-
return nil
795-
})
796-
if err != nil {
797-
return err
783+
if mpost.Op != Del {
784+
// If we are setting a new value then we can just delete all the older values.
785+
newPlist.Postings = append(newPlist.Postings, createDeleteAllPosting())
798786
}
787+
newPlist.Postings = append(newPlist.Postings, mpost)
799788
l.mutationMap.setCurrentEntries(mpost.StartTs, newPlist)
800789
return nil
801790
}

worker/sort_test.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,67 @@ func TestScalarPredicateCount(t *testing.T) {
405405
l.RUnlock()
406406
}
407407

408-
func TestSingleUid(t *testing.T) {
408+
func TestSingleUidReplacement(t *testing.T) {
409+
dir, err := os.MkdirTemp("", "storetest_")
410+
x.Check(err)
411+
defer os.RemoveAll(dir)
412+
413+
opt := badger.DefaultOptions(dir)
414+
ps, err := badger.OpenManaged(opt)
415+
x.Check(err)
416+
pstore = ps
417+
posting.Init(ps, 0, false)
418+
Init(ps)
419+
err = schema.ParseBytes([]byte("singleUidReplaceTest: uid ."), 1)
420+
require.NoError(t, err)
421+
422+
ctx := context.Background()
423+
txn := posting.Oracle().RegisterStartTs(5)
424+
attr := x.GalaxyAttr("singleUidReplaceTest")
425+
426+
// Txn 1. Set 1 -> 2
427+
x.Check(runMutation(ctx, &pb.DirectedEdge{
428+
ValueId: 2,
429+
Attr: attr,
430+
Entity: 1,
431+
Op: pb.DirectedEdge_SET,
432+
}, txn))
433+
434+
txn.Update()
435+
writer := posting.NewTxnWriter(pstore)
436+
require.NoError(t, txn.CommitToDisk(writer, 7))
437+
require.NoError(t, writer.Flush())
438+
txn.UpdateCachedKeys(7)
439+
440+
// Txn 2. Set 1 -> 3
441+
txn = posting.Oracle().RegisterStartTs(9)
442+
443+
x.Check(runMutation(ctx, &pb.DirectedEdge{
444+
ValueId: 3,
445+
Attr: attr,
446+
Entity: 1,
447+
Op: pb.DirectedEdge_SET,
448+
}, txn))
449+
450+
txn.Update()
451+
writer = posting.NewTxnWriter(pstore)
452+
require.NoError(t, txn.CommitToDisk(writer, 11))
453+
require.NoError(t, writer.Flush())
454+
txn.UpdateCachedKeys(11)
455+
456+
key := x.DataKey(attr, 1)
457+
458+
// Reading the david index, we should see 2 inserted, 1 deleted
459+
txn = posting.Oracle().RegisterStartTs(15)
460+
l, err := txn.Get(key)
461+
require.NoError(t, err)
462+
463+
uids, err := l.Uids(posting.ListOptions{ReadTs: 15})
464+
require.NoError(t, err)
465+
require.Equal(t, uids.Uids, []uint64{3})
466+
}
467+
468+
func TestSingleString(t *testing.T) {
409469
dir, err := os.MkdirTemp("", "storetest_")
410470
x.Check(err)
411471
defer os.RemoveAll(dir)

0 commit comments

Comments
 (0)