77
88package org .elasticsearch .xpack .security .support ;
99
10- import org .elasticsearch .action .ActionListener ;
1110import org .elasticsearch .action .search .SearchRequest ;
1211import org .elasticsearch .action .search .SearchResponse ;
12+ import org .elasticsearch .action .support .PlainActionFuture ;
13+ import org .elasticsearch .action .support .RefCountingListener ;
1314import org .elasticsearch .action .support .WriteRequest ;
1415import org .elasticsearch .cluster .metadata .IndexMetadata ;
1516import org .elasticsearch .cluster .service .ClusterService ;
1920import org .elasticsearch .plugins .Plugin ;
2021import org .elasticsearch .search .SearchHit ;
2122import org .elasticsearch .test .ESIntegTestCase ;
22- import org .elasticsearch .test .ESTestCase ;
2323import org .elasticsearch .test .SecurityIntegTestCase ;
2424import org .elasticsearch .xcontent .ToXContent ;
2525import org .elasticsearch .xcontent .XContentBuilder ;
@@ -65,8 +65,9 @@ public void testMigrationWithConcurrentUpdates() throws Exception {
6565 waitForMigrationCompletion ();
6666 var roles = createRoles ();
6767 final var nativeRoleStore = internalCluster ().getInstance (NativeRolesStore .class );
68-
69- try (ExecutorService executor = Executors .newSingleThreadExecutor ()) {
68+ PlainActionFuture <Void > roleUpdatesCompleteListener = new PlainActionFuture <>();
69+ ExecutorService executor = Executors .newSingleThreadExecutor ();
70+ try (RefCountingListener refs = new RefCountingListener (roleUpdatesCompleteListener )) {
7071 final AtomicBoolean runUpdateRolesBackground = new AtomicBoolean (true );
7172 executor .submit (() -> {
7273 while (runUpdateRolesBackground .get ()) {
@@ -86,7 +87,7 @@ public void testMigrationWithConcurrentUpdates() throws Exception {
8687 nativeRoleStore .putRole (
8788 WriteRequest .RefreshPolicy .IMMEDIATE ,
8889 updatedRole ,
89- ActionListener . wrap (resp -> {}, ESTestCase :: fail )
90+ refs . acquire (resp -> logger . trace ( "Updated role [{}]" , updatedRole ) )
9091 );
9192 try {
9293 Thread .sleep (10 );
@@ -97,14 +98,14 @@ public void testMigrationWithConcurrentUpdates() throws Exception {
9798 });
9899
99100 resetMigration ();
100- try {
101- waitForMigrationCompletion ();
102- } finally {
103- runUpdateRolesBackground .set (false );
104- executor .shutdown ();
105- }
101+ waitForMigrationCompletion ();
102+ runUpdateRolesBackground .set (false );
103+ assertAllRolesHaveMetadataFlattened ();
104+ } finally {
105+ // Await all role updates before shutting down
106+ roleUpdatesCompleteListener .get ();
107+ terminate (executor );
106108 }
107- assertAllRolesHaveMetadataFlattened ();
108109 }
109110
110111 private void resetMigration () {
0 commit comments