Skip to content

Commit 5de713f

Browse files
authored
Deleting sample when sampling configuration is deleted or changed (#136123)
This adds logic to SamplingService's clusterChanged method so a samples are removed from memory whenever the sampling configuration for that sample is deleted or modified.
1 parent 47d5eb8 commit 5de713f

File tree

2 files changed

+143
-1
lines changed

2 files changed

+143
-1
lines changed

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.io.stream.Writeable;
3434
import org.elasticsearch.common.settings.Setting;
3535
import org.elasticsearch.common.util.FeatureFlag;
36+
import org.elasticsearch.common.util.set.Sets;
3637
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
3738
import org.elasticsearch.common.xcontent.XContentHelper;
3839
import org.elasticsearch.core.TimeValue;
@@ -53,9 +54,12 @@
5354
import java.lang.ref.SoftReference;
5455
import java.util.Arrays;
5556
import java.util.HashMap;
57+
import java.util.HashSet;
5658
import java.util.List;
5759
import java.util.Map;
5860
import java.util.Objects;
61+
import java.util.Optional;
62+
import java.util.Set;
5963
import java.util.concurrent.ConcurrentHashMap;
6064
import java.util.concurrent.atomic.AtomicInteger;
6165
import java.util.concurrent.atomic.LongAdder;
@@ -322,7 +326,64 @@ public void updateSampleConfiguration(
322326

323327
@Override
324328
public void clusterChanged(ClusterChangedEvent event) {
325-
// TODO: React to sampling config changes
329+
if (RANDOM_SAMPLING_FEATURE_FLAG == false) {
330+
return;
331+
}
332+
if (samples.isEmpty()) {
333+
return;
334+
}
335+
// We want to remove any samples if their sampling configuration has been deleted or modified.
336+
if (event.metadataChanged()) {
337+
/*
338+
* First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the
339+
* previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples.
340+
*/
341+
Set<ProjectId> allProjectIds = Sets.union(
342+
event.state().metadata().projects().keySet(),
343+
event.previousState().metadata().projects().keySet()
344+
);
345+
for (ProjectId projectId : allProjectIds) {
346+
if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
347+
Map<String, SamplingConfiguration> oldSampleConfigsMap = Optional.ofNullable(
348+
event.previousState().metadata().projects().get(projectId)
349+
)
350+
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
351+
.map(SamplingMetadata::getIndexToSamplingConfigMap)
352+
.orElse(Map.of());
353+
Map<String, SamplingConfiguration> newSampleConfigsMap = Optional.ofNullable(
354+
event.state().metadata().projects().get(projectId)
355+
)
356+
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
357+
.map(SamplingMetadata::getIndexToSamplingConfigMap)
358+
.orElse(Map.of());
359+
Set<String> indicesWithRemovedConfigs = new HashSet<>(oldSampleConfigsMap.keySet());
360+
indicesWithRemovedConfigs.removeAll(newSampleConfigsMap.keySet());
361+
/*
362+
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
363+
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
364+
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
365+
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
366+
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
367+
*/
368+
for (String indexName : indicesWithRemovedConfigs) {
369+
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
370+
samples.remove(new ProjectIndex(projectId, indexName));
371+
}
372+
/*
373+
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
374+
* above, we have a race condition here that we can live with.
375+
*/
376+
for (Map.Entry<String, SamplingConfiguration> entry : newSampleConfigsMap.entrySet()) {
377+
String indexName = entry.getKey();
378+
if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
379+
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
380+
samples.remove(new ProjectIndex(projectId, indexName));
381+
}
382+
}
383+
}
384+
}
385+
// TODO: If an index has been deleted, we want to remove its sampling configuration
386+
}
326387
}
327388

328389
private boolean evaluateCondition(

server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
1313
import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata;
1414
import org.elasticsearch.action.index.IndexRequest;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.ClusterState;
1517
import org.elasticsearch.cluster.metadata.ProjectId;
1618
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1719
import org.elasticsearch.cluster.project.ProjectResolver;
@@ -256,6 +258,85 @@ public void testMaybeSampleMaxSize() {
256258
assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2));
257259
}
258260

261+
public void testClusterChanged() {
262+
String indexName = randomIdentifier();
263+
SamplingService samplingService = getTestSamplingService();
264+
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
265+
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);
266+
267+
// Test that the sample is removed if the new state does not have the project that the sample was configured in:
268+
ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))
269+
.putCustom(
270+
SamplingMetadata.TYPE,
271+
new SamplingMetadata(
272+
Map.of(
273+
indexName,
274+
new SamplingConfiguration(
275+
1.0,
276+
randomIntBetween(1, 1000),
277+
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
278+
TimeValue.timeValueDays(randomIntBetween(1, 10)),
279+
null
280+
)
281+
)
282+
)
283+
)
284+
.build();
285+
samplingService.maybeSample(projectMetadata, indexRequest);
286+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
287+
ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
288+
ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE)
289+
.putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())))
290+
.build();
291+
ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState);
292+
samplingService.clusterChanged(event);
293+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
294+
295+
// Test that the sample is removed if the sampling metadata is removed from the project:
296+
samplingService.maybeSample(projectMetadata, indexRequest);
297+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
298+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
299+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build();
300+
event = new ClusterChangedEvent("test", newState, oldState);
301+
samplingService.clusterChanged(event);
302+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
303+
304+
// Test that the sample is removed if the sampling configuration is changed
305+
samplingService.maybeSample(projectMetadata, indexRequest);
306+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
307+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
308+
projectMetadata = ProjectMetadata.builder(projectMetadata.id())
309+
.putCustom(
310+
SamplingMetadata.TYPE,
311+
new SamplingMetadata(
312+
Map.of(
313+
indexName,
314+
new SamplingConfiguration(
315+
1.0,
316+
1001,
317+
ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)),
318+
TimeValue.timeValueDays(randomIntBetween(1, 10)),
319+
null
320+
)
321+
)
322+
)
323+
)
324+
.build();
325+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
326+
event = new ClusterChangedEvent("test", newState, oldState);
327+
samplingService.clusterChanged(event);
328+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L));
329+
330+
// Test that the sample is _not_ removed if the sampling configuration does not change:
331+
samplingService.maybeSample(projectMetadata, indexRequest);
332+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
333+
oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
334+
newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
335+
event = new ClusterChangedEvent("test", newState, oldState);
336+
samplingService.clusterChanged(event);
337+
assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L));
338+
}
339+
259340
private SamplingService getTestSamplingService() {
260341
final ScriptService scriptService = new ScriptService(
261342
Settings.EMPTY,

0 commit comments

Comments
 (0)