Skip to content

Commit fb889be

Browse files
committed
Delete sample config when a data stream is deleted
1 parent 774dabd commit fb889be

File tree

4 files changed

+114
-0
lines changed

4 files changed

+114
-0
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,13 @@
273273
index: sample_test
274274
catch: missing
275275

276+
---
277+
teardown:
278+
279+
- do:
280+
indices.delete_data_stream:
281+
name: sample_test*
282+
283+
- do:
284+
indices.get_all_sample_configuration: {}
285+
- length: { $body: 0 }

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,3 +433,14 @@
433433
- match: { samples_rejected_for_size: 0 }
434434
- match: { samples_accepted: 0 }
435435
- match: { last_exception.type: "script_exception" }
436+
437+
---
438+
teardown:
439+
440+
- do:
441+
indices.delete_data_stream:
442+
name: sample_test*
443+
444+
- do:
445+
indices.get_all_sample_configuration: {}
446+
- length: { $body: 0 }

server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
1818
import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction;
1919
import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration;
20+
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2021
import org.elasticsearch.action.bulk.BulkRequest;
2122
import org.elasticsearch.action.bulk.BulkResponse;
2223
import org.elasticsearch.action.bulk.TransportBulkAction;
24+
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
25+
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
2326
import org.elasticsearch.action.index.IndexRequest;
27+
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2428
import org.elasticsearch.cluster.metadata.ProjectId;
29+
import org.elasticsearch.cluster.metadata.Template;
30+
import org.elasticsearch.common.compress.CompressedXContent;
31+
import org.elasticsearch.common.settings.Settings;
2532
import org.elasticsearch.core.TimeValue;
33+
import org.elasticsearch.datastreams.DataStreamsPlugin;
34+
import org.elasticsearch.plugins.Plugin;
2635
import org.elasticsearch.test.ESIntegTestCase;
2736
import org.junit.After;
2837

38+
import java.util.Collection;
2939
import java.util.HashMap;
3040
import java.util.List;
3141
import java.util.Map;
@@ -36,6 +46,12 @@
3646
import static org.hamcrest.Matchers.equalTo;
3747

3848
public class SamplingServiceIT extends ESIntegTestCase {
49+
50+
@Override
51+
protected Collection<Class<? extends Plugin>> nodePlugins() {
52+
return List.of(DataStreamsPlugin.class);
53+
}
54+
3955
public void testTTL() throws Exception {
4056
assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG);
4157
assertAcked(
@@ -102,6 +118,66 @@ public void testDeleteIndex() throws Exception {
102118
});
103119
}
104120

121+
public void testDeleteDataStream() throws Exception {
122+
assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG);
123+
String indexName = randomIdentifier();
124+
var template = ComposableIndexTemplate.builder()
125+
.indexPatterns(List.of(indexName))
126+
127+
.template(new Template(Settings.EMPTY, CompressedXContent.fromJSON("""
128+
{
129+
"_doc":{
130+
"dynamic":true,
131+
"properties":{
132+
"foo":{
133+
"type":"text"
134+
},
135+
"bar":{
136+
"type":"text"
137+
}
138+
}
139+
}
140+
}
141+
"""), null))
142+
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
143+
.build();
144+
var request = new TransportPutComposableIndexTemplateAction.Request("logs-template");
145+
request.indexTemplate(template);
146+
safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request));
147+
client().execute(
148+
CreateDataStreamAction.INSTANCE,
149+
new CreateDataStreamAction.Request(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, indexName)
150+
).actionGet();
151+
ensureYellow(indexName);
152+
PutSampleConfigurationAction.Request putSampleConfigRequest = new PutSampleConfigurationAction.Request(
153+
new SamplingConfiguration(1.0d, 10, null, null, null),
154+
TimeValue.THIRTY_SECONDS,
155+
TimeValue.THIRTY_SECONDS
156+
).indices(indexName);
157+
client().execute(PutSampleConfigurationAction.INSTANCE, putSampleConfigRequest).actionGet();
158+
for (int i = 0; i < 5; i++) {
159+
BulkRequest bulkRequest = new BulkRequest();
160+
for (int j = 0; j < 20; j++) {
161+
IndexRequest indexRequest = new IndexRequest(indexName);
162+
indexRequest.create(true);
163+
indexRequest.source(Map.of("@timestamp", 12345, "foo", randomBoolean() ? 3L : randomLong(), "bar", randomBoolean()));
164+
bulkRequest.add(indexRequest);
165+
}
166+
BulkResponse bulkResponse = client().execute(TransportBulkAction.TYPE, bulkRequest).actionGet();
167+
assertThat(bulkResponse.hasFailures(), equalTo(false));
168+
}
169+
GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName))
170+
.actionGet();
171+
assertThat(getSampleResponse.getSample().size(), equalTo(10));
172+
client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TimeValue.THIRTY_SECONDS, indexName))
173+
.actionGet();
174+
assertBusy(() -> {
175+
for (SamplingService samplingService : internalCluster().getInstances(SamplingService.class)) {
176+
assertThat(samplingService.getLocalSample(ProjectId.DEFAULT, indexName), equalTo(List.of()));
177+
}
178+
});
179+
}
180+
105181
@After
106182
public void cleanup() {
107183
Map<String, Object> clearedSettings = new HashMap<>();

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.ClusterStateAckListener;
2626
import org.elasticsearch.cluster.ClusterStateListener;
2727
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
28+
import org.elasticsearch.cluster.metadata.DataStream;
2829
import org.elasticsearch.cluster.metadata.IndexMetadata;
2930
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3031
import org.elasticsearch.cluster.metadata.Metadata;
@@ -525,6 +526,22 @@ private void maybeDeleteSamplingConfigurations(ClusterChangedEvent event, Projec
525526
}
526527
}
527528
}
529+
if (currentProject.dataStreams() != previousProject.dataStreams()) {
530+
for (DataStream dataStream : previousProject.dataStreams().values()) {
531+
DataStream current = currentProject.dataStreams().get(dataStream.getName());
532+
if (current == null) {
533+
String dataStreamName = dataStream.getName();
534+
SamplingConfiguration samplingConfiguration = getSamplingConfiguration(
535+
event.state().projectState(projectId).metadata(),
536+
dataStreamName
537+
);
538+
if (samplingConfiguration != null) {
539+
logger.debug("Deleting sample configuration for {} because the data stream has been deleted", dataStreamName);
540+
deleteSampleConfiguration(projectId, dataStreamName);
541+
}
542+
}
543+
}
544+
}
528545
}
529546

530547
private void maybeScheduleJob() {

0 commit comments

Comments
 (0)