Skip to content

Commit fa52364

Browse files
Minor Adjustments to make SharedBlobCacheService more generic (#93234)
Some naming adjustments, the ability to set up the cache on a non-frozen search node and a small API adjustment that makes it so a `ThreadPool` is not a dependency for using the cache.
1 parent 43c8806 commit fa52364

File tree

3 files changed

+13
-14
lines changed

3 files changed

+13
-14
lines changed

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.concurrent.atomic.AtomicReference;
5454
import java.util.concurrent.atomic.LongAdder;
5555
import java.util.function.Consumer;
56-
import java.util.function.LongSupplier;
5756
import java.util.function.Predicate;
5857
import java.util.stream.Collectors;
5958

@@ -130,7 +129,8 @@ public void validate(final RelativeByteSizeValue value, final Map<Setting<?>, Ob
130129
if (value.isNonZeroSize()) {
131130
@SuppressWarnings("unchecked")
132131
final List<DiscoveryNodeRole> roles = (List<DiscoveryNodeRole>) settings.get(NodeRoleSettings.NODE_ROLES_SETTING);
133-
if (DataTier.isFrozenNode(Set.of(roles.toArray(DiscoveryNodeRole[]::new))) == false) {
132+
final var rolesSet = Set.copyOf(roles);
133+
if (DataTier.isFrozenNode(rolesSet) == false && rolesSet.contains(DiscoveryNodeRole.SEARCH_ROLE) == false) {
134134
throw new SettingsException(
135135
"setting [{}] to be positive [{}] is only permitted on nodes with the data_frozen role, roles are [{}]",
136136
SHARED_CACHE_SETTINGS_PREFIX + "size",
@@ -229,8 +229,7 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
229229
private static final Logger logger = LogManager.getLogger(SharedBlobCacheService.class);
230230

231231
private final ConcurrentHashMap<RegionKey<KeyType>, Entry<CacheFileRegion>> keyMapping;
232-
233-
private final LongSupplier currentTimeSupplier;
232+
private final ThreadPool threadPool;
234233

235234
private final KeyedLock<KeyType> keyedLock = new KeyedLock<>();
236235

@@ -260,7 +259,7 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
260259

261260
@SuppressWarnings({ "unchecked", "rawtypes" })
262261
public SharedBlobCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool) {
263-
this.currentTimeSupplier = threadPool::relativeTimeInMillis;
262+
this.threadPool = threadPool;
264263
long totalFsSize;
265264
try {
266265
totalFsSize = FsProbe.getTotal(Environment.getFileStore(environment.nodeDataPaths()[0]));
@@ -372,7 +371,7 @@ public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
372371
final long effectiveRegionSize = getRegionSize(fileLength, region);
373372
try (Releasable ignore = keyedLock.acquire(cacheKey)) {
374373
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
375-
final long now = currentTimeSupplier.getAsLong();
374+
final long now = threadPool.relativeTimeInMillis();
376375
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
377376
regionKey,
378377
key -> new Entry<>(new CacheFileRegion(key, effectiveRegionSize), now)
@@ -543,7 +542,7 @@ private void unlink(final Entry<CacheFileRegion> entry) {
543542

544543
private void computeDecay() {
545544
synchronized (this) {
546-
long now = currentTimeSupplier.getAsLong();
545+
long now = threadPool.relativeTimeInMillis();
547546
for (int i = 0; i < maxFreq; i++) {
548547
for (Entry<CacheFileRegion> entry = freqs[i]; entry != null; entry = entry.next) {
549548
if (now - entry.lastAccessed >= 2 * minTimeDelta) {
@@ -616,7 +615,7 @@ protected String getThreadPool() {
616615

617616
@Override
618617
public String toString() {
619-
return "frozen_cache_decay_task";
618+
return "shared_cache_decay_task";
620619
}
621620
}
622621

@@ -830,7 +829,7 @@ public int populateAndRead(
830829
final ByteRange rangeToRead,
831830
final RangeAvailableHandler reader,
832831
final RangeMissingHandler writer,
833-
final Executor executor
832+
final String executor
834833
) throws Exception {
835834
StepListener<Integer> stepListener = null;
836835
final long writeStart = rangeToWrite.start();
@@ -863,7 +862,7 @@ public int populateAndRead(
863862
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
864863
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
865864
},
866-
executor
865+
threadPool.executor(executor)
867866
);
868867
assert lis != null;
869868
if (stepListener == null) {
@@ -878,11 +877,11 @@ public int populateAndRead(
878877

879878
@Override
880879
public String toString() {
881-
return "FrozenCacheFile{" + "cacheKey=" + cacheKey + ", length=" + length + '}';
880+
return "SharedCacheFile{" + "cacheKey=" + cacheKey + ", length=" + length + '}';
882881
}
883882
}
884883

885-
public CacheFile getFrozenCacheFile(KeyType cacheKey, long length) {
884+
public CacheFile getCacheFile(KeyType cacheKey, long length) {
886885
return new CacheFile(cacheKey, length);
887886
}
888887

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ public void putCachedBlob(String name, ByteRange range, BytesReference content,
712712
}
713713

714714
public SharedBlobCacheService<CacheKey>.CacheFile getFrozenCacheFile(String fileName, long length) {
715-
return sharedBlobCacheService.getFrozenCacheFile(createCacheKey(fileName), length);
715+
return sharedBlobCacheService.getCacheFile(createCacheKey(fileName), length);
716716
}
717717

718718
private static Repository repositoryByUuid(Map<String, Repository> repositories, String repositoryUuid, String originalName) {

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
160160
writeCacheFile(channel, input, channelPos, relativePos, len, progressUpdater, startTimeNanos);
161161
}
162162
},
163-
directory.cacheFetchAsyncExecutor()
163+
SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
164164
);
165165
assert bytesRead == length : bytesRead + " vs " + length;
166166
assert luceneByteBufLock.getReadHoldCount() == 0;

0 commit comments

Comments
 (0)