3838import org .apache .iotdb .consensus .common .response .ConsensusReadResponse ;
3939import org .apache .iotdb .consensus .common .response .ConsensusWriteResponse ;
4040import org .apache .iotdb .consensus .config .ConsensusConfig ;
41+ import org .apache .iotdb .consensus .config .RatisConfig ;
4142import org .apache .iotdb .consensus .exception .ConsensusException ;
4243import org .apache .iotdb .consensus .exception .ConsensusGroupNotExistException ;
4344import org .apache .iotdb .consensus .exception .NodeReadOnlyException ;
6263import org .apache .ratis .protocol .RaftPeerId ;
6364import org .apache .ratis .protocol .SnapshotManagementRequest ;
6465import org .apache .ratis .protocol .exceptions .NotLeaderException ;
66+ import org .apache .ratis .protocol .exceptions .ResourceUnavailableException ;
6567import org .apache .ratis .server .DivisionInfo ;
6668import org .apache .ratis .server .RaftServer ;
6769import org .apache .ratis .server .RaftServerConfigKeys ;
70+ import org .apache .ratis .util .function .CheckedSupplier ;
6871import org .apache .thrift .TException ;
6972import org .slf4j .Logger ;
7073import org .slf4j .LoggerFactory ;
@@ -111,6 +114,8 @@ class RatisConsensus implements IConsensus {
111114 // TODO make it configurable
112115 private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int ) TimeUnit .SECONDS .toMillis (20 );
113116
117+ private final RatisConfig config ;
118+
114119 public RatisConsensus (ConsensusConfig config , IStateMachine .Registry registry )
115120 throws IOException {
116121 myself = Utils .fromTEndPointAndPriorityToRaftPeer (config .getThisNode (), DEFAULT_PRIORITY );
@@ -122,6 +127,7 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
122127 GrpcConfigKeys .Server .setPort (properties , config .getThisNode ().getPort ());
123128
124129 Utils .initRatisConfig (properties , config .getRatisConfig ());
130+ this .config = config .getRatisConfig ();
125131
126132 clientRpc = new GrpcFactory (new Parameters ()).newRaftClientRpc (ClientId .randomId (), properties );
127133
@@ -148,6 +154,49 @@ public void stop() throws IOException {
148154 server .close ();
149155 }
150156
157+ private boolean shouldRetry (RaftClientReply reply ) {
158+ // currently, we only retry when ResourceUnavailableException is caught
159+ return !reply .isSuccess ()
160+ && (reply .getException () != null
161+ && reply .getException () instanceof ResourceUnavailableException );
162+ }
163+ /** launch a consensus write with retry mechanism */
164+ private RaftClientReply writeWithRetry (CheckedSupplier <RaftClientReply , IOException > caller )
165+ throws IOException {
166+
167+ final int maxRetryTimes = config .getRatisConsensus ().getRetryTimesMax ();
168+ final long waitMillis = config .getRatisConsensus ().getRetryWaitMillis ();
169+
170+ int retry = 0 ;
171+ RaftClientReply reply = null ;
172+ while (retry < maxRetryTimes ) {
173+ retry ++;
174+
175+ reply = caller .get ();
176+ if (!shouldRetry (reply )) {
177+ return reply ;
178+ }
179+ logger .debug ("{} sending write request with retry = {} and reply = {}" , this , retry , reply );
180+
181+ try {
182+ Thread .sleep (waitMillis );
183+ } catch (InterruptedException e ) {
184+ logger .warn ("{} retry write sleep is interrupted: {}" , this , e );
185+ Thread .currentThread ().interrupt ();
186+ }
187+ }
188+ return reply ;
189+ }
190+
191+ private RaftClientReply writeLocallyWithRetry (RaftClientRequest request ) throws IOException {
192+ return writeWithRetry (() -> server .submitClientRequest (request ));
193+ }
194+
195+ private RaftClientReply writeRemotelyWithRetry (RatisClient client , Message message )
196+ throws IOException {
197+ return writeWithRetry (() -> client .getRaftClient ().io ().send (message ));
198+ }
199+
151200 /**
152201 * write will first send request to local server use method call if local server is not leader, it
153202 * will use RaftClient to send RPC to read leader
@@ -183,7 +232,7 @@ public ConsensusWriteResponse write(
183232 RaftPeer suggestedLeader = null ;
184233 if (isLeader (consensusGroupId ) && waitUntilLeaderReady (raftGroupId )) {
185234 try {
186- localServerReply = server . submitClientRequest (clientRequest );
235+ localServerReply = writeLocallyWithRetry (clientRequest );
187236 if (localServerReply .isSuccess ()) {
188237 ResponseMessage responseMessage = (ResponseMessage ) localServerReply .getMessage ();
189238 TSStatus writeStatus = (TSStatus ) responseMessage .getContentHolder ();
@@ -203,7 +252,7 @@ public ConsensusWriteResponse write(
203252 RatisClient client = null ;
204253 try {
205254 client = getRaftClient (raftGroup );
206- RaftClientReply reply = client . getRaftClient (). io (). send ( message );
255+ RaftClientReply reply = writeRemotelyWithRetry ( client , message );
207256 if (!reply .isSuccess ()) {
208257 return failedWrite (new RatisRequestFailedException (reply .getException ()));
209258 }
0 commit comments