4040import org .apache .uniffle .client .response .RssReportShuffleResultResponse ;
4141import org .apache .uniffle .client .response .RssReportShuffleWriteFailureResponse ;
4242import org .apache .uniffle .client .response .RssReportShuffleWriteMetricResponse ;
43+ import org .apache .uniffle .common .config .RssClientConf ;
4344import org .apache .uniffle .common .exception .RssException ;
45+ import org .apache .uniffle .common .util .RetryUtils ;
4446import org .apache .uniffle .proto .RssProtos ;
4547import org .apache .uniffle .proto .RssProtos .ReportShuffleFetchFailureRequest ;
4648import org .apache .uniffle .proto .RssProtos .ReportShuffleFetchFailureResponse ;
@@ -57,12 +59,23 @@ public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout) {
5759 }
5860
5961 public ShuffleManagerGrpcClient (String host , int port , long rpcTimeout , int maxRetryAttempts ) {
60- this (host , port , rpcTimeout , maxRetryAttempts , true );
62+ this (
63+ host ,
64+ port ,
65+ rpcTimeout ,
66+ maxRetryAttempts ,
67+ true ,
68+ RssClientConf .RPC_RETRY_BACKOFF_MS .defaultValue ());
6169 }
6270
6371 public ShuffleManagerGrpcClient (
64- String host , int port , long rpcTimeout , int maxRetryAttempts , boolean usePlaintext ) {
65- super (host , port , maxRetryAttempts , usePlaintext );
72+ String host ,
73+ int port ,
74+ long rpcTimeout ,
75+ int maxRetryAttempts ,
76+ boolean usePlaintext ,
77+ long rpcRetryBackoffMs ) {
78+ super (host , port , maxRetryAttempts , usePlaintext , rpcRetryBackoffMs );
6679 blockingStub = ShuffleManagerGrpc .newBlockingStub (channel );
6780 this .rpcTimeout = rpcTimeout ;
6881 }
@@ -81,9 +94,14 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
8194 ReportShuffleFetchFailureRequest protoRequest = request .toProto ();
8295 try {
8396 ReportShuffleFetchFailureResponse response =
84- getBlockingStub ().reportShuffleFetchFailure (protoRequest );
97+ RetryUtils .retryWithCondition (
98+ () -> getBlockingStub ().reportShuffleFetchFailure (protoRequest ),
99+ null ,
100+ rpcRetryBackoffMs ,
101+ maxRetryAttempts ,
102+ e -> e instanceof Exception );
85103 return RssReportShuffleFetchFailureResponse .fromProto (response );
86- } catch (Exception e ) {
104+ } catch (Throwable e ) {
87105 String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed" ;
88106 LOG .warn (msg , e );
89107 throw new RssException (msg , e );
@@ -94,22 +112,50 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
94112 public RssReassignOnStageRetryResponse getPartitionToShufflerServerWithStageRetry (
95113 RssPartitionToShuffleServerRequest req ) {
96114 RssProtos .PartitionToShuffleServerRequest protoRequest = req .toProto ();
97- RssProtos .ReassignOnStageRetryResponse partitionToShufflerServer =
98- getBlockingStub ().getPartitionToShufflerServerWithStageRetry (protoRequest );
99- RssReassignOnStageRetryResponse rssReassignOnStageRetryResponse =
100- RssReassignOnStageRetryResponse .fromProto (partitionToShufflerServer );
101- return rssReassignOnStageRetryResponse ;
115+ try {
116+ RssProtos .ReassignOnStageRetryResponse partitionToShufflerServer =
117+ RetryUtils .retryWithCondition (
118+ () -> getBlockingStub ().getPartitionToShufflerServerWithStageRetry (protoRequest ),
119+ null ,
120+ rpcRetryBackoffMs ,
121+ maxRetryAttempts ,
122+ e -> e instanceof Exception );
123+ return RssReassignOnStageRetryResponse .fromProto (partitionToShufflerServer );
124+ } catch (Throwable e ) {
125+ String msg =
126+ "Get partition to shuffle server with stage retry from host:port["
127+ + host
128+ + ":"
129+ + port
130+ + "] failed" ;
131+ LOG .warn (msg , e );
132+ throw new RssException (msg , e );
133+ }
102134 }
103135
104136 @ Override
105137 public RssReassignOnBlockSendFailureResponse getPartitionToShufflerServerWithBlockRetry (
106138 RssPartitionToShuffleServerRequest req ) {
107139 RssProtos .PartitionToShuffleServerRequest protoRequest = req .toProto ();
108- RssProtos .ReassignOnBlockSendFailureResponse partitionToShufflerServer =
109- getBlockingStub ().getPartitionToShufflerServerWithBlockRetry (protoRequest );
110- RssReassignOnBlockSendFailureResponse rssReassignOnBlockSendFailureResponse =
111- RssReassignOnBlockSendFailureResponse .fromProto (partitionToShufflerServer );
112- return rssReassignOnBlockSendFailureResponse ;
140+ try {
141+ RssProtos .ReassignOnBlockSendFailureResponse partitionToShufflerServer =
142+ RetryUtils .retryWithCondition (
143+ () -> getBlockingStub ().getPartitionToShufflerServerWithBlockRetry (protoRequest ),
144+ null ,
145+ rpcRetryBackoffMs ,
146+ maxRetryAttempts ,
147+ e -> e instanceof Exception );
148+ return RssReassignOnBlockSendFailureResponse .fromProto (partitionToShufflerServer );
149+ } catch (Throwable e ) {
150+ String msg =
151+ "Get partition to shuffle server with block retry from host:port["
152+ + host
153+ + ":"
154+ + port
155+ + "] failed" ;
156+ LOG .warn (msg , e );
157+ throw new RssException (msg , e );
158+ }
113159 }
114160
115161 @ Override
@@ -118,9 +164,14 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
118164 RssProtos .ReportShuffleWriteFailureRequest protoRequest = request .toProto ();
119165 try {
120166 RssProtos .ReportShuffleWriteFailureResponse response =
121- getBlockingStub ().reportShuffleWriteFailure (protoRequest );
167+ RetryUtils .retryWithCondition (
168+ () -> getBlockingStub ().reportShuffleWriteFailure (protoRequest ),
169+ null ,
170+ rpcRetryBackoffMs ,
171+ maxRetryAttempts ,
172+ e -> e instanceof Exception );
122173 return RssReportShuffleWriteFailureResponse .fromProto (response );
123- } catch (Exception e ) {
174+ } catch (Throwable e ) {
124175 String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed" ;
125176 LOG .warn (msg , e );
126177 throw new RssException (msg , e );
@@ -132,47 +183,115 @@ public RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
132183 RssReassignOnBlockSendFailureRequest request ) {
133184 RssProtos .RssReassignOnBlockSendFailureRequest protoReq =
134185 RssReassignOnBlockSendFailureRequest .toProto (request );
135- RssProtos .ReassignOnBlockSendFailureResponse response =
136- getBlockingStub ().reassignOnBlockSendFailure (protoReq );
137- return RssReassignOnBlockSendFailureResponse .fromProto (response );
186+ try {
187+ RssProtos .ReassignOnBlockSendFailureResponse response =
188+ RetryUtils .retryWithCondition (
189+ () -> getBlockingStub ().reassignOnBlockSendFailure (protoReq ),
190+ null ,
191+ rpcRetryBackoffMs ,
192+ maxRetryAttempts ,
193+ e -> e instanceof Exception );
194+ return RssReassignOnBlockSendFailureResponse .fromProto (response );
195+ } catch (Throwable e ) {
196+ String msg =
197+ "Reassign on block send failure from host:port[" + host + ":" + port + "] failed" ;
198+ LOG .warn (msg , e );
199+ throw new RssException (msg , e );
200+ }
138201 }
139202
140203 @ Override
141204 public RssGetShuffleResultResponse getShuffleResult (RssGetShuffleResultRequest request ) {
142- RssProtos .GetShuffleResultResponse response =
143- getBlockingStub ().getShuffleResult (request .toProto ());
144- return RssGetShuffleResultResponse .fromProto (response );
205+ try {
206+ RssProtos .GetShuffleResultResponse response =
207+ RetryUtils .retryWithCondition (
208+ () -> getBlockingStub ().getShuffleResult (request .toProto ()),
209+ null ,
210+ rpcRetryBackoffMs ,
211+ maxRetryAttempts ,
212+ e -> e instanceof Exception );
213+ return RssGetShuffleResultResponse .fromProto (response );
214+ } catch (Throwable e ) {
215+ String msg = "Get shuffle result from host:port[" + host + ":" + port + "] failed" ;
216+ LOG .warn (msg , e );
217+ throw new RuntimeException (msg , e );
218+ }
145219 }
146220
147221 @ Override
148222 public RssGetShuffleResultResponse getShuffleResultForMultiPart (
149223 RssGetShuffleResultForMultiPartRequest request ) {
150- RssProtos .GetShuffleResultForMultiPartResponse response =
151- getBlockingStub ().getShuffleResultForMultiPart (request .toProto ());
152- return RssGetShuffleResultResponse .fromProto (response );
224+ try {
225+ RssProtos .GetShuffleResultForMultiPartResponse response =
226+ RetryUtils .retryWithCondition (
227+ () -> getBlockingStub ().getShuffleResultForMultiPart (request .toProto ()),
228+ null ,
229+ rpcRetryBackoffMs ,
230+ maxRetryAttempts ,
231+ e -> e instanceof Exception );
232+ return RssGetShuffleResultResponse .fromProto (response );
233+ } catch (Throwable e ) {
234+ String msg =
235+ "Get shuffle result for multiport from host:port[" + host + ":" + port + "] failed" ;
236+ LOG .warn (msg , e );
237+ throw new RuntimeException (msg , e );
238+ }
153239 }
154240
155241 @ Override
156242 public RssReportShuffleResultResponse reportShuffleResult (RssReportShuffleResultRequest request ) {
157- RssProtos .ReportShuffleResultResponse response =
158- getBlockingStub ().reportShuffleResult (request .toProto ());
159- return RssReportShuffleResultResponse .fromProto (response );
243+ try {
244+ RssProtos .ReportShuffleResultResponse response =
245+ RetryUtils .retryWithCondition (
246+ () -> getBlockingStub ().reportShuffleResult (request .toProto ()),
247+ null ,
248+ rpcRetryBackoffMs ,
249+ maxRetryAttempts ,
250+ e -> e instanceof Exception );
251+ return RssReportShuffleResultResponse .fromProto (response );
252+ } catch (Throwable e ) {
253+ String msg = "Report shuffle result to host:port[" + host + ":" + port + "] failed" ;
254+ LOG .warn (msg , e );
255+ throw new RuntimeException (msg , e );
256+ }
160257 }
161258
162259 @ Override
163260 public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric (
164261 RssReportShuffleWriteMetricRequest request ) {
165- RssProtos .ReportShuffleWriteMetricResponse response =
166- getBlockingStub ().reportShuffleWriteMetric (request .toProto ());
167- return RssReportShuffleWriteMetricResponse .fromProto (response );
262+ try {
263+ RssProtos .ReportShuffleWriteMetricResponse response =
264+ RetryUtils .retryWithCondition (
265+ () -> getBlockingStub ().reportShuffleWriteMetric (request .toProto ()),
266+ null ,
267+ rpcRetryBackoffMs ,
268+ maxRetryAttempts ,
269+ e -> e instanceof Exception );
270+ return RssReportShuffleWriteMetricResponse .fromProto (response );
271+ } catch (Throwable e ) {
272+ String msg = "Report shuffle write metric to host:port[" + host + ":" + port + "] failed" ;
273+ LOG .warn (msg , e );
274+ throw new RuntimeException (msg , e );
275+ }
168276 }
169277
170278 @ Override
171279 public RssReportShuffleReadMetricResponse reportShuffleReadMetric (
172280 RssReportShuffleReadMetricRequest request ) {
173- RssProtos .ReportShuffleReadMetricResponse response =
174- getBlockingStub ().reportShuffleReadMetric (request .toProto ());
175- return RssReportShuffleReadMetricResponse .fromProto (response );
281+ try {
282+ RssProtos .ReportShuffleReadMetricResponse response =
283+ RetryUtils .retryWithCondition (
284+ () -> getBlockingStub ().reportShuffleReadMetric (request .toProto ()),
285+ null ,
286+ rpcRetryBackoffMs ,
287+ maxRetryAttempts ,
288+ e -> e instanceof Exception );
289+ return RssReportShuffleReadMetricResponse .fromProto (response );
290+ } catch (Throwable e ) {
291+ String msg = "Report shuffle read metric to host:port[" + host + ":" + port + "] failed" ;
292+ LOG .warn (msg , e );
293+ throw new RuntimeException (msg , e );
294+ }
176295 }
177296
178297 @ Override
0 commit comments