diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index d1cf0a1ce0477..31f4c002bb7d7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -42,14 +42,18 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class SamplingService implements ClusterStateListener { public static final boolean RANDOM_SAMPLING_FEATURE_FLAG = new FeatureFlag("random_sampling").isEnabled(); @@ -264,7 +268,69 @@ public boolean atLeastOneSampleConfigured() { @Override public void clusterChanged(ClusterChangedEvent event) { - // TODO: React to sampling config changes + if (RANDOM_SAMPLING_FEATURE_FLAG == false) { + return; + } + if (samples.isEmpty()) { + return; + } + // We want to remove any samples if their sampling configuration has been deleted or modified. + if (event.metadataChanged()) { + /* + * First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the + * previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples. + */ + Set allProjectIds = Stream.concat( + event.state().metadata().projects().values().stream().map(ProjectMetadata::id), + event.previousState().metadata().projects().values().stream().map(ProjectMetadata::id) + ).collect(Collectors.toSet()); + for (ProjectId projectId : allProjectIds) { + if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) { + SamplingMetadata oldSamplingConfig = event.previousState().metadata().hasProject(projectId) + ? event.previousState().projectState(projectId).metadata().custom(SamplingMetadata.TYPE) + : null; + SamplingMetadata newSamplingConfig = event.state().metadata().hasProject(projectId) + ? event.state().projectState(projectId).metadata().custom(SamplingMetadata.TYPE) + : null; + Map newSampleConfigsMap = newSamplingConfig == null + ? Map.of() + : newSamplingConfig.getIndexToSamplingConfigMap(); + Set currentlyConfiguredIndexNames = newSampleConfigsMap.keySet(); + Set previouslyConfiguredIndexNames = oldSamplingConfig == null + ? Set.of() + : oldSamplingConfig.getIndexToSamplingConfigMap().keySet(); + Set removedIndexNames = new HashSet<>(previouslyConfiguredIndexNames); + removedIndexNames.removeAll(currentlyConfiguredIndexNames); + /* + * These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK + * with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still + * exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case + * we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker + * reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion. + */ + for (String indexName : removedIndexNames) { + logger.debug("Removing sample info for {} because its configuration has been removed", indexName); + samples.remove(new ProjectIndex(projectId, indexName)); + } + ; + Map oldSampleConfigsMap = oldSamplingConfig == null + ? Map.of() + : oldSamplingConfig.getIndexToSamplingConfigMap(); + /* + * Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as + * above, we have a race condition here that we can live with. + */ + for (Map.Entry entry : newSampleConfigsMap.entrySet()) { + String indexName = entry.getKey(); + if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) { + logger.debug("Removing sample info for {} because its configuration has changed", indexName); + samples.remove(new ProjectIndex(projectId, indexName)); + } + } + } + } + // TODO: If an index has been deleted, we want to remove its sampling configuration + } } private boolean evaluateCondition( diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java index d15874dcf7942..ad4309740f608 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; @@ -256,6 +258,85 @@ public void testMaybeSampleMaxSize() { assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2)); } + public void testClusterChanged() { + String indexName = randomIdentifier(); + SamplingService samplingService = getTestSamplingService(); + Map inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10))); + final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource); + + // Test that the sample is removed if the new state does not have the project that the sample was configured in: + ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())) + .putCustom( + SamplingMetadata.TYPE, + new SamplingMetadata( + Map.of( + indexName, + new SamplingConfiguration( + 1.0, + randomIntBetween(1, 1000), + ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)), + TimeValue.timeValueDays(randomIntBetween(1, 10)), + null + ) + ) + ) + ) + .build(); + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is removed if the sampling metadata is removed from the project: + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is removed if the sampling configuration is changed + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + projectMetadata = ProjectMetadata.builder(projectMetadata.id()) + .putCustom( + SamplingMetadata.TYPE, + new SamplingMetadata( + Map.of( + indexName, + new SamplingConfiguration( + 1.0, + 1001, + ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)), + TimeValue.timeValueDays(randomIntBetween(1, 10)), + null + ) + ) + ) + ) + .build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is _not_ removed if the sampling configuration does not change: + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + } + private SamplingService getTestSamplingService() { final ScriptService scriptService = new ScriptService( Settings.EMPTY,