Skip to content

Commit a1bed05

Browse files
committed
Deleting sample when sampling configuration is deleted or changed
1 parent 1dd1cf9 commit a1bed05

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@
4141
import java.io.IOException;
4242
import java.lang.ref.SoftReference;
4343
import java.util.Arrays;
44+
import java.util.HashSet;
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Objects;
48+
import java.util.Set;
4749
import java.util.concurrent.ConcurrentHashMap;
4850
import java.util.concurrent.atomic.AtomicInteger;
4951
import java.util.concurrent.atomic.LongAdder;
@@ -250,7 +252,62 @@ public boolean atLeastOneSampleConfigured() {
250252

251253
@Override
252254
public void clusterChanged(ClusterChangedEvent event) {
253-
// TODO: React to sampling config changes
255+
// We want to remove any samples if their sampling configuration has been deleted or modified.
256+
if (event.metadataChanged()) {
257+
/*
258+
* First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the
259+
* previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples.
260+
*/
261+
Set<ProjectId> allProjectIds = new HashSet<>(
262+
event.state().metadata().projects().values().stream().map(ProjectMetadata::id).toList()
263+
);
264+
allProjectIds.addAll(event.previousState().metadata().projects().values().stream().map(ProjectMetadata::id).toList());
265+
for (ProjectId projectId : allProjectIds) {
266+
if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
267+
SamplingMetadata oldSamplingConfig = event.previousState().metadata().hasProject(projectId)
268+
? event.previousState().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
269+
: null;
270+
SamplingMetadata newSamplingConfig = event.state().metadata().hasProject(projectId)
271+
? event.state().projectState(projectId).metadata().custom(SamplingMetadata.TYPE)
272+
: null;
273+
Map<String, SamplingConfiguration> newSampleConfigsMap = newSamplingConfig == null
274+
? Map.of()
275+
: newSamplingConfig.getIndexToSamplingConfigMap();
276+
Set<String> currentlyConfiguredIndexNames = newSampleConfigsMap.keySet();
277+
Set<String> previouslyConfiguredIndexNames = oldSamplingConfig == null
278+
? Set.of()
279+
: oldSamplingConfig.getIndexToSamplingConfigMap().keySet();
280+
Set<String> removedIndexNames = new HashSet<>(previouslyConfiguredIndexNames);
281+
removedIndexNames.removeAll(currentlyConfiguredIndexNames);
282+
/*
283+
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
284+
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
285+
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
286+
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
287+
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
288+
*/
289+
removedIndexNames.forEach(indexName -> {
290+
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
291+
samples.remove(new ProjectIndex(projectId, indexName));
292+
});
293+
Map<String, SamplingConfiguration> oldSampleConfigsMap = oldSamplingConfig == null
294+
? Map.of()
295+
: oldSamplingConfig.getIndexToSamplingConfigMap();
296+
/*
297+
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
298+
* above, we have a race condition here that we can live with.
299+
*/
300+
newSampleConfigsMap.entrySet().forEach(entry -> {
301+
String indexName = entry.getKey();
302+
if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
303+
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
304+
samples.remove(new ProjectIndex(projectId, indexName));
305+
}
306+
});
307+
}
308+
}
309+
// TODO: If an index has been deleted, we want to remove its sampling configuration
310+
}
254311
}
255312

256313
private boolean evaluateCondition(

server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
1313
import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata;
1414
import org.elasticsearch.action.index.IndexRequest;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.ClusterState;
1517
import org.elasticsearch.cluster.metadata.ProjectId;
1618
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1719
import org.elasticsearch.cluster.project.ProjectResolver;
@@ -224,6 +226,85 @@ public void testMaybeSampleMaxSamples() {
224226
assertThat(stats.getTimeEvaluatingCondition(), equalTo(TimeValue.ZERO));
225227
}
226228

229+
public void testClusterChanged() {
230+
String indexName = randomIdentifier();
231+
SamplingService samplingService = getTestSamplingService();
232+
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
233+
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);
234+
235+
// Test that the sample is removed if the new state does not have the project that the sample was configured in:
236+
ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))
237+
.putCustom(
238+
SamplingMetadata.TYPE,
239+
new SamplingMetadata(
240+
Map.of(
241+
indexName,
242+
new SamplingConfiguration(
243+
1.0,
244+
randomIntBetween(1, 1000),
245+
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
246+
TimeValue.timeValueDays(randomIntBetween(1, 10)),
247+
null
248+
)
249+
)
250+
)
251+
)
252+
.build();
253+
samplingService.maybeSample(projectMetadata, indexRequest);
254+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
255+
ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
256+
ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE)
257+
.putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())))
258+
.build();
259+
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState);
260+
samplingService.clusterChanged(event);
261+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
262+
263+
// Test that the sample is removed if the sampling metadata is removed from the project:
264+
samplingService.maybeSample(projectMetadata, indexRequest);
265+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
266+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
267+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build();
268+
event = new ClusterChangedEvent("test", newState, oldState);
269+
samplingService.clusterChanged(event);
270+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
271+
272+
// Test that the sample is removed if the sampling configuration is changed
273+
samplingService.maybeSample(projectMetadata, indexRequest);
274+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
275+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
276+
projectMetadata = ProjectMetadata.builder(projectMetadata.id())
277+
.putCustom(
278+
SamplingMetadata.TYPE,
279+
new SamplingMetadata(
280+
Map.of(
281+
indexName,
282+
new SamplingConfiguration(
283+
1.0,
284+
1001,
285+
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
286+
TimeValue.timeValueDays(randomIntBetween(1, 10)),
287+
null
288+
)
289+
)
290+
)
291+
)
292+
.build();
293+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
294+
event = new ClusterChangedEvent("test", newState, oldState);
295+
samplingService.clusterChanged(event);
296+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
297+
298+
// Test that the sample is _not_ removed if the sampling configuration does not change:
299+
samplingService.maybeSample(projectMetadata, indexRequest);
300+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
301+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
302+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
303+
event = new ClusterChangedEvent("test", newState, oldState);
304+
samplingService.clusterChanged(event);
305+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
306+
}
307+
227308
private SamplingService getTestSamplingService() {
228309
final ScriptService scriptService = new ScriptService(
229310
Settings.EMPTY,

0 commit comments

Comments
 (0)