Skip to content

Commit 3f054a0

Browse files
committed
cancel rpc
1 parent b7969b4 commit 3f054a0

File tree

8 files changed

+107
-118
lines changed

8 files changed

+107
-118
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import java.util.Map;
2525
import java.util.Optional;
2626
import java.util.Set;
27-
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.FutureTask;
2930
import java.util.concurrent.ThreadPoolExecutor;
3031
import java.util.concurrent.TimeUnit;
3132

@@ -80,11 +81,12 @@ public DataPusher(
8081
ThreadUtils.getThreadFactory(this.getClass().getName()));
8182
}
8283

83-
public CompletableFuture<Long> send(AddBlockEvent event) {
84+
public Future<Long> send(AddBlockEvent event) {
8485
if (rssAppId == null) {
8586
throw new RssException("RssAppId should be set.");
8687
}
87-
return CompletableFuture.supplyAsync(
88+
FutureTask<Long> future =
89+
new FutureTask<Long>(
8890
() -> {
8991
String taskId = event.getTaskId();
9092
List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList();
@@ -118,13 +120,9 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
118120
.map(x -> x.getFreeMemory())
119121
.reduce((a, b) -> a + b)
120122
.get();
121-
},
122-
executorService)
123-
.exceptionally(
124-
ex -> {
125-
LOGGER.error("Unexpected exceptions occurred while sending shuffle data", ex);
126-
return null;
127123
});
124+
executorService.submit(future);
125+
return future;
128126
}
129127

130128
private synchronized void putBlockId(

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Optional;
2626
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Future;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.TimeoutException;
2930
import java.util.concurrent.atomic.AtomicInteger;
@@ -96,7 +97,7 @@ public class WriteBufferManager extends MemoryConsumer {
9697
private long requireMemoryInterval;
9798
private int requireMemoryRetryMax;
9899
private Optional<Codec> codec;
99-
private Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc;
100+
private Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc;
100101
private long sendSizeLimit;
101102
private boolean memorySpillEnabled;
102103
private int memorySpillTimeoutSec;
@@ -138,7 +139,7 @@ public WriteBufferManager(
138139
TaskMemoryManager taskMemoryManager,
139140
ShuffleWriteMetrics shuffleWriteMetrics,
140141
RssConf rssConf,
141-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
142+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc,
142143
Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc) {
143144
this(
144145
shuffleId,
@@ -163,7 +164,7 @@ public WriteBufferManager(
163164
TaskMemoryManager taskMemoryManager,
164165
ShuffleWriteMetrics shuffleWriteMetrics,
165166
RssConf rssConf,
166-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
167+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc,
167168
Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc,
168169
int stageAttemptNumber) {
169170
super(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
@@ -555,7 +556,7 @@ public long spill(long size, MemoryConsumer trigger) {
555556
return 0L;
556557
}
557558

558-
List<CompletableFuture<Long>> futures = spillFunc.apply(clear(bufferSpillRatio));
559+
List<Future<Long>> futures = spillFunc.apply(clear(bufferSpillRatio));
559560
CompletableFuture<Void> allOfFutures =
560561
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
561562
try {
@@ -671,8 +672,7 @@ public void setTaskId(String taskId) {
671672
}
672673

673674
@VisibleForTesting
674-
public void setSpillFunc(
675-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc) {
675+
public void setSpillFunc(Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc) {
676676
this.spillFunc = spillFunc;
677677
}
678678

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Optional;
3131
import java.util.Set;
32-
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.Future;
3333
import java.util.concurrent.ScheduledExecutorService;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1557,10 +1557,10 @@ public Map<String, FailedBlockSendTracker> getTaskToFailedBlockSendTracker() {
15571557
return taskToFailedBlockSendTracker;
15581558
}
15591559

1560-
public CompletableFuture<Long> sendData(AddBlockEvent event) {
1560+
public Future<Long> sendData(AddBlockEvent event) {
15611561
if (dataPusher != null && event != null) {
15621562
return dataPusher.send(event);
15631563
}
1564-
return new CompletableFuture<>();
1564+
return new Future<>();
15651565
}
15661566
}

client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Set;
25-
import java.util.concurrent.CompletableFuture;
2625
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
2727
import java.util.function.Supplier;
2828

2929
import com.google.common.collect.Maps;
@@ -119,7 +119,7 @@ public void testSendData() throws ExecutionException, InterruptedException {
119119
new ShuffleBlockInfo(1, 1, 1, 1, 1, new byte[1], null, 1, 100, 1);
120120
AddBlockEvent event = new AddBlockEvent("taskId", Arrays.asList(shuffleBlockInfo));
121121
// sync send
122-
CompletableFuture<Long> future = dataPusher.send(event);
122+
Future<Long> future = dataPusher.send(event);
123123
long memoryFree = future.get();
124124
assertEquals(100, memoryFree);
125125
assertTrue(taskToSuccessBlockIds.get("taskId").contains(1L));

client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
import java.util.Optional;
25-
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.FutureTask;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.function.Function;
2829
import java.util.stream.Stream;
@@ -370,7 +371,7 @@ public void spillByOwnTest() {
370371
null,
371372
0);
372373

373-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc =
374+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc =
374375
blocks -> {
375376
long sum = 0L;
376377
List<AddBlockEvent> events = wbm.buildBlockEvents(blocks);
@@ -381,7 +382,8 @@ public void spillByOwnTest() {
381382
event.getProcessedCallbackChain().stream().forEach(x -> x.run());
382383
sum += event.getShuffleDataInfoList().stream().mapToLong(x -> x.getFreeMemory()).sum();
383384
}
384-
return Arrays.asList(CompletableFuture.completedFuture(sum));
385+
final long result = sum;
386+
return Arrays.asList(new FutureTask<Long>(() -> result));
385387
};
386388
wbm.setSpillFunc(spillFunc);
387389

@@ -407,7 +409,7 @@ public void spillByOwnTest() {
407409
spillFunc =
408410
shuffleBlockInfos ->
409411
Arrays.asList(
410-
CompletableFuture.supplyAsync(
412+
new FutureTask<Long>(
411413
() -> {
412414
List<AddBlockEvent> events = spyManager.buildBlockEvents(shuffleBlockInfos);
413415
long sum = 0L;
@@ -481,15 +483,16 @@ public void spillPartial() {
481483
null,
482484
0);
483485

484-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc =
486+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc =
485487
blocks -> {
486488
long sum = 0L;
487489
List<AddBlockEvent> events = wbm.buildBlockEvents(blocks);
488490
for (AddBlockEvent event : events) {
489491
event.getProcessedCallbackChain().stream().forEach(x -> x.run());
490492
sum += event.getShuffleDataInfoList().stream().mapToLong(x -> x.getFreeMemory()).sum();
491493
}
492-
return Arrays.asList(CompletableFuture.completedFuture(sum));
494+
final long result = sum;
495+
return Arrays.asList(new FutureTask<Long>(() -> result));
493496
};
494497
wbm.setSpillFunc(spillFunc);
495498

@@ -573,7 +576,7 @@ public void spillByOwnWithSparkTaskMemoryManagerTest() {
573576

574577
List<ShuffleBlockInfo> blockList = new ArrayList<>();
575578

576-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc =
579+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc =
577580
blocks -> {
578581
blockList.addAll(blocks);
579582
long sum = 0L;
@@ -582,7 +585,8 @@ public void spillByOwnWithSparkTaskMemoryManagerTest() {
582585
event.getProcessedCallbackChain().stream().forEach(x -> x.run());
583586
sum += event.getShuffleDataInfoList().stream().mapToLong(x -> x.getFreeMemory()).sum();
584587
}
585-
return Arrays.asList(CompletableFuture.completedFuture(sum));
588+
final long result = sum;
589+
return Arrays.asList(new FutureTask<Long>(() -> result));
586590
};
587591
wbm.setSpillFunc(spillFunc);
588592

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Map;
2828
import java.util.Set;
2929
import java.util.concurrent.BlockingQueue;
30-
import java.util.concurrent.CompletableFuture;
3130
import java.util.concurrent.ExecutorService;
3231
import java.util.concurrent.Executors;
3332
import java.util.concurrent.Future;
@@ -415,7 +414,7 @@ public long[] getPartitionLengths() {
415414
}
416415

417416
@VisibleForTesting
418-
protected List<CompletableFuture<Long>> processShuffleBlockInfos(
417+
protected List<Future<Long>> processShuffleBlockInfos(
419418
List<ShuffleBlockInfo> shuffleBlockInfoList) {
420419
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
421420
shuffleBlockInfoList.forEach(
@@ -440,9 +439,8 @@ protected List<CompletableFuture<Long>> processShuffleBlockInfos(
440439
return Collections.emptyList();
441440
}
442441

443-
protected List<CompletableFuture<Long>> postBlockEvent(
444-
List<ShuffleBlockInfo> shuffleBlockInfoList) {
445-
List<CompletableFuture<Long>> futures = new ArrayList<>();
442+
protected List<Future<Long>> postBlockEvent(List<ShuffleBlockInfo> shuffleBlockInfoList) {
443+
List<Future<Long>> futures = new ArrayList<>();
446444
for (AddBlockEvent event : bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
447445
if (blockFailSentRetryEnabled) {
448446
// do nothing if failed.

0 commit comments

Comments
 (0)