46
46
import java .util .concurrent .atomic .AtomicReference ;
47
47
import java .util .function .Consumer ;
48
48
49
- import static org .elasticsearch .reservedstate .service .ReservedStateUpdateTask .checkMetadataVersion ;
50
49
import static org .hamcrest .Matchers .anyOf ;
51
50
import static org .hamcrest .Matchers .contains ;
52
51
import static org .hamcrest .Matchers .containsString ;
53
52
import static org .hamcrest .Matchers .empty ;
54
53
import static org .hamcrest .Matchers .equalTo ;
55
54
import static org .hamcrest .Matchers .instanceOf ;
56
55
import static org .hamcrest .Matchers .is ;
56
+ import static org .hamcrest .Matchers .not ;
57
57
import static org .hamcrest .Matchers .notNullValue ;
58
+ import static org .hamcrest .Matchers .sameInstance ;
58
59
import static org .hamcrest .Matchers .startsWith ;
59
60
import static org .mockito .ArgumentMatchers .any ;
60
61
import static org .mockito .ArgumentMatchers .anyString ;
@@ -74,6 +75,65 @@ private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mo
74
75
return (MasterServiceTaskQueue <T >) mock (MasterServiceTaskQueue .class );
75
76
}
76
77
78
+ private static class TestTaskContext <T extends ClusterStateTaskListener > implements ClusterStateTaskExecutor .TaskContext <T > {
79
+ private final T task ;
80
+
81
+ private TestTaskContext (T task ) {
82
+ this .task = task ;
83
+ }
84
+
85
+ @ Override
86
+ public T getTask () {
87
+ return task ;
88
+ }
89
+
90
+ @ Override
91
+ public void success (Runnable onPublicationSuccess ) {
92
+ onPublicationSuccess .run ();
93
+ }
94
+
95
+ @ Override
96
+ public void success (Consumer <ClusterState > publishedStateConsumer ) {}
97
+
98
+ @ Override
99
+ public void success (Runnable onPublicationSuccess , ClusterStateAckListener clusterStateAckListener ) {}
100
+
101
+ @ Override
102
+ public void success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener clusterStateAckListener ) {}
103
+
104
+ @ Override
105
+ public void onFailure (Exception failure ) {}
106
+
107
+ @ Override
108
+ public Releasable captureResponseHeaders () {
109
+ return null ;
110
+ }
111
+ }
112
+
113
+ private static class TestStateHandler implements ReservedClusterStateHandler <Map <String , Object >> {
114
+ private final String name ;
115
+
116
+ private TestStateHandler (String name ) {
117
+ this .name = name ;
118
+ }
119
+
120
+ @ Override
121
+ public String name () {
122
+ return name ;
123
+ }
124
+
125
+ @ Override
126
+ public TransformState transform (Object source , TransformState prevState ) throws Exception {
127
+ ClusterState newState = new ClusterState .Builder (prevState .state ()).build ();
128
+ return new TransformState (newState , prevState .keys ());
129
+ }
130
+
131
+ @ Override
132
+ public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
133
+ return parser .map ();
134
+ }
135
+ }
136
+
77
137
public void testOperatorController () throws IOException {
78
138
ClusterSettings clusterSettings = new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
79
139
ClusterService clusterService = mock (ClusterService .class );
@@ -147,8 +207,7 @@ public void testInitEmptyTask() {
147
207
148
208
// grab the update task when it gets given to us
149
209
when (clusterService .createTaskQueue (ArgumentMatchers .contains ("reserved state update" ), any (), any ())).thenAnswer (i -> {
150
- @ SuppressWarnings ("unchecked" )
151
- MasterServiceTaskQueue <ReservedStateUpdateTask > queue = mock (MasterServiceTaskQueue .class );
210
+ MasterServiceTaskQueue <ReservedStateUpdateTask > queue = mockTaskQueue ();
152
211
doNothing ().when (queue ).submitTask (any (), updateTask .capture (), any ());
153
212
return queue ;
154
213
});
@@ -181,34 +240,12 @@ public void testUpdateStateTasks() throws Exception {
181
240
182
241
doReturn (state ).when (task ).execute (any ());
183
242
184
- ClusterStateTaskExecutor .TaskContext <ReservedStateUpdateTask > taskContext = new ClusterStateTaskExecutor .TaskContext <>() {
185
- @ Override
186
- public ReservedStateUpdateTask getTask () {
187
- return task ;
188
- }
189
-
243
+ ClusterStateTaskExecutor .TaskContext <ReservedStateUpdateTask > taskContext = new TestTaskContext <>(task ) {
190
244
@ Override
191
245
public void success (Runnable onPublicationSuccess ) {
192
- onPublicationSuccess . run ( );
246
+ super . success ( onPublicationSuccess );
193
247
successCalled .set (true );
194
248
}
195
-
196
- @ Override
197
- public void success (Consumer <ClusterState > publishedStateConsumer ) {}
198
-
199
- @ Override
200
- public void success (Runnable onPublicationSuccess , ClusterStateAckListener clusterStateAckListener ) {}
201
-
202
- @ Override
203
- public void success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener clusterStateAckListener ) {}
204
-
205
- @ Override
206
- public void onFailure (Exception failure ) {}
207
-
208
- @ Override
209
- public Releasable captureResponseHeaders () {
210
- return null ;
211
- }
212
249
};
213
250
214
251
ClusterState newState = taskExecutor .execute (
@@ -227,8 +264,7 @@ public void testUpdateErrorState() {
227
264
ClusterState state = ClusterState .builder (new ClusterName ("test" )).build ();
228
265
229
266
ArgumentCaptor <ReservedStateErrorTask > updateTask = ArgumentCaptor .captor ();
230
- @ SuppressWarnings ("unchecked" )
231
- MasterServiceTaskQueue <ReservedStateErrorTask > errorQueue = mock (MasterServiceTaskQueue .class );
267
+ MasterServiceTaskQueue <ReservedStateErrorTask > errorQueue = mockTaskQueue ();
232
268
doNothing ().when (errorQueue ).submitTask (any (), updateTask .capture (), any ());
233
269
234
270
// grab the update task when it gets given to us
@@ -276,40 +312,8 @@ public void testErrorStateTask() throws Exception {
276
312
)
277
313
);
278
314
279
- ReservedStateErrorTaskExecutor .TaskContext <ReservedStateErrorTask > taskContext =
280
- new ReservedStateErrorTaskExecutor .TaskContext <>() {
281
- @ Override
282
- public ReservedStateErrorTask getTask () {
283
- return task ;
284
- }
285
-
286
- @ Override
287
- public void success (Runnable onPublicationSuccess ) {
288
- onPublicationSuccess .run ();
289
- }
290
-
291
- @ Override
292
- public void success (Consumer <ClusterState > publishedStateConsumer ) {}
293
-
294
- @ Override
295
- public void success (Runnable onPublicationSuccess , ClusterStateAckListener clusterStateAckListener ) {}
296
-
297
- @ Override
298
- public void success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener clusterStateAckListener ) {}
299
-
300
- @ Override
301
- public void onFailure (Exception failure ) {}
302
-
303
- @ Override
304
- public Releasable captureResponseHeaders () {
305
- return null ;
306
- }
307
- };
308
-
309
- ReservedStateErrorTaskExecutor executor = new ReservedStateErrorTaskExecutor ();
310
-
311
- ClusterState newState = executor .execute (
312
- new ClusterStateTaskExecutor .BatchExecutionContext <>(state , List .of (taskContext ), () -> null )
315
+ ClusterState newState = new ReservedStateErrorTaskExecutor ().execute (
316
+ new ClusterStateTaskExecutor .BatchExecutionContext <>(state , List .of (new TestTaskContext <>(task )), () -> null )
313
317
);
314
318
315
319
verify (task , times (1 )).execute (any ());
@@ -324,39 +328,12 @@ public Releasable captureResponseHeaders() {
324
328
}
325
329
326
330
public void testUpdateTaskDuplicateError () {
327
- ReservedClusterStateHandler <Map <String , Object >> newStateMaker = new ReservedClusterStateHandler <>() {
328
- @ Override
329
- public String name () {
330
- return "maker" ;
331
- }
332
-
333
- @ Override
334
- public TransformState transform (Object source , TransformState prevState ) throws Exception {
335
- ClusterState newState = new ClusterState .Builder (prevState .state ()).build ();
336
- return new TransformState (newState , prevState .keys ());
337
- }
338
-
339
- @ Override
340
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
341
- return parser .map ();
342
- }
343
- };
344
-
345
- ReservedClusterStateHandler <Map <String , Object >> exceptionThrower = new ReservedClusterStateHandler <>() {
346
- @ Override
347
- public String name () {
348
- return "one" ;
349
- }
350
-
331
+ ReservedClusterStateHandler <Map <String , Object >> newStateMaker = new TestStateHandler ("maker" );
332
+ ReservedClusterStateHandler <Map <String , Object >> exceptionThrower = new TestStateHandler ("one" ) {
351
333
@ Override
352
334
public TransformState transform (Object source , TransformState prevState ) throws Exception {
353
335
throw new Exception ("anything" );
354
336
}
355
-
356
- @ Override
357
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
358
- return parser .map ();
359
- }
360
337
};
361
338
362
339
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata ("one" , Set .of ("a" , "b" ));
@@ -435,22 +412,40 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
435
412
public void testCheckMetadataVersion () {
436
413
ReservedStateMetadata operatorMetadata = ReservedStateMetadata .builder ("test" ).version (123L ).build ();
437
414
438
- assertTrue (checkMetadataVersion ("operator" , operatorMetadata , new ReservedStateVersion (124L , Version .CURRENT )));
439
-
440
- assertFalse (checkMetadataVersion ("operator" , operatorMetadata , new ReservedStateVersion (123L , Version .CURRENT )));
441
-
442
- assertFalse (
443
- checkMetadataVersion ("operator" , operatorMetadata , new ReservedStateVersion (124L , Version .fromId (Version .CURRENT .id + 1 )))
415
+ ClusterState state = ClusterState .builder (new ClusterName ("test" )).metadata (Metadata .builder ().put (operatorMetadata )).build ();
416
+ ReservedStateUpdateTask task = new ReservedStateUpdateTask (
417
+ "test" ,
418
+ new ReservedStateChunk (Map .of (), new ReservedStateVersion (124L , Version .CURRENT )),
419
+ Map .of (),
420
+ List .of (),
421
+ e -> {},
422
+ ActionListener .noop ()
423
+ );
424
+ assertThat ("Cluster state should be modified" , task .execute (state ), not (sameInstance (state )));
425
+
426
+ task = new ReservedStateUpdateTask (
427
+ "test" ,
428
+ new ReservedStateChunk (Map .of (), new ReservedStateVersion (123L , Version .CURRENT )),
429
+ Map .of (),
430
+ List .of (),
431
+ e -> {},
432
+ ActionListener .noop ()
433
+ );
434
+ assertThat ("Cluster state should not be modified" , task .execute (state ), sameInstance (state ));
435
+
436
+ task = new ReservedStateUpdateTask (
437
+ "test" ,
438
+ new ReservedStateChunk (Map .of (), new ReservedStateVersion (124L , Version .fromId (Version .CURRENT .id + 1 ))),
439
+ Map .of (),
440
+ List .of (),
441
+ e -> {},
442
+ ActionListener .noop ()
444
443
);
444
+ assertThat ("Cluster state should not be modified" , task .execute (state ), sameInstance (state ));
445
445
}
446
446
447
- private ReservedClusterStateHandler <Map <String , Object >> makeHandlerHelper (final String name , final List <String > deps ) {
448
- return new ReservedClusterStateHandler <>() {
449
- @ Override
450
- public String name () {
451
- return name ;
452
- }
453
-
447
+ private ReservedClusterStateHandler <Map <String , Object >> makeHandlerHelper (String name , List <String > deps ) {
448
+ return new TestStateHandler (name ) {
454
449
@ Override
455
450
public TransformState transform (Object source , TransformState prevState ) throws Exception {
456
451
return null ;
@@ -460,11 +455,6 @@ public TransformState transform(Object source, TransformState prevState) throws
460
455
public Collection <String > dependencies () {
461
456
return deps ;
462
457
}
463
-
464
- @ Override
465
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
466
- return parser .map ();
467
- }
468
458
};
469
459
}
470
460
@@ -519,7 +509,12 @@ public void testDuplicateHandlerNames() {
519
509
() -> new ReservedClusterStateService (
520
510
clusterService ,
521
511
mock (RerouteService .class ),
522
- List .of (new ReservedClusterSettingsAction (clusterSettings ), new TestHandler ())
512
+ List .of (new ReservedClusterSettingsAction (clusterSettings ), new TestStateHandler (ReservedClusterSettingsAction .NAME ) {
513
+ @ Override
514
+ public TransformState transform (Object source , TransformState prevState ) throws Exception {
515
+ return prevState ;
516
+ }
517
+ })
523
518
)
524
519
).getMessage (),
525
520
startsWith ("Duplicate key cluster_settings" )
@@ -545,39 +540,12 @@ public void testCheckAndReportError() {
545
540
}
546
541
547
542
public void testTrialRunExtractsNonStateActions () {
548
- ReservedClusterStateHandler <Map <String , Object >> newStateMaker = new ReservedClusterStateHandler <>() {
549
- @ Override
550
- public String name () {
551
- return "maker" ;
552
- }
553
-
554
- @ Override
555
- public TransformState transform (Object source , TransformState prevState ) throws Exception {
556
- ClusterState newState = new ClusterState .Builder (prevState .state ()).build ();
557
- return new TransformState (newState , prevState .keys ());
558
- }
559
-
560
- @ Override
561
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
562
- return parser .map ();
563
- }
564
- };
565
-
566
- ReservedClusterStateHandler <Map <String , Object >> exceptionThrower = new ReservedClusterStateHandler <>() {
567
- @ Override
568
- public String name () {
569
- return "non-state" ;
570
- }
571
-
543
+ ReservedClusterStateHandler <Map <String , Object >> newStateMaker = new TestStateHandler ("maker" );
544
+ ReservedClusterStateHandler <Map <String , Object >> exceptionThrower = new TestStateHandler ("non-state" ) {
572
545
@ Override
573
546
public TransformState transform (Object source , TransformState prevState ) {
574
547
return new TransformState (prevState .state (), prevState .keys ());
575
548
}
576
-
577
- @ Override
578
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
579
- return parser .map ();
580
- }
581
549
};
582
550
583
551
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata ("non-state" , Set .of ("a" , "b" ));
@@ -607,22 +575,4 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
607
575
var trialRunErrors = controller .trialRun ("namespace_one" , state , chunk , new LinkedHashSet <>(orderedHandlers ));
608
576
assertThat (trialRunErrors , empty ());
609
577
}
610
-
611
- static class TestHandler implements ReservedClusterStateHandler <Map <String , Object >> {
612
-
613
- @ Override
614
- public String name () {
615
- return ReservedClusterSettingsAction .NAME ;
616
- }
617
-
618
- @ Override
619
- public TransformState transform (Object source , TransformState prevState ) {
620
- return prevState ;
621
- }
622
-
623
- @ Override
624
- public Map <String , Object > fromXContent (XContentParser parser ) throws IOException {
625
- return parser .map ();
626
- }
627
- }
628
578
}
0 commit comments