1818import org .elasticsearch .cluster .routing .allocation .NodeAllocationStats ;
1919import org .elasticsearch .cluster .routing .allocation .NodeAllocationStatsTests ;
2020import org .elasticsearch .cluster .service .ClusterService ;
21+ import org .elasticsearch .common .settings .ClusterSettings ;
2122import org .elasticsearch .common .settings .Settings ;
2223import org .elasticsearch .core .CheckedConsumer ;
2324import org .elasticsearch .core .TimeValue ;
4243import java .util .concurrent .atomic .AtomicBoolean ;
4344import java .util .concurrent .atomic .AtomicInteger ;
4445import java .util .concurrent .atomic .AtomicReference ;
46+ import java .util .function .Consumer ;
4547
4648import static org .hamcrest .Matchers .anEmptyMap ;
4749import static org .hamcrest .Matchers .containsString ;
5557
5658public class TransportGetAllocationStatsActionTests extends ESTestCase {
5759
58- private static final long CACHE_MAX_AGE_MILLIS = 30000 ;
59-
6060 private long startTimeMillis ;
61+ private TimeValue cacheTTL ;
6162 private ControlledRelativeTimeThreadPool threadPool ;
6263 private ClusterService clusterService ;
6364 private TransportService transportService ;
@@ -69,9 +70,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6970 @ Before
7071 public void setUp () throws Exception {
7172 super .setUp ();
72- startTimeMillis = CACHE_MAX_AGE_MILLIS ;
73+ startTimeMillis = 0L ;
74+ cacheTTL = TimeValue .timeValueMinutes (1 );
7375 threadPool = new ControlledRelativeTimeThreadPool (TransportClusterAllocationExplainActionTests .class .getName (), startTimeMillis );
74- clusterService = ClusterServiceUtils .createClusterService (threadPool );
76+ clusterService = ClusterServiceUtils .createClusterService (
77+ threadPool ,
78+ new ClusterSettings (
79+ Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), cacheTTL .toString ()).build (),
80+ ClusterSettings .BUILT_IN_CLUSTER_SETTINGS
81+ )
82+ );
7583 transportService = new CapturingTransport ().createTransportService (
7684 clusterService .getSettings (),
7785 threadPool ,
@@ -82,12 +90,6 @@ public void setUp() throws Exception {
8290 );
8391 allocationStatsService = mock (AllocationStatsService .class );
8492 action = new TransportGetAllocationStatsAction (
85- Settings .builder ()
86- .put (
87- TransportGetAllocationStatsAction .CACHE_MAX_AGE_SETTING .getKey (),
88- TimeValue .timeValueMillis (CACHE_MAX_AGE_MILLIS ).toString ()
89- )
90- .build (),
9193 transportService ,
9294 clusterService ,
9395 threadPool ,
@@ -195,8 +197,20 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
195197 public void testGetStatsWithCachingEnabled () throws Exception {
196198
197199 final AtomicReference <Map <String , NodeAllocationStats >> allocationStats = new AtomicReference <>();
198- allocationStats .set (Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ()));
199- when (allocationStatsService .stats ()).thenReturn (allocationStats .get ());
200+ int numExpectedAllocationStatsServiceCalls = 0 ;
201+
202+ final Runnable resetExpectedAllocationStats = () -> {
203+ final var stats = Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ());
204+ allocationStats .set (stats );
205+ when (allocationStatsService .stats ()).thenReturn (stats );
206+ };
207+
208+ final Consumer <TimeValue > setCacheTTL = (ttl ) -> {
209+ clusterService .getClusterSettings ()
210+ .applySettings (
211+ Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl .toString ()).build ()
212+ );
213+ };
200214
201215 final CheckedConsumer <ActionListener <Void >, Exception > threadTask = l -> {
202216 final var request = new TransportGetAllocationStatsAction .Request (
@@ -214,21 +228,30 @@ public void testGetStatsWithCachingEnabled() throws Exception {
214228 };
215229
216230 // Initial cache miss, all threads should get the same value.
231+ resetExpectedAllocationStats .run ();
217232 ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
218- verify (allocationStatsService , times (1 )).stats ();
233+ verify (allocationStatsService , times (++ numExpectedAllocationStatsServiceCalls )).stats ();
219234
220235 // Force the cached stats to expire.
221- threadPool .setCurrentTimeInMillis (startTimeMillis + (CACHE_MAX_AGE_MILLIS * 2 ));
236+ threadPool .setCurrentTimeInMillis (startTimeMillis + (2 * cacheTTL . getMillis () ));
222237
223238 // Expect a single call to the stats service on the cache miss.
224- allocationStats .set (Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ()));
225- when (allocationStatsService .stats ()).thenReturn (allocationStats .get ());
239+ resetExpectedAllocationStats .run ();
226240 ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
227- verify (allocationStatsService , times (2 )).stats ();
228-
229- // All subsequent requests should get the cached value.
241+ verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
242+
243+ // Update the TTL setting to disable the cache, we expect a service call each time.
244+ setCacheTTL .accept (TimeValue .ZERO );
245+ threadTask .accept (ActionListener .noop ());
246+ threadTask .accept (ActionListener .noop ());
247+ numExpectedAllocationStatsServiceCalls += 2 ;
248+ verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
249+
250+ // Re-enable the cache, only one thread should call the stats service.
251+ setCacheTTL .accept (TimeValue .timeValueMinutes (5 ));
252+ resetExpectedAllocationStats .run ();
230253 ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
231- verify (allocationStatsService , times (2 )).stats ();
254+ verify (allocationStatsService , times (++ numExpectedAllocationStatsServiceCalls )).stats ();
232255 }
233256
234257 private static class ControlledRelativeTimeThreadPool extends ThreadPool {
0 commit comments