Skip to content

Commit 48c434f

Browse files
updated cache
1 parent daf01c0 commit 48c434f

File tree

3 files changed

+116
-79
lines changed

3 files changed

+116
-79
lines changed

posting/lists.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,13 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
152152
return updated
153153
}
154154

155-
func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
155+
func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) {
156156
getNewPlistNil := func() (*List, error) {
157157
lc.RLock()
158158
defer lc.RUnlock()
159159
if lc.plists == nil {
160-
return getNew(key, pstore, lc.startTs)
160+
pl, err, _ := GetSingleValueForKey(key, lc.startTs)
161+
return pl, err
161162
}
162163
return nil, nil
163164
}
@@ -172,9 +173,10 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
172173
}
173174

174175
var pl *List
176+
var k int
175177
if readFromDisk {
176178
var err error
177-
pl, err = getNew(key, pstore, lc.startTs)
179+
pl, err, k = GetSingleValueForKey(key, lc.startTs)
178180
if err != nil {
179181
return nil, err
180182
}
@@ -185,33 +187,63 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
185187
}
186188
}
187189

190+
fmt.Println(k)
191+
188192
// If we just brought this posting list into memory and we already have a delta for it, let's
189193
// apply it before returning the list.
190194
lc.RLock()
191195
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
192196
pl.setMutation(lc.startTs, delta)
193197
}
194198
lc.RUnlock()
195-
return lc.SetIfAbsent(skey, pl), nil
199+
return pl, nil
196200
}
197201

198-
func (lc *LocalCache) GetSingleItem(key []byte) (*List, error) {
199-
pl, err, _ := GetSingleValueForKey(key, lc.startTs)
200-
if err != nil {
201-
return nil, err
202+
func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
203+
getNewPlistNil := func() (*List, error) {
204+
lc.RLock()
205+
defer lc.RUnlock()
206+
if lc.plists == nil {
207+
return getNew(key, pstore, lc.startTs)
208+
}
209+
return nil, nil
210+
}
211+
212+
if l, err := getNewPlistNil(); l != nil || err != nil {
213+
return l, err
202214
}
203215

204-
l := new(List)
205-
l.key = key
206-
l.plist = pl
216+
skey := string(key)
217+
if pl := lc.getNoStore(skey); pl != nil {
218+
return pl, nil
219+
}
207220

221+
var pl *List
222+
if readFromDisk {
223+
var err error
224+
pl, err = getNew(key, pstore, lc.startTs)
225+
if err != nil {
226+
return nil, err
227+
}
228+
} else {
229+
pl = &List{
230+
key: key,
231+
plist: new(pb.PostingList),
232+
}
233+
}
234+
235+
// If we just brought this posting list into memory and we already have a delta for it, let's
236+
// apply it before returning the list.
208237
lc.RLock()
209-
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
210-
l.setMutation(lc.startTs, delta)
238+
if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 {
239+
pl.setMutation(lc.startTs, delta)
211240
}
212241
lc.RUnlock()
242+
return lc.SetIfAbsent(skey, pl), nil
243+
}
213244

214-
return l, nil
245+
func (lc *LocalCache) GetSingle(key []byte) (*List, error) {
246+
return lc.getSingleInternal(key, true)
215247
}
216248

217249
// Get retrieves the cached version of the list associated with the given key.

posting/mvcc.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,19 +457,49 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
457457
return l, nil
458458
}
459459

460-
func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, int) {
460+
func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) {
461+
cachedVal, ok := lCache.Get(key)
462+
if ok {
463+
l, ok := cachedVal.(*List)
464+
if ok && l != nil {
465+
// No need to clone the immutable layer or the key since mutations will not modify it.
466+
lCopy := &List{
467+
minTs: l.minTs,
468+
maxTs: l.maxTs,
469+
key: key,
470+
plist: l.plist,
471+
}
472+
l.RLock()
473+
if l.mutationMap != nil {
474+
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
475+
for ts, pl := range l.mutationMap {
476+
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
477+
}
478+
}
479+
l.RUnlock()
480+
return lCopy, nil, 0
481+
}
482+
}
483+
484+
if pstore.IsClosed() {
485+
return nil, badger.ErrDBClosed, 0
486+
}
487+
488+
l := new(List)
489+
l.key = key
490+
l.plist = new(pb.PostingList)
491+
461492
//fmt.Println("KEY:", key)
462493
txn := pstore.NewTransactionAt(readTs, false)
463494
item, err := txn.Get(key)
464495
if err != nil {
465496
return nil, err, 0
466497
}
467-
pl := &pb.PostingList{}
468498
k := 0
469499

470500
err = item.Value(func(val []byte) error {
471501
k = len(key) + len(val)
472-
if err := pl.Unmarshal(val); err != nil {
502+
if err := l.plist.Unmarshal(val); err != nil {
473503
return err
474504
}
475505
return nil
@@ -479,7 +509,7 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, in
479509
return nil, err, 0
480510
}
481511

482-
return pl, nil, k
512+
return l, nil, k
483513
}
484514

485515
func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {

worker/task.go

Lines changed: 36 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -399,75 +399,50 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
399399

400400
if pickMultiplePostings {
401401
pl, err = qs.cache.Get(key)
402-
if err != nil {
403-
return err
404-
}
405-
// If count is being requested, there is no need to populate value and facets matrix.
406-
if q.DoCount {
407-
count, err := countForValuePostings(args, pl, facetsTree, listType)
408-
if err != nil && err != posting.ErrNoValue {
409-
return err
410-
}
411-
out.Counts = append(out.Counts, uint32(count))
412-
// Add an empty UID list to make later processing consistent.
413-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
414-
continue
415-
}
416-
417-
if q.ExpandAll {
418-
langTags, err := pl.GetLangTags(args.q.ReadTs)
419-
if err != nil {
420-
return err
421-
}
422-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
423-
}
424-
425-
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
426-
switch {
427-
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
428-
// This branch is taken when the value does not exist in the pl or
429-
// the number of values retrieved is zero (there could still be facets).
430-
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
431-
// LangMatrix so that all these data structure have predictable layouts.
432-
out.UidMatrix = append(out.UidMatrix, &pb.List{})
433-
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
434-
out.ValueMatrix = append(out.ValueMatrix,
435-
&pb.ValueList{Values: []*pb.TaskValue{}})
436-
if q.ExpandAll {
437-
// To keep the cardinality same as that of ValueMatrix.
438-
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
439-
}
440-
continue
441-
case err != nil:
402+
} else {
403+
pl, err = qs.cache.GetSingle(key)
404+
}
405+
if err != nil {
406+
return err
407+
}
408+
// If count is being requested, there is no need to populate value and facets matrix.
409+
if q.DoCount {
410+
count, err := countForValuePostings(args, pl, facetsTree, listType)
411+
if err != nil && err != posting.ErrNoValue {
442412
return err
443413
}
414+
out.Counts = append(out.Counts, uint32(count))
415+
// Add an empty UID list to make later processing consistent.
416+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
417+
continue
418+
}
444419

445-
} else {
446-
pl, err, _ := posting.GetSingleValueForKey(key, q.ReadTs)
420+
if q.ExpandAll {
421+
langTags, err := pl.GetLangTags(args.q.ReadTs)
447422
if err != nil {
448423
return err
449424
}
425+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
426+
}
450427

451-
for _, p := range pl.Postings {
452-
vals = append(vals, types.Val{
453-
Tid: types.TypeID(p.ValType),
454-
Value: p.Value,
455-
})
456-
457-
if q.FacetParam != nil {
458-
fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)})
459-
}
428+
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)
429+
switch {
430+
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
431+
// This branch is taken when the value does not exist in the pl or
432+
// the number of values retrieved is zero (there could still be facets).
433+
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
434+
// LangMatrix so that all these data structure have predictable layouts.
435+
out.UidMatrix = append(out.UidMatrix, &pb.List{})
436+
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
437+
out.ValueMatrix = append(out.ValueMatrix,
438+
&pb.ValueList{Values: []*pb.TaskValue{}})
439+
if q.ExpandAll {
440+
// To keep the cardinality same as that of ValueMatrix.
441+
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
460442
}
461-
//pl, err = qs.cache.GetSingleItem(key)
462-
//if err != nil {
463-
// return err
464-
//}
465-
466-
//val, err := pl.AllValues(q.ReadTs)
467-
//if err != nil {
468-
// return err
469-
//}
470-
//vals = append(vals, val...)
443+
continue
444+
case err != nil:
445+
return err
471446
}
472447

473448
uidList := new(pb.List)

0 commit comments

Comments
 (0)