Skip to content

Commit c8fd983

Browse files
authored
Deleting random sample configurations if their indices are deleted (elastic#136653)
This updates the cluster state listener in SamplingService to delete the sampling configuration if its index has been deleted.
1 parent a87502c commit c8fd983

File tree

2 files changed

+109
-39
lines changed

2 files changed

+109
-39
lines changed

server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,22 @@
1212
import org.elasticsearch.ResourceNotFoundException;
1313
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1414
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
15+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
16+
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
1517
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
1618
import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction;
1719
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
1820
import org.elasticsearch.action.bulk.BulkRequest;
1921
import org.elasticsearch.action.bulk.BulkResponse;
2022
import org.elasticsearch.action.bulk.TransportBulkAction;
2123
import org.elasticsearch.action.index.IndexRequest;
24+
import org.elasticsearch.cluster.metadata.ProjectId;
2225
import org.elasticsearch.core.TimeValue;
2326
import org.elasticsearch.test.ESIntegTestCase;
2427
import org.junit.After;
2528

2629
import java.util.HashMap;
30+
import java.util.List;
2731
import java.util.Map;
2832

2933
import static org.elasticsearch.ingest.SamplingService.RANDOM_SAMPLING_FEATURE_FLAG;
@@ -66,6 +70,38 @@ public void testTTL() throws Exception {
6670
});
6771
}
6872

73+
public void testDeleteIndex() throws Exception {
74+
assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG);
75+
String indexName = randomIdentifier();
76+
client().execute(TransportCreateIndexAction.TYPE, new CreateIndexRequest(indexName)).actionGet();
77+
ensureYellow(indexName);
78+
PutSampleConfigurationAction.Request putSampleConfigRequest = new PutSampleConfigurationAction.Request(
79+
new SamplingConfiguration(1.0d, 10, null, null, null),
80+
TimeValue.THIRTY_SECONDS,
81+
TimeValue.THIRTY_SECONDS
82+
).indices(indexName);
83+
client().execute(PutSampleConfigurationAction.INSTANCE, putSampleConfigRequest).actionGet();
84+
for (int i = 0; i < 5; i++) {
85+
BulkRequest bulkRequest = new BulkRequest();
86+
for (int j = 0; j < 20; j++) {
87+
IndexRequest indexRequest = new IndexRequest(indexName);
88+
indexRequest.source(Map.of("foo", randomBoolean() ? 3L : randomLong(), "bar", randomBoolean()));
89+
bulkRequest.add(indexRequest);
90+
}
91+
BulkResponse bulkResponse = client().execute(TransportBulkAction.TYPE, bulkRequest).actionGet();
92+
assertThat(bulkResponse.hasFailures(), equalTo(false));
93+
}
94+
GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName))
95+
.actionGet();
96+
assertThat(getSampleResponse.getSample().size(), equalTo(10));
97+
client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(indexName)).actionGet();
98+
assertBusy(() -> {
99+
for (SamplingService samplingService : internalCluster().getInstances(SamplingService.class)) {
100+
assertThat(samplingService.getLocalSample(ProjectId.DEFAULT, indexName), equalTo(List.of()));
101+
}
102+
});
103+
}
104+
69105
@After
70106
public void cleanup() {
71107
Map<String, Object> clearedSettings = new HashMap<>();

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.ClusterStateAckListener;
2323
import org.elasticsearch.cluster.ClusterStateListener;
2424
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
25+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2526
import org.elasticsearch.cluster.metadata.Metadata;
2627
import org.elasticsearch.cluster.metadata.ProjectId;
2728
import org.elasticsearch.cluster.metadata.ProjectMetadata;
@@ -424,7 +425,11 @@ public void clusterChanged(ClusterChangedEvent event) {
424425
cancelJob();
425426
}
426427
}
427-
if (samples.isEmpty()) {
428+
if (isMaster == false && samples.isEmpty()) {
429+
/*
430+
* The remaining code potentially removes entries from samples, and delete configurations if this is the master. So if this is
431+
* not the master and has no sampling configurations, we can just bail out here.
432+
*/
428433
return;
429434
}
430435
// We want to remove any samples if their sampling configuration has been deleted or modified.
@@ -438,46 +443,75 @@ public void clusterChanged(ClusterChangedEvent event) {
438443
event.previousState().metadata().projects().keySet()
439444
);
440445
for (ProjectId projectId : allProjectIds) {
441-
if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
442-
Map<String, SamplingConfiguration> oldSampleConfigsMap = Optional.ofNullable(
443-
event.previousState().metadata().projects().get(projectId)
444-
)
445-
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
446-
.map(SamplingMetadata::getIndexToSamplingConfigMap)
447-
.orElse(Map.of());
448-
Map<String, SamplingConfiguration> newSampleConfigsMap = Optional.ofNullable(
449-
event.state().metadata().projects().get(projectId)
450-
)
451-
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
452-
.map(SamplingMetadata::getIndexToSamplingConfigMap)
453-
.orElse(Map.of());
454-
Set<String> indicesWithRemovedConfigs = new HashSet<>(oldSampleConfigsMap.keySet());
455-
indicesWithRemovedConfigs.removeAll(newSampleConfigsMap.keySet());
456-
/*
457-
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
458-
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
459-
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
460-
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
461-
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
462-
*/
463-
for (String indexName : indicesWithRemovedConfigs) {
464-
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
465-
samples.remove(new ProjectIndex(projectId, indexName));
466-
}
467-
/*
468-
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
469-
* above, we have a race condition here that we can live with.
470-
*/
471-
for (Map.Entry<String, SamplingConfiguration> entry : newSampleConfigsMap.entrySet()) {
472-
String indexName = entry.getKey();
473-
if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
474-
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
475-
samples.remove(new ProjectIndex(projectId, indexName));
476-
}
477-
}
446+
maybeRemoveStaleSamples(event, projectId);
447+
// Now delete configurations for any indices that have been deleted:
448+
if (isMaster) {
449+
maybeDeleteSamplingConfigurations(event, projectId);
450+
}
451+
}
452+
}
453+
}
454+
455+
/*
456+
* This method removes any samples from the samples Map that have had their sampling configuration removed or changed in this event.
457+
*/
458+
private void maybeRemoveStaleSamples(ClusterChangedEvent event, ProjectId projectId) {
459+
if (samples.isEmpty() == false && event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) {
460+
Map<String, SamplingConfiguration> oldSampleConfigsMap = Optional.ofNullable(
461+
event.previousState().metadata().projects().get(projectId)
462+
)
463+
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
464+
.map(SamplingMetadata::getIndexToSamplingConfigMap)
465+
.orElse(Map.of());
466+
Map<String, SamplingConfiguration> newSampleConfigsMap = Optional.ofNullable(event.state().metadata().projects().get(projectId))
467+
.map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE))
468+
.map(SamplingMetadata::getIndexToSamplingConfigMap)
469+
.orElse(Map.of());
470+
Set<String> indicesWithRemovedConfigs = new HashSet<>(oldSampleConfigsMap.keySet());
471+
indicesWithRemovedConfigs.removeAll(newSampleConfigsMap.keySet());
472+
/*
473+
* These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK
474+
* with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still
475+
* exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case
476+
* we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker
477+
* reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion.
478+
*/
479+
for (String indexName : indicesWithRemovedConfigs) {
480+
logger.debug("Removing sample info for {} because its configuration has been removed", indexName);
481+
samples.remove(new ProjectIndex(projectId, indexName));
482+
}
483+
/*
484+
* Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as
485+
* above, we have a race condition here that we can live with.
486+
*/
487+
for (Map.Entry<String, SamplingConfiguration> entry : newSampleConfigsMap.entrySet()) {
488+
String indexName = entry.getKey();
489+
if (oldSampleConfigsMap.containsKey(indexName) && entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
490+
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
491+
samples.remove(new ProjectIndex(projectId, indexName));
492+
}
493+
}
494+
}
495+
}
496+
497+
/*
498+
* This method deletes the sampling configuration for any index that has been deleted in this event.
499+
*/
500+
private void maybeDeleteSamplingConfigurations(ClusterChangedEvent event, ProjectId projectId) {
501+
ProjectMetadata currentProject = event.state().metadata().projects().get(projectId);
502+
ProjectMetadata previousProject = event.previousState().metadata().projects().get(projectId);
503+
if (currentProject == null || previousProject == null) {
504+
return;
505+
}
506+
if (currentProject.indices() != previousProject.indices()) {
507+
for (IndexMetadata index : previousProject.indices().values()) {
508+
IndexMetadata current = currentProject.index(index.getIndex());
509+
if (current == null) {
510+
String indexName = index.getIndex().getName();
511+
logger.debug("Deleting sample configuration for {} because the index has been deleted", indexName);
512+
deleteSampleConfiguration(projectId, indexName);
478513
}
479514
}
480-
// TODO: If an index has been deleted, we want to remove its sampling configuration
481515
}
482516
}
483517

0 commit comments

Comments
 (0)