Skip to content

Commit 355797c

Browse files
Switch to controlled time in unit test, address code review comments
1 parent 3930dcd commit 355797c

File tree

2 files changed

+72
-36
lines changed

2 files changed

+72
-36
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
3333
import org.elasticsearch.common.settings.Setting;
34+
import org.elasticsearch.common.settings.Settings;
3435
import org.elasticsearch.common.util.concurrent.EsExecutors;
3536
import org.elasticsearch.core.Nullable;
3637
import org.elasticsearch.core.TimeValue;
@@ -51,11 +52,10 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
5152

5253
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5354

54-
public static final long CACHE_DISABLED = 0L;
5555
public static final Setting<TimeValue> CACHE_MAX_AGE_SETTING = Setting.timeSetting(
56-
"cluster.transport.get.allocation.stats.action.cache.max_age",
56+
"cluster.routing.allocation.stats.cache.ttl",
5757
TimeValue.timeValueMinutes(1),
58-
TimeValue.timeValueMillis(CACHE_DISABLED),
58+
TimeValue.ZERO,
5959
TimeValue.timeValueMinutes(10),
6060
Setting.Property.NodeScope
6161
);
@@ -66,6 +66,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
6666

6767
@Inject
6868
public TransportGetAllocationStatsAction(
69+
Settings settings,
6970
TransportService transportService,
7071
ClusterService clusterService,
7172
ThreadPool threadPool,
@@ -85,12 +86,13 @@ public TransportGetAllocationStatsAction(
8586
EsExecutors.DIRECT_EXECUTOR_SERVICE
8687
);
8788
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
88-
this.allocationStatsCache = new AllocationStatsCache(clusterService.getClusterSettings().get(CACHE_MAX_AGE_SETTING).millis());
89+
this.allocationStatsCache = new AllocationStatsCache(CACHE_MAX_AGE_SETTING.get(settings).millis(), threadPool);
8990
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
9091
threadPool.getThreadContext(),
9192
l -> managementExecutor.execute(ActionRunnable.supply(l, () -> {
93+
// Check the cache again here to prevent duplicate work when a thread has a cache miss and is just about to fork just as
94+
// other threads are coming off a deduplicator call that is about to finish.
9295
final var cachedStats = allocationStatsCache.get();
93-
9496
if (cachedStats != null) {
9597
return cachedStats;
9698
}
@@ -103,11 +105,6 @@ public TransportGetAllocationStatsAction(
103105
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
104106
}
105107

106-
// Package access, intended for unit testing only.
107-
void setCacheMaxAge(TimeValue maxAge) {
108-
this.allocationStatsCache.setMaxAgeMsecs(maxAge.millis());
109-
}
110-
111108
@Override
112109
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
113110
if (clusterService.state().getMinTransportVersion().before(TransportVersions.V_8_14_0)) {
@@ -122,9 +119,16 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
122119
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
123120
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
124121

125-
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
126-
? SubscribableListener.newForked(allocationStatsSupplier::execute)
127-
: SubscribableListener.newSucceeded(Map.of());
122+
SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep;
123+
124+
if (request.metrics().contains(Metric.ALLOCATIONS)) {
125+
final var cachedStats = allocationStatsCache.get();
126+
allocationStatsStep = cachedStats != null
127+
? SubscribableListener.newSucceeded(cachedStats)
128+
: SubscribableListener.newForked(allocationStatsSupplier::execute);
129+
} else {
130+
allocationStatsStep = SubscribableListener.newSucceeded(Map.of());
131+
}
128132

129133
allocationStatsStep.andThenApply(
130134
allocationStats -> new Response(allocationStats, request.metrics().contains(Metric.FS) ? diskThresholdSettings : null)
@@ -216,27 +220,25 @@ public DiskThresholdSettings getDiskThresholdSettings() {
216220
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMsecs) {}
217221

218222
private static class AllocationStatsCache {
219-
private volatile long maxAgeMsecs;
223+
private final long maxAgeMillis;
224+
private final ThreadPool threadPool;
220225
private final AtomicReference<CachedAllocationStats> cachedStats;
221226

222-
AllocationStatsCache(long maxAgeMsecs) {
223-
this.maxAgeMsecs = maxAgeMsecs;
227+
AllocationStatsCache(long maxAgeMillis, ThreadPool threadPool) {
228+
this.maxAgeMillis = maxAgeMillis;
229+
this.threadPool = threadPool;
224230
this.cachedStats = new AtomicReference<>();
225231
}
226232

227-
void setMaxAgeMsecs(long maxAgeMsecs) {
228-
this.maxAgeMsecs = maxAgeMsecs;
229-
}
230-
231233
Map<String, NodeAllocationStats> get() {
232234

233-
if (maxAgeMsecs == CACHE_DISABLED) {
235+
if (maxAgeMillis == 0L) {
234236
return null;
235237
}
236238

237239
final var stats = cachedStats.get();
238240

239-
if (stats == null || System.currentTimeMillis() - stats.timestampMsecs > maxAgeMsecs) {
241+
if (stats == null || threadPool.relativeTimeInMillis() - stats.timestampMsecs > maxAgeMillis) {
240242
return null;
241243
}
242244

@@ -245,8 +247,8 @@ Map<String, NodeAllocationStats> get() {
245247

246248
void put(Map<String, NodeAllocationStats> stats) {
247249

248-
if (maxAgeMsecs > CACHE_DISABLED) {
249-
cachedStats.set(new CachedAllocationStats(stats, System.currentTimeMillis()));
250+
if (maxAgeMillis > 0L) {
251+
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
250252
}
251253
}
252254
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1919
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
2020
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.settings.Settings;
2122
import org.elasticsearch.core.CheckedConsumer;
2223
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.node.Node;
2325
import org.elasticsearch.tasks.Task;
2426
import org.elasticsearch.tasks.TaskId;
27+
import org.elasticsearch.telemetry.metric.MeterRegistry;
2528
import org.elasticsearch.test.ClusterServiceUtils;
2629
import org.elasticsearch.test.ESTestCase;
2730
import org.elasticsearch.test.transport.CapturingTransport;
28-
import org.elasticsearch.threadpool.TestThreadPool;
31+
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
2932
import org.elasticsearch.threadpool.ThreadPool;
3033
import org.elasticsearch.transport.TransportService;
3134
import org.junit.After;
@@ -52,7 +55,10 @@
5255

5356
public class TransportGetAllocationStatsActionTests extends ESTestCase {
5457

55-
private ThreadPool threadPool;
58+
private static final long CACHE_MAX_AGE_MILLIS = 30000;
59+
60+
private long startTimeMillis;
61+
private ControlledRelativeTimeThreadPool threadPool;
5662
private ClusterService clusterService;
5763
private TransportService transportService;
5864
private AllocationStatsService allocationStatsService;
@@ -63,7 +69,8 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6369
@Before
6470
public void setUp() throws Exception {
6571
super.setUp();
66-
threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
72+
startTimeMillis = CACHE_MAX_AGE_MILLIS;
73+
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
6774
clusterService = ClusterServiceUtils.createClusterService(threadPool);
6875
transportService = new CapturingTransport().createTransportService(
6976
clusterService.getSettings(),
@@ -75,6 +82,12 @@ public void setUp() throws Exception {
7582
);
7683
allocationStatsService = mock(AllocationStatsService.class);
7784
action = new TransportGetAllocationStatsAction(
85+
Settings.builder()
86+
.put(
87+
TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING.getKey(),
88+
TimeValue.timeValueMillis(CACHE_MAX_AGE_MILLIS).toString()
89+
)
90+
.build(),
7891
transportService,
7992
clusterService,
8093
threadPool,
@@ -187,7 +200,7 @@ public void testGetStatsWithCachingEnabled() throws Exception {
187200

188201
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
189202
final var request = new TransportGetAllocationStatsAction.Request(
190-
TimeValue.ONE_MINUTE,
203+
TEST_REQUEST_TIMEOUT,
191204
new TaskId(randomIdentifier(), randomNonNegativeLong()),
192205
EnumSet.of(Metric.ALLOCATIONS)
193206
);
@@ -200,24 +213,45 @@ public void testGetStatsWithCachingEnabled() throws Exception {
200213
l.onResponse(null);
201214
};
202215

203-
// Start with a high cache max age, all threads should get the same value.
204-
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
216+
// Initial cache miss, all threads should get the same value.
205217
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
206218
verify(allocationStatsService, times(1)).stats();
207219

208220
// Force the cached stats to expire.
209-
action.setCacheMaxAge(TimeValue.timeValueMillis(1));
210-
Thread.sleep(2L);
221+
threadPool.setCurrentTimeInMillis(startTimeMillis + (CACHE_MAX_AGE_MILLIS * 2));
211222

212-
// Expect a call to the stats service.
223+
// Expect a single call to the stats service on the cache miss.
213224
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
214225
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
215-
threadTask.accept(ActionListener.noop());
226+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
216227
verify(allocationStatsService, times(2)).stats();
217228

218-
// Set a large max age for the cache again so all the threads in the next run can get the cached value.
219-
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
229+
// All subsequent requests should get the cached value.
220230
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
221231
verify(allocationStatsService, times(2)).stats();
222232
}
233+
234+
private static class ControlledRelativeTimeThreadPool extends ThreadPool {
235+
236+
private long currentTimeInMillis;
237+
238+
ControlledRelativeTimeThreadPool(String name, long startTimeMillis) {
239+
super(
240+
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(),
241+
MeterRegistry.NOOP,
242+
new DefaultBuiltInExecutorBuilders()
243+
);
244+
this.currentTimeInMillis = startTimeMillis;
245+
stopCachedTimeThread();
246+
}
247+
248+
@Override
249+
public long relativeTimeInMillis() {
250+
return currentTimeInMillis;
251+
}
252+
253+
void setCurrentTimeInMillis(long currentTimeInMillis) {
254+
this.currentTimeInMillis = currentTimeInMillis;
255+
}
256+
}
223257
}

0 commit comments

Comments
 (0)