Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -264,7 +266,68 @@ public boolean atLeastOneSampleConfigured() {

@Override
public void clusterChanged(ClusterChangedEvent event) {
// TODO: React to sampling config changes
if (RANDOM_SAMPLING_FEATURE_FLAG == false) {
return;
}
if (samples.isEmpty()) {
return;
}
// We want to remove any samples if their sampling configuration has been deleted or modified.
if (event.metadataChanged()) {
/*
* First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the
* previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples.
*/
Set<ProjectId> allProjectIds = new HashSet<>(
event.state().metadata().projects().values().stream().map(ProjectMetadata::id).toList()
);
allProjectIds.addAll(event.previousState().metadata().projects().values().stream().map(ProjectMetadata::id).toList());
for (ProjectId projectId : allProjectIds) {
if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
SamplingMetadata oldSamplingConfig = event.previousState().metadata().hasProject(projectId)
? event.previousState().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
SamplingMetadata newSamplingConfig = event.state().metadata().hasProject(projectId)
? event.state().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
Map<String, SamplingConfiguration> newSampleConfigsMap = newSamplingConfig == null
? Map.of()
: newSamplingConfig.getIndexToSamplingConfigMap();
Set<String> currentlyConfiguredIndexNames = newSampleConfigsMap.keySet();
Set<String> previouslyConfiguredIndexNames = oldSamplingConfig == null
? Set.of()
: oldSamplingConfig.getIndexToSamplingConfigMap().keySet();
Set<String> removedIndexNames = new HashSet<>(previouslyConfiguredIndexNames);
removedIndexNames.removeAll(currentlyConfiguredIndexNames);
/*
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
*/
removedIndexNames.forEach(indexName -> {
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
});
Map<String, SamplingConfiguration> oldSampleConfigsMap = oldSamplingConfig == null
? Map.of()
: oldSamplingConfig.getIndexToSamplingConfigMap();
/*
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
* above, we have a race condition here that we can live with.
*/
newSampleConfigsMap.entrySet().forEach(entry -> {
String indexName = entry.getKey();
if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
}
});
}
}
// TODO: If an index has been deleted, we want to remove its sampling configuration
}
}

private boolean evaluateCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -256,6 +258,85 @@ public void testMaybeSampleMaxSize() {
assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2));
}

public void testClusterChanged() {
String indexName = randomIdentifier();
SamplingService samplingService = getTestSamplingService();
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);

// Test that the sample is removed if the new state does not have the project that the sample was configured in:
ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))
.putCustom(
SamplingMetadata.TYPE,
new SamplingMetadata(
Map.of(
indexName,
new SamplingConfiguration(
1.0,
randomIntBetween(1, 1000),
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
TimeValue.timeValueDays(randomIntBetween(1, 10)),
null
)
)
)
)
.build();
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE)
.putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())))
.build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is removed if the sampling metadata is removed from the project:
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is removed if the sampling configuration is changed
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
projectMetadata = ProjectMetadata.builder(projectMetadata.id())
.putCustom(
SamplingMetadata.TYPE,
new SamplingMetadata(
Map.of(
indexName,
new SamplingConfiguration(
1.0,
1001,
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
TimeValue.timeValueDays(randomIntBetween(1, 10)),
null
)
)
)
)
.build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is _not_ removed if the sampling configuration does not change:
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
}

private SamplingService getTestSamplingService() {
final ScriptService scriptService = new ScriptService(
Settings.EMPTY,
Expand Down