Skip to content

Commit bfe9b76

Browse files
committed
Eagerly cancel rpc request
1 parent 01f6e25 commit bfe9b76

File tree

4 files changed

+14
-5
lines changed

4 files changed

+14
-5
lines changed

client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ private boolean sendShuffleDataAsync(
200200
stageAttemptNumber,
201201
retryMax,
202202
retryIntervalMax,
203-
shuffleIdToBlocks);
203+
shuffleIdToBlocks,
204+
needCancelRequest);
204205
long s = System.currentTimeMillis();
205206
RssSendShuffleDataResponse response =
206207
getShuffleServerClient(ssi).sendShuffleData(request);

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
647647
null,
648648
request.getRetryIntervalMax(),
649649
maxRetryAttempts,
650-
t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
650+
t -> !request.needCancel() && !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
651651
} catch (Throwable throwable) {
652652
LOG.warn("Failed to send shuffle data due to ", throwable);
653653
isSuccessful = false;

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
235235
null,
236236
request.getRetryIntervalMax(),
237237
maxRetryAttempts,
238-
t -> !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
238+
t -> !request.needCancel() && !(t instanceof OutOfMemoryError) && !(t instanceof NotRetryException));
239239
} catch (Throwable throwable) {
240240
LOG.warn("Failed to send shuffle data due to ", throwable);
241241
isSuccessful = false;

internal-client/src/main/java/org/apache/uniffle/client/request/RssSendShuffleDataRequest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.function.Supplier;
2223

2324
import org.apache.uniffle.common.ShuffleBlockInfo;
2425

@@ -29,26 +30,29 @@ public class RssSendShuffleDataRequest {
2930
private int retryMax;
3031
private long retryIntervalMax;
3132
private Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks;
33+
private Supplier<Boolean> needCancel;
3234

3335
public RssSendShuffleDataRequest(
3436
String appId,
3537
int retryMax,
3638
long retryIntervalMax,
3739
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks) {
38-
this(appId, 0, retryMax, retryIntervalMax, shuffleIdToBlocks);
40+
this(appId, 0, retryMax, retryIntervalMax, shuffleIdToBlocks, () -> false);
3941
}
4042

4143
public RssSendShuffleDataRequest(
4244
String appId,
4345
int stageAttemptNumber,
4446
int retryMax,
4547
long retryIntervalMax,
46-
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks) {
48+
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks,
49+
Supplier<Boolean> needCancel) {
4750
this.appId = appId;
4851
this.retryMax = retryMax;
4952
this.retryIntervalMax = retryIntervalMax;
5053
this.shuffleIdToBlocks = shuffleIdToBlocks;
5154
this.stageAttemptNumber = stageAttemptNumber;
55+
this.needCancel = needCancel;
5256
}
5357

5458
public String getAppId() {
@@ -70,4 +74,8 @@ public int getStageAttemptNumber() {
7074
public Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> getShuffleIdToBlocks() {
7175
return shuffleIdToBlocks;
7276
}
77+
78+
public Boolean needCancel() {
79+
return needCancel.get();
80+
}
7381
}

0 commit comments

Comments
 (0)