Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123346.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123346
summary: Reduce licence checks in `LicensedWriteLoadForecaster`
area: CRUD
type: bug
issues:
- 123247
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected void masterOperation(
return;
}
var clusterInfo = clusterInfoService.getClusterInfo();
writeLoadForecaster.refreshLicence();
listener.onResponse(
new DesiredBalanceResponse(
desiredBalanceShardsAllocator.getStats(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent(
);
}

writeLoadForecaster.refreshLicence();
metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
ClusterInfo clusterInfo,
@Nullable DesiredBalance desiredBalance
) {
writeLoadForecaster.refreshLicence();
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface WriteLoadForecaster {

OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);

void refreshLicence();

class DefaultWriteLoadForecaster implements WriteLoadForecaster {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
Expand All @@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return OptionalDouble.empty();
}

@Override
public void refreshLicence() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public BalancedShardsAllocator(ClusterSettings clusterSettings, WriteLoadForecas

@Override
public void allocate(RoutingAllocation allocation) {
writeLoadForecaster.refreshLicence();

assert allocation.ignoreDisable() == false;

if (allocation.routingNodes().size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return indexMetadata.getForecastedWriteLoad();
}

@Override
public void refreshLicence() {}
};

public static MockAllocationService createAllocationService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);

setHasValidLicense(false);
writeLoadForecaster.refreshLicence();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
Expand Down Expand Up @@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception {
assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata);

setHasValidLicense(false);
writeLoadForecaster.refreshLicence();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.Objects;
import java.util.OptionalDouble;
Expand All @@ -30,30 +34,37 @@
import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING;

class LicensedWriteLoadForecaster implements WriteLoadForecaster {

private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class);

public static final Setting<TimeValue> MAX_INDEX_AGE_SETTING = Setting.timeSetting(
"write_load_forecaster.max_index_age",
TimeValue.timeValueDays(7),
TimeValue.timeValueHours(1),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
private final BooleanSupplier hasValidLicense;

private final BooleanSupplier hasValidLicenceSupplier;
private final ThreadPool threadPool;
private volatile TimeValue maxIndexAge;

@SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENCE_FIELD
private volatile boolean hasValidLicence;

LicensedWriteLoadForecaster(
BooleanSupplier hasValidLicense,
BooleanSupplier hasValidLicenceSupplier,
ThreadPool threadPool,
Settings settings,
ClusterSettings clusterSettings
) {
this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
this(hasValidLicenceSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
}

// exposed for tests only
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) {
this.hasValidLicense = hasValidLicense;
LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenceSupplier, ThreadPool threadPool, TimeValue maxIndexAge) {
this.hasValidLicenceSupplier = hasValidLicenceSupplier;
this.threadPool = threadPool;
this.maxIndexAge = maxIndexAge;
}
Expand All @@ -64,7 +75,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) {

@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
if (hasValidLicense.getAsBoolean() == false) {
if (hasValidLicence == false) {
return metadata;
}

Expand Down Expand Up @@ -143,7 +154,7 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
@Override
@SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used")
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
if (hasValidLicense.getAsBoolean() == false) {
if (hasValidLicence == false) {
return OptionalDouble.empty();
}

Expand All @@ -154,4 +165,25 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {

return indexMetadata.getForecastedWriteLoad();
}

private static final VarHandle VH_HAS_VALID_LICENCE_FIELD;

static {
try {
VH_HAS_VALID_LICENCE_FIELD = MethodHandles.lookup()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to use a VarHandle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're checking the license state in a hot loop so I didn't want to introduce another layer of indirection on the read path, but I wanted an atomic getAndSet on the write path for the sake of getting the logging right. I'll add a comment.

.in(LicensedWriteLoadForecaster.class)
.findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicence", boolean.class);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@Override
public void refreshLicence() {
final var newValue = hasValidLicenceSupplier.getAsBoolean();
final var oldValue = (boolean) VH_HAS_VALID_LICENCE_FIELD.getAndSet(this, newValue);
if (newValue != oldValue) {
logger.info("licence state changed, now [{}]", newValue ? "valid" : "not valid");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.writeloadforecaster;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
Expand All @@ -30,9 +33,12 @@
import java.util.OptionalDouble;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand All @@ -53,7 +59,13 @@ public void tearDownThreadPool() {
public void testWriteLoadForecastIsAddedToWriteIndex() {
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
final AtomicInteger licenceCheckCount = new AtomicInteger();
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> {
licenceCheckCount.incrementAndGet();
return hasValidLicense.get();
}, threadPool, maxIndexAge);

writeLoadForecaster.refreshLicence();

final Metadata.Builder metadataBuilder = Metadata.builder();
final String dataStreamName = "logs-es";
Expand Down Expand Up @@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() {
assertThat(forecastedWriteLoad.isPresent(), is(true));
assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));

assertThat(licenceCheckCount.get(), equalTo(1));
hasValidLicense.set(false);

writeLoadForecaster.refreshLicence();
assertThat(licenceCheckCount.get(), equalTo(2));

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
}
Expand Down Expand Up @@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() {
metadataBuilder.put(dataStream);

final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
writeLoadForecaster.refreshLicence();

final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
dataStream.getName(),
Expand All @@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge);
writeLoadForecaster.refreshLicence();

final Metadata.Builder metadataBuilder = Metadata.builder();
final String dataStreamName = "logs-es";
Expand Down Expand Up @@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() {
assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6)));

hasValidLicense.set(false);
writeLoadForecaster.refreshLicence();

final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false));
Expand Down Expand Up @@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List<Index> backingIndices) {
.setIndexMode(IndexMode.STANDARD)
.build();
}

public void testLicenseStateLogging() {

final var seenMessages = new ArrayList<String>();

final var collectingLoggingAssertion = new MockLog.SeenEventExpectation(
"seen event",
LicensedWriteLoadForecaster.class.getCanonicalName(),
Level.INFO,
"*"
) {
@Override
public boolean innerMatch(LogEvent event) {
final var message = event.getMessage().getFormattedMessage();
if (message.startsWith("license state changed, now [")) {
seenMessages.add(message);
return true;
}

return false;
}
};

MockLog.assertThatLogger(() -> {
final var hasValidLicence = new AtomicBoolean();
final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicence::get, threadPool, randomTimeValue());
assertThat(seenMessages, empty());
writeLoadForecaster.refreshLicence();
assertThat(seenMessages, empty());

hasValidLicence.set(true);
writeLoadForecaster.refreshLicence();
assertThat(seenMessages, contains("license state changed, now [valid]"));
writeLoadForecaster.refreshLicence();
assertThat(seenMessages, contains("license state changed, now [valid]"));

hasValidLicence.set(false);
writeLoadForecaster.refreshLicence();
assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]"));

hasValidLicence.set(true);
ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicence());
assertThat(
seenMessages,
contains(
"license state changed, now [valid]",
"license state changed, now [not valid]",
"license state changed, now [valid]"
)
);
}, LicensedWriteLoadForecaster.class, collectingLoggingAssertion);
}
}