3434#define ILLEGAL_AUXKEY 0xFFFFFFFF // This is an offset into the cache which is out of range
3535
3636// Lockless behavior.
37- // 'data' is a Ptr<IOBufferData> with the lower 3 bits used for read marking.
38- // When reading the entry, readers increment the lowest 3 bits.
39- // When attempting to replace an entry, the writer will skip entries with readers .
37+ // 'data' is a Ptr<IOBufferData> with the lower 3 bits used for marking.
38+ // When accessing the entry the lowest 3 bits are incremented .
39+ // When attempting to replace an entry, the writer will skip marked entries .
4040
4141struct RamCacheLocklessLRUEntry {
4242 std::atomic<uint64_t > data;
43- std::atomic< uint64_t > auxkey;
43+ uint64_t auxkey;
4444 CryptoHash key;
4545};
4646
@@ -71,7 +71,6 @@ struct RamCacheLocklessLRU : public RamCache {
7171
7272 bool remove (int i, RamCacheLocklessLRUEntry *e);
7373 int64_t remove_one ();
74- int find_lru (RamCacheLocklessLRUTags *t);
7574 void get_bucket (int buckekt, RamCacheLocklessLRUTags **t, RamCacheLocklessLRUEntry **e);
7675};
7776
@@ -109,29 +108,29 @@ RamCacheLocklessLRU::init(int64_t abytes, Vol *avol)
109108}
110109
111110static uint64_t
112- increment_reader (std::atomic<uint64_t > *p)
111+ increment_mark (std::atomic<uint64_t > *p)
113112{
114113 uint64_t d;
115114 while (true ) {
116- uint64_t data = p->load ();
115+ uint64_t data = p->load (std::memory_order_relaxed );
117116 if ((data & LOCK) == LOCK) { // Max count, just spin.
118117 continue ;
119118 }
120119 d = data + 1 ;
121- if (p->compare_exchange_weak (data, d)) {
120+ if (p->compare_exchange_weak (data, d, std::memory_order_acquire, std::memory_order_relaxed )) {
122121 return d;
123122 }
124123 }
125124}
126125
127126static uint64_t
128- decrement_reader (std::atomic<uint64_t > *p)
127+ decrement_mark (std::atomic<uint64_t > *p)
129128{
130129 uint64_t d;
131130 while (true ) {
132- uint64_t data = p->load ();
131+ uint64_t data = p->load (std::memory_order_relaxed );
133132 d = data - 1 ;
134- if (p->compare_exchange_weak (data, d)) {
133+ if (p->compare_exchange_weak (data, d, std::memory_order_release, std::memory_order_relaxed )) {
135134 return d;
136135 }
137136 }
@@ -143,14 +142,15 @@ update_lru(int i, RamCacheLocklessLRUTags *t)
143142 while (true ) {
144143 uint64_t lru = t->lru .load (std::memory_order_relaxed); // Handled at a higher level.
145144 uint64_t new_lru = lru;
145+ int shift = i * 8 ;
146146 // Update the row so that there is a zero for i and 1 for all others.
147147 // Set row to all ones.
148- uint64_t m = 0xFF << i ;
148+ uint64_t m = 0xFFull << shift ;
149149 new_lru |= m;
150150 // Clear the column.
151- m = 0x11111111 << i ;
151+ m = 0x11111111ull << shift ;
152152 new_lru &= m;
153- if (t->lru .compare_exchange_weak (lru, new_lru)) {
153+ if (lru == new_lru || t->lru .compare_exchange_weak (lru, new_lru, std::memory_order_relaxed )) {
154154 return ;
155155 }
156156 }
@@ -162,12 +162,13 @@ update_tag(int i, RamCacheLocklessLRUTags *t, CryptoHash *key)
162162 while (true ) {
163163 uint64_t tags = t->tags .load (std::memory_order_relaxed); // Handled at a higher level.
164164 uint64_t new_tags = tags;
165- uint64_t m = 0xFF << i;
165+ int shift = i * 8 ;
166+ uint64_t m = 0xFFull << shift;
166167 new_tags &= ~m;
167168 uint8_t *entry_tag_bits = reinterpret_cast <uint8_t *>(key);
168- m = *entry_tag_bits << i ;
169+ m = static_cast < uint64_t >( *entry_tag_bits) << shift ;
169170 new_tags |= m;
170- if (t->lru .compare_exchange_weak (tags, new_tags)) {
171+ if (tags == new_tags || t->tags .compare_exchange_weak (tags, new_tags, std::memory_order_relaxed )) {
171172 return ;
172173 }
173174 }
@@ -191,88 +192,95 @@ RamCacheLocklessLRU::get(CryptoHash *key, Ptr<IOBufferData> *ret_data, uint64_t
191192 RamCacheLocklessLRUEntry *b;
192193 RamCacheLocklessLRUTags *t;
193194 get_bucket (key->slice32 (3 ) % nbuckets, &t, &b);
194- uint64_t tags = t->tags .load (std::memory_order_acquire );
195+ uint64_t tags = t->tags .load (std::memory_order_relaxed );
195196 for (int i = 0 ; i < ASSOCIATIVITY; i++) {
196197 uint8_t *tag_bits = reinterpret_cast <uint8_t *>(&tags);
197198 RamCacheLocklessLRUEntry *e = &b[i];
198199 uint8_t *entry_tag_bits = reinterpret_cast <uint8_t *>(&e->key );
199200 if (*entry_tag_bits != *tag_bits) {
200201 continue ;
201202 }
202- uint64_t d = increment_reader (&e->data );
203+ uint64_t d = increment_mark (&e->data );
203204 uint64_t dptr = d & ~LOCK;
204205 if (!dptr) { // Empty
205- decrement_reader (&e->data );
206+ decrement_mark (&e->data );
206207 continue ;
207208 }
208- if (e->key == *key && e->auxkey . load () == auxkey) {
209+ if (e->key == *key && e->auxkey == auxkey) {
209210 (*ret_data) = *reinterpret_cast <Ptr<IOBufferData> *>(&dptr);
210211 DDebug (" ram_cache" , " get %X %" PRIu64 " HIT" , key->slice32 (3 ), auxkey);
211212 CACHE_SUM_DYN_STAT_THREAD (cache_ram_cache_hits_stat, 1 );
212213 update_lru (i, t);
213- decrement_reader (&e->data );
214+ decrement_mark (&e->data );
214215 return 1 ;
215216 }
217+ decrement_mark (&e->data );
216218 }
217219 DDebug (" ram_cache" , " get %X %" PRIu64 " MISS" , key->slice32 (3 ), auxkey);
218220 CACHE_SUM_DYN_STAT_THREAD (cache_ram_cache_misses_stat, 1 );
219221 return 0 ;
220222}
221223
224+ // Succeeds if the entry is already empty or if it is removed.
222225bool
223- RamCacheLocklessLRU::remove (int i, RamCacheLocklessLRUEntry *b )
226+ RamCacheLocklessLRU::remove (int i, RamCacheLocklessLRUEntry *bucket )
224227{
228+ RamCacheLocklessLRUEntry *e = &bucket[i];
225229 uint64_t d;
226230 while (true ) {
227- uint64_t data = b ->data .load (std::memory_order_acquire);
231+ uint64_t data = e ->data .load (std::memory_order_acquire);
228232 if ((data & LOCK)) {
229233 return false ;
230234 }
231235 if ((data & ~LOCK) == 0 ) {
232236 return false ;
233237 }
234238 d = data + 1 ;
235- if (b ->data .compare_exchange_weak (data, d)) {
239+ if (e ->data .compare_exchange_weak (data, d)) {
236240 break ;
237241 }
238242 }
239- IOBufferData *block = reinterpret_cast <IOBufferData *>(data);
240- auto size = block->block_size ();
241- RamCacheLocklessLRUEntry *e = &b[i];
242- uint64_t new_data = 0 ;
243- if (!e->data .compare_exchange_strong (d, new_data)) {
244- decrement_reader (&e->data );
243+ IOBufferData *block = reinterpret_cast <IOBufferData *>(data);
244+ auto size = ENTRY_OVERHEAD + block->block_size ();
245+ uint64_t new_data = 1 ;
246+ if (!e->data .compare_exchange_strong (d, new_data, std::memory_order_relaxed, std::memory_order_relaxed)) {
247+ decrement_mark (&e->data );
245248 return false ;
246249 }
247- e->auxkey .store (ILLEGAL_AUXKEY);
250+ e->auxkey = ILLEGAL_AUXKEY;
251+ decrement_mark (&e->data );
252+
253+ block->refcount_dec ();
248254 bytes -= size;
255+ CACHE_SUM_DYN_STAT_THREAD (cache_ram_cache_bytes_stat, -size);
249256 objects--;
250257 return true ;
251258}
252259
253- int64_t
254- RamCacheLocklessLRU::remove_one ( )
260+ static int
261+ find_lru_victim (RamCacheLocklessLRUTags *t, RamCacheLocklessLRUEntry *b )
255262{
256- RamCacheLocklessLRUEntry *b;
257- RamCacheLocklessLRUTags *t;
258- int i;
259- while (true ) {
260- auto bucket = reclaim_sweep++;
261- get_bucket (bucket % nbuckets, &t, &b);
262- i = find_lru (t);
263- if (i < 0 ) {
263+ // Find the row with the most bits set.
264+ uint64_t lru = t->lru .load (std::memory_order_acquire);
265+ int i = -1 ;
266+ int max_p = -1 ;
267+ for (int j = 0 ; j < 8 ; j++) {
268+ if (!(b[j].data .load (std::memory_order_relaxed) & ~LOCK)) { // Skip empty entries.
264269 continue ;
265270 }
266- if (!remove (i, b)) {
267- continue ;
271+ uint64_t c = lru >> (8 * j);
272+ c &= 0xFF ;
273+ int p = std::__popcount (c);
274+ if (p > max_p) {
275+ i = j;
276+ max_p = p;
268277 }
269- break ;
270278 }
271- return bytes ;
279+ return i ;
272280}
273281
274- int
275- RamCacheLocklessLRU:: find_lru (RamCacheLocklessLRUTags *t)
282+ static int
283+ find_lru (RamCacheLocklessLRUTags *t)
276284{
277285 // Find the row with the most bits set.
278286 uint64_t lru = t->lru .load (std::memory_order_acquire);
@@ -290,6 +298,27 @@ RamCacheLocklessLRU::find_lru(RamCacheLocklessLRUTags *t)
290298 return i;
291299}
292300
301+ int64_t
302+ RamCacheLocklessLRU::remove_one ()
303+ {
304+ RamCacheLocklessLRUEntry *b;
305+ RamCacheLocklessLRUTags *t;
306+ int i;
307+ while (true ) {
308+ auto bucket = reclaim_sweep++;
309+ get_bucket (bucket % nbuckets, &t, &b);
310+ i = find_lru_victim (t, b);
311+ if (i < 0 ) {
312+ continue ;
313+ }
314+ if (!remove (i, b)) {
315+ continue ;
316+ }
317+ break ;
318+ }
319+ return bytes;
320+ }
321+
293322int
294323RamCacheLocklessLRU::put (CryptoHash *key, IOBufferData *data, uint32_t len, bool , uint64_t auxkey)
295324{
@@ -300,7 +329,7 @@ RamCacheLocklessLRU::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool
300329 RamCacheLocklessLRUEntry *b;
301330 RamCacheLocklessLRUTags *t;
302331 get_bucket (key->slice32 (3 ) % nbuckets, &t, &b);
303- uint64_t tags = t->tags .load (std::memory_order_acquire );
332+ uint64_t tags = t->tags .load (std::memory_order_relaxed );
304333 int empty = -1 ;
305334 for (int i = 0 ; i < ASSOCIATIVITY; i++) {
306335 uint8_t *tag_bits = reinterpret_cast <uint8_t *>(&tags);
@@ -309,19 +338,20 @@ RamCacheLocklessLRU::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool
309338 if (*entry_tag_bits != *tag_bits) {
310339 continue ;
311340 }
312- uint64_t d = increment_reader (&e->data );
341+ uint64_t d = increment_mark (&e->data );
313342 uint64_t dptr = d & ~LOCK;
314343 if (!dptr) { // Empty
315- decrement_reader (&e->data );
344+ decrement_mark (&e->data );
316345 empty = i;
317346 continue ;
318347 }
319348 if (e->key == *key && e->auxkey == auxkey) {
320- decrement_reader (&e->data );
349+ decrement_mark (&e->data );
321350 return 0 ;
322351 }
323- decrement_reader (&e->data );
352+ decrement_mark (&e->data );
324353 }
354+ // Not found.
325355
326356 // Free enough space.
327357 int size = ENTRY_OVERHEAD + data->block_size ();
@@ -331,39 +361,46 @@ RamCacheLocklessLRU::put(CryptoHash *key, IOBufferData *data, uint32_t len, bool
331361 bb = remove_one ();
332362 }
333363
364+ int free = -1 ;
334365 // Find a cache line.
335366 if (empty < 0 ) {
336- empty = find_lru (t);
337- if (empty < 0 ) {
367+ free = find_lru (t);
368+ if (free < 0 ) {
338369 bytes -= size;
339370 return 0 ;
340371 }
341372 }
342373
343374 // Remove current entry.
344- if (!remove (empty, b)) {
345- bytes -= size;
346- return 0 ;
375+ if (empty < 0 ) {
376+ if (!remove (free, b)) {
377+ bytes -= size;
378+ return 0 ;
379+ }
380+ empty = free;
347381 }
348382
349383 // Swap in new pointer.
350384 RamCacheLocklessLRUEntry *e = &b[empty];
351385 uint64_t d = 0 ;
352- auto new_data = reinterpret_cast <uint64_t >(data) + 1 ; // Mark reader.
353- if (!e->data .compare_exchange_strong (d, reinterpret_cast <uint64_t >(new_data))) {
386+ uint64_t new_data = reinterpret_cast <uint64_t >(data) + 1 ;
387+ data->refcount_inc ();
388+ if (!e->data .compare_exchange_strong (d, new_data, std::memory_order_relaxed)) {
389+ data->refcount_dec ();
354390 return 0 ;
355391 }
356392
357393 // Update the key and auxkey.
358- e->key = *key;
394+ e->key = *key;
395+ e->auxkey = auxkey;
396+
397+ decrement_mark (&e->data );
398+
359399 update_lru (empty, t);
360400 update_tag (empty, t, key);
361- e->auxkey .store (auxkey, std::memory_order_release);
362-
363- decrement_reader (&e->data );
364401
365402 objects++;
366- CACHE_SUM_DYN_STAT_THREAD (cache_ram_cache_bytes_stat, ENTRY_OVERHEAD + data-> block_size () );
403+ CACHE_SUM_DYN_STAT_THREAD (cache_ram_cache_bytes_stat, size );
367404 DDebug (" ram_cache" , " put %X %" PRIu64 " INSERTED" , key->slice32 (3 ), auxkey);
368405 return 1 ;
369406}
@@ -385,17 +422,18 @@ RamCacheLocklessLRU::fixup(const CryptoHash *key, uint64_t old_auxkey, uint64_t
385422 if (*entry_tag_bits != *tag_bits) {
386423 continue ;
387424 }
388- uint64_t d = increment_reader (&e->data );
425+ uint64_t d = increment_mark (&e->data );
389426 uint64_t dptr = d & ~LOCK;
390427 if (!dptr) { // Empty
391- decrement_reader (&e->data );
428+ decrement_mark (&e->data );
392429 continue ;
393430 }
394- if (e->key == *key && e->auxkey . load () == old_auxkey) {
395- decrement_reader (& e->data ) ;
396- e->auxkey . store (new_auxkey, std::memory_order_release );
431+ if (e->key == *key && e->auxkey == old_auxkey) {
432+ e->auxkey = new_auxkey ;
433+ decrement_mark (& e->data );
397434 return 1 ;
398435 }
436+ decrement_mark (&e->data );
399437 }
400438 return 0 ;
401439}
0 commit comments