21
21
import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
22
22
import org .elasticsearch .cluster .ClusterState ;
23
23
import org .elasticsearch .cluster .ClusterStateUpdateTask ;
24
- import org .elasticsearch .cluster .ack .ClusterStateUpdateRequest ;
25
24
import org .elasticsearch .cluster .routing .allocation .allocator .AllocationActionListener ;
26
25
import org .elasticsearch .cluster .service .ClusterService ;
27
26
import org .elasticsearch .common .Priority ;
@@ -143,13 +142,19 @@ public ClusterState createDataStream(
143
142
);
144
143
}
145
144
146
- public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest <
147
- CreateDataStreamClusterStateUpdateRequest > {
148
-
149
- private final boolean performReroute ;
150
- private final String name ;
151
- private final long startTime ;
152
- private final SystemDataStreamDescriptor descriptor ;
145
+ public record CreateDataStreamClusterStateUpdateRequest (
146
+ String name ,
147
+ long startTime ,
148
+ @ Nullable SystemDataStreamDescriptor systemDataStreamDescriptor ,
149
+ TimeValue masterNodeTimeout ,
150
+ TimeValue ackTimeout ,
151
+ boolean performReroute
152
+ ) {
153
+ public CreateDataStreamClusterStateUpdateRequest {
154
+ Objects .requireNonNull (name );
155
+ Objects .requireNonNull (masterNodeTimeout );
156
+ Objects .requireNonNull (ackTimeout );
157
+ }
153
158
154
159
public CreateDataStreamClusterStateUpdateRequest (String name ) {
155
160
this (name , System .currentTimeMillis (), null , TimeValue .ZERO , TimeValue .ZERO , true );
@@ -159,42 +164,14 @@ public CreateDataStreamClusterStateUpdateRequest(
159
164
String name ,
160
165
SystemDataStreamDescriptor systemDataStreamDescriptor ,
161
166
TimeValue masterNodeTimeout ,
162
- TimeValue timeout ,
167
+ TimeValue ackTimeout ,
163
168
boolean performReroute
164
169
) {
165
- this (name , System .currentTimeMillis (), systemDataStreamDescriptor , masterNodeTimeout , timeout , performReroute );
166
- }
167
-
168
- public CreateDataStreamClusterStateUpdateRequest (
169
- String name ,
170
- long startTime ,
171
- SystemDataStreamDescriptor systemDataStreamDescriptor ,
172
- TimeValue masterNodeTimeout ,
173
- TimeValue timeout ,
174
- boolean performReroute
175
- ) {
176
- this .name = name ;
177
- this .startTime = startTime ;
178
- this .descriptor = systemDataStreamDescriptor ;
179
- this .performReroute = performReroute ;
180
- masterNodeTimeout (masterNodeTimeout );
181
- ackTimeout (timeout );
170
+ this (name , System .currentTimeMillis (), systemDataStreamDescriptor , masterNodeTimeout , ackTimeout , performReroute );
182
171
}
183
172
184
173
public boolean isSystem () {
185
- return descriptor != null ;
186
- }
187
-
188
- public boolean performReroute () {
189
- return performReroute ;
190
- }
191
-
192
- public SystemDataStreamDescriptor getSystemDataStreamDescriptor () {
193
- return descriptor ;
194
- }
195
-
196
- long getStartTime () {
197
- return startTime ;
174
+ return systemDataStreamDescriptor != null ;
198
175
}
199
176
}
200
177
@@ -243,7 +220,7 @@ static ClusterState createDataStream(
243
220
boolean initializeFailureStore
244
221
) throws Exception {
245
222
String dataStreamName = request .name ;
246
- SystemDataStreamDescriptor systemDataStreamDescriptor = request .getSystemDataStreamDescriptor ();
223
+ SystemDataStreamDescriptor systemDataStreamDescriptor = request .systemDataStreamDescriptor ();
247
224
boolean isSystemDataStreamName = metadataCreateIndexService .getSystemIndices ().isSystemDataStream (request .name );
248
225
assert (isSystemDataStreamName && systemDataStreamDescriptor != null )
249
226
|| (isSystemDataStreamName == false && systemDataStreamDescriptor == null )
@@ -292,13 +269,13 @@ static ClusterState createDataStream(
292
269
if (isSystem ) {
293
270
throw new IllegalArgumentException ("Failure stores are not supported on system data streams" );
294
271
}
295
- String failureStoreIndexName = DataStream .getDefaultFailureStoreName (dataStreamName , initialGeneration , request .getStartTime ());
272
+ String failureStoreIndexName = DataStream .getDefaultFailureStoreName (dataStreamName , initialGeneration , request .startTime ());
296
273
currentState = createFailureStoreIndex (
297
274
metadataCreateIndexService ,
298
275
"initialize_data_stream" ,
299
276
settings ,
300
277
currentState ,
301
- request .getStartTime (),
278
+ request .startTime (),
302
279
dataStreamName ,
303
280
template ,
304
281
failureStoreIndexName ,
@@ -308,7 +285,7 @@ static ClusterState createDataStream(
308
285
}
309
286
310
287
if (writeIndex == null ) {
311
- String firstBackingIndexName = DataStream .getDefaultBackingIndexName (dataStreamName , initialGeneration , request .getStartTime ());
288
+ String firstBackingIndexName = DataStream .getDefaultBackingIndexName (dataStreamName , initialGeneration , request .startTime ());
312
289
currentState = createBackingIndex (
313
290
metadataCreateIndexService ,
314
291
currentState ,
@@ -397,7 +374,7 @@ private static ClusterState createBackingIndex(
397
374
firstBackingIndexName
398
375
).dataStreamName (dataStreamName )
399
376
.systemDataStreamDescriptor (systemDataStreamDescriptor )
400
- .nameResolvedInstant (request .getStartTime ())
377
+ .nameResolvedInstant (request .startTime ())
401
378
.performReroute (request .performReroute ())
402
379
.setMatchingTemplate (template );
403
380
0 commit comments