13
13
import org .elasticsearch .test .ESTestCase ;
14
14
import org .elasticsearch .xpack .core .security .authc .RealmConfig ;
15
15
import org .elasticsearch .xpack .core .security .authc .jwt .JwtRealmSettings ;
16
+ import org .mockito .Mockito ;
16
17
17
18
import java .io .IOException ;
18
19
import java .nio .charset .StandardCharsets ;
28
29
import static org .hamcrest .Matchers .sameInstance ;
29
30
import static org .mockito .Mockito .doAnswer ;
30
31
import static org .mockito .Mockito .mock ;
32
+ import static org .mockito .Mockito .never ;
31
33
import static org .mockito .Mockito .spy ;
32
- import static org .mockito .Mockito .times ;
33
34
import static org .mockito .Mockito .verify ;
34
35
import static org .mockito .Mockito .when ;
35
36
36
37
public class JwkSetLoaderTests extends ESTestCase {
37
38
38
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/92745" )
39
39
public void testConcurrentReloadWillBeQueuedAndShareTheResults () throws IOException , InterruptedException {
40
40
final Path tempDir = createTempDir ();
41
41
final Path path = tempDir .resolve ("jwkset.json" );
@@ -53,23 +53,37 @@ public void testConcurrentReloadWillBeQueuedAndShareTheResults() throws IOExcept
53
53
.mapToObj (i -> new PlainActionFuture <Tuple <Boolean , JwkSetLoader .JwksAlgs >>())
54
54
.toList ();
55
55
56
- final var threadsCountDown = new CountDownLatch (nThreads );
56
+ // Start the first thread for reloading
57
+ // Ensure it is inside the actual loading method and make it wait there to simulate slow processing
58
+ final var loadingLatch = new CountDownLatch (1 );
57
59
final var readyLatch = new CountDownLatch (1 );
58
60
doAnswer (invocation -> {
59
- // Mark the thread to be ready when it enters the concurrency controlling area
60
- threadsCountDown .countDown ();
61
- // Wait till the ready flag to start the racing
61
+ loadingLatch .countDown ();
62
62
assertThat (readyLatch .await (10 , TimeUnit .SECONDS ), is (true ));
63
- return invocation .callRealMethod ();
64
- }).when (jwkSetLoader ).getFuture ();
63
+ invocation .callRealMethod ();
64
+ return null ;
65
+ }).when (jwkSetLoader ).loadInternal (anyActionListener ());
65
66
66
- // Start the threads and toggle flags to start racing
67
- IntStream .range (0 , nThreads ).forEach (i -> new Thread (() -> jwkSetLoader .reload (futures .get (i ))).start ());
67
+ new Thread (() -> jwkSetLoader .reload (futures .get (0 ))).start ();
68
+ assertThat (loadingLatch .await (10 , TimeUnit .SECONDS ), is (true ));
69
+
70
+ // Start rest of the threads for racing and ensure they are all through the concurrency controlling area
71
+ Mockito .reset (jwkSetLoader );
72
+ final var threadsCountDown = new CountDownLatch (nThreads - 1 );
73
+ doAnswer (invocation -> {
74
+ final Object result = invocation .callRealMethod ();
75
+ threadsCountDown .countDown ();
76
+ return result ;
77
+ }).when (jwkSetLoader ).getFuture ();
78
+ IntStream .range (1 , nThreads ).forEach (i -> new Thread (() -> jwkSetLoader .reload (futures .get (i ))).start ());
68
79
assertThat (threadsCountDown .await (10 , TimeUnit .SECONDS ), is (true ));
80
+
81
+ // Notify the first thread to finish the loading work
69
82
readyLatch .countDown ();
70
83
71
- // All concurrent reloading calls will get the same result and the actual reloading work will only happen once
72
- futures .forEach (future -> assertThat (future .actionGet (), sameInstance (futures .get (0 ).actionGet ())));
73
- verify (jwkSetLoader , times (1 )).loadInternal (anyActionListener ());
84
+ // All concurrent reloading calls will get the same result from the first thread and skip the actual loading work
85
+ final Tuple <Boolean , JwkSetLoader .JwksAlgs > tuple = futures .get (0 ).actionGet ();
86
+ futures .subList (1 , nThreads ).forEach (future -> assertThat (future .actionGet (), sameInstance (tuple )));
87
+ verify (jwkSetLoader , never ()).loadInternal (anyActionListener ());
74
88
}
75
89
}
0 commit comments