99
1010package org .elasticsearch .action .admin .cluster .allocation ;
1111
12+ import org .elasticsearch .action .ActionListener ;
1213import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequestParameters .Metric ;
1314import org .elasticsearch .action .support .ActionFilters ;
1415import org .elasticsearch .action .support .PlainActionFuture ;
1516import org .elasticsearch .cluster .ClusterState ;
1617import org .elasticsearch .cluster .routing .allocation .AllocationStatsService ;
18+ import org .elasticsearch .cluster .routing .allocation .NodeAllocationStats ;
1719import org .elasticsearch .cluster .routing .allocation .NodeAllocationStatsTests ;
1820import org .elasticsearch .cluster .service .ClusterService ;
21+ import org .elasticsearch .common .settings .ClusterSettings ;
22+ import org .elasticsearch .common .settings .Settings ;
23+ import org .elasticsearch .core .CheckedConsumer ;
1924import org .elasticsearch .core .TimeValue ;
2025import org .elasticsearch .features .FeatureService ;
26+ import org .elasticsearch .node .Node ;
2127import org .elasticsearch .tasks .Task ;
2228import org .elasticsearch .tasks .TaskId ;
29+ import org .elasticsearch .telemetry .metric .MeterRegistry ;
2330import org .elasticsearch .test .ClusterServiceUtils ;
2431import org .elasticsearch .test .ESTestCase ;
2532import org .elasticsearch .test .transport .CapturingTransport ;
26- import org .elasticsearch .threadpool .TestThreadPool ;
33+ import org .elasticsearch .threadpool .DefaultBuiltInExecutorBuilders ;
2734import org .elasticsearch .threadpool .ThreadPool ;
2835import org .elasticsearch .transport .TransportService ;
2936import org .junit .After ;
3542import java .util .concurrent .CyclicBarrier ;
3643import java .util .concurrent .atomic .AtomicBoolean ;
3744import java .util .concurrent .atomic .AtomicInteger ;
45+ import java .util .concurrent .atomic .AtomicReference ;
3846
3947import static org .hamcrest .Matchers .anEmptyMap ;
4048import static org .hamcrest .Matchers .containsString ;
4452import static org .mockito .ArgumentMatchers .eq ;
4553import static org .mockito .Mockito .mock ;
4654import static org .mockito .Mockito .never ;
55+ import static org .mockito .Mockito .times ;
4756import static org .mockito .Mockito .verify ;
4857import static org .mockito .Mockito .when ;
4958
5059public class TransportGetAllocationStatsActionTests extends ESTestCase {
5160
52- private ThreadPool threadPool ;
61+ private long startTimeMillis ;
62+ private TimeValue allocationStatsCacheTTL ;
63+ private ControlledRelativeTimeThreadPool threadPool ;
5364 private ClusterService clusterService ;
5465 private TransportService transportService ;
5566 private AllocationStatsService allocationStatsService ;
@@ -61,8 +72,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6172 @ Before
6273 public void setUp () throws Exception {
6374 super .setUp ();
64- threadPool = new TestThreadPool (TransportClusterAllocationExplainActionTests .class .getName ());
65- clusterService = ClusterServiceUtils .createClusterService (threadPool );
75+ startTimeMillis = 0L ;
76+ allocationStatsCacheTTL = TimeValue .timeValueMinutes (1 );
77+ threadPool = new ControlledRelativeTimeThreadPool (TransportClusterAllocationExplainActionTests .class .getName (), startTimeMillis );
78+ clusterService = ClusterServiceUtils .createClusterService (
79+ threadPool ,
80+ new ClusterSettings (
81+ Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), allocationStatsCacheTTL ).build (),
82+ ClusterSettings .BUILT_IN_CLUSTER_SETTINGS
83+ )
84+ );
6685 transportService = new CapturingTransport ().createTransportService (
6786 clusterService .getSettings (),
6887 threadPool ,
@@ -92,7 +111,17 @@ public void tearDown() throws Exception {
92111 transportService .close ();
93112 }
94113
114+ private void disableAllocationStatsCache () {
115+ setAllocationStatsCacheTTL (TimeValue .ZERO );
116+ }
117+
118+ private void setAllocationStatsCacheTTL (TimeValue ttl ) {
119+ clusterService .getClusterSettings ()
120+ .applySettings (Settings .builder ().put (TransportGetAllocationStatsAction .CACHE_TTL_SETTING .getKey (), ttl ).build ());
121+ };
122+
95123 public void testReturnsOnlyRequestedStats () throws Exception {
124+ disableAllocationStatsCache ();
96125
97126 var metrics = EnumSet .copyOf (randomSubsetOf (Metric .values ().length , Metric .values ()));
98127
@@ -126,6 +155,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
126155 }
127156
128157 public void testDeduplicatesStatsComputations () throws InterruptedException {
158+ disableAllocationStatsCache ();
129159 final var requestCounter = new AtomicInteger ();
130160 final var isExecuting = new AtomicBoolean ();
131161 when (allocationStatsService .stats ()).thenAnswer (invocation -> {
@@ -170,4 +200,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
170200 thread .join ();
171201 }
172202 }
203+
204+ public void testGetStatsWithCachingEnabled () throws Exception {
205+
206+ final AtomicReference <Map <String , NodeAllocationStats >> allocationStats = new AtomicReference <>();
207+ int numExpectedAllocationStatsServiceCalls = 0 ;
208+
209+ final Runnable resetExpectedAllocationStats = () -> {
210+ final var stats = Map .of (randomIdentifier (), NodeAllocationStatsTests .randomNodeAllocationStats ());
211+ allocationStats .set (stats );
212+ when (allocationStatsService .stats ()).thenReturn (stats );
213+ };
214+
215+ final CheckedConsumer <ActionListener <Void >, Exception > threadTask = l -> {
216+ final var request = new TransportGetAllocationStatsAction .Request (
217+ TEST_REQUEST_TIMEOUT ,
218+ new TaskId (randomIdentifier (), randomNonNegativeLong ()),
219+ EnumSet .of (Metric .ALLOCATIONS )
220+ );
221+
222+ action .masterOperation (mock (Task .class ), request , ClusterState .EMPTY_STATE , l .map (response -> {
223+ assertSame ("Expected the cached allocation stats to be returned" , response .getNodeAllocationStats (), allocationStats .get ());
224+ return null ;
225+ }));
226+ };
227+
228+ // Initial cache miss, all threads should get the same value.
229+ resetExpectedAllocationStats .run ();
230+ ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
231+ verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
232+
233+ // Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats.
234+ threadPool .setCurrentTimeInMillis (startTimeMillis + between (0 , (int ) allocationStatsCacheTTL .millis ()));
235+ ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
236+ verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
237+
238+ // Force the cached stats to expire.
239+ threadPool .setCurrentTimeInMillis (startTimeMillis + allocationStatsCacheTTL .getMillis () + 1 );
240+
241+ // Expect a single call to the stats service on the cache miss.
242+ resetExpectedAllocationStats .run ();
243+ ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
244+ verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
245+
246+ // Update the TTL setting to disable the cache, we expect a service call each time.
247+ setAllocationStatsCacheTTL (TimeValue .ZERO );
248+ safeAwait (threadTask );
249+ safeAwait (threadTask );
250+ numExpectedAllocationStatsServiceCalls += 2 ;
251+ verify (allocationStatsService , times (numExpectedAllocationStatsServiceCalls )).stats ();
252+
253+ // Re-enable the cache, only one thread should call the stats service.
254+ setAllocationStatsCacheTTL (TimeValue .timeValueMinutes (5 ));
255+ resetExpectedAllocationStats .run ();
256+ ESTestCase .startInParallel (between (1 , 5 ), threadNumber -> safeAwait (threadTask ));
257+ verify (allocationStatsService , times (++numExpectedAllocationStatsServiceCalls )).stats ();
258+ }
259+
260+ private static class ControlledRelativeTimeThreadPool extends ThreadPool {
261+
262+ private long currentTimeInMillis ;
263+
264+ ControlledRelativeTimeThreadPool (String name , long startTimeMillis ) {
265+ super (
266+ Settings .builder ().put (Node .NODE_NAME_SETTING .getKey (), name ).build (),
267+ MeterRegistry .NOOP ,
268+ new DefaultBuiltInExecutorBuilders ()
269+ );
270+ this .currentTimeInMillis = startTimeMillis ;
271+ stopCachedTimeThread ();
272+ }
273+
274+ @ Override
275+ public long relativeTimeInMillis () {
276+ return currentTimeInMillis ;
277+ }
278+
279+ void setCurrentTimeInMillis (long currentTimeInMillis ) {
280+ this .currentTimeInMillis = currentTimeInMillis ;
281+ }
282+ }
173283}
0 commit comments