4646import  java .util .concurrent .atomic .AtomicReference ;
4747import  java .util .function .Consumer ;
4848
49- import  static  org .elasticsearch .reservedstate .service .ReservedStateUpdateTask .checkMetadataVersion ;
5049import  static  org .hamcrest .Matchers .anyOf ;
5150import  static  org .hamcrest .Matchers .contains ;
5251import  static  org .hamcrest .Matchers .containsString ;
5352import  static  org .hamcrest .Matchers .empty ;
5453import  static  org .hamcrest .Matchers .equalTo ;
5554import  static  org .hamcrest .Matchers .instanceOf ;
5655import  static  org .hamcrest .Matchers .is ;
56+ import  static  org .hamcrest .Matchers .not ;
5757import  static  org .hamcrest .Matchers .notNullValue ;
58+ import  static  org .hamcrest .Matchers .sameInstance ;
5859import  static  org .hamcrest .Matchers .startsWith ;
5960import  static  org .mockito .ArgumentMatchers .any ;
6061import  static  org .mockito .ArgumentMatchers .anyString ;
@@ -74,6 +75,65 @@ private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mo
7475        return  (MasterServiceTaskQueue <T >) mock (MasterServiceTaskQueue .class );
7576    }
7677
78+     private  static  class  TestTaskContext <T  extends  ClusterStateTaskListener > implements  ClusterStateTaskExecutor .TaskContext <T > {
79+         private  final  T  task ;
80+ 
81+         private  TestTaskContext (T  task ) {
82+             this .task  = task ;
83+         }
84+ 
85+         @ Override 
86+         public  T  getTask () {
87+             return  task ;
88+         }
89+ 
90+         @ Override 
91+         public  void  success (Runnable  onPublicationSuccess ) {
92+             onPublicationSuccess .run ();
93+         }
94+ 
95+         @ Override 
96+         public  void  success (Consumer <ClusterState > publishedStateConsumer ) {}
97+ 
98+         @ Override 
99+         public  void  success (Runnable  onPublicationSuccess , ClusterStateAckListener  clusterStateAckListener ) {}
100+ 
101+         @ Override 
102+         public  void  success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener  clusterStateAckListener ) {}
103+ 
104+         @ Override 
105+         public  void  onFailure (Exception  failure ) {}
106+ 
107+         @ Override 
108+         public  Releasable  captureResponseHeaders () {
109+             return  null ;
110+         }
111+     }
112+ 
113+     private  static  class  TestStateHandler  implements  ReservedClusterStateHandler <Map <String , Object >> {
114+         private  final  String  name ;
115+ 
116+         private  TestStateHandler (String  name ) {
117+             this .name  = name ;
118+         }
119+ 
120+         @ Override 
121+         public  String  name () {
122+             return  name ;
123+         }
124+ 
125+         @ Override 
126+         public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
127+             ClusterState  newState  = new  ClusterState .Builder (prevState .state ()).build ();
128+             return  new  TransformState (newState , prevState .keys ());
129+         }
130+ 
131+         @ Override 
132+         public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
133+             return  parser .map ();
134+         }
135+     }
136+ 
77137    public  void  testOperatorController () throws  IOException  {
78138        ClusterSettings  clusterSettings  = new  ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS );
79139        ClusterService  clusterService  = mock (ClusterService .class );
@@ -147,8 +207,7 @@ public void testInitEmptyTask() {
147207
148208        // grab the update task when it gets given to us 
149209        when (clusterService .createTaskQueue (ArgumentMatchers .contains ("reserved state update" ), any (), any ())).thenAnswer (i  -> {
150-             @ SuppressWarnings ("unchecked" )
151-             MasterServiceTaskQueue <ReservedStateUpdateTask > queue  = mock (MasterServiceTaskQueue .class );
210+             MasterServiceTaskQueue <ReservedStateUpdateTask > queue  = mockTaskQueue ();
152211            doNothing ().when (queue ).submitTask (any (), updateTask .capture (), any ());
153212            return  queue ;
154213        });
@@ -181,34 +240,12 @@ public void testUpdateStateTasks() throws Exception {
181240
182241        doReturn (state ).when (task ).execute (any ());
183242
184-         ClusterStateTaskExecutor .TaskContext <ReservedStateUpdateTask > taskContext  = new  ClusterStateTaskExecutor .TaskContext <>() {
185-             @ Override 
186-             public  ReservedStateUpdateTask  getTask () {
187-                 return  task ;
188-             }
189- 
243+         ClusterStateTaskExecutor .TaskContext <ReservedStateUpdateTask > taskContext  = new  TestTaskContext <>(task ) {
190244            @ Override 
191245            public  void  success (Runnable  onPublicationSuccess ) {
192-                 onPublicationSuccess . run ( );
246+                 super . success ( onPublicationSuccess );
193247                successCalled .set (true );
194248            }
195- 
196-             @ Override 
197-             public  void  success (Consumer <ClusterState > publishedStateConsumer ) {}
198- 
199-             @ Override 
200-             public  void  success (Runnable  onPublicationSuccess , ClusterStateAckListener  clusterStateAckListener ) {}
201- 
202-             @ Override 
203-             public  void  success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener  clusterStateAckListener ) {}
204- 
205-             @ Override 
206-             public  void  onFailure (Exception  failure ) {}
207- 
208-             @ Override 
209-             public  Releasable  captureResponseHeaders () {
210-                 return  null ;
211-             }
212249        };
213250
214251        ClusterState  newState  = taskExecutor .execute (
@@ -227,8 +264,7 @@ public void testUpdateErrorState() {
227264        ClusterState  state  = ClusterState .builder (new  ClusterName ("test" )).build ();
228265
229266        ArgumentCaptor <ReservedStateErrorTask > updateTask  = ArgumentCaptor .captor ();
230-         @ SuppressWarnings ("unchecked" )
231-         MasterServiceTaskQueue <ReservedStateErrorTask > errorQueue  = mock (MasterServiceTaskQueue .class );
267+         MasterServiceTaskQueue <ReservedStateErrorTask > errorQueue  = mockTaskQueue ();
232268        doNothing ().when (errorQueue ).submitTask (any (), updateTask .capture (), any ());
233269
234270        // grab the update task when it gets given to us 
@@ -276,40 +312,8 @@ public void testErrorStateTask() throws Exception {
276312            )
277313        );
278314
279-         ReservedStateErrorTaskExecutor .TaskContext <ReservedStateErrorTask > taskContext  =
280-             new  ReservedStateErrorTaskExecutor .TaskContext <>() {
281-                 @ Override 
282-                 public  ReservedStateErrorTask  getTask () {
283-                     return  task ;
284-                 }
285- 
286-                 @ Override 
287-                 public  void  success (Runnable  onPublicationSuccess ) {
288-                     onPublicationSuccess .run ();
289-                 }
290- 
291-                 @ Override 
292-                 public  void  success (Consumer <ClusterState > publishedStateConsumer ) {}
293- 
294-                 @ Override 
295-                 public  void  success (Runnable  onPublicationSuccess , ClusterStateAckListener  clusterStateAckListener ) {}
296- 
297-                 @ Override 
298-                 public  void  success (Consumer <ClusterState > publishedStateConsumer , ClusterStateAckListener  clusterStateAckListener ) {}
299- 
300-                 @ Override 
301-                 public  void  onFailure (Exception  failure ) {}
302- 
303-                 @ Override 
304-                 public  Releasable  captureResponseHeaders () {
305-                     return  null ;
306-                 }
307-             };
308- 
309-         ReservedStateErrorTaskExecutor  executor  = new  ReservedStateErrorTaskExecutor ();
310- 
311-         ClusterState  newState  = executor .execute (
312-             new  ClusterStateTaskExecutor .BatchExecutionContext <>(state , List .of (taskContext ), () -> null )
315+         ClusterState  newState  = new  ReservedStateErrorTaskExecutor ().execute (
316+             new  ClusterStateTaskExecutor .BatchExecutionContext <>(state , List .of (new  TestTaskContext <>(task )), () -> null )
313317        );
314318
315319        verify (task , times (1 )).execute (any ());
@@ -324,39 +328,12 @@ public Releasable captureResponseHeaders() {
324328    }
325329
326330    public  void  testUpdateTaskDuplicateError () {
327-         ReservedClusterStateHandler <Map <String , Object >> newStateMaker  = new  ReservedClusterStateHandler <>() {
328-             @ Override 
329-             public  String  name () {
330-                 return  "maker" ;
331-             }
332- 
333-             @ Override 
334-             public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
335-                 ClusterState  newState  = new  ClusterState .Builder (prevState .state ()).build ();
336-                 return  new  TransformState (newState , prevState .keys ());
337-             }
338- 
339-             @ Override 
340-             public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
341-                 return  parser .map ();
342-             }
343-         };
344- 
345-         ReservedClusterStateHandler <Map <String , Object >> exceptionThrower  = new  ReservedClusterStateHandler <>() {
346-             @ Override 
347-             public  String  name () {
348-                 return  "one" ;
349-             }
350- 
331+         ReservedClusterStateHandler <Map <String , Object >> newStateMaker  = new  TestStateHandler ("maker" );
332+         ReservedClusterStateHandler <Map <String , Object >> exceptionThrower  = new  TestStateHandler ("one" ) {
351333            @ Override 
352334            public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
353335                throw  new  Exception ("anything" );
354336            }
355- 
356-             @ Override 
357-             public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
358-                 return  parser .map ();
359-             }
360337        };
361338
362339        ReservedStateHandlerMetadata  hmOne  = new  ReservedStateHandlerMetadata ("one" , Set .of ("a" , "b" ));
@@ -435,22 +412,40 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
435412    public  void  testCheckMetadataVersion () {
436413        ReservedStateMetadata  operatorMetadata  = ReservedStateMetadata .builder ("test" ).version (123L ).build ();
437414
438-         assertTrue (checkMetadataVersion ("operator" , operatorMetadata , new  ReservedStateVersion (124L , Version .CURRENT )));
439- 
440-         assertFalse (checkMetadataVersion ("operator" , operatorMetadata , new  ReservedStateVersion (123L , Version .CURRENT )));
441- 
442-         assertFalse (
443-             checkMetadataVersion ("operator" , operatorMetadata , new  ReservedStateVersion (124L , Version .fromId (Version .CURRENT .id  + 1 )))
415+         ClusterState  state  = ClusterState .builder (new  ClusterName ("test" )).metadata (Metadata .builder ().put (operatorMetadata )).build ();
416+         ReservedStateUpdateTask  task  = new  ReservedStateUpdateTask (
417+             "test" ,
418+             new  ReservedStateChunk (Map .of (), new  ReservedStateVersion (124L , Version .CURRENT )),
419+             Map .of (),
420+             List .of (),
421+             e  -> {},
422+             ActionListener .noop ()
423+         );
424+         assertThat ("Cluster state should be modified" , task .execute (state ), not (sameInstance (state )));
425+ 
426+         task  = new  ReservedStateUpdateTask (
427+             "test" ,
428+             new  ReservedStateChunk (Map .of (), new  ReservedStateVersion (123L , Version .CURRENT )),
429+             Map .of (),
430+             List .of (),
431+             e  -> {},
432+             ActionListener .noop ()
433+         );
434+         assertThat ("Cluster state should not be modified" , task .execute (state ), sameInstance (state ));
435+ 
436+         task  = new  ReservedStateUpdateTask (
437+             "test" ,
438+             new  ReservedStateChunk (Map .of (), new  ReservedStateVersion (124L , Version .fromId (Version .CURRENT .id  + 1 ))),
439+             Map .of (),
440+             List .of (),
441+             e  -> {},
442+             ActionListener .noop ()
444443        );
444+         assertThat ("Cluster state should not be modified" , task .execute (state ), sameInstance (state ));
445445    }
446446
447-     private  ReservedClusterStateHandler <Map <String , Object >> makeHandlerHelper (final  String  name , final  List <String > deps ) {
448-         return  new  ReservedClusterStateHandler <>() {
449-             @ Override 
450-             public  String  name () {
451-                 return  name ;
452-             }
453- 
447+     private  ReservedClusterStateHandler <Map <String , Object >> makeHandlerHelper (String  name , List <String > deps ) {
448+         return  new  TestStateHandler (name ) {
454449            @ Override 
455450            public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
456451                return  null ;
@@ -460,11 +455,6 @@ public TransformState transform(Object source, TransformState prevState) throws
460455            public  Collection <String > dependencies () {
461456                return  deps ;
462457            }
463- 
464-             @ Override 
465-             public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
466-                 return  parser .map ();
467-             }
468458        };
469459    }
470460
@@ -519,7 +509,12 @@ public void testDuplicateHandlerNames() {
519509                () -> new  ReservedClusterStateService (
520510                    clusterService ,
521511                    mock (RerouteService .class ),
522-                     List .of (new  ReservedClusterSettingsAction (clusterSettings ), new  TestHandler ())
512+                     List .of (new  ReservedClusterSettingsAction (clusterSettings ), new  TestStateHandler (ReservedClusterSettingsAction .NAME ) {
513+                         @ Override 
514+                         public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
515+                             return  prevState ;
516+                         }
517+                     })
523518                )
524519            ).getMessage (),
525520            startsWith ("Duplicate key cluster_settings" )
@@ -545,39 +540,12 @@ public void testCheckAndReportError() {
545540    }
546541
547542    public  void  testTrialRunExtractsNonStateActions () {
548-         ReservedClusterStateHandler <Map <String , Object >> newStateMaker  = new  ReservedClusterStateHandler <>() {
549-             @ Override 
550-             public  String  name () {
551-                 return  "maker" ;
552-             }
553- 
554-             @ Override 
555-             public  TransformState  transform (Object  source , TransformState  prevState ) throws  Exception  {
556-                 ClusterState  newState  = new  ClusterState .Builder (prevState .state ()).build ();
557-                 return  new  TransformState (newState , prevState .keys ());
558-             }
559- 
560-             @ Override 
561-             public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
562-                 return  parser .map ();
563-             }
564-         };
565- 
566-         ReservedClusterStateHandler <Map <String , Object >> exceptionThrower  = new  ReservedClusterStateHandler <>() {
567-             @ Override 
568-             public  String  name () {
569-                 return  "non-state" ;
570-             }
571- 
543+         ReservedClusterStateHandler <Map <String , Object >> newStateMaker  = new  TestStateHandler ("maker" );
544+         ReservedClusterStateHandler <Map <String , Object >> exceptionThrower  = new  TestStateHandler ("non-state" ) {
572545            @ Override 
573546            public  TransformState  transform (Object  source , TransformState  prevState ) {
574547                return  new  TransformState (prevState .state (), prevState .keys ());
575548            }
576- 
577-             @ Override 
578-             public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
579-                 return  parser .map ();
580-             }
581549        };
582550
583551        ReservedStateHandlerMetadata  hmOne  = new  ReservedStateHandlerMetadata ("non-state" , Set .of ("a" , "b" ));
@@ -607,22 +575,4 @@ public Map<String, Object> fromXContent(XContentParser parser) throws IOExceptio
607575        var  trialRunErrors  = controller .trialRun ("namespace_one" , state , chunk , new  LinkedHashSet <>(orderedHandlers ));
608576        assertThat (trialRunErrors , empty ());
609577    }
610- 
611-     static  class  TestHandler  implements  ReservedClusterStateHandler <Map <String , Object >> {
612- 
613-         @ Override 
614-         public  String  name () {
615-             return  ReservedClusterSettingsAction .NAME ;
616-         }
617- 
618-         @ Override 
619-         public  TransformState  transform (Object  source , TransformState  prevState ) {
620-             return  prevState ;
621-         }
622- 
623-         @ Override 
624-         public  Map <String , Object > fromXContent (XContentParser  parser ) throws  IOException  {
625-             return  parser .map ();
626-         }
627-     }
628578}
0 commit comments