Skip to content

Commit fd9faf7

Browse files
committed
Adding an extension point to call data stream lifecycle code in xpack
1 parent 33e00ca commit fd9faf7

File tree

8 files changed

+287
-21
lines changed

8 files changed

+287
-21
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction;
4848
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction;
4949
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction;
50+
import org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions;
5051
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
5152
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
5253
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
@@ -86,6 +87,7 @@
8687
import org.elasticsearch.health.HealthIndicatorService;
8788
import org.elasticsearch.index.IndexSettingProvider;
8889
import org.elasticsearch.plugins.ActionPlugin;
90+
import org.elasticsearch.plugins.ExtensiblePlugin;
8991
import org.elasticsearch.plugins.HealthPlugin;
9092
import org.elasticsearch.plugins.Plugin;
9193
import org.elasticsearch.rest.RestController;
@@ -102,7 +104,7 @@
102104

103105
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;
104106

105-
public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin {
107+
public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, ExtensiblePlugin {
106108

107109
public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
108110
"time_series.poll_interval",
@@ -153,6 +155,7 @@ public static TimeValue getLookAheadTime(Settings settings) {
153155
private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
154156
private final SetOnce<DataStreamLifecycleHealthIndicatorService> dataStreamLifecycleHealthIndicatorService = new SetOnce<>();
155157
private final Settings settings;
158+
private AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions;
156159

157160
public DataStreamsPlugin(Settings settings) {
158161
this.settings = settings;
@@ -220,7 +223,8 @@ public Collection<?> createComponents(PluginServices services) {
220223
errorStoreInitialisationService.get(),
221224
services.allocationService(),
222225
dataStreamLifecycleErrorsPublisher.get(),
223-
services.dataStreamGlobalRetentionSettings()
226+
services.dataStreamGlobalRetentionSettings(),
227+
additionalDataStreamLifecycleActions
224228
)
225229
);
226230
dataLifecycleInitialisationService.get().init();
@@ -314,4 +318,15 @@ public void close() throws IOException {
314318
public Collection<HealthIndicatorService> getHealthIndicatorServices() {
315319
return List.of(dataStreamLifecycleHealthIndicatorService.get());
316320
}
321+
322+
@Override
323+
public void loadExtensions(ExtensionLoader loader) {
324+
List<AdditionalDataStreamLifecycleActions> dataStreamLifecycleActions = loader.loadExtensions(
325+
AdditionalDataStreamLifecycleActions.class
326+
);
327+
assert dataStreamLifecycleActions.size() <= 1;
328+
if (dataStreamLifecycleActions.isEmpty() == false) {
329+
this.additionalDataStreamLifecycleActions = dataStreamLifecycleActions.getFirst();
330+
}
331+
}
317332
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.lifecycle;
11+
12+
import java.util.List;
13+
14+
public interface AdditionalDataStreamLifecycleActions {
15+
List<DataStreamLifecycleAction> getDataStreamLifecycleActions();
16+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.datastreams.lifecycle;
11+
12+
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.cluster.ProjectState;
14+
import org.elasticsearch.cluster.metadata.DataStream;
15+
import org.elasticsearch.index.Index;
16+
17+
import java.util.Set;
18+
19+
@FunctionalInterface
20+
public interface DataStreamLifecycleAction {
21+
/**
22+
* This takes some action on the data stream. The action is expected to be fast, or run asynchronously. It returns a set of indices
23+
* that ought to be ignored by subsequent actions in the current pass.
24+
*
25+
* @param projectState The current ProjectState
26+
* @param dataStream The data stream to be acted upon
27+
* @param indicesToExcludeForRemainingRun A set of indices that ought to be ignored by this action.
28+
*/
29+
Set<Index> apply(
30+
ProjectState projectState,
31+
DataStream dataStream,
32+
Set<Index> indicesToExcludeForRemainingRun,
33+
Client client,
34+
DataStreamLifecycleErrorStore errorStore
35+
);
36+
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab
180180
private final MasterServiceTaskQueue<DeleteSourceAndAddDownsampleToDS> swapSourceWithDownsampleIndexQueue;
181181
private volatile ByteSizeValue targetMergePolicyFloorSegment;
182182
private volatile int targetMergePolicyFactor;
183+
private final AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions;
183184
/**
184185
* The number of retries for a particular index and error after which DSL will emmit a signal (e.g. log statement)
185186
*/
@@ -218,7 +219,8 @@ public DataStreamLifecycleService(
218219
DataStreamLifecycleErrorStore errorStore,
219220
AllocationService allocationService,
220221
DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher,
221-
DataStreamGlobalRetentionSettings globalRetentionSettings
222+
DataStreamGlobalRetentionSettings globalRetentionSettings,
223+
@Nullable AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions
222224
) {
223225
this.settings = settings;
224226
this.client = client;
@@ -248,6 +250,7 @@ public DataStreamLifecycleService(
248250
new DeleteSourceAndAddDownsampleIndexExecutor(allocationService)
249251
);
250252
this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher;
253+
this.additionalDataStreamLifecycleActions = additionalDataStreamLifecycleActions;
251254
}
252255

253256
/**
@@ -378,7 +381,16 @@ private void run(ProjectState projectState) {
378381
// the following indices should not be considered for the remainder of this service run, for various reasons.
379382
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();
380383
for (DataStreamLifecycleAction action : actions) {
381-
indicesToExcludeForRemainingRun.addAll(action.apply(projectState, dataStream, indicesToExcludeForRemainingRun));
384+
indicesToExcludeForRemainingRun.addAll(
385+
action.apply(projectState, dataStream, indicesToExcludeForRemainingRun, client, errorStore)
386+
);
387+
}
388+
if (additionalDataStreamLifecycleActions != null) {
389+
for (DataStreamLifecycleAction action : additionalDataStreamLifecycleActions.getDataStreamLifecycleActions()) {
390+
indicesToExcludeForRemainingRun.addAll(
391+
action.apply(projectState, dataStream, indicesToExcludeForRemainingRun, client, errorStore)
392+
);
393+
}
382394
}
383395
affectedIndices += indicesToExcludeForRemainingRun.size();
384396
affectedDataStreams++;
@@ -396,7 +408,9 @@ private void run(ProjectState projectState) {
396408
private Set<Index> timeSeriesIndicesStillWithinTimeBounds(
397409
ProjectState projectState,
398410
DataStream dataStream,
399-
Set<Index> indicesToExcludeForRemainingRun
411+
Set<Index> indicesToExcludeForRemainingRun,
412+
Client client,
413+
DataStreamLifecycleErrorStore errorStore
400414
) {
401415
return timeSeriesIndicesStillWithinTimeBounds(projectState, dataStream, indicesToExcludeForRemainingRun, nowSupplier);
402416
}
@@ -449,7 +463,13 @@ static Set<Index> timeSeriesIndicesStillWithinTimeBounds(
449463
* replacing an index in the data stream, deleting a source index, or downsampling itself) so these indices can be skipped in case
450464
* there are other operations to be executed by the data stream lifecycle after downsampling.
451465
*/
452-
Set<Index> maybeExecuteDownsampling(ProjectState projectState, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
466+
Set<Index> maybeExecuteDownsampling(
467+
ProjectState projectState,
468+
DataStream dataStream,
469+
Set<Index> indicesToExcludeForRemainingRun,
470+
Client client,
471+
DataStreamLifecycleErrorStore errorStore
472+
) {
453473
Set<Index> affectedIndices = new HashSet<>();
454474
try {
455475
List<Index> targetIndices = getTargetIndices(
@@ -836,7 +856,13 @@ private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStr
836856
}
837857
}
838858

839-
private Set<Index> maybeExecuteRollover(ProjectState projectState, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
859+
private Set<Index> maybeExecuteRollover(
860+
ProjectState projectState,
861+
DataStream dataStream,
862+
Set<Index> indicesToExcludeForRemainingRun,
863+
Client client,
864+
DataStreamLifecycleErrorStore errorStore
865+
) {
840866
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
841867
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);
842868
// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
@@ -919,7 +945,13 @@ private Index maybeExecuteRollover(
919945
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
920946
* @return The set of indices that delete requests have been sent for
921947
*/
922-
Set<Index> maybeExecuteRetention(ProjectState projectState, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
948+
Set<Index> maybeExecuteRetention(
949+
ProjectState projectState,
950+
DataStream dataStream,
951+
Set<Index> indicesToExcludeForRemainingRun,
952+
Client client,
953+
DataStreamLifecycleErrorStore errorStore
954+
) {
923955
Set<Index> indicesToBeRemoved = new HashSet<>();
924956
try {
925957
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
@@ -1012,7 +1044,9 @@ Set<Index> maybeExecuteRetention(ProjectState projectState, DataStream dataStrea
10121044
private Set<Index> maybeExecuteForceMerge(
10131045
ProjectState projectState,
10141046
DataStream dataStream,
1015-
Set<Index> indicesToExcludeForRemainingRun
1047+
Set<Index> indicesToExcludeForRemainingRun,
1048+
Client client,
1049+
DataStreamLifecycleErrorStore errorStore
10161050
) {
10171051
Set<Index> affectedIndices = new HashSet<>();
10181052
try {
@@ -1725,16 +1759,4 @@ public int hashCode() {
17251759
}
17261760
}
17271761

1728-
@FunctionalInterface
1729-
public interface DataStreamLifecycleAction {
1730-
/**
1731-
*
1732-
* This takes some action on the data stream. The action is expected to be fast, or run asynchronously. It returns a set of indices
1733-
* that ought to be ignored by subsequent actions in the current pass.
1734-
* @param projectState The current ProjectState
1735-
* @param dataStream The data stream to be acted upon
1736-
* @param indicesToExcludeForRemainingRun A set of indices that ought to be ignored by this action.
1737-
*/
1738-
Set<Index> apply(ProjectState projectState, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun);
1739-
}
17401762
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
apply plugin: 'elasticsearch.internal-es-plugin'
2+
apply plugin: 'elasticsearch.internal-cluster-test'
3+
apply plugin: 'elasticsearch.internal-java-rest-test'
4+
5+
esplugin {
6+
name = 'x-pack-data-streams'
7+
description = 'Elasticsearch Expanded Pack Plugin - Data Stream Lifecycle'
8+
classname = 'org.elasticsearch.xpack.datastreams.XPackDataStreamsPlugin'
9+
extendedPlugins = ['data-streams', 'x-pack-core']
10+
hasNativeController =false
11+
requiresKeystore =true
12+
}
13+
base {
14+
archivesName = 'x-pack-data-stream-lifecycle'
15+
}
16+
17+
dependencies {
18+
compileOnly project(path: xpackModule('core'))
19+
compileOnly project(':modules:data-streams')
20+
testImplementation(testArtifact(project(xpackModule('core'))))
21+
}
22+
23+
addQaCheckDependencies(project)

0 commit comments

Comments
 (0)