Skip to content

Commit fd6ecbf

Browse files
committed
Add data stream template validation
to data stream promotion endpoint
1 parent e012784 commit fd6ecbf

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.elasticsearch.datastreams.action;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ResourceNotFoundException;
1214
import org.elasticsearch.action.ActionListener;
1315
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
@@ -23,6 +25,8 @@
2325
import org.elasticsearch.cluster.metadata.Metadata;
2426
import org.elasticsearch.cluster.service.ClusterService;
2527
import org.elasticsearch.common.Priority;
28+
import org.elasticsearch.common.logging.HeaderWarning;
29+
import org.elasticsearch.common.regex.Regex;
2630
import org.elasticsearch.common.util.concurrent.EsExecutors;
2731
import org.elasticsearch.core.SuppressForbidden;
2832
import org.elasticsearch.indices.SystemIndices;
@@ -31,9 +35,14 @@
3135
import org.elasticsearch.threadpool.ThreadPool;
3236
import org.elasticsearch.transport.TransportService;
3337

38+
import static org.elasticsearch.core.Strings.format;
39+
3440
public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction<PromoteDataStreamAction.Request> {
3541

42+
private static final Logger logger = LogManager.getLogger(PromoteDataStreamTransportAction.class);
43+
3644
private final SystemIndices systemIndices;
45+
private final ClusterService clusterService;
3746

3847
@Inject
3948
public PromoteDataStreamTransportAction(
@@ -55,6 +64,7 @@ public PromoteDataStreamTransportAction(
5564
EsExecutors.DIRECT_EXECUTOR_SERVICE
5665
);
5766
this.systemIndices = systemIndices;
67+
this.clusterService = clusterService;
5868
}
5969

6070
@Override
@@ -76,7 +86,7 @@ public void onFailure(Exception e) {
7686

7787
@Override
7888
public ClusterState execute(ClusterState currentState) {
79-
return promoteDataStream(currentState, request);
89+
return promoteDataStream(currentState, request, clusterService);
8090
}
8191

8292
@Override
@@ -92,18 +102,48 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
92102
clusterService.submitUnbatchedStateUpdateTask(source, task);
93103
}
94104

95-
static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) {
105+
static ClusterState promoteDataStream(
106+
ClusterState currentState,
107+
PromoteDataStreamAction.Request request,
108+
ClusterService clusterService
109+
) {
96110
DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName());
111+
97112
if (dataStream == null) {
98113
throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist");
99114
}
100115

116+
warnIfTemplateMissingForDatastream(dataStream, clusterService);
117+
101118
DataStream promotedDataStream = dataStream.promoteDataStream();
102119
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
103120
metadata.put(promotedDataStream);
104121
return ClusterState.builder(currentState).metadata(metadata).build();
105122
}
106123

124+
private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterService clusterService) {
125+
var datastreamName = dataStream.getName();
126+
127+
var matchingIndex = clusterService.state()
128+
.metadata()
129+
.templatesV2()
130+
.values()
131+
.stream()
132+
.filter(cit -> cit.getDataStreamTemplate() != null)
133+
.flatMap(cit -> cit.indexPatterns().stream())
134+
.anyMatch(pattern -> Regex.simpleMatch(pattern, datastreamName));
135+
136+
if (matchingIndex == false) {
137+
String warningMessage = format(
138+
"Data stream [%s] does not have a matching index template. This will cause rollover to fail until a matching index "
139+
+ "template is created",
140+
datastreamName
141+
);
142+
logger.warn(() -> warningMessage);
143+
HeaderWarning.addWarning(warningMessage);
144+
}
145+
}
146+
107147
@Override
108148
protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) {
109149
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

0 commit comments

Comments
 (0)