13
13
import org .elasticsearch .Version ;
14
14
import org .elasticsearch .action .ActionListener ;
15
15
import org .elasticsearch .action .ActionResponse ;
16
- import org .elasticsearch .action .support .RefCountingListener ;
17
16
import org .elasticsearch .cluster .ClusterState ;
18
17
import org .elasticsearch .cluster .metadata .ReservedStateErrorMetadata ;
19
18
import org .elasticsearch .cluster .metadata .ReservedStateMetadata ;
22
21
import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
23
22
import org .elasticsearch .common .Priority ;
24
23
import org .elasticsearch .core .Tuple ;
25
- import org .elasticsearch .reservedstate .NonStateTransformResult ;
26
24
import org .elasticsearch .reservedstate .ReservedClusterStateHandler ;
27
25
import org .elasticsearch .reservedstate .TransformState ;
28
26
import org .elasticsearch .xcontent .ConstructingObjectParser ;
29
27
import org .elasticsearch .xcontent .ParseField ;
30
28
import org .elasticsearch .xcontent .XContentParser ;
31
29
32
30
import java .util .ArrayList ;
33
- import java .util .Collection ;
34
- import java .util .Collections ;
35
31
import java .util .HashMap ;
36
32
import java .util .LinkedHashSet ;
37
33
import java .util .List ;
@@ -156,7 +152,6 @@ public void initEmpty(String namespace, ActionListener<ActionResponse.Empty> lis
156
152
new ReservedStateUpdateTask (
157
153
namespace ,
158
154
emptyState ,
159
- List .of (),
160
155
Map .of (),
161
156
List .of (),
162
157
// error state should not be possible since there is no metadata being parsed or processed
@@ -210,65 +205,44 @@ public void process(String namespace, ReservedStateChunk reservedStateChunk, Con
210
205
return ;
211
206
}
212
207
213
- // We trial run all handler validations to ensure that we can process all of the cluster state error free. During
214
- // the trial run we collect 'consumers' (functions) for any non cluster state transforms that need to run.
215
- var trialRunResult = trialRun (namespace , state , reservedStateChunk , orderedHandlers );
208
+ // We trial run all handler validations to ensure that we can process all of the cluster state error free.
209
+ var trialRunErrors = trialRun (namespace , state , reservedStateChunk , orderedHandlers );
216
210
// this is not using the modified trial state above, but that doesn't matter, we're just setting errors here
217
- var error = checkAndReportError (namespace , trialRunResult . errors , reservedStateVersion );
211
+ var error = checkAndReportError (namespace , trialRunErrors , reservedStateVersion );
218
212
219
213
if (error != null ) {
220
214
errorListener .accept (error );
221
215
return ;
222
216
}
223
-
224
- // Since we have validated that the cluster state update can be correctly performed in the trial run, we now
225
- // execute the non cluster state transforms. These are assumed to be async and we continue with the cluster state update
226
- // after all have completed. This part of reserved cluster state update is non-atomic, some or all of the non-state
227
- // transformations can succeed, and we can fail to eventually write the reserved cluster state.
228
- executeNonStateTransformationSteps (trialRunResult .nonStateTransforms , new ActionListener <>() {
229
- @ Override
230
- public void onResponse (Collection <NonStateTransformResult > nonStateTransformResults ) {
231
- // Once all of the non-state transformation results complete, we can proceed to
232
- // do the final save of the cluster state. The non-state transformation reserved keys are applied
233
- // to the reserved state after all other key handlers.
234
- updateTaskQueue .submitTask (
235
- "reserved cluster state [" + namespace + "]" ,
236
- new ReservedStateUpdateTask (
237
- namespace ,
238
- reservedStateChunk ,
239
- nonStateTransformResults ,
240
- handlers ,
241
- orderedHandlers ,
242
- ReservedClusterStateService .this ::updateErrorState ,
243
- new ActionListener <>() {
244
- @ Override
245
- public void onResponse (ActionResponse .Empty empty ) {
246
- logger .info ("Successfully applied new reserved cluster state for namespace [{}]" , namespace );
247
- errorListener .accept (null );
248
- }
249
-
250
- @ Override
251
- public void onFailure (Exception e ) {
252
- // Don't spam the logs on repeated errors
253
- if (isNewError (existingMetadata , reservedStateVersion .version ())) {
254
- logger .debug ("Failed to apply reserved cluster state" , e );
255
- errorListener .accept (e );
256
- } else {
257
- errorListener .accept (null );
258
- }
259
- }
217
+ updateTaskQueue .submitTask (
218
+ "reserved cluster state [" + namespace + "]" ,
219
+ new ReservedStateUpdateTask (
220
+ namespace ,
221
+ reservedStateChunk ,
222
+ handlers ,
223
+ orderedHandlers ,
224
+ ReservedClusterStateService .this ::updateErrorState ,
225
+ new ActionListener <>() {
226
+ @ Override
227
+ public void onResponse (ActionResponse .Empty empty ) {
228
+ logger .info ("Successfully applied new reserved cluster state for namespace [{}]" , namespace );
229
+ errorListener .accept (null );
230
+ }
231
+
232
+ @ Override
233
+ public void onFailure (Exception e ) {
234
+ // Don't spam the logs on repeated errors
235
+ if (isNewError (existingMetadata , reservedStateVersion .version ())) {
236
+ logger .debug ("Failed to apply reserved cluster state" , e );
237
+ errorListener .accept (e );
238
+ } else {
239
+ errorListener .accept (null );
260
240
}
261
- ),
262
- null
263
- );
264
- }
265
-
266
- @ Override
267
- public void onFailure (Exception e ) {
268
- // If we encounter an error while runnin the non-state transforms, we avoid saving any cluster state.
269
- errorListener .accept (checkAndReportError (namespace , List .of (stackTrace (e )), reservedStateVersion ));
270
- }
271
- });
241
+ }
242
+ }
243
+ ),
244
+ null
245
+ );
272
246
}
273
247
274
248
// package private for testing
@@ -324,14 +298,13 @@ public void onFailure(Exception e) {
324
298
/**
325
299
* Goes through all of the handlers, runs the validation and the transform part of the cluster state.
326
300
* <p>
327
- * While running the handlers we also collect any non cluster state transformation consumer actions that
328
- * need to be performed asynchronously before we attempt to save the cluster state. The trial run does not
329
- * result in an update of the cluster state, it's only purpose is to verify if we can correctly perform a
330
- * cluster state update with the given reserved state chunk.
301
+ * The trial run does not result in an update of the cluster state, it's only purpose is to verify
302
+ * if we can correctly perform a cluster state update with the given reserved state chunk.
331
303
*
332
304
* Package private for testing
305
+ * @return Any errors that occured
333
306
*/
334
- TrialRunResult trialRun (
307
+ List < String > trialRun (
335
308
String namespace ,
336
309
ClusterState currentState ,
337
310
ReservedStateChunk stateChunk ,
@@ -341,7 +314,6 @@ TrialRunResult trialRun(
341
314
Map <String , Object > reservedState = stateChunk .state ();
342
315
343
316
List <String > errors = new ArrayList <>();
344
- List <Consumer <ActionListener <NonStateTransformResult >>> nonStateTransforms = new ArrayList <>();
345
317
346
318
ClusterState state = currentState ;
347
319
@@ -351,39 +323,12 @@ TrialRunResult trialRun(
351
323
Set <String > existingKeys = keysForHandler (existingMetadata , handlerName );
352
324
TransformState transformState = handler .transform (reservedState .get (handlerName ), new TransformState (state , existingKeys ));
353
325
state = transformState .state ();
354
- if (transformState .nonStateTransform () != null ) {
355
- nonStateTransforms .add (transformState .nonStateTransform ());
356
- }
357
326
} catch (Exception e ) {
358
327
errors .add (format ("Error processing %s state change: %s" , handler .name (), stackTrace (e )));
359
328
}
360
329
}
361
330
362
- return new TrialRunResult (nonStateTransforms , errors );
363
- }
364
-
365
- /**
366
- * Runs the non cluster state transformations asynchronously, collecting the {@link NonStateTransformResult} objects.
367
- * <p>
368
- * Once all non cluster state transformations have completed, we submit the cluster state update task, which
369
- * updates all of the handler state, including the keys produced by the non cluster state transforms. The new reserved
370
- * state version isn't written to the cluster state until the cluster state task runs.
371
- *
372
- * Package private for testing
373
- */
374
- static void executeNonStateTransformationSteps (
375
- List <Consumer <ActionListener <NonStateTransformResult >>> nonStateTransforms ,
376
- ActionListener <Collection <NonStateTransformResult >> listener
377
- ) {
378
- final List <NonStateTransformResult > result = Collections .synchronizedList (new ArrayList <>(nonStateTransforms .size ()));
379
- try (var listeners = new RefCountingListener (listener .map (ignored -> result ))) {
380
- for (var transform : nonStateTransforms ) {
381
- // non cluster state transforms don't modify the cluster state, they however are given a chance to return a more
382
- // up-to-date version of the modified keys we should save in the reserved state. These calls are
383
- // async and report back when they are done through the postTasksListener.
384
- transform .accept (listeners .acquire (result ::add ));
385
- }
386
- }
331
+ return errors ;
387
332
}
388
333
389
334
/**
@@ -449,9 +394,4 @@ private void addStateHandler(String key, Set<String> keys, LinkedHashSet<String>
449
394
public void installStateHandler (ReservedClusterStateHandler <?> handler ) {
450
395
this .handlers .put (handler .name (), handler );
451
396
}
452
-
453
- /**
454
- * Helper record class to combine the result of a trial run, non cluster state actions and any errors
455
- */
456
- record TrialRunResult (List <Consumer <ActionListener <NonStateTransformResult >>> nonStateTransforms , List <String > errors ) {}
457
397
}
0 commit comments