Skip to content

Commit 55450fe

Browse files
authored
Use a time supplier interface instead of passing ThreadPool (#116333)
An attempt to use a basic interface for time supplier based on #115511 (comment). (TLDR: sometimes we pass around a ThreadPool instance just to be able to get time. It might be more reasonable to separate those use cases)
1 parent 65de0f0 commit 55450fe

File tree

8 files changed

+148
-79
lines changed

8 files changed

+148
-79
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.metrics.MeanMetric;
2323
import org.elasticsearch.common.settings.ClusterSettings;
2424
import org.elasticsearch.common.settings.Setting;
25+
import org.elasticsearch.common.time.TimeProvider;
2526
import org.elasticsearch.common.util.Maps;
2627
import org.elasticsearch.core.Strings;
2728
import org.elasticsearch.core.TimeValue;
@@ -37,7 +38,6 @@
3738
import java.util.Set;
3839
import java.util.TreeMap;
3940
import java.util.TreeSet;
40-
import java.util.function.LongSupplier;
4141
import java.util.function.Predicate;
4242

4343
import static java.util.stream.Collectors.toUnmodifiableSet;
@@ -50,7 +50,7 @@ public class DesiredBalanceComputer {
5050
private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
5151

5252
private final ShardsAllocator delegateAllocator;
53-
private final LongSupplier timeSupplierMillis;
53+
private final TimeProvider timeProvider;
5454

5555
// stats
5656
protected final MeanMetric iterations = new MeanMetric();
@@ -73,9 +73,9 @@ public class DesiredBalanceComputer {
7373
private TimeValue progressLogInterval;
7474
private long maxBalanceComputationTimeDuringIndexCreationMillis;
7575

76-
public DesiredBalanceComputer(ClusterSettings clusterSettings, LongSupplier timeSupplierMillis, ShardsAllocator delegateAllocator) {
76+
public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
7777
this.delegateAllocator = delegateAllocator;
78-
this.timeSupplierMillis = timeSupplierMillis;
78+
this.timeProvider = timeProvider;
7979
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
8080
clusterSettings.initializeAndWatch(
8181
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
@@ -275,7 +275,7 @@ public DesiredBalance compute(
275275

276276
final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
277277
final long timeWarningInterval = progressLogInterval.millis();
278-
final long computationStartedTime = timeSupplierMillis.getAsLong();
278+
final long computationStartedTime = timeProvider.relativeTimeInMillis();
279279
long nextReportTime = computationStartedTime + timeWarningInterval;
280280

281281
int i = 0;
@@ -323,7 +323,7 @@ public DesiredBalance compute(
323323

324324
i++;
325325
final int iterations = i;
326-
final long currentTime = timeSupplierMillis.getAsLong();
326+
final long currentTime = timeProvider.relativeTimeInMillis();
327327
final boolean reportByTime = nextReportTime <= currentTime;
328328
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
329329
if (reportByTime || reportByIterationCount) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public DesiredBalanceShardsAllocator(
9393
delegateAllocator,
9494
threadPool,
9595
clusterService,
96-
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator),
96+
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
9797
reconciler,
9898
telemetryProvider,
9999
nodeAllocationStatsProvider
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.time;
11+
12+
/**
13+
* An interface encapsulating the different methods for getting relative and absolute time. The main
14+
* implementation of this is {@link org.elasticsearch.threadpool.ThreadPool}. To make it clear that a
15+
* {@code ThreadPool} is being passed around only to get time, it is preferred to use this interface.
16+
*/
17+
public interface TimeProvider {
18+
19+
/**
20+
* Returns a value of milliseconds that may be used for relative time calculations.
21+
*
22+
* This method should only be used for calculating time deltas. For an epoch based
23+
* timestamp, see {@link #absoluteTimeInMillis()}.
24+
*/
25+
long relativeTimeInMillis();
26+
27+
/**
28+
* Returns a value of nanoseconds that may be used for relative time calculations.
29+
*
30+
* This method should only be used for calculating time deltas. For an epoch based
31+
* timestamp, see {@link #absoluteTimeInMillis()}.
32+
*/
33+
long relativeTimeInNanos();
34+
35+
/**
36+
* Returns a value of milliseconds that may be used for relative time calculations. Similar to {@link #relativeTimeInMillis()} except
37+
* that this method is more expensive: the return value is computed directly from {@link System#nanoTime} and is not cached. You should
38+
* use {@link #relativeTimeInMillis()} unless the extra accuracy offered by this method is worth the costs.
39+
*
40+
* When computing a time interval by comparing relative times in milliseconds, you should make sure that both endpoints use cached
41+
* values returned from {@link #relativeTimeInMillis()} or that they both use raw values returned from this method. It doesn't really
42+
* make sense to compare a raw value to a cached value, even if in practice the result of such a comparison will be approximately
43+
* sensible.
44+
*/
45+
long rawRelativeTimeInMillis();
46+
47+
/**
48+
* Returns the value of milliseconds since UNIX epoch.
49+
*
50+
* This method should only be used for exact date/time formatting. For calculating
51+
* time deltas that should not suffer from negative deltas, which are possible with
52+
* this method, see {@link #relativeTimeInMillis()}.
53+
*/
54+
long absoluteTimeInMillis();
55+
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.common.settings.Setting;
1818
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.time.TimeProvider;
1920
import org.elasticsearch.common.unit.ByteSizeUnit;
2021
import org.elasticsearch.common.unit.ByteSizeValue;
2122
import org.elasticsearch.common.unit.SizeValue;
@@ -65,7 +66,7 @@
6566
* Manages all the Java thread pools we create. {@link Names} contains a list of the thread pools, but plugins can dynamically add more
6667
* thread pools to instantiate.
6768
*/
68-
public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
69+
public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler, TimeProvider {
6970

7071
private static final Logger logger = LogManager.getLogger(ThreadPool.class);
7172

@@ -362,12 +363,7 @@ protected ThreadPool() {
362363
this.scheduler = null;
363364
}
364365

365-
/**
366-
* Returns a value of milliseconds that may be used for relative time calculations.
367-
*
368-
* This method should only be used for calculating time deltas. For an epoch based
369-
* timestamp, see {@link #absoluteTimeInMillis()}.
370-
*/
366+
@Override
371367
public long relativeTimeInMillis() {
372368
return cachedTimeThread.relativeTimeInMillis();
373369
}
@@ -379,37 +375,17 @@ public LongSupplier relativeTimeInMillisSupplier() {
379375
return relativeTimeInMillisSupplier;
380376
}
381377

382-
/**
383-
* Returns a value of nanoseconds that may be used for relative time calculations.
384-
*
385-
* This method should only be used for calculating time deltas. For an epoch based
386-
* timestamp, see {@link #absoluteTimeInMillis()}.
387-
*/
378+
@Override
388379
public long relativeTimeInNanos() {
389380
return cachedTimeThread.relativeTimeInNanos();
390381
}
391382

392-
/**
393-
* Returns a value of milliseconds that may be used for relative time calculations. Similar to {@link #relativeTimeInMillis()} except
394-
* that this method is more expensive: the return value is computed directly from {@link System#nanoTime} and is not cached. You should
395-
* use {@link #relativeTimeInMillis()} unless the extra accuracy offered by this method is worth the costs.
396-
*
397-
* When computing a time interval by comparing relative times in milliseconds, you should make sure that both endpoints use cached
398-
* values returned from {@link #relativeTimeInMillis()} or that they both use raw values returned from this method. It doesn't really
399-
* make sense to compare a raw value to a cached value, even if in practice the result of such a comparison will be approximately
400-
* sensible.
401-
*/
383+
@Override
402384
public long rawRelativeTimeInMillis() {
403385
return TimeValue.nsecToMSec(System.nanoTime());
404386
}
405387

406-
/**
407-
* Returns the value of milliseconds since UNIX epoch.
408-
*
409-
* This method should only be used for exact date/time formatting. For calculating
410-
* time deltas that should not suffer from negative deltas, which are possible with
411-
* this method, see {@link #relativeTimeInMillis()}.
412-
*/
388+
@Override
413389
public long absoluteTimeInMillis() {
414390
return cachedTimeThread.absoluteTimeInMillis();
415391
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testDeleteDesiredBalance() throws Exception {
101101
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
102102

103103
var delegate = new BalancedShardsAllocator();
104-
var computer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegate) {
104+
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {
105105

106106
final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();
107107

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.elasticsearch.common.Randomness;
4343
import org.elasticsearch.common.UUIDs;
4444
import org.elasticsearch.common.settings.Settings;
45+
import org.elasticsearch.common.time.TimeProvider;
46+
import org.elasticsearch.common.time.TimeProviderUtils;
4547
import org.elasticsearch.common.unit.ByteSizeValue;
4648
import org.elasticsearch.common.util.Maps;
4749
import org.elasticsearch.index.IndexVersion;
@@ -1203,42 +1205,40 @@ public void testShouldLogComputationIteration() {
12031205

12041206
private void checkIterationLogging(int iterations, long eachIterationDuration, MockLog.AbstractEventExpectation expectation) {
12051207
var currentTime = new AtomicLong(0L);
1208+
TimeProvider timeProvider = TimeProviderUtils.create(() -> currentTime.addAndGet(eachIterationDuration));
1209+
12061210
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
12071211
// prevents interrupting a long computation.
12081212
var clusterSettings = createBuiltInClusterSettings(
12091213
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
12101214
);
1211-
var desiredBalanceComputer = new DesiredBalanceComputer(
1212-
clusterSettings,
1213-
() -> currentTime.addAndGet(eachIterationDuration),
1214-
new ShardsAllocator() {
1215-
@Override
1216-
public void allocate(RoutingAllocation allocation) {
1217-
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
1218-
while (unassignedIterator.hasNext()) {
1219-
final var shardRouting = unassignedIterator.next();
1220-
if (shardRouting.primary()) {
1221-
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
1222-
} else {
1223-
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
1224-
}
1225-
}
1226-
1227-
// move shard on each iteration
1228-
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
1229-
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
1230-
}
1231-
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
1232-
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
1215+
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, timeProvider, new ShardsAllocator() {
1216+
@Override
1217+
public void allocate(RoutingAllocation allocation) {
1218+
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
1219+
while (unassignedIterator.hasNext()) {
1220+
final var shardRouting = unassignedIterator.next();
1221+
if (shardRouting.primary()) {
1222+
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
1223+
} else {
1224+
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
12331225
}
12341226
}
12351227

1236-
@Override
1237-
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
1238-
throw new AssertionError("only used for allocation explain");
1228+
// move shard on each iteration
1229+
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
1230+
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
1231+
}
1232+
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
1233+
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
12391234
}
12401235
}
1241-
);
1236+
1237+
@Override
1238+
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
1239+
throw new AssertionError("only used for allocation explain");
1240+
}
1241+
});
12421242

12431243
assertThatLogger(() -> {
12441244
var iteration = new AtomicInteger(0);
@@ -1346,7 +1346,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
13461346
}
13471347

13481348
private static DesiredBalanceComputer createDesiredBalanceComputer(ShardsAllocator allocator) {
1349-
return new DesiredBalanceComputer(createBuiltInClusterSettings(), () -> 0L, allocator);
1349+
return new DesiredBalanceComputer(createBuiltInClusterSettings(), TimeProviderUtils.create(() -> 0L), allocator);
13501350
}
13511351

13521352
private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map<ShardId, ShardAssignment> expected) {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
4545
import org.elasticsearch.common.UUIDs;
4646
import org.elasticsearch.common.settings.Settings;
47+
import org.elasticsearch.common.time.TimeProviderUtils;
4748
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
4849
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
4950
import org.elasticsearch.core.TimeValue;
@@ -398,7 +399,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
398399
shardsAllocator,
399400
threadPool,
400401
clusterService,
401-
new DesiredBalanceComputer(clusterSettings, time::get, shardsAllocator) {
402+
new DesiredBalanceComputer(clusterSettings, TimeProviderUtils.create(time::get), shardsAllocator) {
402403
@Override
403404
public DesiredBalance compute(
404405
DesiredBalance previousDesiredBalance,
@@ -525,7 +526,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
525526
shardsAllocator,
526527
threadPool,
527528
clusterService,
528-
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
529+
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
529530
@Override
530531
public DesiredBalance compute(
531532
DesiredBalance previousDesiredBalance,
@@ -629,7 +630,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
629630
shardsAllocator,
630631
threadPool,
631632
clusterService,
632-
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
633+
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
633634
@Override
634635
public DesiredBalance compute(
635636
DesiredBalance previousDesiredBalance,
@@ -717,7 +718,7 @@ public void testResetDesiredBalance() {
717718
var delegateAllocator = createShardsAllocator();
718719
var clusterSettings = createBuiltInClusterSettings();
719720

720-
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator) {
721+
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {
721722

722723
final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();
723724

@@ -786,11 +787,7 @@ public void testResetDesiredBalanceOnNoLongerMaster() {
786787
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
787788

788789
var delegateAllocator = createShardsAllocator();
789-
var desiredBalanceComputer = new DesiredBalanceComputer(
790-
createBuiltInClusterSettings(),
791-
threadPool::relativeTimeInMillis,
792-
delegateAllocator
793-
);
790+
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
794791
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
795792
delegateAllocator,
796793
threadPool,
@@ -840,11 +837,7 @@ public void testResetDesiredBalanceOnNodeShutdown() {
840837

841838
final var resetCalled = new AtomicBoolean();
842839
var delegateAllocator = createShardsAllocator();
843-
var desiredBalanceComputer = new DesiredBalanceComputer(
844-
createBuiltInClusterSettings(),
845-
threadPool::relativeTimeInMillis,
846-
delegateAllocator
847-
);
840+
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
848841
var desiredBalanceAllocator = new DesiredBalanceShardsAllocator(
849842
delegateAllocator,
850843
threadPool,

0 commit comments

Comments
 (0)