Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c86859c
Evict from the shared blob cache asynchronously
nicktindall Apr 10, 2025
c143dba
Only evict from shared cache when index is partial (SharedSnapshotInd…
nicktindall Apr 10, 2025
a62ac00
Update docs/changelog/126581.yaml
nicktindall Apr 10, 2025
6ae1e7c
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall Apr 10, 2025
ff3a25d
Fix changelog
nicktindall Apr 10, 2025
2ef16c9
Update x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobc…
nicktindall Apr 14, 2025
d3ce506
Fix indenting
nicktindall Apr 14, 2025
bd35686
evictionsRunner -> asyncEvictionsRunner
nicktindall Apr 14, 2025
83c1bda
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall Apr 15, 2025
632afbc
Only evict asynchronously for shards we know are not coming back
nicktindall Apr 14, 2025
3274c1c
Merge remote-tracking branch 'origin/main' into evict_from_the_shared…
nicktindall Apr 15, 2025
253dba1
Merge remote-tracking branch 'origin/main' into evict_from_the_shared…
nicktindall Apr 22, 2025
f035f25
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall Apr 23, 2025
7ac3220
Propagate IndexRemovalReason to deletion listeners
nicktindall Apr 23, 2025
8e18644
Fix naming (reasonMessage/reason)
nicktindall Apr 23, 2025
410fb35
Fix naming (reasonText/reason)
nicktindall Apr 23, 2025
2372056
Naming
nicktindall Apr 23, 2025
8c91b45
[CI] Auto commit changes from spotless
Apr 23, 2025
87d1ba4
Naming/javadoc
nicktindall Apr 23, 2025
ea43b2d
randomReason()
nicktindall Apr 23, 2025
c6e7a05
Don't evict shards when IndexRemovalReason is FAILURE
nicktindall Apr 23, 2025
7eebc42
javadoc/naming
nicktindall Apr 23, 2025
250df4c
Merge remote-tracking branch 'origin/main' into evict_from_the_shared…
nicktindall May 20, 2025
d3cd806
Make IndexRemovalReason a top-level enum for sharing
nicktindall May 21, 2025
5eabc0f
Fix eviction logic
nicktindall May 21, 2025
53ad877
Comment
nicktindall May 21, 2025
fa17c66
Fix eviction logic
nicktindall May 21, 2025
69e748f
Improve change summary
nicktindall May 21, 2025
f498216
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall May 21, 2025
185b390
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall May 22, 2025
c2f3b0f
Add tests
nicktindall May 23, 2025
63f4b1f
Work with any number of nodes
nicktindall May 23, 2025
cd09f7d
Randomise number of docs
nicktindall May 23, 2025
69eb2e5
Merge branch 'main' into evict_from_the_shared_blob_cache_asynchronously
nicktindall May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126581.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126581
summary: Evict from the shared blob cache asynchronously
area: Searchable Snapshots
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public List<Setting<?>> getSettings() {
SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING,
SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING,
SharedBlobCacheService.SHARED_CACHE_MMAP,
SharedBlobCacheService.SHARED_CACHE_COUNT_READS
SharedBlobCacheService.SHARED_CACHE_COUNT_READS,
SharedBlobCacheService.SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -97,6 +98,13 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
Setting.Property.NodeScope
);

public static final Setting<Integer> SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING = Setting.intSetting(
SHARED_CACHE_SETTINGS_PREFIX + "concurrent_evictions",
5,
1,
Setting.Property.NodeScope
);

private static Setting.Validator<ByteSizeValue> getPageSizeAlignedByteSizeValueValidator(String settingName) {
return value -> {
if (value.getBytes() == -1) {
Expand Down Expand Up @@ -283,6 +291,8 @@ private interface Cache<K, T> extends Releasable {
CacheEntry<T> get(K cacheKey, long fileLength, int region);

int forceEvict(Predicate<K> cacheKeyPredicate);

void forceEvictAsync(Predicate<K> cacheKey);
}

private abstract static class CacheEntry<T> {
Expand Down Expand Up @@ -328,6 +338,7 @@ private CacheEntry(T chunk) {
private final Runnable evictIncrementer;

private final LongSupplier relativeTimeInNanosSupplier;
private final ThrottledTaskRunner evictionsRunner;

public SharedBlobCacheService(
NodeEnvironment environment,
Expand Down Expand Up @@ -388,6 +399,11 @@ public SharedBlobCacheService(
this.blobCacheMetrics = blobCacheMetrics;
this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment;
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
this.evictionsRunner = new ThrottledTaskRunner(
"shared_blob_cache_evictions",
SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING.get(settings),
threadPool.generic()
);
}

public static long calculateCacheSize(Settings settings, long totalFsSize) {
Expand Down Expand Up @@ -704,6 +720,15 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {

}

/**
* Evict entries from the cache that match the given predicate asynchronously
*
* @param cacheKeyPredicate
*/
public void forceEvictAsync(Predicate<KeyType> cacheKeyPredicate) {
cache.forceEvictAsync(cacheKeyPredicate);
}

// used by tests
int getFreq(CacheFileRegion<KeyType> cacheFileRegion) {
if (cache instanceof LFUCache lfuCache) {
Expand Down Expand Up @@ -1606,6 +1631,24 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
return evictedCount;
}

@Override
public void forceEvictAsync(Predicate<KeyType> cacheKeyPredicate) {
evictionsRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
forceEvict(cacheKeyPredicate);
}

@Override
public void onFailure(Exception e) {
// should be impossible, GENERIC pool doesn't reject anything
final String message = "unexpected failure evicting from shared blob cache";
logger.error(message, e);
assert false : new AssertionError(message, e);
}
});
}

private LFUCacheEntry initChunk(LFUCacheEntry entry) {
assert Thread.holdsLock(entry.chunk);
RegionKey<KeyType> regionKey = entry.chunk.regionKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,44 @@ public void testForceEvictResponse() throws IOException {
}
}

public void testAsynchronousEviction() throws Exception {
Settings settings = Settings.builder()
.put(NODE_NAME_SETTING.getKey(), "node")
.put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(500)).getStringRep())
.put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(100)).getStringRep())
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
environment,
settings,
taskQueue.getThreadPool(),
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
BlobCacheMetrics.NOOP
)
) {
final var cacheKey1 = generateCacheKey();
final var cacheKey2 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
final var region0 = cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.freeRegionCount());
final var region1 = cacheService.get(cacheKey2, size(250), 1);
assertEquals(3, cacheService.freeRegionCount());
assertFalse(region0.isEvicted());
assertFalse(region1.isEvicted());
cacheService.forceEvictAsync(ck -> ck == cacheKey1);
assertFalse(region0.isEvicted());
assertFalse(region1.isEvicted());
// run the async task
taskQueue.runAllRunnableTasks();
assertTrue(region0.isEvicted());
assertFalse(region1.isEvicted());
assertEquals(4, cacheService.freeRegionCount());
}
}

public void testDecay() throws IOException {
// we have 8 regions
Settings settings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea
shardId
);
}
if (sharedBlobCacheService != null) {
sharedBlobCacheService.forceEvict(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings()));
if (indexSettings.getIndexMetadata().isPartialSearchableSnapshot() && sharedBlobCacheService != null) {
sharedBlobCacheService.forceEvictAsync(
SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings())
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ private void markShardAsEvictedInCache(ShardId shardId, IndexSettings indexSetti
shardId
);

final SharedBlobCacheService<CacheKey> sharedBlobCacheService = this.frozenCacheServiceSupplier.get();
assert sharedBlobCacheService != null : "frozen cache service not initialized";
sharedBlobCacheService.forceEvict(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings()));
// Only partial searchable snapshots use the SharedBlobCacheService
if (indexSettings.getIndexMetadata().isPartialSearchableSnapshot()) {
final SharedBlobCacheService<CacheKey> sharedBlobCacheService =
SearchableSnapshotIndexFoldersDeletionListener.this.frozenCacheServiceSupplier.get();
assert sharedBlobCacheService != null : "frozen cache service not initialized";
sharedBlobCacheService.forceEvictAsync(SearchableSnapshots.forceEvictPredicate(shardId, indexSettings.getSettings()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't remember all the decisions around evicting cache entries for partially mounted shards 😞

I suspect that we made it this way to allow cache regions to be reused sooner without waiting for them to decay. It was also useful at the beginning when some corrupted data got written in the cache for some reason, as the forced eviction would clear the mess for us.

But besides this, for partially mounted shards I don't see much reason to force the eviction of cache entries vs. just let them expire in cache. And if the shard is quickly relocated them reassigned to the same node, I think there is a risk that the async force eviction now runs concurrently with a shard recovery?

So maybe we could only force-evict asynchronously when the shard is deleted or failed, and let cache entries in cache if it's no longer assigned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That sounds reasonable. It's easy to implement in SearchableSnapshotIndexEventListener#beforeIndexRemoved because we have the IndexRemovalReason. It's a bit trickier in the SearchableSnapshotIndexFoldersDeletionListener#before(Index|Shard)FoldersDeleted because we lack that context, I'll trace back to where those deletions originate to see if there's an obvious way to propagate that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a go at propagating the reason for the deletion to the listeners. This allows the listener to trigger the asynchronous eviction when we know the shards/indices aren't coming back (i.e. only on DELETE). It meant changes in a few places.

  • I used the IndexRemovalReason to communicate the reason for deletion. I don't like borrowing that from an unrelated interface but we did already have it in scope in some of these places. If we think it's right to use it I could break it out to be a top-level enum rather than being under IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.
  • There are some places that now take a reasonText and an IndexRemovalReason we could get rid of the reason text if we don't feel it's adding anything, but it would mean some log messages would change. It sometimes seems to offer more context, for example the text is different when the IndexService executes a pending delete vs when it succeeds on the first attempt, also delete unassigned index specifies that the index is being deleted despite it not being assigned to the local node.
  • I think the only time it's safe to schedule an asynchronous delete is on an IndexRemovalReason.DELETE. I don't think FAILURE is appropriate, because I assume we could retry after one of those? I don't have the context to make this call I don't think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nick.

  • If we think it's right to use it I could break it out to be a top-level enum rather than being under IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.

That makes sense.

  • There are some places that now take a reasonText and an IndexRemovalReason

Thanks for having kept the reason as text. It's provides a bit more context and people are also used to search them in logs.

  • I think the only time it's safe to schedule an asynchronous delete is on an IndexRemovalReason.DELETE. I don't think FAILURE is appropriate, because I assume we could retry after one of those?

Yes,it is possible that the failed shard got reassigned on the same node after it failed. But in that case, we don't really know the cause of the failure and it would be preferable to synchronously evict the cache I think. It makes sure that cached data are cleaned up so that retries will fetch them again from the source of truth (in the case the cached data are the cause of the failure if we were not evicting them then the shard would have no chance to recover ever).

It goes against the purpose of this PR but shard failures should be the exception so I think keeping the synchronous eviction is OK for failures.

Copy link
Contributor Author

@nicktindall nicktindall May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved IndexRemovalReason to be a top-level enum in the org.elasticsearch.indices.cluster package. I wasn't sure if this was the best location for it, but I think it has most meaning in the org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices interface where it came from, so I left it close to there.

I changed the logic to

  • evict DELETED shards/indices asynchronously
  • evict FAILED shards/indices synchronously
  • leave everything else to age out of the cache

I'll investigate what testing might be appropriate

}
}
}
Loading