Skip to content

Commit 4c33d20

Browse files
Allow fast blob-cache introspection by shard-id (elastic#138282)
By changing the key-mapping to be two-layer, shard-id, then region-key, we allow introspection by shard-id to be much faster, exemplified in this PR through both removeFromCache and forceEvict(shardId, predicate) options being added. This can be utilized for more purposes like pushing down data for shards as it ages or moves out of the node - or removing data entirely when an index is deleted. It may also remove the need for the async force-evict mechanism.
1 parent f957556 commit 4c33d20

File tree

6 files changed

+311
-32
lines changed

6 files changed

+311
-32
lines changed

docs/changelog/138282.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138282
2+
summary: Allow fast blob-cache introspection by shard-id
3+
area: Searchable Snapshots
4+
type: enhancement
5+
issues: []
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.blobcache.shared;
9+
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
import java.util.function.BiConsumer;
13+
import java.util.function.Function;
14+
15+
/**
16+
* A 2 layer key mapping for the shared cache.
17+
* @param <Key1> The outer layer key type
18+
* @param <Key2> The inner key type
19+
* @param <Value> The value type
20+
*/
21+
class KeyMapping<Key1, Key2, Value> {
22+
private final ConcurrentHashMap<Key1, ConcurrentHashMap<Key2, Value>> mapping = new ConcurrentHashMap<>();
23+
24+
public Value get(Key1 key1, Key2 key2) {
25+
ConcurrentHashMap<Key2, Value> inner = mapping.get(key1);
26+
if (inner != null) {
27+
return inner.get(key2);
28+
} else {
29+
return null;
30+
}
31+
}
32+
33+
/**
34+
* Compute a key if absent. Notice that unlike CHM#computeIfAbsent, locking will be done also when present
35+
* @param key1 The key1 part
36+
* @param key2 The key2 part
37+
* @param function the function to get from key2 to the value
38+
* @return the resulting value.
39+
*/
40+
public Value computeIfAbsent(Key1 key1, Key2 key2, Function<Key2, Value> function) {
41+
AtomicReference<Value> result = new AtomicReference<>();
42+
mapping.compute(key1, (k, current) -> {
43+
ConcurrentHashMap<Key2, Value> map = current == null ? new ConcurrentHashMap<>() : current;
44+
result.setPlain(map.computeIfAbsent(key2, function));
45+
return map;
46+
});
47+
return result.getPlain();
48+
}
49+
50+
public boolean remove(Key1 key1, Key2 key2, Value value) {
51+
ConcurrentHashMap<Key2, Value> inner = mapping.get(key1);
52+
if (inner != null) {
53+
boolean removed = inner.remove(key2, value);
54+
if (removed && inner.isEmpty()) {
55+
mapping.computeIfPresent(key1, (k, v) -> v.isEmpty() ? null : v);
56+
}
57+
return removed;
58+
}
59+
return false;
60+
}
61+
62+
Iterable<Key1> key1s() {
63+
return mapping.keySet();
64+
}
65+
66+
void forEach(Key1 key1, BiConsumer<Key2, Value> consumer) {
67+
ConcurrentHashMap<Key2, Value> map = mapping.get(key1);
68+
if (map != null) {
69+
map.forEach(consumer);
70+
}
71+
}
72+
73+
void forEach(BiConsumer<Key2, Value> consumer) {
74+
for (ConcurrentHashMap<Key2, Value> map : mapping.values()) {
75+
map.forEach(consumer);
76+
}
77+
}
78+
}

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.core.TimeValue;
4040
import org.elasticsearch.env.Environment;
4141
import org.elasticsearch.env.NodeEnvironment;
42+
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.index.store.LuceneFilesExtensions;
4344
import org.elasticsearch.monitor.fs.FsProbe;
4445
import org.elasticsearch.node.NodeRoleSettings;
@@ -78,7 +79,11 @@
7879
/**
7980
* A caching layer on a local node to minimize network roundtrips to the remote blob store.
8081
*/
81-
public class SharedBlobCacheService<KeyType> implements Releasable {
82+
public class SharedBlobCacheService<KeyType extends SharedBlobCacheService.KeyBase> implements Releasable {
83+
84+
public interface KeyBase {
85+
ShardId shardId();
86+
}
8287

8388
private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache.";
8489

@@ -301,6 +306,8 @@ private interface Cache<K, T> extends Releasable {
301306
int forceEvict(Predicate<K> cacheKeyPredicate);
302307

303308
void forceEvictAsync(Predicate<K> cacheKey);
309+
310+
int forceEvict(ShardId shard, Predicate<K> cacheKeyPredicate);
304311
}
305312

306313
private abstract static class CacheEntry<T> {
@@ -759,7 +766,7 @@ public Stats getStats() {
759766
}
760767

761768
public void removeFromCache(KeyType cacheKey) {
762-
forceEvict(cacheKey::equals);
769+
forceEvict(cacheKey.shardId(), cacheKey::equals);
763770
}
764771

765772
/**
@@ -770,7 +777,10 @@ public void removeFromCache(KeyType cacheKey) {
770777
*/
771778
public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
772779
return cache.forceEvict(cacheKeyPredicate);
780+
}
773781

782+
public int forceEvict(ShardId shard, Predicate<KeyType> cacheKeyPredicate) {
783+
return cache.forceEvict(shard, cacheKeyPredicate);
774784
}
775785

776786
/**
@@ -867,7 +877,7 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f
867877
* always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen
868878
* (see {@link SharedBlobCacheService.LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
869879
*/
870-
static class CacheFileRegion<KeyType> extends EvictableRefCounted {
880+
static class CacheFileRegion<KeyType extends KeyBase> extends EvictableRefCounted {
871881

872882
private static final VarHandle VH_IO = findIOVarHandle();
873883

@@ -1634,7 +1644,7 @@ void touch() {
16341644
}
16351645
}
16361646

1637-
private final ConcurrentHashMap<RegionKey<KeyType>, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>();
1647+
private final KeyMapping<ShardId, RegionKey<KeyType>, LFUCacheEntry> keyMapping = new KeyMapping<>();
16381648
private final LFUCacheEntry[] freqs;
16391649
private final int maxFreq;
16401650
private final DecayAndNewEpochTask decayAndNewEpochTask;
@@ -1653,8 +1663,9 @@ public void close() {
16531663
decayAndNewEpochTask.close();
16541664
}
16551665

1666+
// used by tests
16561667
int getFreq(CacheFileRegion<KeyType> cacheFileRegion) {
1657-
return keyMapping.get(cacheFileRegion.regionKey).freq;
1668+
return keyMapping.get(cacheFileRegion.regionKey.file().shardId(), cacheFileRegion.regionKey).freq;
16581669
}
16591670

16601671
@Override
@@ -1663,10 +1674,11 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
16631674
final long now = epoch.get();
16641675
// try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
16651676
// if we did not find an entry
1666-
var entry = keyMapping.get(regionKey);
1677+
var entry = keyMapping.get(cacheKey.shardId(), regionKey);
16671678
if (entry == null) {
16681679
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
16691680
entry = keyMapping.computeIfAbsent(
1681+
cacheKey.shardId(),
16701682
regionKey,
16711683
key -> new LFUCacheEntry(new CacheFileRegion<KeyType>(SharedBlobCacheService.this, key, effectiveRegionSize), now)
16721684
);
@@ -1706,7 +1718,7 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
17061718
boolean evicted = entry.chunk.forceEvict();
17071719
if (evicted && entry.chunk.volatileIO() != null) {
17081720
unlink(entry);
1709-
keyMapping.remove(entry.chunk.regionKey, entry);
1721+
keyMapping.remove(entry.chunk.regionKey.file.shardId(), entry.chunk.regionKey, entry);
17101722
evictedCount++;
17111723
if (frequency > 0) {
17121724
nonZeroFrequencyEvictedCount++;
@@ -1719,6 +1731,10 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
17191731
return evictedCount;
17201732
}
17211733

1734+
private boolean removeKeyMappingForEntry(LFUCacheEntry entry) {
1735+
return keyMapping.remove(entry.chunk.regionKey.file().shardId(), entry.chunk.regionKey, entry);
1736+
}
1737+
17221738
@Override
17231739
public void forceEvictAsync(Predicate<KeyType> cacheKeyPredicate) {
17241740
asyncEvictionsRunner.enqueueTask(new ActionListener<>() {
@@ -1739,10 +1755,42 @@ public void onFailure(Exception e) {
17391755
});
17401756
}
17411757

1758+
@Override
1759+
public int forceEvict(ShardId shard, Predicate<KeyType> cacheKeyPredicate) {
1760+
final List<LFUCacheEntry> matchingEntries = new ArrayList<>();
1761+
keyMapping.forEach(shard, (key, entry) -> {
1762+
if (cacheKeyPredicate.test(key.file)) {
1763+
matchingEntries.add(entry);
1764+
}
1765+
});
1766+
1767+
var evictedCount = 0;
1768+
var nonZeroFrequencyEvictedCount = 0;
1769+
if (matchingEntries.isEmpty() == false) {
1770+
synchronized (SharedBlobCacheService.this) {
1771+
for (LFUCacheEntry entry : matchingEntries) {
1772+
int frequency = entry.freq;
1773+
boolean evicted = entry.chunk.forceEvict();
1774+
if (evicted && entry.chunk.volatileIO() != null) {
1775+
unlink(entry);
1776+
assert shard.equals(entry.chunk.regionKey.file.shardId());
1777+
keyMapping.remove(shard, entry.chunk.regionKey, entry);
1778+
evictedCount++;
1779+
if (frequency > 0) {
1780+
nonZeroFrequencyEvictedCount++;
1781+
}
1782+
}
1783+
}
1784+
}
1785+
}
1786+
blobCacheMetrics.getEvictedCountNonZeroFrequency().incrementBy(nonZeroFrequencyEvictedCount);
1787+
return evictedCount;
1788+
}
1789+
17421790
private LFUCacheEntry initChunk(LFUCacheEntry entry) {
17431791
assert Thread.holdsLock(entry.chunk);
17441792
RegionKey<KeyType> regionKey = entry.chunk.regionKey;
1745-
if (keyMapping.get(regionKey) != entry) {
1793+
if (keyMapping.get(regionKey.file().shardId(), regionKey) != entry) {
17461794
throwAlreadyClosed("no free region found (contender)");
17471795
}
17481796
// new item
@@ -1765,7 +1813,7 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) {
17651813
if (io != null) {
17661814
assignToSlot(entry, io);
17671815
} else {
1768-
boolean removed = keyMapping.remove(regionKey, entry);
1816+
boolean removed = removeKeyMappingForEntry(entry);
17691817
assert removed;
17701818
throwAlreadyClosed("no free region found");
17711819
}
@@ -1780,7 +1828,7 @@ private void assignToSlot(LFUCacheEntry entry, SharedBytes.IO freeSlot) {
17801828
if (entry.chunk.isEvicted()) {
17811829
assert regionOwners.remove(freeSlot) == entry.chunk;
17821830
freeRegions.add(freeSlot);
1783-
keyMapping.remove(entry.chunk.regionKey, entry);
1831+
removeKeyMappingForEntry(entry);
17841832
throwAlreadyClosed("evicted during free region allocation");
17851833
}
17861834
pushEntryToBack(entry);
@@ -1985,7 +2033,7 @@ private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotificatio
19852033
}
19862034
} finally {
19872035
unlink(entry);
1988-
keyMapping.remove(entry.chunk.regionKey, entry);
2036+
removeKeyMappingForEntry(entry);
19892037
}
19902038
}
19912039
} finally {
@@ -2020,7 +2068,7 @@ public boolean maybeEvictLeastUsed() {
20202068
boolean evicted = entry.chunk.tryEvict();
20212069
if (evicted && entry.chunk.volatileIO() != null) {
20222070
unlink(entry);
2023-
keyMapping.remove(entry.chunk.regionKey, entry);
2071+
removeKeyMappingForEntry(entry);
20242072
return true;
20252073
}
20262074
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.blobcache.shared;
9+
10+
import org.elasticsearch.test.ESTestCase;
11+
12+
import java.util.HashSet;
13+
import java.util.List;
14+
import java.util.Set;
15+
import java.util.stream.IntStream;
16+
17+
public class KeyMappingTests extends ESTestCase {
18+
19+
public void testBasics() {
20+
final String k1 = randomAlphanumericOfLength(10);
21+
final String k2 = randomAlphanumericOfLength(10);
22+
final String value = randomAlphanumericOfLength(10);
23+
KeyMapping<String, String, String> mapping = new KeyMapping<>();
24+
assertNull(mapping.get(k1, k2));
25+
26+
assertEquals(value, mapping.computeIfAbsent(k1, k2, (kx) -> value));
27+
assertEquals(value, mapping.get(k1, k2));
28+
29+
mapping.computeIfAbsent(k1, k2, (kx) -> { throw new AssertionError(); });
30+
31+
assertEquals(value, mapping.get(k1, k2));
32+
33+
final String k12 = randomValueOtherThan(k1, () -> randomAlphanumericOfLength(10));
34+
mapping.computeIfAbsent(k12, k2, (kx) -> randomAlphanumericOfLength(10));
35+
36+
assertEquals(value, mapping.get(k1, k2));
37+
38+
assertEquals(Set.of(k1, k12), mapping.key1s());
39+
40+
Set<String> values = new HashSet<>();
41+
mapping.forEach(k1, (ak2, result) -> { assertTrue(values.add(result)); });
42+
assertEquals(Set.of(value), values);
43+
44+
assertTrue(mapping.remove(k1, k2, value));
45+
46+
assertEquals(Set.of(k12), mapping.key1s());
47+
48+
assertNull(mapping.get(k1, k2));
49+
assertNotNull(mapping.get(k12, k2));
50+
51+
assertFalse(mapping.remove(k1, k2, value));
52+
}
53+
54+
public void testMultiThreaded() {
55+
final String k1 = randomAlphanumericOfLength(10);
56+
KeyMapping<String, String, Integer> mapping = new KeyMapping<>();
57+
58+
List<Thread> threads = IntStream.range(0, 10).mapToObj(i -> new Thread(() -> {
59+
final String k2 = Integer.toString(i);
60+
logger.info(k2);
61+
62+
for (int j = 0; j < 1000; ++j) {
63+
Integer finalJ = j;
64+
assertNull(mapping.get(k1, k2));
65+
assertSame(finalJ, mapping.computeIfAbsent(k1, k2, (kx) -> finalJ));
66+
assertEquals(finalJ, mapping.get(k1, k2));
67+
assertTrue(mapping.remove(k1, k2, finalJ));
68+
if ((j & 1) == 0) {
69+
assertFalse(mapping.remove(k1, k2, finalJ));
70+
}
71+
72+
}
73+
assertNull(mapping.get(k1, k2));
74+
}, "test-thread-" + i)).toList();
75+
76+
threads.forEach(Thread::start);
77+
threads.forEach(t -> {
78+
try {
79+
t.join(10000);
80+
} catch (InterruptedException e) {
81+
throw new RuntimeException(e);
82+
}
83+
});
84+
85+
assertEquals(Set.of(), mapping.key1s());
86+
}
87+
88+
public void testMultiThreadedSameKey() {
89+
final String k1 = randomAlphanumericOfLength(10);
90+
KeyMapping<String, String, Integer> mapping = new KeyMapping<>();
91+
92+
List<Thread> threads = IntStream.range(0, 10).mapToObj(i -> new Thread(() -> {
93+
for (int j = 0; j < 1000; ++j) {
94+
Integer computeValue = i * 1000 + j;
95+
Integer value = mapping.computeIfAbsent(k1, k1, (kx) -> computeValue);
96+
assertNotNull(value);
97+
// either our value or another threads value.
98+
assertTrue(value == computeValue || value / 1000 != i);
99+
mapping.remove(k1, k1, value);
100+
}
101+
})).toList();
102+
threads.forEach(Thread::start);
103+
threads.forEach(t -> {
104+
try {
105+
t.join(10000);
106+
} catch (InterruptedException e) {
107+
throw new RuntimeException(e);
108+
}
109+
});
110+
111+
assertEquals(Set.of(), mapping.key1s());
112+
}
113+
}

0 commit comments

Comments
 (0)