diff --git a/docs/changelog/123346.yaml b/docs/changelog/123346.yaml new file mode 100644 index 0000000000000..42c6fbf6931ad --- /dev/null +++ b/docs/changelog/123346.yaml @@ -0,0 +1,6 @@ +pr: 123346 +summary: Reduce license checks in `LicensedWriteLoadForecaster` +area: CRUD +type: bug +issues: + - 123247 diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java index 454fdad7cccc0..d01fd702f79be 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceAction.java @@ -92,6 +92,7 @@ protected void masterOperation( return; } var clusterInfo = clusterInfoService.getClusterInfo(); + writeLoadForecaster.refreshLicense(); listener.onResponse( new DesiredBalanceResponse( desiredBalanceShardsAllocator.getStats(), diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 0c22a17bb1f6b..63a5a792db3c9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -430,6 +430,7 @@ yield new DataStreamAutoShardingEvent( ); } + writeLoadForecaster.refreshLicense(); metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder); metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java index fa4d60c83e5ef..f64a63332a371 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java @@ -44,6 +44,8 @@ public AllocationStatsService( public Map stats() { assert Transports.assertNotTransportThread("too expensive for a transport worker"); + writeLoadForecaster.refreshLicense(); + var state = clusterService.state(); var info = clusterInfoService.getClusterInfo(); var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java index e7ca51eee815e..7bebedd9fdde4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java @@ -21,6 +21,8 @@ public interface WriteLoadForecaster { OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata); + void refreshLicense(); + class DefaultWriteLoadForecaster implements WriteLoadForecaster { @Override public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) { @@ -31,5 +33,8 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { return OptionalDouble.empty(); } + + @Override + public void refreshLicense() {} } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 27087992f9d2b..e58b1ecb73372 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -170,6 +170,11 @@ private static float ensureValidThreshold(float threshold) { @Override public void allocate(RoutingAllocation allocation) { + if (allocation.metadata().indices().isEmpty() == false) { + // must not use licensed features when just starting up + writeLoadForecaster.refreshLicense(); + } + assert allocation.ignoreDisable() == false; if (allocation.routingNodes().size() == 0) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index c8d66f389dab1..f3db9cb50313c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -86,6 +86,9 @@ public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { return indexMetadata.getForecastedWriteLoad(); } + + @Override + public void refreshLicense() {} }; public static MockAllocationService createAllocationService() { diff --git a/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java b/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java index b37b026b853e2..5c174d1bddef2 100644 --- a/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java +++ b/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java @@ -84,6 +84,7 @@ public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata); setHasValidLicense(false); + writeLoadForecaster.refreshLicense(); final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false))); @@ -131,6 +132,7 @@ public void testWriteLoadForecastIsOverriddenBySetting() throws Exception { assertAllPreviousForecastsAreClearedAfterRollover(dataStream, metadata); setHasValidLicense(false); + writeLoadForecaster.refreshLicense(); final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false))); diff --git a/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java index d4a85ce859b2b..45c5abdc61fd6 100644 --- a/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java +++ b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java @@ -19,8 +19,12 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.List; import java.util.Objects; import java.util.OptionalDouble; @@ -30,6 +34,9 @@ import static org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING; class LicensedWriteLoadForecaster implements WriteLoadForecaster { + + private static final Logger logger = LogManager.getLogger(LicensedWriteLoadForecaster.class); + public static final Setting MAX_INDEX_AGE_SETTING = Setting.timeSetting( "write_load_forecaster.max_index_age", TimeValue.timeValueDays(7), @@ -37,23 +44,26 @@ class LicensedWriteLoadForecaster implements WriteLoadForecaster { Setting.Property.NodeScope, Setting.Property.Dynamic ); - private final BooleanSupplier hasValidLicense; + private final BooleanSupplier hasValidLicenseSupplier; private final ThreadPool threadPool; private volatile TimeValue maxIndexAge; + @SuppressWarnings("unused") // modified via VH_HAS_VALID_LICENSE_FIELD + private volatile boolean hasValidLicense; + LicensedWriteLoadForecaster( - BooleanSupplier hasValidLicense, + BooleanSupplier hasValidLicenseSupplier, ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings ) { - this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings)); + this(hasValidLicenseSupplier, threadPool, MAX_INDEX_AGE_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting); } // exposed for tests only - LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) { - this.hasValidLicense = hasValidLicense; + LicensedWriteLoadForecaster(BooleanSupplier hasValidLicenseSupplier, ThreadPool threadPool, TimeValue maxIndexAge) { + this.hasValidLicenseSupplier = hasValidLicenseSupplier; this.threadPool = threadPool; this.maxIndexAge = maxIndexAge; } @@ -64,7 +74,7 @@ private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) { @Override public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) { - if (hasValidLicense.getAsBoolean() == false) { + if (hasValidLicense == false) { return metadata; } @@ -143,7 +153,7 @@ static OptionalDouble forecastIndexWriteLoad(List indicesWriteLo @Override @SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used") public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { - if (hasValidLicense.getAsBoolean() == false) { + if (hasValidLicense == false) { return OptionalDouble.empty(); } @@ -154,4 +164,29 @@ public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { return indexMetadata.getForecastedWriteLoad(); } + + /** + * Used to atomically {@code getAndSet()} the {@link #hasValidLicense} field. This is better than an + * {@link java.util.concurrent.atomic.AtomicBoolean} because it takes one less pointer dereference on each read. + */ + private static final VarHandle VH_HAS_VALID_LICENSE_FIELD; + + static { + try { + VH_HAS_VALID_LICENSE_FIELD = MethodHandles.lookup() + .in(LicensedWriteLoadForecaster.class) + .findVarHandle(LicensedWriteLoadForecaster.class, "hasValidLicense", boolean.class); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public void refreshLicense() { + final var newValue = hasValidLicenseSupplier.getAsBoolean(); + final var oldValue = (boolean) VH_HAS_VALID_LICENSE_FIELD.getAndSet(this, newValue); + if (newValue != oldValue) { + logger.info("license state changed, now [{}]", newValue ? "valid" : "not valid"); + } + } } diff --git a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java index 790af0a201578..162e84b2562c5 100644 --- a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java +++ b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.writeloadforecaster; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; @@ -19,6 +21,7 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -30,9 +33,12 @@ import java.util.OptionalDouble; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad; import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -53,7 +59,13 @@ public void tearDownThreadPool() { public void testWriteLoadForecastIsAddedToWriteIndex() { final TimeValue maxIndexAge = TimeValue.timeValueDays(7); final AtomicBoolean hasValidLicense = new AtomicBoolean(true); - final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge); + final AtomicInteger licenseCheckCount = new AtomicInteger(); + final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> { + licenseCheckCount.incrementAndGet(); + return hasValidLicense.get(); + }, threadPool, maxIndexAge); + + writeLoadForecaster.refreshLicense(); final Metadata.Builder metadataBuilder = Metadata.builder(); final String dataStreamName = "logs-es"; @@ -95,8 +107,12 @@ public void testWriteLoadForecastIsAddedToWriteIndex() { assertThat(forecastedWriteLoad.isPresent(), is(true)); assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0))); + assertThat(licenseCheckCount.get(), equalTo(1)); hasValidLicense.set(false); + writeLoadForecaster.refreshLicense(); + assertThat(licenseCheckCount.get(), equalTo(2)); + final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex); assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false)); } @@ -136,6 +152,7 @@ public void testUptimeIsUsedToWeightWriteLoad() { metadataBuilder.put(dataStream); final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge); + writeLoadForecaster.refreshLicense(); final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( dataStream.getName(), @@ -154,6 +171,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() { final TimeValue maxIndexAge = TimeValue.timeValueDays(7); final AtomicBoolean hasValidLicense = new AtomicBoolean(true); final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge); + writeLoadForecaster.refreshLicense(); final Metadata.Builder metadataBuilder = Metadata.builder(); final String dataStreamName = "logs-es"; @@ -197,6 +215,7 @@ public void testForecastedWriteLoadIsOverriddenBySetting() { assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6))); hasValidLicense.set(false); + writeLoadForecaster.refreshLicense(); final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex); assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false)); @@ -327,4 +346,56 @@ private DataStream createDataStream(String name, List backingIndices) { .setIndexMode(IndexMode.STANDARD) .build(); } + + public void testLicenseStateLogging() { + + final var seenMessages = new ArrayList(); + + final var collectingLoggingAssertion = new MockLog.SeenEventExpectation( + "seen event", + LicensedWriteLoadForecaster.class.getCanonicalName(), + Level.INFO, + "*" + ) { + @Override + public boolean innerMatch(LogEvent event) { + final var message = event.getMessage().getFormattedMessage(); + if (message.startsWith("license state changed, now [")) { + seenMessages.add(message); + return true; + } + + return false; + } + }; + + MockLog.assertThatLogger(() -> { + final var hasValidLicense = new AtomicBoolean(); + final var writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, randomTimeValue()); + assertThat(seenMessages, empty()); + writeLoadForecaster.refreshLicense(); + assertThat(seenMessages, empty()); + + hasValidLicense.set(true); + writeLoadForecaster.refreshLicense(); + assertThat(seenMessages, contains("license state changed, now [valid]")); + writeLoadForecaster.refreshLicense(); + assertThat(seenMessages, contains("license state changed, now [valid]")); + + hasValidLicense.set(false); + writeLoadForecaster.refreshLicense(); + assertThat(seenMessages, contains("license state changed, now [valid]", "license state changed, now [not valid]")); + + hasValidLicense.set(true); + ESTestCase.startInParallel(between(1, 10), ignored -> writeLoadForecaster.refreshLicense()); + assertThat( + seenMessages, + contains( + "license state changed, now [valid]", + "license state changed, now [not valid]", + "license state changed, now [valid]" + ) + ); + }, LicensedWriteLoadForecaster.class, collectingLoggingAssertion); + } }