Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123346.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123346
summary: Reduce license checks in `LicensedWriteLoadForecaster`
area: CRUD
type: bug
issues:
- 123247
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected void masterOperation(
return;
}
var clusterInfo = clusterInfoService.getClusterInfo();
writeLoadForecaster.refreshLicense();
listener.onResponse(
new DesiredBalanceResponse(
desiredBalanceShardsAllocator.getStats(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent(
);
}

writeLoadForecaster.refreshLicense();
metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public AllocationStatsService(
public Map<String, NodeAllocationStats> stats() {
assert Transports.assertNotTransportThread("too expensive for a transport worker");

writeLoadForecaster.refreshLicense();

var state = clusterService.state();
var info = clusterInfoService.getClusterInfo();
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface WriteLoadForecaster {

OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);

void refreshLicense();

class DefaultWriteLoadForecaster implements WriteLoadForecaster {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
Expand All @@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return OptionalDouble.empty();
}

@Override
public void refreshLicense() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ private static float ensureValidThreshold(float threshold) {

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.metadata().indices().isEmpty() == false) {
// must not use licensed features when just starting up
writeLoadForecaster.refreshLicense();
}

assert allocation.ignoreDisable() == false;

if (allocation.routingNodes().size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return indexMetadata.getForecastedWriteLoad();
}

@Override
public void refreshLicense() {}
};

public static MockAllocationService createAllocationService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);

setHasValidLicense(false);
writeLoadForecaster.refreshLicense();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
Expand Down Expand Up @@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception {
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);

setHasValidLicense(false);
writeLoadForecaster.refreshLicense();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.Objects;
import java.util.OptionalDouble;
Expand All @@ -30,30 +34,36 @@
import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING;

class LicensedWriteLoadForecaster implements WriteLoadForecaster {

private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class);

public static final Setting<TimeValue> MAX_INDEX_AGE_SETTING = Setting.timeSetting(
"write_load_forecaster.max_index_age",
TimeValue.timeValueDays(7),
TimeValue.timeValueHours(1),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
private final BooleanSupplier hasValidLicense;
private final BooleanSupplier hasValidLicenseSupplier;
private final ThreadPool threadPool;
private volatile TimeValue maxIndexAge;

@SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENSE_FIELD
private volatile boolean hasValidLicense;

LicensedWriteLoadForecaster(
BooleanSupplier hasValidLicense,
BooleanSupplier hasValidLicenseSupplier,
ThreadPool threadPool,
Settings settings,
ClusterSettings clusterSettings
) {
this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
this(hasValidLicenseSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
}

// exposed for tests only
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) {
this.hasValidLicense = hasValidLicense;
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenseSupplier, ThreadPool threadPool, TimeValue maxIndexAge) {
this.hasValidLicenseSupplier = hasValidLicenseSupplier;
this.threadPool = threadPool;
this.maxIndexAge = maxIndexAge;
}
Expand All @@ -64,7 +74,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) {

@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
if (hasValidLicense.getAsBoolean() == false) {
if (hasValidLicense == false) {
return metadata;
}

Expand Down Expand Up @@ -143,7 +153,7 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
@Override
@SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used")
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
if (hasValidLicense.getAsBoolean() == false) {
if (hasValidLicense == false) {
return OptionalDouble.empty();
}

Expand All @@ -154,4 +164,29 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {

return indexMetadata.getForecastedWriteLoad();
}

/**
* Used to atomically {@code getAndSet()} the {@link #hasValidLicense} field. This is better than an
* {@link java.util.concurrent.atomic.AtomicBoolean} because it takes one less pointer dereference on each read.
*/
private static final VarHandle VH_HAS_VALID_LICENSE_FIELD;

static {
try {
VH_HAS_VALID_LICENSE_FIELD = MethodHandles.lookup()
.in(LicensedWriteLoadForecaster.class)
.findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicense", boolean.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@Override
public void refreshLicense() {
final var newValue = hasValidLicenseSupplier.getAsBoolean();
final var oldValue = (boolean) VH_HAS_VALID_LICENSE_FIELD.getAndSet(this, newValue);
if (newValue != oldValue) {
logger.info("license state changed, now [{}]", newValue ? "valid" : "not valid");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.writeloadforecaster;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
Expand All @@ -30,9 +33,12 @@
import java.util.OptionalDouble;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand All @@ -53,7 +59,13 @@ public void tearDownThreadPool() {
public void testWriteLoadForecastIsAddedToWriteIndex() {
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
final AtomicInteger licenseCheckCount = new AtomicInteger();
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> {
licenseCheckCount.incrementAndGet();
return hasValidLicense.get();
}, threadPool, maxIndexAge);

writeLoadForecaster.refreshLicense();

final Metadata.Builder metadataBuilder = Metadata.builder();
final String dataStreamName = "logs-es";
Expand Down Expand Up @@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() {
assertThat(forecastedWriteLoad.isPresent(), is(true));
assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));

assertThat(licenseCheckCount.get(), equalTo(1));
hasValidLicense.set(false);

writeLoadForecaster.refreshLicense();
assertThat(licenseCheckCount.get(), equalTo(2));

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
}
Expand Down Expand Up @@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() {
metadataBuilder.put(dataStream);

final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
writeLoadForecaster.refreshLicense();

final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
dataStream.getName(),
Expand All @@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
writeLoadForecaster.refreshLicense();

final Metadata.Builder metadataBuilder = Metadata.builder();
final String dataStreamName = "logs-es";
Expand Down Expand Up @@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6)));

hasValidLicense.set(false);
writeLoadForecaster.refreshLicense();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
Expand Down Expand Up @@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List<Index> backingIndices) {
.setIndexMode(IndexMode.STANDARD)
.build();
}

public void testLicenseStateLogging() {

final var seenMessages = new ArrayList<String>();

final var collectingLoggingAssertion = new MockLog.SeenEventExpectation(
"seen event",
LicensedWriteLoadForecaster.class.getCanonicalName(),
Level.INFO,
"*"
) {
@Override
public boolean innerMatch(LogEvent event) {
final var message = event.getMessage().getFormattedMessage();
if (message.startsWith("license state changed, now [")) {
seenMessages.add(message);
return true;
}

return false;
}
};

MockLog.assertThatLogger(() -> {
final var hasValidLicense = new AtomicBoolean();
final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, randomTimeValue());
assertThat(seenMessages, empty());
writeLoadForecaster.refreshLicense();
assertThat(seenMessages, empty());

hasValidLicense.set(true);
writeLoadForecaster.refreshLicense();
assertThat(seenMessages, contains("license state changed, now [valid]"));
writeLoadForecaster.refreshLicense();
assertThat(seenMessages, contains("license state changed, now [valid]"));

hasValidLicense.set(false);
writeLoadForecaster.refreshLicense();
assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]"));

hasValidLicense.set(true);
ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicense());
assertThat(
seenMessages,
contains(
"license state changed, now [valid]",
"license state changed, now [not valid]",
"license state changed, now [valid]"
)
);
}, LicensedWriteLoadForecaster.class, collectingLoggingAssertion);
}
}