1111
1212import org .apache .logging .log4j .Level ;
1313import org .elasticsearch .action .ActionListener ;
14+ import org .elasticsearch .cluster .ClusterChangedEvent ;
1415import org .elasticsearch .cluster .ClusterState ;
1516import org .elasticsearch .cluster .NodeConnectionsService ;
17+ import org .elasticsearch .cluster .TimeoutClusterStateListener ;
1618import org .elasticsearch .common .Priority ;
1719import org .elasticsearch .common .settings .ClusterSettings ;
1820import org .elasticsearch .common .settings .Settings ;
1921import org .elasticsearch .common .util .concurrent .DeterministicTaskQueue ;
2022import org .elasticsearch .common .util .concurrent .PrioritizedEsThreadPoolExecutor ;
2123import org .elasticsearch .core .TimeValue ;
22- import org .elasticsearch .logging .LogManager ;
23- import org .elasticsearch .logging .Logger ;
2424import org .elasticsearch .test .ESTestCase ;
2525import org .elasticsearch .test .MockLog ;
2626
3232
3333public class ClusterApplierServiceWatchdogTests extends ESTestCase {
3434
35- private static final Logger logger = LogManager .getLogger (ClusterApplierServiceWatchdogTests .class );
36-
3735 public void testThreadWatchdogLogging () {
3836 final var deterministicTaskQueue = new DeterministicTaskQueue ();
3937
@@ -77,8 +75,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7775
7876 final AtomicBoolean completedTask = new AtomicBoolean ();
7977
80- clusterApplierService .runOnApplierThread ("blocking task" , randomFrom (Priority .values ()), ignored -> {
81-
78+ final Runnable hotThreadsDumpsAsserter = () -> {
8279 final var startMillis = deterministicTaskQueue .getCurrentTimeMillis ();
8380
8481 for (int i = 0 ; i < 3 ; i ++) {
@@ -98,12 +95,49 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
9895
9996 mockLog .assertAllExpectationsMatched ();
10097 }
101- }, ActionListener .running (() -> completedTask .set (true )));
98+ };
99+
100+ if (randomBoolean ()) {
101+ clusterApplierService .runOnApplierThread (
102+ "slow task" ,
103+ randomFrom (Priority .values ()),
104+ ignored -> hotThreadsDumpsAsserter .run (),
105+ ActionListener .running (() -> assertTrue (completedTask .compareAndSet (false , true )))
106+ );
107+ } else {
108+ class TestListener implements TimeoutClusterStateListener {
109+ @ Override
110+ public void postAdded () {
111+ hotThreadsDumpsAsserter .run ();
112+ }
113+
114+ @ Override
115+ public void onClose () {
116+ fail ("should time out before closing" );
117+ }
118+
119+ @ Override
120+ public void onTimeout (TimeValue timeout ) {
121+ assertTrue (completedTask .compareAndSet (false , true ));
122+ clusterApplierService .removeTimeoutListener (TestListener .this );
123+ }
124+
125+ @ Override
126+ public void clusterChanged (ClusterChangedEvent event ) {
127+ fail ("no cluster state updates expected" );
128+ }
129+ }
130+
131+ clusterApplierService .addTimeoutListener (
132+ // timeout sufficiently short that it elapses while postAdded() is still running
133+ TimeValue .timeValueMillis (randomLongBetween (0 , 2 * intervalMillis + 2 * quietTimeMillis )),
134+ new TestListener ()
135+ );
136+ }
102137
103138 deterministicTaskQueue .runAllRunnableTasks ();
104139
105140 assertTrue (completedTask .get ());
106141 }
107142 }
108-
109143}
0 commit comments