Skip to content

Commit c5a89d8

Browse files
authored
RATIS-2337. Refactor MiniRaftCluster.PeerChanges. (#1291)
1 parent e26213a commit c5a89d8

File tree

11 files changed

+193
-153
lines changed

11 files changed

+193
-153
lines changed

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.ratis.server.RaftServer;
3131
import org.apache.ratis.server.RaftServerConfigKeys;
3232
import org.apache.ratis.server.impl.MiniRaftCluster;
33+
import org.apache.ratis.server.impl.PeerChanges;
3334
import org.apache.ratis.server.impl.RaftServerTestUtil;
3435
import org.apache.ratis.server.protocol.TermIndex;
3536
import org.apache.ratis.server.raftlog.RaftLog;
@@ -115,13 +116,12 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
115116
Assertions.assertEquals(3, snapshot.getFiles().size());
116117

117118
// add two more peers
118-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
119+
final PeerChanges change = cluster.addNewPeers(2, true,
119120
true);
120121
// trigger setConfiguration
121-
cluster.setConfiguration(change.allPeersInNewConf);
122+
cluster.setConfiguration(change.getPeersInNewConf());
122123

123-
RaftServerTestUtil
124-
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
124+
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
125125

126126
// Check the installed snapshot file number on each Follower matches with the
127127
// leader snapshot.
@@ -161,17 +161,17 @@ private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Excep
161161
}
162162

163163
// add two more peers and install snapshot from leaders
164-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
164+
final PeerChanges change = cluster.addNewPeers(2, true,
165165
true);
166166
try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
167167
final RaftException e = Assertions.assertThrows(RaftException.class,
168-
() -> client.admin().setConfiguration(change.allPeersInNewConf));
168+
() -> client.admin().setConfiguration(change.getPeersInNewConf()));
169169
Assertions.assertTrue( e instanceof RaftRetryFailureException
170170
|| e instanceof ReconfigurationTimeoutException,
171171
() -> "Unexpected exception: " + e);
172172
}
173173

174-
final SnapshotInfo snapshotInfo = cluster.getDivision(change.newPeers[0].getId())
174+
final SnapshotInfo snapshotInfo = cluster.getDivision(change.getAddedPeers().get(0).getId())
175175
.getStateMachine().getLatestSnapshot();
176176
Assertions.assertNotNull(snapshotInfo);
177177

@@ -184,7 +184,7 @@ private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Excep
184184

185185
try (final RaftClient client = cluster.createClient(cluster.getLeader().getId())) {
186186
// successfully setConfiguration during leader switch
187-
final RaftClientReply setConf = client.admin().setConfiguration(change.allPeersInNewConf);
187+
final RaftClientReply setConf = client.admin().setConfiguration(change.getPeersInNewConf());
188188
Assertions.assertTrue(setConf.isSuccess());
189189

190190
RaftTestUtil.deIsolate(cluster, leaderId);

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.ratis.server.RaftServer;
2727
import org.apache.ratis.server.RaftServerConfigKeys;
2828
import org.apache.ratis.server.impl.MiniRaftCluster;
29+
import org.apache.ratis.server.impl.PeerChanges;
2930
import org.apache.ratis.server.impl.RaftServerTestUtil;
3031
import org.apache.ratis.server.protocol.TermIndex;
3132
import org.apache.ratis.server.raftlog.RaftLog;
@@ -49,7 +50,6 @@
4950
import java.io.IOException;
5051
import java.nio.file.Files;
5152
import java.nio.file.Path;
52-
import java.util.Arrays;
5353
import java.util.Collections;
5454
import java.util.List;
5555
import java.util.concurrent.CompletableFuture;
@@ -241,13 +241,11 @@ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Except
241241
Assertions.assertTrue(set);
242242

243243
// Add new peer(s)
244-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true);
244+
final PeerChanges change = cluster.addNewPeers(1, true, true);
245245
// trigger setConfiguration
246-
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
247-
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
246+
RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
248247

249-
RaftServerTestUtil
250-
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
248+
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
251249

252250
// Check the installed snapshot index on each Follower matches with the
253251
// leader snapshot.
@@ -391,12 +389,10 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
391389

392390
// Add new peer(s) who will need snapshots from the leader.
393391
final int numNewPeers = 1;
394-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
392+
final PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
395393
// trigger setConfiguration
396-
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
397-
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
398-
RaftServerTestUtil
399-
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
394+
RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
395+
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
400396

401397
// Generate more data.
402398
try (final RaftClient client = cluster.createClient(leader.getId())) {
@@ -479,13 +475,10 @@ private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws Exception
479475
Assertions.assertTrue(set);
480476

481477
// add one new peer
482-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true);
478+
final PeerChanges change = cluster.addNewPeers(1, true, true);
483479
// trigger setConfiguration
484-
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
485-
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
486-
487-
RaftServerTestUtil
488-
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
480+
RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
481+
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
489482

490483
// Check the installed snapshot index on each Follower matches with the
491484
// leader snapshot.
@@ -558,12 +551,11 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
558551

559552
// Add new peer(s)
560553
final int numNewPeers = 1;
561-
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
554+
final PeerChanges change = cluster.addNewPeers(numNewPeers, true, true);
562555
// trigger setConfiguration
563-
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
564-
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
556+
RaftServerTestUtil.runWithMinorityPeers(cluster, change.getPeersInNewConf(), cluster::setConfiguration);
565557

566-
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
558+
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.getPeersInNewConf(), 0, null);
567559

568560
// Check the installed snapshot index on each Follower matches with the
569561
// leader snapshot.

ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ratis.server.RaftServer;
2929
import org.apache.ratis.server.RaftServerConfigKeys;
3030
import org.apache.ratis.server.impl.MiniRaftCluster;
31+
import org.apache.ratis.server.impl.PeerChanges;
3132
import org.apache.ratis.server.raftlog.RaftLog;
3233
import org.apache.ratis.server.raftlog.RaftLogIOException;
3334
import org.apache.ratis.util.JavaUtils;
@@ -109,12 +110,12 @@ void runTestNotLeaderExceptionWithReconf(CLUSTER cluster) throws Exception {
109110
final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
110111

111112
// add two more peers
112-
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{
113+
PeerChanges change = cluster.addNewPeers(new String[]{
113114
"ss1", "ss2"}, true, false);
114115
// trigger setConfiguration
115-
LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
116+
LOG.info("Start changing the configuration: {}", change.getPeersInNewConf());
116117
try (final RaftClient c2 = cluster.createClient(newLeader)) {
117-
RaftClientReply reply = c2.admin().setConfiguration(change.allPeersInNewConf);
118+
RaftClientReply reply = c2.admin().setConfiguration(change.getPeersInNewConf());
118119
Assertions.assertTrue(reply.isSuccess());
119120
}
120121
LOG.info(cluster.printServers());

ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.ratis;
1919

2020
import org.apache.ratis.server.impl.MiniRaftCluster;
21-
import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
21+
import org.apache.ratis.server.impl.PeerChanges;
2222
import org.apache.ratis.RaftTestUtil.SimpleMessage;
2323
import org.apache.ratis.client.RaftClient;
2424
import org.apache.ratis.client.RaftClientRpc;
@@ -39,12 +39,10 @@
3939
import org.junit.jupiter.api.Test;
4040
import org.slf4j.event.Level;
4141

42-
import java.util.Arrays;
4342
import java.util.Collections;
43+
import java.util.List;
4444
import java.util.concurrent.TimeUnit;
4545

46-
import static java.util.Arrays.asList;
47-
4846
public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
4947
extends BaseTest
5048
implements MiniRaftCluster.Factory.Get<CLUSTER> {
@@ -139,10 +137,9 @@ void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception {
139137

140138
// trigger the reconfiguration, make sure the original leader is kicked out
141139
final PeerChanges change = cluster.removePeers(2, true, Collections.emptyList());
142-
final RaftPeer[] allPeers = change.allPeersInNewConf;
140+
final List<RaftPeer> allPeers = change.getPeersInNewConf();
143141
// trigger setConfiguration
144-
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(allPeers),
145-
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
142+
RaftServerTestUtil.runWithMinorityPeers(cluster, allPeers, cluster::setConfiguration);
146143

147144
final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
148145
final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
@@ -153,7 +150,7 @@ void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception {
153150
// same clientId and callId in the request
154151
r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
155152
callId, new SimpleMessage("message"));
156-
rpc.addRaftPeers(Arrays.asList(change.newPeers));
153+
rpc.addRaftPeers(change.getAddedPeers());
157154
for (int i = 0; i < 10; i++) {
158155
try {
159156
assertReply(rpc.sendRequest(r), client, callId);

ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353

5454
import java.io.IOException;
5555
import java.util.ArrayList;
56-
import java.util.Arrays;
5756
import java.util.Collection;
5857
import java.util.Iterator;
5958
import java.util.List;
@@ -65,6 +64,7 @@
6564

6665
import static org.apache.ratis.RaftTestUtil.getPeersWithPriority;
6766
import static org.apache.ratis.RaftTestUtil.waitForLeader;
67+
import static org.apache.ratis.proto.RaftProtos.RaftPeerRole.LISTENER;
6868
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
6969
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_COUNT_METRIC;
7070
import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN;
@@ -156,11 +156,11 @@ public void testAddServerForWaitReady() throws IOException, InterruptedException
156156
}
157157
// add 3 new servers and wait longer time
158158
CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(2000));
159-
MiniRaftCluster.PeerChanges peerChanges = cluster.addNewPeers(2, true, false);
159+
final PeerChanges peerChanges = cluster.addNewPeers(2, true, false);
160160
LOG.info("add new 3 servers");
161161
LOG.info(cluster.printServers());
162162
RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
163-
.setServersInNewConf(peerChanges.newPeers)
163+
.setServersInNewConf(peerChanges.getAddedPeers())
164164
.setMode(SetConfigurationRequest.Mode.ADD).build());
165165
assertTrue(reply.isSuccess());
166166
for (RaftServer server : cluster.getServers()) {
@@ -461,14 +461,14 @@ public void testAddListener() throws Exception {
461461
client.io().send(new RaftTestUtil.SimpleMessage("message"));
462462
List<RaftPeer> servers = cluster.getPeers();
463463
assertEquals(servers.size(), 3);
464-
MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1,
465-
true, false, RaftProtos.RaftPeerRole.LISTENER);
466-
RaftClientReply reply = client.admin().setConfiguration(servers, Arrays.asList(changes.newPeers));
464+
final PeerChanges changes = cluster.addNewPeers(1, true, false, LISTENER);
465+
final List<RaftPeer> added = changes.getAddedPeers();
466+
final RaftClientReply reply = client.admin().setConfiguration(servers, added);
467467
assertTrue(reply.isSuccess());
468468
Collection<RaftPeer> listener =
469469
leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
470470
assertEquals(1, listener.size());
471-
assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId());
471+
assertEquals(added.get(0).getId(), listener.iterator().next().getId());
472472
}
473473
cluster.shutdown();
474474
}
@@ -486,8 +486,8 @@ public void testAddFollowerWhenExistsListener() throws Exception {
486486
List<RaftPeer> listener = new ArrayList<>(
487487
leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER));
488488
assertEquals(1, listener.size());
489-
MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1, true, false);
490-
ArrayList<RaftPeer> newPeers = new ArrayList<>(Arrays.asList(changes.newPeers));
489+
final PeerChanges changes = cluster.addNewPeers(1, true, false);
490+
final List<RaftPeer> newPeers = new ArrayList<>(changes.getAddedPeers());
491491
newPeers.addAll(leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER));
492492
RaftClientReply reply = client.admin().setConfiguration(newPeers, listener);
493493
assertTrue(reply.isSuccess());
@@ -761,7 +761,7 @@ void runTestLeaderLeaseDuringReconfiguration(CLUSTER cluster, long leaseTimeoutM
761761
RaftServerTestUtil.assertLeaderLease(leader, true);
762762

763763
final List<RaftServer.Division> followers = cluster.getFollowers();
764-
final MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(2, true);
764+
final PeerChanges changes = cluster.addNewPeers(2, true);
765765

766766
// blocking the original 2 followers
767767
BlockRequestHandlingInjection.getInstance().blockReplier(followers.get(0).getId().toString());
@@ -770,7 +770,7 @@ void runTestLeaderLeaseDuringReconfiguration(CLUSTER cluster, long leaseTimeoutM
770770
// start reconfiguration in another thread, shall fail eventually
771771
new Thread(() -> {
772772
try {
773-
client.admin().setConfiguration(changes.allPeersInNewConf);
773+
client.admin().setConfiguration(changes.getPeersInNewConf());
774774
} catch (IOException e) {
775775
System.out.println("as expected: " + e.getMessage());
776776
}

ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -238,18 +238,6 @@ private int getPort(String address) {
238238
}
239239
}
240240

241-
public static class PeerChanges {
242-
public final RaftPeer[] allPeersInNewConf;
243-
public final RaftPeer[] newPeers;
244-
public final RaftPeer[] removedPeers;
245-
246-
public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
247-
this.allPeersInNewConf = all;
248-
this.newPeers = newPeers;
249-
this.removedPeers = removed;
250-
}
251-
}
252-
253241
public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) {
254242
Stream<RaftPeer> peer = ids.stream()
255243
.map(id -> RaftPeer.newBuilder().setId(id))
@@ -493,12 +481,12 @@ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
493481
}
494482
}
495483

496-
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
497-
final RaftPeer[] np = newPeers.toArray(RaftPeer.emptyArray());
498-
newPeers.addAll(group.getPeers());
499-
RaftPeer[] p = newPeers.toArray(RaftPeer.emptyArray());
500-
group = RaftGroup.valueOf(group.getGroupId(), p);
501-
return new PeerChanges(p, np, RaftPeer.emptyArray());
484+
final List<RaftPeer> newPeers = toRaftPeers(newServers);
485+
final List<RaftPeer> allPeers = new ArrayList<>(newPeers.size() + group.getPeers().size());
486+
allPeers.addAll(newPeers);
487+
allPeers.addAll(group.getPeers());
488+
group = RaftGroup.valueOf(group.getGroupId(), allPeers);
489+
return new PeerChanges(allPeers, newPeers, Collections.emptyList());
502490
}
503491

504492
void startServers(Iterable<? extends RaftServer> raftServers) throws IOException {
@@ -513,7 +501,7 @@ void startServers(Iterable<? extends RaftServer> raftServers) throws IOException
513501
*/
514502
public PeerChanges removePeers(int number, boolean removeLeader,
515503
Collection<RaftPeer> excluded) throws InterruptedException {
516-
Collection<RaftPeer> raftPeers = new ArrayList<>(group.getPeers());
504+
final List<RaftPeer> raftPeers = new ArrayList<>(group.getPeers());
517505
List<RaftPeer> removedPeers = new ArrayList<>(number);
518506
if (removeLeader) {
519507
final RaftPeer leader = RaftTestUtil.waitForLeader(this).getPeer();
@@ -531,9 +519,8 @@ public PeerChanges removePeers(int number, boolean removeLeader,
531519
removed++;
532520
}
533521
}
534-
final RaftPeer[] p = raftPeers.toArray(RaftPeer.emptyArray());
535-
group = RaftGroup.valueOf(group.getGroupId(), p);
536-
return new PeerChanges(p, RaftPeer.emptyArray(), removedPeers.toArray(RaftPeer.emptyArray()));
522+
group = RaftGroup.valueOf(group.getGroupId(), raftPeers);
523+
return new PeerChanges(raftPeers, Collections.emptyList(), removedPeers);
537524
}
538525

539526
public void killServer(RaftPeerId id) {
@@ -815,15 +802,14 @@ public RaftClientRequest newRaftClientRequest(
815802
}
816803

817804
public SetConfigurationRequest newSetConfigurationRequest(
818-
ClientId clientId, RaftPeerId leaderId,
819-
RaftPeer... raftPeers) {
805+
ClientId clientId, RaftPeerId leaderId, List<RaftPeer> raftPeers) {
820806
return new SetConfigurationRequest(clientId, leaderId, getGroupId(), CallId.getDefault(),
821807
SetConfigurationRequest.Arguments.newBuilder().setServersInNewConf(raftPeers).build());
822808
}
823809

824-
public void setConfiguration(RaftPeer... raftPeers) throws IOException {
810+
public void setConfiguration(List<RaftPeer> raftPeers) throws IOException {
825811
try(RaftClient client = createClient()) {
826-
LOG.info("Start changing the configuration: {}", Arrays.asList(raftPeers));
812+
LOG.info("Start changing the configuration: {}", raftPeers);
827813
final RaftClientReply reply = client.admin().setConfiguration(raftPeers);
828814
Preconditions.assertTrue(reply.isSuccess());
829815
}

0 commit comments

Comments
 (0)