3939import org .elasticsearch .core .TimeValue ;
4040import org .elasticsearch .env .Environment ;
4141import org .elasticsearch .env .NodeEnvironment ;
42+ import org .elasticsearch .index .shard .ShardId ;
4243import org .elasticsearch .index .store .LuceneFilesExtensions ;
4344import org .elasticsearch .monitor .fs .FsProbe ;
4445import org .elasticsearch .node .NodeRoleSettings ;
7879/**
7980 * A caching layer on a local node to minimize network roundtrips to the remote blob store.
8081 */
81- public class SharedBlobCacheService <KeyType > implements Releasable {
82+ public class SharedBlobCacheService <KeyType extends SharedBlobCacheService .KeyBase > implements Releasable {
83+
84+ public interface KeyBase {
85+ ShardId shardId ();
86+ }
8287
8388 private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache." ;
8489
@@ -301,6 +306,8 @@ private interface Cache<K, T> extends Releasable {
301306 int forceEvict (Predicate <K > cacheKeyPredicate );
302307
303308 void forceEvictAsync (Predicate <K > cacheKey );
309+
310+ int forceEvict (ShardId shard , Predicate <K > cacheKeyPredicate );
304311 }
305312
306313 private abstract static class CacheEntry <T > {
@@ -759,7 +766,7 @@ public Stats getStats() {
759766 }
760767
761768 public void removeFromCache (KeyType cacheKey ) {
762- forceEvict (cacheKey ::equals );
769+ forceEvict (cacheKey . shardId (), cacheKey ::equals );
763770 }
764771
765772 /**
@@ -770,7 +777,10 @@ public void removeFromCache(KeyType cacheKey) {
770777 */
771778 public int forceEvict (Predicate <KeyType > cacheKeyPredicate ) {
772779 return cache .forceEvict (cacheKeyPredicate );
780+ }
773781
782+ public int forceEvict (ShardId shard , Predicate <KeyType > cacheKeyPredicate ) {
783+ return cache .forceEvict (shard , cacheKeyPredicate );
774784 }
775785
776786 /**
@@ -867,7 +877,7 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f
867877 * always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen
868878 * (see {@link SharedBlobCacheService.LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
869879 */
870- static class CacheFileRegion <KeyType > extends EvictableRefCounted {
880+ static class CacheFileRegion <KeyType extends KeyBase > extends EvictableRefCounted {
871881
872882 private static final VarHandle VH_IO = findIOVarHandle ();
873883
@@ -1634,7 +1644,9 @@ void touch() {
16341644 }
16351645 }
16361646
1637- private final ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry > keyMapping = new ConcurrentHashMap <>();
1647+ // only put/remove through computeXXX to ensure we do not lose the data.
1648+ private final ConcurrentHashMap <ShardId , ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry >> keyMapping =
1649+ new ConcurrentHashMap <>();
16381650 private final LFUCacheEntry [] freqs ;
16391651 private final int maxFreq ;
16401652 private final DecayAndNewEpochTask decayAndNewEpochTask ;
@@ -1654,7 +1666,7 @@ public void close() {
16541666 }
16551667
16561668 int getFreq (CacheFileRegion <KeyType > cacheFileRegion ) {
1657- return keyMapping .get (cacheFileRegion .regionKey ).freq ;
1669+ return keyMapping .get (cacheFileRegion .regionKey . file (). shardId ()). get ( cacheFileRegion . regionKey ).freq ;
16581670 }
16591671
16601672 @ Override
@@ -1663,10 +1675,11 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
16631675 final long now = epoch .get ();
16641676 // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
16651677 // if we did not find an entry
1666- var entry = keyMapping .get (regionKey );
1678+ var perShardMapping = keyMapping .computeIfAbsent (cacheKey .shardId (), key -> new ConcurrentHashMap <>());
1679+ var entry = perShardMapping .get (regionKey );
16671680 if (entry == null ) {
16681681 final int effectiveRegionSize = computeCacheFileRegionSize (fileLength , region );
1669- entry = keyMapping .computeIfAbsent (
1682+ entry = perShardMapping .computeIfAbsent (
16701683 regionKey ,
16711684 key -> new LFUCacheEntry (new CacheFileRegion <KeyType >(SharedBlobCacheService .this , key , effectiveRegionSize ), now )
16721685 );
@@ -1692,10 +1705,12 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
16921705 @ Override
16931706 public int forceEvict (Predicate <KeyType > cacheKeyPredicate ) {
16941707 final List <LFUCacheEntry > matchingEntries = new ArrayList <>();
1695- keyMapping .forEach ((key , value ) -> {
1696- if (cacheKeyPredicate .test (key .file )) {
1697- matchingEntries .add (value );
1698- }
1708+ keyMapping .forEach ((shard , value ) -> {
1709+ value .forEach ((key , entry ) -> {
1710+ if (cacheKeyPredicate .test (key .file )) {
1711+ matchingEntries .add (entry );
1712+ }
1713+ });
16991714 });
17001715 var evictedCount = 0 ;
17011716 var nonZeroFrequencyEvictedCount = 0 ;
@@ -1706,7 +1721,9 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
17061721 boolean evicted = entry .chunk .forceEvict ();
17071722 if (evicted && entry .chunk .volatileIO () != null ) {
17081723 unlink (entry );
1709- keyMapping .remove (entry .chunk .regionKey , entry );
1724+ // todo: can this be null? Should not, need to assert.
1725+ ShardId shard = entry .chunk .regionKey .file .shardId ();
1726+ removeKeyMappingForEntry (entry , shard );
17101727 evictedCount ++;
17111728 if (frequency > 0 ) {
17121729 nonZeroFrequencyEvictedCount ++;
@@ -1719,6 +1736,29 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
17191736 return evictedCount ;
17201737 }
17211738
1739+ private boolean removeKeyMappingForEntry (LFUCacheEntry entry ) {
1740+ return removeKeyMappingForEntry (entry , entry .chunk .regionKey .file ().shardId ());
1741+ }
1742+
1743+ private boolean removeKeyMappingForEntry (LFUCacheEntry entry , ShardId shard ) {
1744+ ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry > map = keyMapping .get (shard );
1745+ if (map != null ) {
1746+ boolean removed = map .remove (entry .chunk .regionKey , entry );
1747+ if (map .isEmpty ()) {
1748+ keyMapping .computeIfPresent (shard , (shard1 , entries ) -> {
1749+ if (entries .isEmpty ()) {
1750+ return null ;
1751+ } else {
1752+ return entries ;
1753+ }
1754+ });
1755+ }
1756+ return removed ;
1757+ } else {
1758+ return false ;
1759+ }
1760+ }
1761+
17221762 @ Override
17231763 public void forceEvictAsync (Predicate <KeyType > cacheKeyPredicate ) {
17241764 asyncEvictionsRunner .enqueueTask (new ActionListener <>() {
@@ -1739,10 +1779,58 @@ public void onFailure(Exception e) {
17391779 });
17401780 }
17411781
1782+ @ Override
1783+ public int forceEvict (ShardId shard , Predicate <KeyType > cacheKeyPredicate ) {
1784+ final List <LFUCacheEntry > matchingEntries = new ArrayList <>();
1785+ ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry > entries = keyMapping .get (shard );
1786+ if (entries != null ) {
1787+ entries .forEach ((key , entry ) -> {
1788+ if (cacheKeyPredicate .test (key .file )) {
1789+ matchingEntries .add (entry );
1790+ }
1791+ });
1792+ }
1793+
1794+ var evictedCount = 0 ;
1795+ var nonZeroFrequencyEvictedCount = 0 ;
1796+ if (matchingEntries .isEmpty () == false ) {
1797+ // todo: can this be null? Should not, need to assert.
1798+ ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry > map = keyMapping .get (shard );
1799+ synchronized (SharedBlobCacheService .this ) {
1800+ for (LFUCacheEntry entry : matchingEntries ) {
1801+ int frequency = entry .freq ;
1802+ boolean evicted = entry .chunk .forceEvict ();
1803+ if (evicted && entry .chunk .volatileIO () != null ) {
1804+ unlink (entry );
1805+ if (map != null ) {
1806+ map .remove (entry .chunk .regionKey , entry );
1807+ }
1808+ evictedCount ++;
1809+ if (frequency > 0 ) {
1810+ nonZeroFrequencyEvictedCount ++;
1811+ }
1812+ }
1813+ }
1814+ }
1815+ if (map != null && map .isEmpty ()) {
1816+ keyMapping .computeIfPresent (shard , (shard1 , entries1 ) -> {
1817+ if (entries1 .isEmpty ()) {
1818+ return null ;
1819+ } else {
1820+ return entries1 ;
1821+ }
1822+ });
1823+ }
1824+ }
1825+ blobCacheMetrics .getEvictedCountNonZeroFrequency ().incrementBy (nonZeroFrequencyEvictedCount );
1826+ return evictedCount ;
1827+ }
1828+
17421829 private LFUCacheEntry initChunk (LFUCacheEntry entry ) {
17431830 assert Thread .holdsLock (entry .chunk );
17441831 RegionKey <KeyType > regionKey = entry .chunk .regionKey ;
1745- if (keyMapping .get (regionKey ) != entry ) {
1832+ ConcurrentHashMap <RegionKey <KeyType >, LFUCacheEntry > perShardMapping = keyMapping .get (regionKey .file ().shardId ());
1833+ if (perShardMapping == null || perShardMapping .get (regionKey ) != entry ) {
17461834 throwAlreadyClosed ("no free region found (contender)" );
17471835 }
17481836 // new item
@@ -1765,7 +1853,7 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) {
17651853 if (io != null ) {
17661854 assignToSlot (entry , io );
17671855 } else {
1768- boolean removed = keyMapping . remove ( regionKey , entry );
1856+ boolean removed = removeKeyMappingForEntry ( entry );
17691857 assert removed ;
17701858 throwAlreadyClosed ("no free region found" );
17711859 }
@@ -1780,7 +1868,7 @@ private void assignToSlot(LFUCacheEntry entry, SharedBytes.IO freeSlot) {
17801868 if (entry .chunk .isEvicted ()) {
17811869 assert regionOwners .remove (freeSlot ) == entry .chunk ;
17821870 freeRegions .add (freeSlot );
1783- keyMapping . remove ( entry . chunk . regionKey , entry );
1871+ removeKeyMappingForEntry ( entry );
17841872 throwAlreadyClosed ("evicted during free region allocation" );
17851873 }
17861874 pushEntryToBack (entry );
@@ -1985,7 +2073,7 @@ private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotificatio
19852073 }
19862074 } finally {
19872075 unlink (entry );
1988- keyMapping . remove ( entry . chunk . regionKey , entry );
2076+ removeKeyMappingForEntry ( entry );
19892077 }
19902078 }
19912079 } finally {
@@ -2020,7 +2108,7 @@ public boolean maybeEvictLeastUsed() {
20202108 boolean evicted = entry .chunk .tryEvict ();
20212109 if (evicted && entry .chunk .volatileIO () != null ) {
20222110 unlink (entry );
2023- keyMapping . remove ( entry . chunk . regionKey , entry );
2111+ removeKeyMappingForEntry ( entry );
20242112 return true ;
20252113 }
20262114 }
0 commit comments