Skip to content

Commit 1b1c061

Browse files
committed
Reduce licence checks in LicensedWriteLoadForecaster
Rather than checking the license (updating the usage map) on every single shard, just do it once at the start of a computation that needs to forecast write loads. Closes #123247
1 parent 4bd1f81 commit 1b1c061

File tree

9 files changed

+126
-8
lines changed

9 files changed

+126
-8
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ protected void masterOperation(
9292
return;
9393
}
9494
var clusterInfo = clusterInfoService.getClusterInfo();
95+
writeLoadForecaster.refreshLicence();
9596
listener.onResponse(
9697
new DesiredBalanceResponse(
9798
desiredBalanceShardsAllocator.getStats(),

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent(
430430
);
431431
}
432432

433+
writeLoadForecaster.refreshLicence();
433434
metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
434435
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
435436

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
7070
ClusterInfo clusterInfo,
7171
@Nullable DesiredBalance desiredBalance
7272
) {
73+
writeLoadForecaster.refreshLicence();
7374
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
7475
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
7576
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface WriteLoadForecaster {
2121

2222
OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);
2323

24+
void refreshLicence();
25+
2426
class DefaultWriteLoadForecaster implements WriteLoadForecaster {
2527
@Override
2628
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
@@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
3133
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
3234
return OptionalDouble.empty();
3335
}
36+
37+
@Override
38+
public void refreshLicence() {}
3439
}
3540
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public BalancedShardsAllocator(ClusterSettings clusterSettings, WriteLoadForecas
145145

146146
@Override
147147
public void allocate(RoutingAllocation allocation) {
148+
writeLoadForecaster.refreshLicence();
149+
148150
assert allocation.ignoreDisable() == false;
149151

150152
if (allocation.routingNodes().size() == 0) {

test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
8989
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
9090
return indexMetadata.getForecastedWriteLoad();
9191
}
92+
93+
@Override
94+
public void refreshLicence() {}
9295
};
9396

9497
public static MockAllocationService createAllocationService() {

x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception
8484
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
8585

8686
setHasValidLicense(false);
87+
writeLoadForecaster.refreshLicence();
8788

8889
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
8990
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
@@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception {
131132
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
132133

133134
setHasValidLicense(false);
135+
writeLoadForecaster.refreshLicence();
134136

135137
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
136138
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));

x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import org.elasticsearch.core.SuppressForbidden;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.index.Index;
22+
import org.elasticsearch.logging.LogManager;
23+
import org.elasticsearch.logging.Logger;
2224
import org.elasticsearch.threadpool.ThreadPool;
2325

26+
import java.lang.invoke.MethodHandles;
27+
import java.lang.invoke.VarHandle;
2428
import java.util.List;
2529
import java.util.Objects;
2630
import java.util.OptionalDouble;
@@ -30,30 +34,37 @@
3034
import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING;
3135

3236
class LicensedWriteLoadForecaster implements WriteLoadForecaster {
37+
38+
private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class);
39+
3340
public static final Setting<TimeValue> MAX_INDEX_AGE_SETTING = Setting.timeSetting(
3441
"write_load_forecaster.max_index_age",
3542
TimeValue.timeValueDays(7),
3643
TimeValue.timeValueHours(1),
3744
Setting.Property.NodeScope,
3845
Setting.Property.Dynamic
3946
);
40-
private final BooleanSupplier hasValidLicense;
47+
48+
private final BooleanSupplier hasValidLicenceSupplier;
4149
private final ThreadPool threadPool;
4250
private volatile TimeValue maxIndexAge;
4351

52+
@SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENCE_FIELD
53+
private volatile boolean hasValidLicence;
54+
4455
LicensedWriteLoadForecaster(
45-
BooleanSupplier hasValidLicense,
56+
BooleanSupplier hasValidLicenceSupplier,
4657
ThreadPool threadPool,
4758
Settings settings,
4859
ClusterSettings clusterSettings
4960
) {
50-
this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
61+
this(hasValidLicenceSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
5162
clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
5263
}
5364

5465
// exposed for tests only
55-
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) {
56-
this.hasValidLicense = hasValidLicense;
66+
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenceSupplier, ThreadPool threadPool, TimeValue maxIndexAge) {
67+
this.hasValidLicenceSupplier = hasValidLicenceSupplier;
5768
this.threadPool = threadPool;
5869
this.maxIndexAge = maxIndexAge;
5970
}
@@ -64,7 +75,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) {
6475

6576
@Override
6677
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
67-
if (hasValidLicense.getAsBoolean() == false) {
78+
if (hasValidLicence == false) {
6879
return metadata;
6980
}
7081

@@ -143,7 +154,7 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
143154
@Override
144155
@SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used")
145156
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
146-
if (hasValidLicense.getAsBoolean() == false) {
157+
if (hasValidLicence == false) {
147158
return OptionalDouble.empty();
148159
}
149160

@@ -154,4 +165,25 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
154165

155166
return indexMetadata.getForecastedWriteLoad();
156167
}
168+
169+
private static final VarHandle VH_HAS_VALID_LICENCE_FIELD;
170+
171+
static {
172+
try {
173+
VH_HAS_VALID_LICENCE_FIELD = MethodHandles.lookup()
174+
.in(LicensedWriteLoadForecaster.class)
175+
.findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicence", boolean.class);
176+
} catch (NoSuchFieldException | IllegalAccessException e) {
177+
throw new RuntimeException(e);
178+
}
179+
}
180+
181+
@Override
182+
public void refreshLicence() {
183+
final var newValue = hasValidLicenceSupplier.getAsBoolean();
184+
final var oldValue = (boolean) VH_HAS_VALID_LICENCE_FIELD.getAndSet(this, newValue);
185+
if (newValue != oldValue) {
186+
logger.info("licence state changed, now [{}]", newValue ? "valid" : "not valid");
187+
}
188+
}
157189
}

x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.writeloadforecaster;
99

10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.core.LogEvent;
1012
import org.elasticsearch.cluster.metadata.DataStream;
1113
import org.elasticsearch.cluster.metadata.IndexMetadata;
1214
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.index.IndexMode;
2022
import org.elasticsearch.index.IndexVersion;
2123
import org.elasticsearch.test.ESTestCase;
24+
import org.elasticsearch.test.MockLog;
2225
import org.elasticsearch.threadpool.TestThreadPool;
2326
import org.elasticsearch.threadpool.ThreadPool;
2427
import org.junit.After;
@@ -30,9 +33,12 @@
3033
import java.util.OptionalDouble;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3337

3438
import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
3539
import static org.hamcrest.Matchers.closeTo;
40+
import static org.hamcrest.Matchers.contains;
41+
import static org.hamcrest.Matchers.empty;
3642
import static org.hamcrest.Matchers.equalTo;
3743
import static org.hamcrest.Matchers.greaterThan;
3844
import static org.hamcrest.Matchers.is;
@@ -53,7 +59,13 @@ public void tearDownThreadPool() {
5359
public void testWriteLoadForecastIsAddedToWriteIndex() {
5460
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
5561
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
56-
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
62+
final AtomicInteger licenceCheckCount = new AtomicInteger();
63+
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> {
64+
licenceCheckCount.incrementAndGet();
65+
return hasValidLicense.get();
66+
}, threadPool, maxIndexAge);
67+
68+
writeLoadForecaster.refreshLicence();
5769

5870
final Metadata.Builder metadataBuilder = Metadata.builder();
5971
final String dataStreamName = "logs-es";
@@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() {
95107
assertThat(forecastedWriteLoad.isPresent(), is(true));
96108
assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));
97109

110+
assertThat(licenceCheckCount.get(), equalTo(1));
98111
hasValidLicense.set(false);
99112

113+
writeLoadForecaster.refreshLicence();
114+
assertThat(licenceCheckCount.get(), equalTo(2));
115+
100116
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
101117
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
102118
}
@@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() {
136152
metadataBuilder.put(dataStream);
137153

138154
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
155+
writeLoadForecaster.refreshLicence();
139156

140157
final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
141158
dataStream.getName(),
@@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
154171
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
155172
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
156173
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
174+
writeLoadForecaster.refreshLicence();
157175

158176
final Metadata.Builder metadataBuilder = Metadata.builder();
159177
final String dataStreamName = "logs-es";
@@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
197215
assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6)));
198216

199217
hasValidLicense.set(false);
218+
writeLoadForecaster.refreshLicence();
200219

201220
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
202221
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
@@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List<Index> backingIndices) {
327346
.setIndexMode(IndexMode.STANDARD)
328347
.build();
329348
}
349+
350+
public void testLicenseStateLogging() {
351+
352+
final var seenMessages = new ArrayList<String>();
353+
354+
final var collectingLoggingAssertion = new MockLog.SeenEventExpectation(
355+
"seen event",
356+
LicensedWriteLoadForecaster.class.getCanonicalName(),
357+
Level.INFO,
358+
"*"
359+
) {
360+
@Override
361+
public boolean innerMatch(LogEvent event) {
362+
final var message = event.getMessage().getFormattedMessage();
363+
if (message.startsWith("license state changed, now [")) {
364+
seenMessages.add(message);
365+
return true;
366+
}
367+
368+
return false;
369+
}
370+
};
371+
372+
MockLog.assertThatLogger(() -> {
373+
final var hasValidLicence = new AtomicBoolean();
374+
final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicence::get, threadPool, randomTimeValue());
375+
assertThat(seenMessages, empty());
376+
writeLoadForecaster.refreshLicence();
377+
assertThat(seenMessages, empty());
378+
379+
hasValidLicence.set(true);
380+
writeLoadForecaster.refreshLicence();
381+
assertThat(seenMessages, contains("license state changed, now [valid]"));
382+
writeLoadForecaster.refreshLicence();
383+
assertThat(seenMessages, contains("license state changed, now [valid]"));
384+
385+
hasValidLicence.set(false);
386+
writeLoadForecaster.refreshLicence();
387+
assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]"));
388+
389+
hasValidLicence.set(true);
390+
ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicence());
391+
assertThat(
392+
seenMessages,
393+
contains(
394+
"license state changed, now [valid]",
395+
"license state changed, now [not valid]",
396+
"license state changed, now [valid]"
397+
)
398+
);
399+
}, LicensedWriteLoadForecaster.class, collectingLoggingAssertion);
400+
}
330401
}

0 commit comments

Comments
 (0)