Skip to content

Commit 432e7fd

Browse files
committed
[#2595] Add configurable 'RPC_RETRY_BACKOFF_MS' option into ShuffleManagerGrpcClient
1 parent cb408f5 commit 432e7fd

File tree

1 file changed

+154
-35
lines changed

1 file changed

+154
-35
lines changed

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java

Lines changed: 154 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
4141
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
4242
import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
43+
import org.apache.uniffle.common.config.RssClientConf;
4344
import org.apache.uniffle.common.exception.RssException;
45+
import org.apache.uniffle.common.util.RetryUtils;
4446
import org.apache.uniffle.proto.RssProtos;
4547
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
4648
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
@@ -57,12 +59,23 @@ public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout) {
5759
}
5860

5961
public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout, int maxRetryAttempts) {
60-
this(host, port, rpcTimeout, maxRetryAttempts, true);
62+
this(
63+
host,
64+
port,
65+
rpcTimeout,
66+
maxRetryAttempts,
67+
true,
68+
RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue());
6169
}
6270

6371
public ShuffleManagerGrpcClient(
64-
String host, int port, long rpcTimeout, int maxRetryAttempts, boolean usePlaintext) {
65-
super(host, port, maxRetryAttempts, usePlaintext);
72+
String host,
73+
int port,
74+
long rpcTimeout,
75+
int maxRetryAttempts,
76+
boolean usePlaintext,
77+
long rpcRetryBackoffMs) {
78+
super(host, port, maxRetryAttempts, usePlaintext, rpcRetryBackoffMs);
6679
blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
6780
this.rpcTimeout = rpcTimeout;
6881
}
@@ -81,9 +94,14 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
8194
ReportShuffleFetchFailureRequest protoRequest = request.toProto();
8295
try {
8396
ReportShuffleFetchFailureResponse response =
84-
getBlockingStub().reportShuffleFetchFailure(protoRequest);
97+
RetryUtils.retryWithCondition(
98+
() -> getBlockingStub().reportShuffleFetchFailure(protoRequest),
99+
null,
100+
rpcRetryBackoffMs,
101+
maxRetryAttempts,
102+
e -> e instanceof Exception);
85103
return RssReportShuffleFetchFailureResponse.fromProto(response);
86-
} catch (Exception e) {
104+
} catch (Throwable e) {
87105
String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
88106
LOG.warn(msg, e);
89107
throw new RssException(msg, e);
@@ -94,22 +112,50 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
94112
public RssReassignOnStageRetryResponse getPartitionToShufflerServerWithStageRetry(
95113
RssPartitionToShuffleServerRequest req) {
96114
RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
97-
RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer =
98-
getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest);
99-
RssReassignOnStageRetryResponse rssReassignOnStageRetryResponse =
100-
RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer);
101-
return rssReassignOnStageRetryResponse;
115+
try {
116+
RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer =
117+
RetryUtils.retryWithCondition(
118+
() -> getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest),
119+
null,
120+
rpcRetryBackoffMs,
121+
maxRetryAttempts,
122+
e -> e instanceof Exception);
123+
return RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer);
124+
} catch (Throwable e) {
125+
String msg =
126+
"Get partition to shuffle server with stage retry from host:port["
127+
+ host
128+
+ ":"
129+
+ port
130+
+ "] failed";
131+
LOG.warn(msg, e);
132+
throw new RssException(msg, e);
133+
}
102134
}
103135

104136
@Override
105137
public RssReassignOnBlockSendFailureResponse getPartitionToShufflerServerWithBlockRetry(
106138
RssPartitionToShuffleServerRequest req) {
107139
RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
108-
RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
109-
getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest);
110-
RssReassignOnBlockSendFailureResponse rssReassignOnBlockSendFailureResponse =
111-
RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
112-
return rssReassignOnBlockSendFailureResponse;
140+
try {
141+
RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
142+
RetryUtils.retryWithCondition(
143+
() -> getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest),
144+
null,
145+
rpcRetryBackoffMs,
146+
maxRetryAttempts,
147+
e -> e instanceof Exception);
148+
return RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
149+
} catch (Throwable e) {
150+
String msg =
151+
"Get partition to shuffle server with block retry from host:port["
152+
+ host
153+
+ ":"
154+
+ port
155+
+ "] failed";
156+
LOG.warn(msg, e);
157+
throw new RssException(msg, e);
158+
}
113159
}
114160

115161
@Override
@@ -118,9 +164,14 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
118164
RssProtos.ReportShuffleWriteFailureRequest protoRequest = request.toProto();
119165
try {
120166
RssProtos.ReportShuffleWriteFailureResponse response =
121-
getBlockingStub().reportShuffleWriteFailure(protoRequest);
167+
RetryUtils.retryWithCondition(
168+
() -> getBlockingStub().reportShuffleWriteFailure(protoRequest),
169+
null,
170+
rpcRetryBackoffMs,
171+
maxRetryAttempts,
172+
e -> e instanceof Exception);
122173
return RssReportShuffleWriteFailureResponse.fromProto(response);
123-
} catch (Exception e) {
174+
} catch (Throwable e) {
124175
String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
125176
LOG.warn(msg, e);
126177
throw new RssException(msg, e);
@@ -132,47 +183,115 @@ public RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
132183
RssReassignOnBlockSendFailureRequest request) {
133184
RssProtos.RssReassignOnBlockSendFailureRequest protoReq =
134185
RssReassignOnBlockSendFailureRequest.toProto(request);
135-
RssProtos.ReassignOnBlockSendFailureResponse response =
136-
getBlockingStub().reassignOnBlockSendFailure(protoReq);
137-
return RssReassignOnBlockSendFailureResponse.fromProto(response);
186+
try {
187+
RssProtos.ReassignOnBlockSendFailureResponse response =
188+
RetryUtils.retryWithCondition(
189+
() -> getBlockingStub().reassignOnBlockSendFailure(protoReq),
190+
null,
191+
rpcRetryBackoffMs,
192+
maxRetryAttempts,
193+
e -> e instanceof Exception);
194+
return RssReassignOnBlockSendFailureResponse.fromProto(response);
195+
} catch (Throwable e) {
196+
String msg =
197+
"Reassign on block send failure from host:port[" + host + ":" + port + "] failed";
198+
LOG.warn(msg, e);
199+
throw new RssException(msg, e);
200+
}
138201
}
139202

140203
@Override
141204
public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest request) {
142-
RssProtos.GetShuffleResultResponse response =
143-
getBlockingStub().getShuffleResult(request.toProto());
144-
return RssGetShuffleResultResponse.fromProto(response);
205+
try {
206+
RssProtos.GetShuffleResultResponse response =
207+
RetryUtils.retryWithCondition(
208+
() -> getBlockingStub().getShuffleResult(request.toProto()),
209+
null,
210+
rpcRetryBackoffMs,
211+
maxRetryAttempts,
212+
e -> e instanceof Exception);
213+
return RssGetShuffleResultResponse.fromProto(response);
214+
} catch (Throwable e) {
215+
String msg = "Get shuffle result from host:port[" + host + ":" + port + "] failed";
216+
LOG.warn(msg, e);
217+
throw new RuntimeException(msg, e);
218+
}
145219
}
146220

147221
@Override
148222
public RssGetShuffleResultResponse getShuffleResultForMultiPart(
149223
RssGetShuffleResultForMultiPartRequest request) {
150-
RssProtos.GetShuffleResultForMultiPartResponse response =
151-
getBlockingStub().getShuffleResultForMultiPart(request.toProto());
152-
return RssGetShuffleResultResponse.fromProto(response);
224+
try {
225+
RssProtos.GetShuffleResultForMultiPartResponse response =
226+
RetryUtils.retryWithCondition(
227+
() -> getBlockingStub().getShuffleResultForMultiPart(request.toProto()),
228+
null,
229+
rpcRetryBackoffMs,
230+
maxRetryAttempts,
231+
e -> e instanceof Exception);
232+
return RssGetShuffleResultResponse.fromProto(response);
233+
} catch (Throwable e) {
234+
String msg =
235+
"Get shuffle result for multiport from host:port[" + host + ":" + port + "] failed";
236+
LOG.warn(msg, e);
237+
throw new RuntimeException(msg, e);
238+
}
153239
}
154240

155241
@Override
156242
public RssReportShuffleResultResponse reportShuffleResult(RssReportShuffleResultRequest request) {
157-
RssProtos.ReportShuffleResultResponse response =
158-
getBlockingStub().reportShuffleResult(request.toProto());
159-
return RssReportShuffleResultResponse.fromProto(response);
243+
try {
244+
RssProtos.ReportShuffleResultResponse response =
245+
RetryUtils.retryWithCondition(
246+
() -> getBlockingStub().reportShuffleResult(request.toProto()),
247+
null,
248+
rpcRetryBackoffMs,
249+
maxRetryAttempts,
250+
e -> e instanceof Exception);
251+
return RssReportShuffleResultResponse.fromProto(response);
252+
} catch (Throwable e) {
253+
String msg = "Report shuffle result to host:port[" + host + ":" + port + "] failed";
254+
LOG.warn(msg, e);
255+
throw new RuntimeException(msg, e);
256+
}
160257
}
161258

162259
@Override
163260
public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
164261
RssReportShuffleWriteMetricRequest request) {
165-
RssProtos.ReportShuffleWriteMetricResponse response =
166-
getBlockingStub().reportShuffleWriteMetric(request.toProto());
167-
return RssReportShuffleWriteMetricResponse.fromProto(response);
262+
try {
263+
RssProtos.ReportShuffleWriteMetricResponse response =
264+
RetryUtils.retryWithCondition(
265+
() -> getBlockingStub().reportShuffleWriteMetric(request.toProto()),
266+
null,
267+
rpcRetryBackoffMs,
268+
maxRetryAttempts,
269+
e -> e instanceof Exception);
270+
return RssReportShuffleWriteMetricResponse.fromProto(response);
271+
} catch (Throwable e) {
272+
String msg = "Report shuffle write metric to host:port[" + host + ":" + port + "] failed";
273+
LOG.warn(msg, e);
274+
throw new RuntimeException(msg, e);
275+
}
168276
}
169277

170278
@Override
171279
public RssReportShuffleReadMetricResponse reportShuffleReadMetric(
172280
RssReportShuffleReadMetricRequest request) {
173-
RssProtos.ReportShuffleReadMetricResponse response =
174-
getBlockingStub().reportShuffleReadMetric(request.toProto());
175-
return RssReportShuffleReadMetricResponse.fromProto(response);
281+
try {
282+
RssProtos.ReportShuffleReadMetricResponse response =
283+
RetryUtils.retryWithCondition(
284+
() -> getBlockingStub().reportShuffleReadMetric(request.toProto()),
285+
null,
286+
rpcRetryBackoffMs,
287+
maxRetryAttempts,
288+
e -> e instanceof Exception);
289+
return RssReportShuffleReadMetricResponse.fromProto(response);
290+
} catch (Throwable e) {
291+
String msg = "Report shuffle read metric to host:port[" + host + ":" + port + "] failed";
292+
LOG.warn(msg, e);
293+
throw new RuntimeException(msg, e);
294+
}
176295
}
177296

178297
@Override

0 commit comments

Comments
 (0)