1818package org .apache .uniffle .test ;
1919
2020import java .io .File ;
21+ import java .util .ArrayList ;
2122import java .util .List ;
2223import java .util .Map ;
2324import java .util .Set ;
25+ import java .util .concurrent .ThreadLocalRandom ;
26+ import java .util .stream .Collectors ;
2427
2528import com .google .common .collect .Lists ;
2629import com .google .common .collect .Maps ;
@@ -89,8 +92,11 @@ public static MockedShuffleServer createServer(int id, File tmpDir, int coordina
8992 throws Exception {
9093 ShuffleServerConf shuffleServerConf = getShuffleServerConf (ServerType .GRPC );
9194 shuffleServerConf .setInteger ("rss.rpc.server.port" , 0 );
92- shuffleServerConf .setLong ("rss.server.app.expired.withoutHeartbeat" , 8000 );
93- shuffleServerConf .setLong ("rss.server.heartbeat.interval" , 5000 );
95+ // app expires time should be greater than the client send data time which is 180s by default.
96+ // (rpcTimeout*retryTimes)
97+ // this can avoid get NO_REGISTER when second data to secondaryServer
98+ shuffleServerConf .setLong ("rss.server.app.expired.withoutHeartbeat" , 240 * 1000L );
99+ shuffleServerConf .setLong ("rss.server.heartbeat.interval" , 5000L );
94100 File dataDir1 = new File (tmpDir , id + "_1" );
95101 File dataDir2 = new File (tmpDir , id + "_2" );
96102 String basePath = dataDir1 .getAbsolutePath () + "," + dataDir2 .getAbsolutePath ();
@@ -101,6 +107,21 @@ public static MockedShuffleServer createServer(int id, File tmpDir, int coordina
101107 return new MockedShuffleServer (shuffleServerConf );
102108 }
103109
110+ private List <Integer > generateFakePort (int num ) {
111+ Set <Integer > portExistsSet =
112+ grpcShuffleServers .stream ().map (ShuffleServer ::getGrpcPort ).collect (Collectors .toSet ());
113+ int i = 0 ;
114+ List <Integer > fakePorts = new ArrayList <>(num );
115+ while (i < num ) {
116+ int port = ThreadLocalRandom .current ().nextInt (1 , 65535 );
117+ if (portExistsSet .add (port )) {
118+ fakePorts .add (port );
119+ i ++;
120+ }
121+ }
122+ return fakePorts ;
123+ }
124+
104125 @ BeforeEach
105126 public void initCluster (@ TempDir File tmpDir ) throws Exception {
106127 CoordinatorConf coordinatorConf = getCoordinatorConf ();
@@ -152,31 +173,22 @@ public void initCluster(@TempDir File tmpDir) throws Exception {
152173 grpcShuffleServers .get (4 ).getGrpcPort ());
153174
154175 // simulator of failed servers
176+ List <Integer > fakePortList = generateFakePort (5 );
155177 fakedShuffleServerInfo0 =
156178 new ShuffleServerInfo (
157- "127.0.0.1-20001" ,
158- grpcShuffleServers .get (0 ).getIp (),
159- grpcShuffleServers .get (0 ).getGrpcPort () + 100 );
179+ "127.0.0.1-20001" , grpcShuffleServers .get (0 ).getIp (), fakePortList .get (0 ));
160180 fakedShuffleServerInfo1 =
161181 new ShuffleServerInfo (
162- "127.0.0.1-20002" ,
163- grpcShuffleServers .get (1 ).getIp (),
164- grpcShuffleServers .get (0 ).getGrpcPort () + 200 );
182+ "127.0.0.1-20002" , grpcShuffleServers .get (1 ).getIp (), fakePortList .get (1 ));
165183 fakedShuffleServerInfo2 =
166184 new ShuffleServerInfo (
167- "127.0.0.1-20003" ,
168- grpcShuffleServers .get (2 ).getIp (),
169- grpcShuffleServers .get (0 ).getGrpcPort () + 300 );
185+ "127.0.0.1-20003" , grpcShuffleServers .get (2 ).getIp (), fakePortList .get (2 ));
170186 fakedShuffleServerInfo3 =
171187 new ShuffleServerInfo (
172- "127.0.0.1-20004" ,
173- grpcShuffleServers .get (2 ).getIp (),
174- grpcShuffleServers .get (0 ).getGrpcPort () + 400 );
188+ "127.0.0.1-20004" , grpcShuffleServers .get (2 ).getIp (), fakePortList .get (3 ));
175189 fakedShuffleServerInfo4 =
176190 new ShuffleServerInfo (
177- "127.0.0.1-20005" ,
178- grpcShuffleServers .get (2 ).getIp (),
179- grpcShuffleServers .get (0 ).getGrpcPort () + 500 );
191+ "127.0.0.1-20005" , grpcShuffleServers .get (2 ).getIp (), fakePortList .get (4 ));
180192
181193 // spark.rss.data.replica=3
182194 // spark.rss.data.replica.write=2
@@ -339,7 +351,7 @@ private void registerShuffleServer(
339351 new MockedShuffleWriteClientImpl (
340352 ShuffleClientFactory .newWriteBuilder ()
341353 .clientType (ClientType .GRPC .name ())
342- .retryMax (3 )
354+ .retryMax (2 )
343355 .retryIntervalMax (1000 )
344356 .heartBeatThreadNum (1 )
345357 .replica (replica )
@@ -945,8 +957,8 @@ public void case10() throws Exception {
945957 shuffleServerInfo3 ,
946958 shuffleServerInfo4 ));
947959 SendShuffleDataResult result = shuffleWriteClientImpl .sendShuffleData (testAppId , blocks );
948- assertTrue ( result .getSuccessBlockIds ().size () == 3 );
949- assertTrue ( result .getFailedBlockIds ().size () == 0 );
960+ assertEquals ( 3 , result .getSuccessBlockIds ().size ());
961+ assertEquals ( 0 , result .getFailedBlockIds ().size ());
950962 }
951963
952964 // we cannot read any blocks from server 1 due to failures
0 commit comments