99
1010package  org .elasticsearch .action .admin .cluster .allocation ;
1111
12+ import  org .elasticsearch .action .ActionListener ;
1213import  org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequestParameters .Metric ;
1314import  org .elasticsearch .action .support .ActionFilters ;
1415import  org .elasticsearch .action .support .PlainActionFuture ;
1516import  org .elasticsearch .cluster .ClusterState ;
1617import  org .elasticsearch .cluster .routing .allocation .AllocationStatsService ;
18+ import  org .elasticsearch .cluster .routing .allocation .NodeAllocationStats ;
1719import  org .elasticsearch .cluster .routing .allocation .NodeAllocationStatsTests ;
1820import  org .elasticsearch .cluster .service .ClusterService ;
21+ import  org .elasticsearch .common .settings .ClusterSettings ;
22+ import  org .elasticsearch .common .settings .Settings ;
23+ import  org .elasticsearch .core .CheckedConsumer ;
1924import  org .elasticsearch .core .TimeValue ;
2025import  org .elasticsearch .features .FeatureService ;
26+ import  org .elasticsearch .node .Node ;
2127import  org .elasticsearch .tasks .Task ;
2228import  org .elasticsearch .tasks .TaskId ;
29+ import  org .elasticsearch .telemetry .metric .MeterRegistry ;
2330import  org .elasticsearch .test .ClusterServiceUtils ;
2431import  org .elasticsearch .test .ESTestCase ;
2532import  org .elasticsearch .test .transport .CapturingTransport ;
26- import  org .elasticsearch .threadpool .TestThreadPool ;
33+ import  org .elasticsearch .threadpool .DefaultBuiltInExecutorBuilders ;
2734import  org .elasticsearch .threadpool .ThreadPool ;
2835import  org .elasticsearch .transport .TransportService ;
2936import  org .junit .After ;
3542import  java .util .concurrent .CyclicBarrier ;
3643import  java .util .concurrent .atomic .AtomicBoolean ;
3744import  java .util .concurrent .atomic .AtomicInteger ;
45+ import  java .util .concurrent .atomic .AtomicReference ;
3846
3947import  static  org .hamcrest .Matchers .anEmptyMap ;
4048import  static  org .hamcrest .Matchers .containsString ;
4452import  static  org .mockito .ArgumentMatchers .eq ;
4553import  static  org .mockito .Mockito .mock ;
4654import  static  org .mockito .Mockito .never ;
55+ import  static  org .mockito .Mockito .times ;
4756import  static  org .mockito .Mockito .verify ;
4857import  static  org .mockito .Mockito .when ;
4958
5059public  class  TransportGetAllocationStatsActionTests  extends  ESTestCase  {
5160
52-     private  ThreadPool  threadPool ;
61+     private  long  startTimeMillis ;
62+     private  TimeValue  allocationStatsCacheTTL ;
63+     private  ControlledRelativeTimeThreadPool  threadPool ;
5364    private  ClusterService  clusterService ;
5465    private  TransportService  transportService ;
5566    private  AllocationStatsService  allocationStatsService ;
@@ -61,8 +72,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6172    @ Before 
6273    public  void  setUp () throws  Exception  {
6374        super .setUp ();
64-         threadPool  = new  TestThreadPool (TransportClusterAllocationExplainActionTests .class .getName ());
65-         clusterService  = ClusterServiceUtils .createClusterService (threadPool );
75+         startTimeMillis  = 0L ;
76+         allocationStatsCacheTTL  = TimeValue .timeValueMinutes (1 );
77+         threadPool  = new  ControlledRelativeTimeThreadPool (TransportClusterAllocationExplainActionTests .class .getName (), startTimeMillis );
78+         clusterService  = ClusterServiceUtils .createClusterService (
79+             threadPool ,
80+             new  ClusterSettings (
81+                 Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), allocationStatsCacheTTL ).build (),
82+                 ClusterSettings .BUILT_IN_CLUSTER_SETTINGS 
83+             )
84+         );
6685        transportService  = new  CapturingTransport ().createTransportService (
6786            clusterService .getSettings (),
6887            threadPool ,
@@ -92,7 +111,17 @@ public void tearDown() throws Exception {
92111        transportService .close ();
93112    }
94113
114+     private  void  disableAllocationStatsCache () {
115+         setAllocationStatsCacheTTL (TimeValue .ZERO );
116+     }
117+ 
118+     private  void  setAllocationStatsCacheTTL (TimeValue  ttl ) {
119+         clusterService .getClusterSettings ()
120+             .applySettings (Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl ).build ());
121+     };
122+ 
95123    public  void  testReturnsOnlyRequestedStats () throws  Exception  {
124+         disableAllocationStatsCache ();
96125
97126        var  metrics  = EnumSet .copyOf (randomSubsetOf (Metric .values ().length , Metric .values ()));
98127
@@ -126,6 +155,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
126155    }
127156
128157    public  void  testDeduplicatesStatsComputations () throws  InterruptedException  {
158+         disableAllocationStatsCache ();
129159        final  var  requestCounter  = new  AtomicInteger ();
130160        final  var  isExecuting  = new  AtomicBoolean ();
131161        when (allocationStatsService .stats ()).thenAnswer (invocation  -> {
@@ -170,4 +200,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
170200            thread .join ();
171201        }
172202    }
203+ 
204+     public  void  testGetStatsWithCachingEnabled () throws  Exception  {
205+ 
206+         final  AtomicReference <Map <String , NodeAllocationStats >> allocationStats  = new  AtomicReference <>();
207+         int  numExpectedAllocationStatsServiceCalls  = 0 ;
208+ 
209+         final  Runnable  resetExpectedAllocationStats  = () -> {
210+             final  var  stats  = Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ());
211+             allocationStats .set (stats );
212+             when (allocationStatsService .stats ()).thenReturn (stats );
213+         };
214+ 
215+         final  CheckedConsumer <ActionListener <Void >, Exception > threadTask  = l  -> {
216+             final  var  request  = new  TransportGetAllocationStatsAction .Request (
217+                 TEST_REQUEST_TIMEOUT ,
218+                 new  TaskId (randomIdentifier (), randomNonNegativeLong ()),
219+                 EnumSet .of (Metric .ALLOCATIONS )
220+             );
221+ 
222+             action .masterOperation (mock (Task .class ), request , ClusterState .EMPTY_STATE , l .map (response  -> {
223+                 assertSame ("Expected the cached allocation stats to be returned" , response .getNodeAllocationStats (), allocationStats .get ());
224+                 return  null ;
225+             }));
226+         };
227+ 
228+         // Initial cache miss, all threads should get the same value. 
229+         resetExpectedAllocationStats .run ();
230+         ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
231+         verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
232+ 
233+         // Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats. 
234+         threadPool .setCurrentTimeInMillis (startTimeMillis  + between (0 , (int ) allocationStatsCacheTTL .millis ()));
235+         ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
236+         verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
237+ 
238+         // Force the cached stats to expire. 
239+         threadPool .setCurrentTimeInMillis (startTimeMillis  + allocationStatsCacheTTL .getMillis () + 1 );
240+ 
241+         // Expect a single call to the stats service on the cache miss. 
242+         resetExpectedAllocationStats .run ();
243+         ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
244+         verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
245+ 
246+         // Update the TTL setting to disable the cache, we expect a service call each time. 
247+         setAllocationStatsCacheTTL (TimeValue .ZERO );
248+         safeAwait (threadTask );
249+         safeAwait (threadTask );
250+         numExpectedAllocationStatsServiceCalls  += 2 ;
251+         verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
252+ 
253+         // Re-enable the cache, only one thread should call the stats service. 
254+         setAllocationStatsCacheTTL (TimeValue .timeValueMinutes (5 ));
255+         resetExpectedAllocationStats .run ();
256+         ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
257+         verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
258+     }
259+ 
260+     private  static  class  ControlledRelativeTimeThreadPool  extends  ThreadPool  {
261+ 
262+         private  long  currentTimeInMillis ;
263+ 
264+         ControlledRelativeTimeThreadPool (String  name , long  startTimeMillis ) {
265+             super (
266+                 Settings .builder ().put (Node .NODE_NAME_SETTING .getKey (), name ).build (),
267+                 MeterRegistry .NOOP ,
268+                 new  DefaultBuiltInExecutorBuilders ()
269+             );
270+             this .currentTimeInMillis  = startTimeMillis ;
271+             stopCachedTimeThread ();
272+         }
273+ 
274+         @ Override 
275+         public  long  relativeTimeInMillis () {
276+             return  currentTimeInMillis ;
277+         }
278+ 
279+         void  setCurrentTimeInMillis (long  currentTimeInMillis ) {
280+             this .currentTimeInMillis  = currentTimeInMillis ;
281+         }
282+     }
173283}
0 commit comments