Skip to content

Commit 9785f9a

Browse files
added single get call
1 parent 170f37c commit 9785f9a

File tree

4 files changed

+131
-33
lines changed

4 files changed

+131
-33
lines changed

posting/list_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,41 @@ func TestAddMutation_mrjn1(t *testing.T) {
435435
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
436436
}
437437

438+
func TestReadSingleValue(t *testing.T) {
439+
defer setMaxListSize(maxListSize)
440+
maxListSize = math.MaxInt32
441+
442+
// We call pl.Iterate and then stop iterating in the first loop when we are reading
443+
// single values. This test confirms that the two functions, getFirst from this file
444+
// and GetSingeValueForKey works without an issue.
445+
446+
key := x.DataKey(x.GalaxyAttr("value"), 1240)
447+
ol, err := getNew(key, ps, math.MaxUint64)
448+
require.NoError(t, err)
449+
N := int(1e2)
450+
for i := 2; i <= N; i += 2 {
451+
edge := &pb.DirectedEdge{
452+
Value: []byte("ho hey there" + strconv.Itoa(i)),
453+
}
454+
txn := Txn{StartTs: uint64(i)}
455+
addMutationHelper(t, ol, edge, Set, &txn)
456+
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
457+
kData := ol.getMutation(uint64(i))
458+
writer := NewTxnWriter(pstore)
459+
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
460+
require.NoError(t, err)
461+
}
462+
writer.Flush()
463+
464+
for j := 3; j < i+6; j++ {
465+
k, err, _ := GetSingleValueForKey(key, uint64(j))
466+
require.NoError(t, err)
467+
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
468+
}
469+
470+
}
471+
}
472+
438473
func TestRollupMaxTsIsSet(t *testing.T) {
439474
defer setMaxListSize(maxListSize)
440475
maxListSize = math.MaxInt32

posting/lists.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,25 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
195195
return lc.SetIfAbsent(skey, pl), nil
196196
}
197197

198+
func (lc *LocalCache) GetSingleItem(key []byte) (*List, error) {
199+
pl, err, _ := GetSingleValueForKey(key, lc.startTs)
200+
if err != nil {
201+
return nil, err
202+
}
203+
204+
l := new(List)
205+
l.key = key
206+
l.plist = pl
207+
208+
lc.RLock()
209+
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
210+
l.setMutation(lc.startTs, delta)
211+
}
212+
lc.RUnlock()
213+
214+
return l, nil
215+
}
216+
198217
// Get retrieves the cached version of the list associated with the given key.
199218
func (lc *LocalCache) Get(key []byte) (*List, error) {
200219
return lc.getInternal(key, true)

posting/mvcc.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,31 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
457457
return l, nil
458458
}
459459

460+
func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, int) {
461+
//fmt.Println("KEY:", key)
462+
txn := pstore.NewTransactionAt(readTs, false)
463+
item, err := txn.Get(key)
464+
if err != nil {
465+
return nil, err, 0
466+
}
467+
pl := &pb.PostingList{}
468+
k := 0
469+
470+
err = item.Value(func(val []byte) error {
471+
k = len(key) + len(val)
472+
if err := pl.Unmarshal(val); err != nil {
473+
return err
474+
}
475+
return nil
476+
})
477+
478+
if err != nil {
479+
return nil, err, 0
480+
}
481+
482+
return pl, nil, k
483+
}
484+
460485
func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
461486
cachedVal, ok := lCache.Get(key)
462487
if ok {

worker/task.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -391,49 +391,68 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
391391
key := x.DataKey(q.Attr, q.UidList.Uids[i])
392392

393393
// Get or create the posting list for an entity, attribute combination.
394-
pl, err := qs.cache.Get(key)
395-
if err != nil {
396-
return err
397-
}
394+
var pl *posting.List
395+
pickMultiplePostings := q.ExpandAll || (listType && len(q.Langs) == 0)
396+
397+
var vals []types.Val
398+
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored
398399

399-
// If count is being requested, there is no need to populate value and facets matrix.
400-
if q.DoCount {
401-
count, err := countForValuePostings(args, pl, facetsTree, listType)
402-
if err != nil && err != posting.ErrNoValue {
400+
if pickMultiplePostings {
401+
pl, err = qs.cache.Get(key)
402+
if err != nil {
403403
return err
404404
}
405-
out.Counts = append(out.Counts, uint32(count))
406-
// Add an empty UID list to make later processing consistent.
407-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
408-
continue
409-
}
405+
// If count is being requested, there is no need to populate value and facets matrix.
406+
if q.DoCount {
407+
count, err := countForValuePostings(args, pl, facetsTree, listType)
408+
if err != nil && err != posting.ErrNoValue {
409+
return err
410+
}
411+
out.Counts = append(out.Counts, uint32(count))
412+
// Add an empty UID list to make later processing consistent.
413+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
414+
continue
415+
}
410416

411-
vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
412-
switch {
413-
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
414-
// This branch is taken when the value does not exist in the pl or
415-
// the number of values retrieved is zero (there could still be facets).
416-
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
417-
// LangMatrix so that all these data structure have predictable layouts.
418-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
419-
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
420-
out.ValueMatrix = append(out.ValueMatrix,
421-
&pb.ValueList{Values: []*pb.TaskValue{}})
422417
if q.ExpandAll {
423-
// To keep the cardinality same as that of ValueMatrix.
424-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
418+
langTags, err := pl.GetLangTags(args.q.ReadTs)
419+
if err != nil {
420+
return err
421+
}
422+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
423+
}
424+
425+
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
426+
switch {
427+
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
428+
// This branch is taken when the value does not exist in the pl or
429+
// the number of values retrieved is zero (there could still be facets).
430+
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
431+
// LangMatrix so that all these data structure have predictable layouts.
432+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
433+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
434+
out.ValueMatrix = append(out.ValueMatrix,
435+
&pb.ValueList{Values: []*pb.TaskValue{}})
436+
if q.ExpandAll {
437+
// To keep the cardinality same as that of ValueMatrix.
438+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
439+
}
440+
continue
441+
case err != nil:
442+
return err
443+
}
444+
445+
} else {
446+
pl, err = qs.cache.GetSingleItem(key)
447+
if err != nil {
448+
return err
425449
}
426-
continue
427-
case err != nil:
428-
return err
429-
}
430450

431-
if q.ExpandAll {
432-
langTags, err := pl.GetLangTags(args.q.ReadTs)
451+
val, err := pl.AllValues(q.ReadTs)
433452
if err != nil {
434453
return err
435454
}
436-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
455+
vals = append(vals, val...)
437456
}
438457

439458
uidList := new(pb.List)

0 commit comments

Comments
 (0)