@@ -111,6 +111,8 @@ type LocalCache struct {
111111
112112	// plists are posting lists in memory. They can be discarded to reclaim space. 
113113	plists  map [string ]* List 
114+ 
115+ 	postings  map [string ]* pb.PostingList 
114116}
115117
116118// NewLocalCache returns a new LocalCache instance. 
@@ -120,13 +122,17 @@ func NewLocalCache(startTs uint64) *LocalCache {
120122		deltas :      make (map [string ][]byte ),
121123		plists :      make (map [string ]* List ),
122124		maxVersions : make (map [string ]uint64 ),
125+ 		postings :    make (map [string ]* pb.PostingList ),
123126	}
124127}
125128
126129// NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs 
127130// around. 
128131func  NoCache (startTs  uint64 ) * LocalCache  {
129- 	return  & LocalCache {startTs : startTs }
132+ 	return  & LocalCache {
133+ 		startTs :  startTs ,
134+ 		postings : make (map [string ]* pb.PostingList ),
135+ 	}
130136}
131137
132138func  (lc  * LocalCache ) getNoStore (key  string ) * List  {
@@ -138,6 +144,20 @@ func (lc *LocalCache) getNoStore(key string) *List {
138144	return  nil 
139145}
140146
147+ // SetIfAbsent adds the list for the specified key to the cache. If a list for the same 
148+ // key already exists, the cache will not be modified and the existing list 
149+ // will be returned instead. This behavior is meant to prevent the goroutines 
150+ // using the cache from ending up with an orphaned version of a list. 
151+ func  (lc  * LocalCache ) SetPostingIfAbsent (key  string , updated  * pb.PostingList ) * pb.PostingList  {
152+ 	lc .Lock ()
153+ 	defer  lc .Unlock ()
154+ 	if  pl , ok  :=  lc .postings [key ]; ok  {
155+ 		return  pl 
156+ 	}
157+ 	lc .postings [key ] =  updated 
158+ 	return  updated 
159+ }
160+ 
141161// SetIfAbsent adds the list for the specified key to the cache. If a list for the same 
142162// key already exists, the cache will not be modified and the existing list 
143163// will be returned instead. This behavior is meant to prevent the goroutines 
@@ -200,8 +220,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
200220	remaining_keys  :=  make ([][]byte , 0 )
201221	lc .RLock ()
202222	for  i , key  :=  range  keys  {
203- 		pl  :=  & pb.PostingList {}
204- 		if  delta , ok  :=  lc .deltas [string (key )]; ok  &&  len (delta ) >  0  {
223+ 		if  pl , ok  :=  lc .postings [string (key )]; ok  &&  pl  !=  nil  {
224+ 			results [i ] =  pl 
225+ 		} else  if  delta , ok  :=  lc .deltas [string (key )]; ok  &&  len (delta ) >  0  {
226+ 			pl  :=  & pb.PostingList {}
205227			err  :=  pl .Unmarshal (delta )
206228			if  err  !=  nil  {
207229				results [i ] =  pl 
@@ -214,6 +236,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
214236
215237	txn  :=  pstore .NewTransactionAt (lc .startTs , false )
216238	items , err  :=  txn .GetBatch (remaining_keys )
239+ 	if  err  !=  nil  {
240+ 		fmt .Println (err , keys )
241+ 		return  nil , err 
242+ 	}
217243	idx  :=  0 
218244
219245	for  i  :=  0 ; i  <  len (results ); i ++  {
@@ -245,6 +271,7 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e
245271		}
246272		pl .Postings  =  pl .Postings [:idx ]
247273		results [i ] =  pl 
274+ 		lc .SetPostingIfAbsent (string (keys [i ]), pl )
248275	}
249276
250277	return  results , err 
@@ -311,6 +338,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
311338		}
312339	}
313340	pl .Postings  =  pl .Postings [:idx ]
341+ 	lc .SetPostingIfAbsent (string (key ), pl )
314342	return  pl , nil 
315343}
316344
0 commit comments