1818import  org .elasticsearch .cluster .routing .allocation .NodeAllocationStats ;
1919import  org .elasticsearch .cluster .routing .allocation .NodeAllocationStatsTests ;
2020import  org .elasticsearch .cluster .service .ClusterService ;
21+ import  org .elasticsearch .common .settings .ClusterSettings ;
2122import  org .elasticsearch .common .settings .Settings ;
2223import  org .elasticsearch .core .CheckedConsumer ;
2324import  org .elasticsearch .core .TimeValue ;
4243import  java .util .concurrent .atomic .AtomicBoolean ;
4344import  java .util .concurrent .atomic .AtomicInteger ;
4445import  java .util .concurrent .atomic .AtomicReference ;
46+ import  java .util .function .Consumer ;
4547
4648import  static  org .hamcrest .Matchers .anEmptyMap ;
4749import  static  org .hamcrest .Matchers .containsString ;
5557
5658public  class  TransportGetAllocationStatsActionTests  extends  ESTestCase  {
5759
58-     private  static  final  long  CACHE_MAX_AGE_MILLIS  = 30000 ;
59- 
6060    private  long  startTimeMillis ;
61+     private  TimeValue  cacheTTL ;
6162    private  ControlledRelativeTimeThreadPool  threadPool ;
6263    private  ClusterService  clusterService ;
6364    private  TransportService  transportService ;
@@ -69,9 +70,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6970    @ Before 
7071    public  void  setUp () throws  Exception  {
7172        super .setUp ();
72-         startTimeMillis  = CACHE_MAX_AGE_MILLIS ;
73+         startTimeMillis  = 0L ;
74+         cacheTTL  = TimeValue .timeValueMinutes (1 );
7375        threadPool  = new  ControlledRelativeTimeThreadPool (TransportClusterAllocationExplainActionTests .class .getName (), startTimeMillis );
74-         clusterService  = ClusterServiceUtils .createClusterService (threadPool );
76+         clusterService  = ClusterServiceUtils .createClusterService (
77+             threadPool ,
78+             new  ClusterSettings (
79+                 Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), cacheTTL .toString ()).build (),
80+                 ClusterSettings .BUILT_IN_CLUSTER_SETTINGS 
81+             )
82+         );
7583        transportService  = new  CapturingTransport ().createTransportService (
7684            clusterService .getSettings (),
7785            threadPool ,
@@ -82,12 +90,6 @@ public void setUp() throws Exception {
8290        );
8391        allocationStatsService  = mock (AllocationStatsService .class );
8492        action  = new  TransportGetAllocationStatsAction (
85-             Settings .builder ()
86-                 .put (
87-                     TransportGetAllocationStatsAction .CACHE_MAX_AGE_SETTING .getKey (),
88-                     TimeValue .timeValueMillis (CACHE_MAX_AGE_MILLIS ).toString ()
89-                 )
90-                 .build (),
9193            transportService ,
9294            clusterService ,
9395            threadPool ,
@@ -195,8 +197,20 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
195197    public  void  testGetStatsWithCachingEnabled () throws  Exception  {
196198
197199        final  AtomicReference <Map <String , NodeAllocationStats >> allocationStats  = new  AtomicReference <>();
198-         allocationStats .set (Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ()));
199-         when (allocationStatsService .stats ()).thenReturn (allocationStats .get ());
200+         int  numExpectedAllocationStatsServiceCalls  = 0 ;
201+ 
202+         final  Runnable  resetExpectedAllocationStats  = () -> {
203+             final  var  stats  = Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ());
204+             allocationStats .set (stats );
205+             when (allocationStatsService .stats ()).thenReturn (stats );
206+         };
207+ 
208+         final  Consumer <TimeValue > setCacheTTL  = (ttl ) -> {
209+             clusterService .getClusterSettings ()
210+                 .applySettings (
211+                     Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl .toString ()).build ()
212+                 );
213+         };
200214
201215        final  CheckedConsumer <ActionListener <Void >, Exception > threadTask  = l  -> {
202216            final  var  request  = new  TransportGetAllocationStatsAction .Request (
@@ -214,21 +228,30 @@ public void testGetStatsWithCachingEnabled() throws Exception {
214228        };
215229
216230        // Initial cache miss, all threads should get the same value. 
231+         resetExpectedAllocationStats .run ();
217232        ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
218-         verify (allocationStatsService , times (1 )).stats ();
233+         verify (allocationStatsService , times (++ numExpectedAllocationStatsServiceCalls )).stats ();
219234
220235        // Force the cached stats to expire. 
221-         threadPool .setCurrentTimeInMillis (startTimeMillis  + (CACHE_MAX_AGE_MILLIS  * 2 ));
236+         threadPool .setCurrentTimeInMillis (startTimeMillis  + (2  * cacheTTL . getMillis () ));
222237
223238        // Expect a single call to the stats service on the cache miss. 
224-         allocationStats .set (Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ()));
225-         when (allocationStatsService .stats ()).thenReturn (allocationStats .get ());
239+         resetExpectedAllocationStats .run ();
226240        ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
227-         verify (allocationStatsService , times (2 )).stats ();
228- 
229-         // All subsequent requests should get the cached value. 
241+         verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
242+ 
243+         // Update the TTL setting to disable the cache, we expect a service call each time. 
244+         setCacheTTL .accept (TimeValue .ZERO );
245+         threadTask .accept (ActionListener .noop ());
246+         threadTask .accept (ActionListener .noop ());
247+         numExpectedAllocationStatsServiceCalls  += 2 ;
248+         verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
249+ 
250+         // Re-enable the cache, only one thread should call the stats service. 
251+         setCacheTTL .accept (TimeValue .timeValueMinutes (5 ));
252+         resetExpectedAllocationStats .run ();
230253        ESTestCase .startInParallel (between (1 , 5 ), threadNumber  -> safeAwait (threadTask ));
231-         verify (allocationStatsService , times (2 )).stats ();
254+         verify (allocationStatsService , times (++ numExpectedAllocationStatsServiceCalls )).stats ();
232255    }
233256
234257    private  static  class  ControlledRelativeTimeThreadPool  extends  ThreadPool  {
0 commit comments