diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java index 64d4c8d4dc511..4f9ac3eb99348 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java @@ -27,8 +27,7 @@ public List> getSettings() { SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING, SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING, SharedBlobCacheService.SHARED_CACHE_MMAP, - SharedBlobCacheService.SHARED_CACHE_COUNT_READS, - SharedBlobCacheService.SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING + SharedBlobCacheService.SHARED_CACHE_COUNT_READS ); } } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 4a202562e5e3d..d74ade1783fe2 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.RelativeByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; @@ -53,11 +52,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -98,13 +99,6 @@ public class SharedBlobCacheService implements Releasable { Setting.Property.NodeScope ); - public static final Setting SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING = Setting.intSetting( - SHARED_CACHE_SETTINGS_PREFIX + "concurrent_evictions", - 5, - 1, - Setting.Property.NodeScope - ); - private static Setting.Validator getPageSizeAlignedByteSizeValueValidator(String settingName) { return value -> { if (value.getBytes() == -1) { @@ -338,7 +332,7 @@ private CacheEntry(T chunk) { private final Runnable evictIncrementer; private final LongSupplier relativeTimeInNanosSupplier; - private final ThrottledTaskRunner asyncEvictionsRunner; + private final ExecutorService asyncEvictionsExecutor; public SharedBlobCacheService( NodeEnvironment environment, @@ -399,11 +393,7 @@ public SharedBlobCacheService( this.blobCacheMetrics = blobCacheMetrics; this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment; this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; - this.asyncEvictionsRunner = new ThrottledTaskRunner( - "shared_blob_cache_evictions", - SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING.get(settings), - threadPool.generic() - ); + this.asyncEvictionsExecutor = threadPool.generic(); } public static long calculateCacheSize(Settings settings, long totalFsSize) { @@ -1590,6 +1580,8 @@ void touch() { private final DecayAndNewEpochTask decayAndNewEpochTask; private final AtomicLong epoch = new AtomicLong(); + private final Queue> evictionQueue = new ConcurrentLinkedQueue<>(); + private final AtomicInteger evictionRunnerActive = new AtomicInteger(0); @SuppressWarnings("unchecked") LFUCache(Settings settings) { @@ -1671,22 +1663,20 @@ public int forceEvict(Predicate cacheKeyPredicate) { @Override public void forceEvictAsync(Predicate cacheKeyPredicate) { - asyncEvictionsRunner.enqueueTask(new ActionListener<>() { - @Override - public void onResponse(Releasable releasable) { - try (releasable) { - forceEvict(cacheKeyPredicate); - } - } + evictionQueue.add(cacheKeyPredicate); + startRunnerIfIdle(); + } - @Override - public void onFailure(Exception e) { - // should be impossible, GENERIC pool doesn't reject anything - final String message = "unexpected failure evicting from shared blob cache"; - logger.error(message, e); - assert false : new AssertionError(message, e); - } - }); + private void startRunnerIfIdle() { + if (evictionRunnerActive.getAndIncrement() == 0) { + asyncEvictionsExecutor.submit(() -> { + int evictionRunnerValue = -1; + while (evictionQueue.isEmpty() == false || evictionRunnerActive.compareAndSet(evictionRunnerValue, 0) == false) { + forceEvict(evictionQueue.poll()); + evictionRunnerValue = evictionRunnerActive.get(); + } + }); + } } private LFUCacheEntry initChunk(LFUCacheEntry entry) {