Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -264,7 +266,68 @@ public boolean atLeastOneSampleConfigured() {

@Override
public void clusterChanged(ClusterChangedEvent event) {
// TODO: React to sampling config changes
if (RANDOM_SAMPLING_FEATURE_FLAG == false) {
return;
}
if (samples.isEmpty()) {
return;
}
// We want to remove any samples if their sampling configuration has been deleted or modified.
if (event.metadataChanged()) {
/*
* First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the
* previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples.
*/
Set<ProjectId> allProjectIds = new HashSet<>(
event.state().metadata().projects().values().stream().map(ProjectMetadata::id).toList()
);
allProjectIds.addAll(event.previousState().metadata().projects().values().stream().map(ProjectMetadata::id).toList());
for (ProjectId projectId : allProjectIds) {
if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're retrieving the SamplingMetadata from both projects below, it doesn't really make sense to do this customMetadataChanged here, as that gets the SamplingMetadata from both projects as well. We might as well just get the two customs below and check their equality here. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't really save us anything other than a two hash map lookups will it? And it's possible that someone could optimize customMetadataChanged in the future and then we'd miss out on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd save the project lookup and the customs lookup, both twice. I agree that that's not much. I'll leave it up to you 👍

SamplingMetadata oldSamplingConfig = event.previousState().metadata().hasProject(projectId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename this to oldSamplingMetadata and newSamplingMetadata? Otherwise it's easy to confuse this with the actual sampling configs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch.

? event.previousState().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
SamplingMetadata newSamplingConfig = event.state().metadata().hasProject(projectId)
? event.state().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
Map<String, SamplingConfiguration> newSampleConfigsMap = newSamplingConfig == null
? Map.of()
: newSamplingConfig.getIndexToSamplingConfigMap();
Set<String> currentlyConfiguredIndexNames = newSampleConfigsMap.keySet();
Set<String> previouslyConfiguredIndexNames = oldSamplingConfig == null
? Set.of()
: oldSamplingConfig.getIndexToSamplingConfigMap().keySet();
Set<String> removedIndexNames = new HashSet<>(previouslyConfiguredIndexNames);
removedIndexNames.removeAll(currentlyConfiguredIndexNames);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of an optional code styling suggestion, so feel free to ignore if you prefer your current implementation.

Suggested change
SamplingMetadata oldSamplingConfig = event.previousState().metadata().hasProject(projectId)
? event.previousState().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
SamplingMetadata newSamplingConfig = event.state().metadata().hasProject(projectId)
? event.state().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
: null;
Map<String, SamplingConfiguration> newSampleConfigsMap = newSamplingConfig == null
? Map.of()
: newSamplingConfig.getIndexToSamplingConfigMap();
Set<String> currentlyConfiguredIndexNames = newSampleConfigsMap.keySet();
Set<String> previouslyConfiguredIndexNames = oldSamplingConfig == null
? Set.of()
: oldSamplingConfig.getIndexToSamplingConfigMap().keySet();
Set<String> removedIndexNames = new HashSet<>(previouslyConfiguredIndexNames);
removedIndexNames.removeAll(currentlyConfiguredIndexNames);
Map<String, SamplingConfiguration> oldSampleConfigsMap = Optional.ofNullable(event.previousState().metadata().getProject(projectId))
.map(p -> p.custom(SamplingMetadata.TYPE))
.map(SamplingMetadata::getIndexToSamplingConfigMap)
.orElse(Map.of());
Map<String, SamplingConfiguration> newSampleConfigsMap = Optional.ofNullable(event.state().metadata().getProject(projectId))
.map(p -> p.custom(SamplingMetadata.TYPE))
.map(SamplingMetadata::getIndexToSamplingConfigMap)
.orElse(Map.of());
Set<String> removedIndexNames = new HashSet<>(oldSampleConfigsMap.keySet());
removedIndexNames.removeAll(newSampleConfigsMap.keySet());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that too. Unfortunately though getProject() throws an exception rather than returning null if the project doesn't exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh but using projects().get(projectId) works fine. I'll switch to that.

/*
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
*/
removedIndexNames.forEach(indexName -> {
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
});
Map<String, SamplingConfiguration> oldSampleConfigsMap = oldSamplingConfig == null
? Map.of()
: oldSamplingConfig.getIndexToSamplingConfigMap();
/*
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
* above, we have a race condition here that we can live with.
*/
newSampleConfigsMap.entrySet().forEach(entry -> {
String indexName = entry.getKey();
if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
samples.remove(new ProjectIndex(projectId, indexName));
}
});
}
}
// TODO: If an index has been deleted, we want to remove its sampling configuration
}
}

private boolean evaluateCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -256,6 +258,85 @@ public void testMaybeSampleMaxSize() {
assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2));
}

public void testClusterChanged() {
String indexName = randomIdentifier();
SamplingService samplingService = getTestSamplingService();
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);

// Test that the sample is removed if the new state does not have the project that the sample was configured in:
ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))
.putCustom(
SamplingMetadata.TYPE,
new SamplingMetadata(
Map.of(
indexName,
new SamplingConfiguration(
1.0,
randomIntBetween(1, 1000),
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
TimeValue.timeValueDays(randomIntBetween(1, 10)),
null
)
)
)
)
.build();
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE)
.putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())))
.build();
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is removed if the sampling metadata is removed from the project:
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is removed if the sampling configuration is changed
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
projectMetadata = ProjectMetadata.builder(projectMetadata.id())
.putCustom(
SamplingMetadata.TYPE,
new SamplingMetadata(
Map.of(
indexName,
new SamplingConfiguration(
1.0,
1001,
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
TimeValue.timeValueDays(randomIntBetween(1, 10)),
null
)
)
)
)
.build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));

// Test that the sample is _not_ removed if the sampling configuration does not change:
samplingService.maybeSample(projectMetadata, indexRequest);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
event = new ClusterChangedEvent("test", newState, oldState);
samplingService.clusterChanged(event);
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
}

private SamplingService getTestSamplingService() {
final ScriptService scriptService = new ScriptService(
Settings.EMPTY,
Expand Down