Skip to content

Commit 6e167a2

Browse files
Make the cache TTL a dynamic setting
1 parent 115c0bf commit 6e167a2

File tree

3 files changed

+65
-33
lines changed

3 files changed

+65
-33
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
3333
import org.elasticsearch.common.settings.Setting;
34-
import org.elasticsearch.common.settings.Settings;
3534
import org.elasticsearch.common.util.concurrent.EsExecutors;
3635
import org.elasticsearch.core.Nullable;
3736
import org.elasticsearch.core.TimeValue;
@@ -52,12 +51,14 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
5251

5352
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5453

55-
public static final Setting<TimeValue> CACHE_MAX_AGE_SETTING = Setting.timeSetting(
54+
public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1);
55+
public static final Setting<TimeValue> CACHE_TTL_SETTING = Setting.timeSetting(
5656
"cluster.routing.allocation.stats.cache.ttl",
57-
TimeValue.timeValueMinutes(1),
57+
DEFAULT_CACHE_TTL,
5858
TimeValue.ZERO,
5959
TimeValue.timeValueMinutes(10),
60-
Setting.Property.NodeScope
60+
Setting.Property.NodeScope,
61+
Setting.Property.Dynamic
6162
);
6263

6364
private final AllocationStatsCache allocationStatsCache;
@@ -66,7 +67,6 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
6667

6768
@Inject
6869
public TransportGetAllocationStatsAction(
69-
Settings settings,
7070
TransportService transportService,
7171
ClusterService clusterService,
7272
ThreadPool threadPool,
@@ -86,7 +86,7 @@ public TransportGetAllocationStatsAction(
8686
EsExecutors.DIRECT_EXECUTOR_SERVICE
8787
);
8888
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
89-
this.allocationStatsCache = new AllocationStatsCache(CACHE_MAX_AGE_SETTING.get(settings).millis(), threadPool);
89+
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
9090
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
9191
final var cachedStats = allocationStatsCache.get();
9292
if (cachedStats != null) {
@@ -101,6 +101,7 @@ public TransportGetAllocationStatsAction(
101101
}));
102102
});
103103
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
104+
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
104105
}
105106

106107
@Override
@@ -211,25 +212,33 @@ public DiskThresholdSettings getDiskThresholdSettings() {
211212
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}
212213

213214
private static class AllocationStatsCache {
214-
private final long maxAgeMillis;
215+
private volatile long ttlMillis;
215216
private final ThreadPool threadPool;
216217
private final AtomicReference<CachedAllocationStats> cachedStats;
217218

218-
AllocationStatsCache(long maxAgeMillis, ThreadPool threadPool) {
219-
this.maxAgeMillis = maxAgeMillis;
219+
AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
220220
this.threadPool = threadPool;
221221
this.cachedStats = new AtomicReference<>();
222+
setTTL(ttl);
223+
}
224+
225+
void setTTL(TimeValue ttl) {
226+
ttlMillis = ttl.millis();
227+
228+
if (ttlMillis == 0L) {
229+
cachedStats.set(null);
230+
}
222231
}
223232

224233
Map<String, NodeAllocationStats> get() {
225234

226-
if (maxAgeMillis == 0L) {
235+
if (ttlMillis == 0L) {
227236
return null;
228237
}
229238

230239
final var stats = cachedStats.get();
231240

232-
if (stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > maxAgeMillis) {
241+
if (stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis) {
233242
return null;
234243
}
235244

@@ -238,7 +247,7 @@ Map<String, NodeAllocationStats> get() {
238247

239248
void put(Map<String, NodeAllocationStats> stats) {
240249

241-
if (maxAgeMillis > 0L) {
250+
if (ttlMillis > 0L) {
242251
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
243252
}
244253
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,6 @@ public void apply(Settings value, Settings current, Settings previous) {
635635
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
636636
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
637637
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,
638-
TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING
638+
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
639639
).filter(Objects::nonNull).collect(toSet());
640640
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1919
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
2020
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.settings.ClusterSettings;
2122
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.core.CheckedConsumer;
2324
import org.elasticsearch.core.TimeValue;
@@ -42,6 +43,7 @@
4243
import java.util.concurrent.atomic.AtomicBoolean;
4344
import java.util.concurrent.atomic.AtomicInteger;
4445
import java.util.concurrent.atomic.AtomicReference;
46+
import java.util.function.Consumer;
4547

4648
import static org.hamcrest.Matchers.anEmptyMap;
4749
import static org.hamcrest.Matchers.containsString;
@@ -55,9 +57,8 @@
5557

5658
public class TransportGetAllocationStatsActionTests extends ESTestCase {
5759

58-
private static final long CACHE_MAX_AGE_MILLIS = 30000;
59-
6060
private long startTimeMillis;
61+
private TimeValue cacheTTL;
6162
private ControlledRelativeTimeThreadPool threadPool;
6263
private ClusterService clusterService;
6364
private TransportService transportService;
@@ -69,9 +70,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6970
@Before
7071
public void setUp() throws Exception {
7172
super.setUp();
72-
startTimeMillis = CACHE_MAX_AGE_MILLIS;
73+
startTimeMillis = 0L;
74+
cacheTTL = TimeValue.timeValueMinutes(1);
7375
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
74-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
76+
clusterService = ClusterServiceUtils.createClusterService(
77+
threadPool,
78+
new ClusterSettings(
79+
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), cacheTTL.toString()).build(),
80+
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
81+
)
82+
);
7583
transportService = new CapturingTransport().createTransportService(
7684
clusterService.getSettings(),
7785
threadPool,
@@ -82,12 +90,6 @@ public void setUp() throws Exception {
8290
);
8391
allocationStatsService = mock(AllocationStatsService.class);
8492
action = new TransportGetAllocationStatsAction(
85-
Settings.builder()
86-
.put(
87-
TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING.getKey(),
88-
TimeValue.timeValueMillis(CACHE_MAX_AGE_MILLIS).toString()
89-
)
90-
.build(),
9193
transportService,
9294
clusterService,
9395
threadPool,
@@ -195,8 +197,20 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
195197
public void testGetStatsWithCachingEnabled() throws Exception {
196198

197199
final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
198-
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
199-
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
200+
int numExpectedAllocationStatsServiceCalls = 0;
201+
202+
final Runnable resetExpectedAllocationStats = () -> {
203+
final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats());
204+
allocationStats.set(stats);
205+
when(allocationStatsService.stats()).thenReturn(stats);
206+
};
207+
208+
final Consumer<TimeValue> setCacheTTL = (ttl) -> {
209+
clusterService.getClusterSettings()
210+
.applySettings(
211+
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl.toString()).build()
212+
);
213+
};
200214

201215
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
202216
final var request = new TransportGetAllocationStatsAction.Request(
@@ -214,21 +228,30 @@ public void testGetStatsWithCachingEnabled() throws Exception {
214228
};
215229

216230
// Initial cache miss, all threads should get the same value.
231+
resetExpectedAllocationStats.run();
217232
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
218-
verify(allocationStatsService, times(1)).stats();
233+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
219234

220235
// Force the cached stats to expire.
221-
threadPool.setCurrentTimeInMillis(startTimeMillis + (CACHE_MAX_AGE_MILLIS * 2));
236+
threadPool.setCurrentTimeInMillis(startTimeMillis + (2 * cacheTTL.getMillis()));
222237

223238
// Expect a single call to the stats service on the cache miss.
224-
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
225-
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
239+
resetExpectedAllocationStats.run();
226240
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
227-
verify(allocationStatsService, times(2)).stats();
228-
229-
// All subsequent requests should get the cached value.
241+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
242+
243+
// Update the TTL setting to disable the cache, we expect a service call each time.
244+
setCacheTTL.accept(TimeValue.ZERO);
245+
threadTask.accept(ActionListener.noop());
246+
threadTask.accept(ActionListener.noop());
247+
numExpectedAllocationStatsServiceCalls += 2;
248+
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();
249+
250+
// Re-enable the cache, only one thread should call the stats service.
251+
setCacheTTL.accept(TimeValue.timeValueMinutes(5));
252+
resetExpectedAllocationStats.run();
230253
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
231-
verify(allocationStatsService, times(2)).stats();
254+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
232255
}
233256

234257
private static class ControlledRelativeTimeThreadPool extends ThreadPool {

0 commit comments

Comments
 (0)