Skip to content

Commit 3f2048b

Browse files
authored
Add virtualStorageFabricEvictImmediatelyOnHoldRelease config. (#18871)
1 parent cf7d232 commit 3f2048b

File tree

5 files changed

+246
-33
lines changed

5 files changed

+246
-33
lines changed

server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ public class SegmentLoaderConfig
7777
@JsonProperty("virtualStorageLoadThreads")
7878
private int virtualStorageLoadThreads = 2 * runtimeInfo.getAvailableProcessors();
7979

80+
/**
81+
* When enabled, weakly-held cache entries are evicted immediately upon release of all holds, rather than
82+
* waiting for space pressure to trigger eviction. This setting is not intended to be configured directly by
83+
* administrators. Instead, it is expected to be set when appropriate via {@link #setVirtualStorage}.
84+
*/
85+
@JsonProperty("virtualStorageFabricEvictImmediatelyOnHoldRelease")
86+
private boolean virtualStorageFabricEvictImmediatelyOnHoldRelease = false;
87+
8088
private long combinedMaxSize = 0;
8189

8290
public List<StorageLocationConfig> getLocations()
@@ -154,6 +162,11 @@ public int getVirtualStorageLoadThreads()
154162
return virtualStorageLoadThreads;
155163
}
156164

165+
public boolean isVirtualStorageFabricEvictImmediatelyOnHoldRelease()
166+
{
167+
return virtualStorageFabricEvictImmediatelyOnHoldRelease;
168+
}
169+
157170
public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
158171
{
159172
SegmentLoaderConfig retVal = new SegmentLoaderConfig();
@@ -163,6 +176,19 @@ public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
163176
return retVal;
164177
}
165178

179+
/**
180+
* Sets {@link #virtualStorage} and {@link #virtualStorageFabricEvictImmediatelyOnHoldRelease}.
181+
*/
182+
public SegmentLoaderConfig setVirtualStorage(
183+
boolean virtualStorage,
184+
boolean virtualStorageFabricEvictImmediatelyOnHoldRelease
185+
)
186+
{
187+
this.virtualStorage = virtualStorage;
188+
this.virtualStorageFabricEvictImmediatelyOnHoldRelease = virtualStorageFabricEvictImmediatelyOnHoldRelease;
189+
return this;
190+
}
191+
166192
/**
167193
* Convert a list of {@link StorageLocationConfig} objects to {@link StorageLocation} objects.
168194
* <p>
@@ -195,6 +221,7 @@ public String toString()
195221
", statusQueueMaxSize=" + statusQueueMaxSize +
196222
", useVirtualStorageFabric=" + virtualStorage +
197223
", virtualStorageFabricLoadThreads=" + virtualStorageLoadThreads +
224+
", virtualStorageFabricEvictImmediatelyOnHoldRelease=" + virtualStorageFabricEvictImmediatelyOnHoldRelease +
198225
", combinedMaxSize=" + combinedMaxSize +
199226
'}';
200227
}

server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public SegmentLocalCacheManager(
145145
if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() > 0) {
146146
throw DruidException.defensive("Invalid configuration: virtualStorage is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnBootstrap");
147147
}
148+
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
149+
for (StorageLocation location : locations) {
150+
location.setEvictImmediatelyOnHoldRelease(true);
151+
}
152+
}
148153
virtualStorageLoadOnDemandExec =
149154
MoreExecutors.listeningDecorator(
150155
// probably replace this with virtual threads once minimum version is java 21
@@ -333,21 +338,20 @@ public void storeInfoFile(final DataSegment segment) throws IOException
333338
@Override
334339
public void removeInfoFile(final DataSegment segment)
335340
{
336-
final Runnable delete = () -> deleteSegmentInfoFile(segment);
337341
final SegmentCacheEntryIdentifier entryId = new SegmentCacheEntryIdentifier(segment.getId());
338342
boolean isCached = false;
339343
// defer deleting until the unmount operation of the cache entry, if possible, so that if the process stops before
340344
// the segment files are deleted, they can be properly managed on startup (since the info entry still exists)
341345
for (StorageLocation location : locations) {
342346
final SegmentCacheEntry cacheEntry = location.getCacheEntry(entryId);
343347
if (cacheEntry != null) {
344-
isCached = isCached || cacheEntry.setOnUnmount(delete);
348+
isCached = isCached || cacheEntry.setDeleteInfoFileOnUnmount();
345349
}
346350
}
347351

348352
// otherwise we are probably deleting for cleanup reasons, so try it anyway if it wasn't present in any location
349353
if (!isCached) {
350-
delete.run();
354+
deleteSegmentInfoFile(segment);
351355
}
352356
}
353357

@@ -430,7 +434,7 @@ public AcquireSegmentAction acquireSegment(final DataSegment dataSegment) throws
430434
jsonMapper.writeValue(out, dataSegment);
431435
return null;
432436
});
433-
hold.getEntry().setOnUnmount(() -> deleteSegmentInfoFile(dataSegment));
437+
hold.getEntry().setDeleteInfoFileOnUnmount();
434438
}
435439

436440
return new AcquireSegmentAction(
@@ -492,6 +496,11 @@ private AcquireSegmentAction acquireExistingSegment(SegmentCacheEntryIdentifier
492496
public void load(final DataSegment dataSegment) throws SegmentLoadingException
493497
{
494498
if (config.isVirtualStorage()) {
499+
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
500+
throw DruidException.defensive(
501+
"load() should not be called when virtualStorageFabricEvictImmediatelyOnHoldRelease is enabled"
502+
);
503+
}
495504
// virtual storage doesn't do anything with loading immediately, but check to see if the segment is already cached
496505
// and if so, clear out the onUnmount action
497506
final ReferenceCountingLock lock = lock(dataSegment);
@@ -533,6 +542,11 @@ public void bootstrap(
533542
) throws SegmentLoadingException
534543
{
535544
if (config.isVirtualStorage()) {
545+
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
546+
throw DruidException.defensive(
547+
"bootstrap() should not be called when virtualStorageFabricEvictImmediatelyOnHoldRelease is enabled"
548+
);
549+
}
536550
// during bootstrap, check if the segment exists in a location and mount it, getCachedSegments already
537551
// did the reserving for us
538552
final SegmentCacheEntryIdentifier id = new SegmentCacheEntryIdentifier(dataSegment.getId());
@@ -1031,6 +1045,10 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException
10311045
);
10321046
unmount();
10331047
}
1048+
1049+
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
1050+
setDeleteInfoFileOnUnmount();
1051+
}
10341052
}
10351053
catch (SegmentLoadingException e) {
10361054
try {
@@ -1104,12 +1122,12 @@ public synchronized Optional<Segment> acquireReference()
11041122
return referenceProvider.acquireReference();
11051123
}
11061124

1107-
public synchronized boolean setOnUnmount(Runnable runnable)
1125+
public synchronized boolean setDeleteInfoFileOnUnmount()
11081126
{
11091127
if (location == null) {
11101128
return false;
11111129
}
1112-
onUnmount.set(runnable);
1130+
onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
11131131
return true;
11141132
}
11151133

server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,7 @@ public class StorageLocation
106106
@GuardedBy("lock")
107107
private WeakCacheEntry hand;
108108

109-
/**
110-
* Current total size of files in bytes, including weak entries.
111-
*/
109+
private volatile boolean evictImmediatelyOnHoldRelease = false;
112110

113111
/**
114112
* Current total size of files in bytes, including weak entries.
@@ -171,6 +169,14 @@ public File getPath()
171169
return path;
172170
}
173171

172+
/**
173+
* Sets whether weak cache entries should be immediately evicted once all holds are released.
174+
*/
175+
public void setEvictImmediatelyOnHoldRelease(final boolean evictImmediatelyOnHoldRelease)
176+
{
177+
this.evictImmediatelyOnHoldRelease = evictImmediatelyOnHoldRelease;
178+
}
179+
174180
public <T extends CacheEntry> T getStaticCacheEntry(CacheEntryIdentifier entryId)
175181
{
176182
lock.readLock().lock();
@@ -340,7 +346,10 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHoldIfExists(
340346
if (existingEntry != null && existingEntry.hold()) {
341347
existingEntry.visited = true;
342348
weakStats.getAndUpdate(WeakStats::hit);
343-
return new ReservationHold<>((T) existingEntry.cacheEntry, existingEntry::release);
349+
return new ReservationHold<>(
350+
(T) existingEntry.cacheEntry,
351+
createWeakEntryReleaseRunnable(existingEntry, false)
352+
);
344353
}
345354
return null;
346355
}
@@ -374,7 +383,10 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHold(
374383
if (retryExistingEntry != null && retryExistingEntry.hold()) {
375384
retryExistingEntry.visited = true;
376385
weakStats.getAndUpdate(WeakStats::hit);
377-
return new ReservationHold<>((T) retryExistingEntry.cacheEntry, retryExistingEntry::release);
386+
return new ReservationHold<>(
387+
(T) retryExistingEntry.cacheEntry,
388+
createWeakEntryReleaseRunnable(retryExistingEntry, false)
389+
);
378390
}
379391
final CacheEntry newEntry = entrySupplier.get();
380392
final ReclaimResult reclaimResult = canHandleWeak(newEntry);
@@ -388,28 +400,7 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHold(
388400
weakStats.getAndUpdate(s -> s.load(newEntry.getSize()));
389401
hold = new ReservationHold<>(
390402
(T) newEntry,
391-
() -> {
392-
newWeakEntry.release();
393-
lock.writeLock().lock();
394-
try {
395-
weakCacheEntries.computeIfPresent(
396-
newEntry.getId(),
397-
(cacheEntryIdentifier, weakCacheEntry) -> {
398-
if (!weakCacheEntry.cacheEntry.isMounted()) {
399-
// if we never successfully mounted, go ahead and remove so we don't have a dead entry
400-
unlinkWeakEntry(weakCacheEntry);
401-
// we call unmount anyway to terminate the phaser
402-
weakCacheEntry.unmount();
403-
return null;
404-
}
405-
return weakCacheEntry;
406-
}
407-
);
408-
}
409-
finally {
410-
lock.writeLock().unlock();
411-
}
412-
}
403+
createWeakEntryReleaseRunnable(newWeakEntry, true)
413404
);
414405
} else {
415406
weakStats.getAndUpdate(WeakStats::reject);
@@ -444,6 +435,52 @@ public void release(CacheEntry entry)
444435
}
445436
}
446437

438+
/**
439+
* Creates a release runnable for a {@link WeakCacheEntry} that handles immediate eviction when configured.
440+
* If {@link #evictImmediately} is true and there are no more holds after releasing, the entry is immediately
441+
* evicted from the cache. For new entries (isNewEntry=true), unmounted entries are also removed.
442+
*/
443+
private Runnable createWeakEntryReleaseRunnable(
444+
final WeakCacheEntry weakEntry,
445+
final boolean isNewEntry
446+
)
447+
{
448+
return () -> {
449+
weakEntry.release();
450+
451+
if (!isNewEntry && !evictImmediatelyOnHoldRelease) {
452+
// No need to consider removal from weakCacheEntries on hold release.
453+
return;
454+
}
455+
456+
lock.writeLock().lock();
457+
try {
458+
weakCacheEntries.computeIfPresent(
459+
weakEntry.cacheEntry.getId(),
460+
(cacheEntryIdentifier, weakCacheEntry) -> {
461+
// If we never successfully mounted, go ahead and remove so we don't have a dead entry.
462+
// Furthermore, if evictImmediatelyOnHoldRelease is set, evict on release if all holds are gone.
463+
final boolean isMounted = weakCacheEntry.cacheEntry.isMounted();
464+
if ((isNewEntry && !isMounted)
465+
|| (evictImmediatelyOnHoldRelease && !weakCacheEntry.isHeld())) {
466+
unlinkWeakEntry(weakCacheEntry);
467+
weakCacheEntry.unmount(); // call even if never mounted, to terminate the phaser
468+
if (isMounted) {
469+
weakStats.getAndUpdate(s -> s.evict(weakCacheEntry.cacheEntry.getSize()));
470+
}
471+
return null;
472+
} else {
473+
return weakCacheEntry;
474+
}
475+
}
476+
);
477+
}
478+
finally {
479+
lock.writeLock().unlock();
480+
}
481+
};
482+
}
483+
447484
/**
448485
* Inserts a new {@link WeakCacheEntry}, inserting it as {@link #head} (or both {@link #head} and {@link #tail} if it
449486
* is the only entry), tracking size in {@link #currSizeBytes} and {@link #currWeakSizeBytes}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.segment.loading;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
public class SegmentLoaderConfigTest
26+
{
27+
@Test
28+
public void testSetVirtualStorage()
29+
{
30+
final SegmentLoaderConfig config = new SegmentLoaderConfig();
31+
32+
// Verify default values
33+
Assert.assertFalse(config.isVirtualStorage());
34+
Assert.assertFalse(config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease());
35+
36+
// Set both to true
37+
config.setVirtualStorage(true, true);
38+
39+
// Verify both fields are set
40+
Assert.assertTrue(config.isVirtualStorage());
41+
Assert.assertTrue(config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease());
42+
}
43+
}

0 commit comments

Comments
 (0)