2929
3030import com .google .common .collect .Sets ;
3131import com .google .common .util .concurrent .Uninterruptibles ;
32- import org .junit .jupiter .api .Disabled ;
32+ import org .junit .jupiter .api .AfterEach ;
3333import org .junit .jupiter .api .Test ;
3434import org .junit .jupiter .api .io .TempDir ;
3535
36- import org .apache .uniffle .client .api .CoordinatorClient ;
37- import org .apache .uniffle .client .factory .CoordinatorClientFactory ;
3836import org .apache .uniffle .client .request .RssAccessClusterRequest ;
3937import org .apache .uniffle .client .response .RssAccessClusterResponse ;
40- import org .apache .uniffle .common .ClientType ;
4138import org .apache .uniffle .common .rpc .ServerType ;
4239import org .apache .uniffle .common .rpc .StatusCode ;
4340import org .apache .uniffle .common .util .Constants ;
4643import org .apache .uniffle .coordinator .access .AccessCheckResult ;
4744import org .apache .uniffle .coordinator .access .AccessInfo ;
4845import org .apache .uniffle .coordinator .access .checker .AccessChecker ;
46+ import org .apache .uniffle .coordinator .metric .CoordinatorMetrics ;
4947import org .apache .uniffle .server .ShuffleServer ;
5048import org .apache .uniffle .server .ShuffleServerConf ;
5149
5250import static org .junit .jupiter .api .Assertions .assertEquals ;
5351import static org .junit .jupiter .api .Assertions .assertTrue ;
5452
55- @ Disabled ("flaky test" )
5653public class AccessClusterTest extends CoordinatorTestBase {
5754
5855 public static class MockedAccessChecker implements AccessChecker {
@@ -83,20 +80,27 @@ public void close() throws IOException {
8380 }
8481 }
8582
83+ @ AfterEach
84+ public void afterEach () throws Exception {
85+ shutdownServers ();
86+ CoordinatorMetrics .clear ();
87+ }
88+
8689 @ Test
8790 public void testUsingCustomExtraProperties () throws Exception {
88- CoordinatorConf coordinatorConf = getCoordinatorConf ();
91+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort ();
8992 coordinatorConf .setString (
9093 "rss.coordinator.access.checkers" ,
9194 "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker" );
92- createCoordinatorServer (coordinatorConf );
93- startServers ();
95+ storeCoordinatorConf (coordinatorConf );
96+ startServersWithRandomPorts ();
9497 Uninterruptibles .sleepUninterruptibly (3 , TimeUnit .SECONDS );
9598 // case1: empty map
9699 String accessID = "acessid" ;
97100 RssAccessClusterRequest request =
98101 new RssAccessClusterRequest (
99102 accessID , Sets .newHashSet (Constants .SHUFFLE_SERVER_VERSION ), 2000 , "user" );
103+ createClient ();
100104 RssAccessClusterResponse response = coordinatorClient .accessCluster (request );
101105 assertEquals (StatusCode .ACCESS_DENIED , response .getStatusCode ());
102106
@@ -125,8 +129,6 @@ public void testUsingCustomExtraProperties() throws Exception {
125129 "user" );
126130 response = coordinatorClient .accessCluster (request );
127131 assertEquals (StatusCode .SUCCESS , response .getStatusCode ());
128-
129- shutdownServers ();
130132 }
131133
132134 @ Test
@@ -140,23 +142,23 @@ public void test(@TempDir File tempDir) throws Exception {
140142 printWriter .flush ();
141143 printWriter .close ();
142144
143- CoordinatorConf coordinatorConf = getCoordinatorConf ();
145+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort ();
144146 coordinatorConf .setInteger ("rss.coordinator.access.loadChecker.serverNum.threshold" , 2 );
145147 coordinatorConf .setString ("rss.coordinator.access.candidates.path" , cfgFile .getAbsolutePath ());
146148 coordinatorConf .setString (
147149 "rss.coordinator.access.checkers" ,
148150 "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
149151 + "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker" );
150- createCoordinatorServer (coordinatorConf );
152+ storeCoordinatorConf (coordinatorConf );
151153
152- ShuffleServerConf shuffleServerConf = getShuffleServerConf (ServerType .GRPC );
153- createShuffleServer (shuffleServerConf );
154- startServers ();
154+ storeShuffleServerConf (shuffleServerConfWithoutPort (0 , tempDir , ServerType .GRPC ));
155+ startServersWithRandomPorts ();
155156 Uninterruptibles .sleepUninterruptibly (3 , TimeUnit .SECONDS );
156157 String accessId = "111111" ;
157158 RssAccessClusterRequest request =
158159 new RssAccessClusterRequest (
159160 accessId , Sets .newHashSet (Constants .SHUFFLE_SERVER_VERSION ), 2000 , "user" );
161+ createClient ();
160162 RssAccessClusterResponse response = coordinatorClient .accessCluster (request );
161163 assertEquals (StatusCode .ACCESS_DENIED , response .getStatusCode ());
162164 assertTrue (response .getMessage ().startsWith ("Denied by AccessCandidatesChecker" ));
@@ -168,32 +170,19 @@ public void test(@TempDir File tempDir) throws Exception {
168170 response = coordinatorClient .accessCluster (request );
169171 assertEquals (StatusCode .ACCESS_DENIED , response .getStatusCode ());
170172 assertTrue (response .getMessage ().startsWith ("Denied by AccessClusterLoadChecker" ));
171-
172- shuffleServerConf .setInteger (
173- "rss.rpc.server.port" , shuffleServerConf .getInteger (ShuffleServerConf .RPC_SERVER_PORT ) + 2 );
174- shuffleServerConf .setInteger (
175- "rss.jetty.http.port" , shuffleServerConf .getInteger (ShuffleServerConf .JETTY_HTTP_PORT ) + 1 );
173+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort (0 , tempDir , ServerType .GRPC );
174+ shuffleServerConf .setString ("rss.coordinator.quorum" , getQuorum ());
176175 ShuffleServer shuffleServer = new ShuffleServer (shuffleServerConf );
177176 shuffleServer .start ();
177+ // this make sure the server can be shutdown
178+ grpcShuffleServers .add (shuffleServer );
178179 Uninterruptibles .sleepUninterruptibly (3 , TimeUnit .SECONDS );
179180
180- CoordinatorClient client =
181- CoordinatorClientFactory .getInstance ()
182- .createCoordinatorClient (ClientType .GRPC , LOCALHOST , COORDINATOR_PORT_1 + 13 );
183- request =
184- new RssAccessClusterRequest (
185- accessId , Sets .newHashSet (Constants .SHUFFLE_SERVER_VERSION ), 2000 , "user" );
186- response = client .accessCluster (request );
187- assertEquals (StatusCode .INTERNAL_ERROR , response .getStatusCode ());
188- assertTrue (response .getMessage ().startsWith ("UNAVAILABLE: io exception" ));
189-
190181 request =
191182 new RssAccessClusterRequest (
192183 accessId , Sets .newHashSet (Constants .SHUFFLE_SERVER_VERSION ), 2000 , "user" );
193184 response = coordinatorClient .accessCluster (request );
194185 assertEquals (StatusCode .SUCCESS , response .getStatusCode ());
195186 assertTrue (response .getMessage ().startsWith ("SUCCESS" ));
196- shuffleServer .stopServer ();
197- shutdownServers ();
198187 }
199188}
0 commit comments