Skip to content

Commit cefe696

Browse files
committed
Reduce licence checks in LicensedWriteLoadForecaster
Rather than checking the license (updating the usage map) on every single shard, just do it once at the start of a computation that needs to forecast write loads. Backport of elastic#123346 to 8.x Closes elastic#123247
1 parent 58a92f4 commit cefe696

File tree

10 files changed

+139
-8
lines changed

10 files changed

+139
-8
lines changed

docs/changelog/123346.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123346
2+
summary: Reduce license checks in `LicensedWriteLoadForecaster`
3+
area: CRUD
4+
type: bug
5+
issues:
6+
- 123247

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ protected void masterOperation(
9292
return;
9393
}
9494
var clusterInfo = clusterInfoService.getClusterInfo();
95+
writeLoadForecaster.refreshLicense();
9596
listener.onResponse(
9697
new DesiredBalanceResponse(
9798
desiredBalanceShardsAllocator.getStats(),

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent(
430430
);
431431
}
432432

433+
writeLoadForecaster.refreshLicense();
433434
metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
434435
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
435436

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public AllocationStatsService(
4444
public Map<String, NodeAllocationStats> stats() {
4545
assert Transports.assertNotTransportThread("too expensive for a transport worker");
4646

47+
writeLoadForecaster.refreshLicense();
48+
4749
var state = clusterService.state();
4850
var info = clusterInfoService.getClusterInfo();
4951
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface WriteLoadForecaster {
2121

2222
OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);
2323

24+
void refreshLicense();
25+
2426
class DefaultWriteLoadForecaster implements WriteLoadForecaster {
2527
@Override
2628
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
@@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
3133
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
3234
return OptionalDouble.empty();
3335
}
36+
37+
@Override
38+
public void refreshLicense() {}
3439
}
3540
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ private static float ensureValidThreshold(float threshold) {
170170

171171
@Override
172172
public void allocate(RoutingAllocation allocation) {
173+
if (allocation.metadata().indices().isEmpty() == false) {
174+
// must not use licensed features when just starting up
175+
writeLoadForecaster.refreshLicense();
176+
}
177+
173178
assert allocation.ignoreDisable() == false;
174179

175180
if (allocation.routingNodes().size() == 0) {

test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
8686
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
8787
return indexMetadata.getForecastedWriteLoad();
8888
}
89+
90+
@Override
91+
public void refreshLicense() {}
8992
};
9093

9194
public static MockAllocationService createAllocationService() {

x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception
8484
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
8585

8686
setHasValidLicense(false);
87+
writeLoadForecaster.refreshLicense();
8788

8889
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
8990
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
@@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception {
131132
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);
132133

133134
setHasValidLicense(false);
135+
writeLoadForecaster.refreshLicense();
134136

135137
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
136138
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));

x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import org.elasticsearch.core.SuppressForbidden;
2020
import org.elasticsearch.core.TimeValue;
2121
import org.elasticsearch.index.Index;
22+
import org.elasticsearch.logging.LogManager;
23+
import org.elasticsearch.logging.Logger;
2224
import org.elasticsearch.threadpool.ThreadPool;
2325

26+
import java.lang.invoke.MethodHandles;
27+
import java.lang.invoke.VarHandle;
2428
import java.util.List;
2529
import java.util.Objects;
2630
import java.util.OptionalDouble;
@@ -30,30 +34,36 @@
3034
import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING;
3135

3236
class LicensedWriteLoadForecaster implements WriteLoadForecaster {
37+
38+
private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class);
39+
3340
public static final Setting<TimeValue> MAX_INDEX_AGE_SETTING = Setting.timeSetting(
3441
"write_load_forecaster.max_index_age",
3542
TimeValue.timeValueDays(7),
3643
TimeValue.timeValueHours(1),
3744
Setting.Property.NodeScope,
3845
Setting.Property.Dynamic
3946
);
40-
private final BooleanSupplier hasValidLicense;
47+
private final BooleanSupplier hasValidLicenseSupplier;
4148
private final ThreadPool threadPool;
4249
private volatile TimeValue maxIndexAge;
4350

51+
@SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENSE_FIELD
52+
private volatile boolean hasValidLicense;
53+
4454
LicensedWriteLoadForecaster(
45-
BooleanSupplier hasValidLicense,
55+
BooleanSupplier hasValidLicenseSupplier,
4656
ThreadPool threadPool,
4757
Settings settings,
4858
ClusterSettings clusterSettings
4959
) {
50-
this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
60+
this(hasValidLicenseSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
5161
clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
5262
}
5363

5464
// exposed for tests only
55-
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) {
56-
this.hasValidLicense = hasValidLicense;
65+
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenseSupplier, ThreadPool threadPool, TimeValue maxIndexAge) {
66+
this.hasValidLicenseSupplier = hasValidLicenseSupplier;
5767
this.threadPool = threadPool;
5868
this.maxIndexAge = maxIndexAge;
5969
}
@@ -64,7 +74,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) {
6474

6575
@Override
6676
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
67-
if (hasValidLicense.getAsBoolean() == false) {
77+
if (hasValidLicense == false) {
6878
return metadata;
6979
}
7080

@@ -143,7 +153,7 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
143153
@Override
144154
@SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used")
145155
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
146-
if (hasValidLicense.getAsBoolean() == false) {
156+
if (hasValidLicense == false) {
147157
return OptionalDouble.empty();
148158
}
149159

@@ -154,4 +164,29 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
154164

155165
return indexMetadata.getForecastedWriteLoad();
156166
}
167+
168+
/**
169+
* Used to atomically {@code getAndSet()} the {@link #hasValidLicense} field. This is better than an
170+
* {@link java.util.concurrent.atomic.AtomicBoolean} because it takes one less pointer dereference on each read.
171+
*/
172+
private static final VarHandle VH_HAS_VALID_LICENSE_FIELD;
173+
174+
static {
175+
try {
176+
VH_HAS_VALID_LICENSE_FIELD = MethodHandles.lookup()
177+
.in(LicensedWriteLoadForecaster.class)
178+
.findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicense", boolean.class);
179+
} catch (NoSuchFieldException | IllegalAccessException e) {
180+
throw new RuntimeException(e);
181+
}
182+
}
183+
184+
@Override
185+
public void refreshLicense() {
186+
final var newValue = hasValidLicenseSupplier.getAsBoolean();
187+
final var oldValue = (boolean) VH_HAS_VALID_LICENSE_FIELD.getAndSet(this, newValue);
188+
if (newValue != oldValue) {
189+
logger.info("license state changed, now [{}]", newValue ? "valid" : "not valid");
190+
}
191+
}
157192
}

x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.writeloadforecaster;
99

10+
import org.apache.logging.log4j.Level;
11+
import org.apache.logging.log4j.core.LogEvent;
1012
import org.elasticsearch.cluster.metadata.DataStream;
1113
import org.elasticsearch.cluster.metadata.IndexMetadata;
1214
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
@@ -19,6 +21,7 @@
1921
import org.elasticsearch.index.IndexMode;
2022
import org.elasticsearch.index.IndexVersion;
2123
import org.elasticsearch.test.ESTestCase;
24+
import org.elasticsearch.test.MockLog;
2225
import org.elasticsearch.threadpool.TestThreadPool;
2326
import org.elasticsearch.threadpool.ThreadPool;
2427
import org.junit.After;
@@ -30,9 +33,12 @@
3033
import java.util.OptionalDouble;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3337

3438
import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
3539
import static org.hamcrest.Matchers.closeTo;
40+
import static org.hamcrest.Matchers.contains;
41+
import static org.hamcrest.Matchers.empty;
3642
import static org.hamcrest.Matchers.equalTo;
3743
import static org.hamcrest.Matchers.greaterThan;
3844
import static org.hamcrest.Matchers.is;
@@ -53,7 +59,13 @@ public void tearDownThreadPool() {
5359
public void testWriteLoadForecastIsAddedToWriteIndex() {
5460
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
5561
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
56-
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
62+
final AtomicInteger licenseCheckCount = new AtomicInteger();
63+
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> {
64+
licenseCheckCount.incrementAndGet();
65+
return hasValidLicense.get();
66+
}, threadPool, maxIndexAge);
67+
68+
writeLoadForecaster.refreshLicense();
5769

5870
final Metadata.Builder metadataBuilder = Metadata.builder();
5971
final String dataStreamName = "logs-es";
@@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() {
95107
assertThat(forecastedWriteLoad.isPresent(), is(true));
96108
assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));
97109

110+
assertThat(licenseCheckCount.get(), equalTo(1));
98111
hasValidLicense.set(false);
99112

113+
writeLoadForecaster.refreshLicense();
114+
assertThat(licenseCheckCount.get(), equalTo(2));
115+
100116
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
101117
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
102118
}
@@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() {
136152
metadataBuilder.put(dataStream);
137153

138154
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
155+
writeLoadForecaster.refreshLicense();
139156

140157
final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
141158
dataStream.getName(),
@@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
154171
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
155172
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
156173
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
174+
writeLoadForecaster.refreshLicense();
157175

158176
final Metadata.Builder metadataBuilder = Metadata.builder();
159177
final String dataStreamName = "logs-es";
@@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
197215
assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6)));
198216

199217
hasValidLicense.set(false);
218+
writeLoadForecaster.refreshLicense();
200219

201220
final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
202221
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
@@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List<Index> backingIndices) {
327346
.setIndexMode(IndexMode.STANDARD)
328347
.build();
329348
}
349+
350+
public void testLicenseStateLogging() {
351+
352+
final var seenMessages = new ArrayList<String>();
353+
354+
final var collectingLoggingAssertion = new MockLog.SeenEventExpectation(
355+
"seen event",
356+
LicensedWriteLoadForecaster.class.getCanonicalName(),
357+
Level.INFO,
358+
"*"
359+
) {
360+
@Override
361+
public boolean innerMatch(LogEvent event) {
362+
final var message = event.getMessage().getFormattedMessage();
363+
if (message.startsWith("license state changed, now [")) {
364+
seenMessages.add(message);
365+
return true;
366+
}
367+
368+
return false;
369+
}
370+
};
371+
372+
MockLog.assertThatLogger(() -> {
373+
final var hasValidLicense = new AtomicBoolean();
374+
final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, randomTimeValue());
375+
assertThat(seenMessages, empty());
376+
writeLoadForecaster.refreshLicense();
377+
assertThat(seenMessages, empty());
378+
379+
hasValidLicense.set(true);
380+
writeLoadForecaster.refreshLicense();
381+
assertThat(seenMessages, contains("license state changed, now [valid]"));
382+
writeLoadForecaster.refreshLicense();
383+
assertThat(seenMessages, contains("license state changed, now [valid]"));
384+
385+
hasValidLicense.set(false);
386+
writeLoadForecaster.refreshLicense();
387+
assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]"));
388+
389+
hasValidLicense.set(true);
390+
ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicense());
391+
assertThat(
392+
seenMessages,
393+
contains(
394+
"license state changed, now [valid]",
395+
"license state changed, now [not valid]",
396+
"license state changed, now [valid]"
397+
)
398+
);
399+
}, LicensedWriteLoadForecaster.class, collectingLoggingAssertion);
400+
}
330401
}

0 commit comments

Comments
 (0)