2828
2929import java .util .Collections ;
3030import java .util .concurrent .CompletableFuture ;
31+ import java .util .concurrent .CountDownLatch ;
3132import java .util .concurrent .ExecutorService ;
3233import java .util .concurrent .Executors ;
34+ import java .util .concurrent .TimeUnit ;
3335
3436import static org .mockito .ArgumentMatchers .any ;
3537import static org .mockito .Mockito .doAnswer ;
@@ -57,9 +59,13 @@ public void testClosingOffsetReaderWhenOffsetStoreHangs() throws Exception {
5759 OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl (
5860 offsetBackingStore , "namespace" , taskKeyConverter , taskValueConverter );
5961
62+ CountDownLatch latch = new CountDownLatch (1 );
63+
64+ // Hanging `offsetBackingStore.get()`
6065 doAnswer (invocation -> {
61- // Sleep for a long time to simulate a hanging offset store
62- Thread .sleep (9999 * 1000 );
66+ latch .countDown ();
67+ CompletableFuture <Void > future = new CompletableFuture <>();
68+ future .get (9999 , TimeUnit .SECONDS );
6369 throw new RuntimeException ("Should never get here" );
6470 }).when (offsetBackingStore ).get (any ());
6571
@@ -68,13 +74,20 @@ public void testClosingOffsetReaderWhenOffsetStoreHangs() throws Exception {
6874 // Does call offsetBackingStore.get() and hangs
6975 offsetStorageReaderImpl .offsets (Collections .emptyList ());
7076 });
71- Thread .sleep (3000 );
77+
78+ // Ensure the task is hanging
79+ latch .await ();
7280
7381 verify (offsetBackingStore , times (1 )).get (any ());
7482
7583 // The herder thread should not block when trying to close `offsetStorageReaderImpl`
7684 // and complete before test timeout
7785 offsetStorageReaderImpl .close ();
86+
87+ executor .shutdownNow ();
88+ if (!executor .awaitTermination (5 , TimeUnit .SECONDS )) {
89+ throw new RuntimeException ("Failed to shutdown executor" );
90+ }
7891 }
7992
8093 @ Test
@@ -89,9 +102,13 @@ public void testClosingOffsetReaderWhenOffsetStoreHangsAndHasIncompleteFutures()
89102 OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl (
90103 offsetBackingStore , "namespace" , taskKeyConverter , taskValueConverter );
91104
105+ CountDownLatch latchTask1 = new CountDownLatch (1 );
106+ CountDownLatch latchTask2 = new CountDownLatch (1 );
107+
92108 // Mock hanging future
93109 doAnswer (invocation -> {
94- Thread .sleep (9999 * 1000 );
110+ CompletableFuture <Void > future = new CompletableFuture <>();
111+ future .get (9999 , TimeUnit .SECONDS );
95112 throw new RuntimeException ("Should never get here" );
96113 }).when (hangingFuture ).get ();
97114
@@ -104,10 +121,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
104121 if (callCount == 0 ) {
105122 callCount += 1 ;
106123 // First connector task
124+ latchTask1 .countDown ();
107125 return hangingFuture ;
108126 } else {
109127 // Second connector task
110- Thread .sleep (9999 * 1000 );
128+ latchTask2 .countDown ();
129+ CompletableFuture <Void > future = new CompletableFuture <>();
130+ future .get (9999 , TimeUnit .SECONDS );
111131 throw new RuntimeException ("Should never get here" );
112132 }
113133 }
@@ -120,7 +140,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
120140 executor .submit (() -> {
121141 offsetStorageReaderImpl .offsets (Collections .emptyList ());
122142 });
123- Thread .sleep (3000 );
143+ // Ensure first task is hanging
144+ latchTask1 .await ();
124145
125146 verify (offsetBackingStore , times (1 )).get (any ());
126147 verify (hangingFuture , times (1 )).get ();
@@ -130,7 +151,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
130151 executor .submit (() -> {
131152 offsetStorageReaderImpl .offsets (Collections .emptyList ());
132153 });
133- Thread .sleep (3000 );
154+ // Ensure second task is hanging
155+ latchTask2 .await ();
134156
135157 verify (offsetBackingStore , times (2 )).get (any ());
136158
@@ -140,5 +162,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
140162
141163 // The hanging future should be cancelled by `close()`
142164 verify (hangingFuture , times (1 )).cancel (true );
165+
166+ executor .shutdownNow ();
167+ if (!executor .awaitTermination (5 , TimeUnit .SECONDS )) {
168+ throw new RuntimeException ("Failed to shutdown executor" );
169+ }
143170 }
144171}
0 commit comments