Skip to content

Commit 212a7f9

Browse files
committed
future true
1 parent 3f054a0 commit 212a7f9

File tree

5 files changed

+22
-22
lines changed

5 files changed

+22
-22
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public WriteBufferManager(
213213
TaskMemoryManager taskMemoryManager,
214214
ShuffleWriteMetrics shuffleWriteMetrics,
215215
RssConf rssConf,
216-
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc,
216+
Function<List<ShuffleBlockInfo>, List<Future<Long>>> spillFunc,
217217
int stageAttemptNumber) {
218218
this(
219219
shuffleId,

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Optional;
3131
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
3233
import java.util.concurrent.Future;
3334
import java.util.concurrent.ScheduledExecutorService;
3435
import java.util.concurrent.TimeUnit;
@@ -1561,6 +1562,6 @@ public Future<Long> sendData(AddBlockEvent event) {
15611562
if (dataPusher != null && event != null) {
15621563
return dataPusher.send(event);
15631564
}
1564-
return new Future<>();
1565+
return new CompletableFuture<>();
15651566
}
15661567
}

client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27-
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Executors;
3029
import java.util.concurrent.Future;
@@ -361,8 +360,7 @@ private void checkSentBlockCount() {
361360
*
362361
* @param shuffleBlockInfoList
363362
*/
364-
private List<CompletableFuture<Long>> processShuffleBlockInfos(
365-
List<ShuffleBlockInfo> shuffleBlockInfoList) {
363+
private List<Future<Long>> processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
366364
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
367365
shuffleBlockInfoList.stream()
368366
.forEach(
@@ -390,9 +388,8 @@ private List<CompletableFuture<Long>> processShuffleBlockInfos(
390388

391389
// don't send huge block to shuffle server, or there will be OOM if shuffle sever receives data
392390
// more than expected
393-
protected List<CompletableFuture<Long>> postBlockEvent(
394-
List<ShuffleBlockInfo> shuffleBlockInfoList) {
395-
List<CompletableFuture<Long>> futures = new ArrayList<>();
391+
protected List<Future<Long>> postBlockEvent(List<ShuffleBlockInfo> shuffleBlockInfoList) {
392+
List<Future<Long>> futures = new ArrayList<>();
396393
for (AddBlockEvent event : bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
397394
futures.add(shuffleManager.sendData(event));
398395
}

client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.Set;
2424
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Future;
2526
import java.util.function.Function;
2627
import java.util.stream.Collectors;
2728

@@ -169,9 +170,9 @@ public void checkBlockSendResultTest() {
169170
}
170171

171172
static class FakedDataPusher extends DataPusher {
172-
private final Function<AddBlockEvent, CompletableFuture<Long>> sendFunc;
173+
private final Function<AddBlockEvent, Future<Long>> sendFunc;
173174

174-
FakedDataPusher(Function<AddBlockEvent, CompletableFuture<Long>> sendFunc) {
175+
FakedDataPusher(Function<AddBlockEvent, Future<Long>> sendFunc) {
175176
this(null, null, null, null, null, 1, 1, sendFunc);
176177
}
177178

@@ -183,7 +184,7 @@ private FakedDataPusher(
183184
Set<String> failedTaskIds,
184185
int threadPoolSize,
185186
int threadKeepAliveTime,
186-
Function<AddBlockEvent, CompletableFuture<Long>> sendFunc) {
187+
Function<AddBlockEvent, Future<Long>> sendFunc) {
187188
super(
188189
shuffleWriteClient,
189190
taskToSuccessBlockIds,
@@ -195,7 +196,7 @@ private FakedDataPusher(
195196
}
196197

197198
@Override
198-
public CompletableFuture<Long> send(AddBlockEvent event) {
199+
public Future<Long> send(AddBlockEvent event) {
199200
return sendFunc.apply(event);
200201
}
201202
}

client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
22-
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.ExecutorService;
2423
import java.util.concurrent.Executors;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.FutureTask;
2526
import java.util.concurrent.TimeUnit;
2627

2728
import org.awaitility.Awaitility;
@@ -70,12 +71,12 @@ public void testGenerateTaskIdBitMap() {
7071
}
7172
}
7273

73-
private List<CompletableFuture<Boolean>> getFutures(boolean fail) {
74-
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
74+
private List<Future<Boolean>> getFutures(boolean fail) {
75+
List<Future<Boolean>> futures = new ArrayList<>();
7576
for (int i = 0; i < 3; i++) {
7677
final int index = i;
77-
CompletableFuture<Boolean> future =
78-
CompletableFuture.supplyAsync(
78+
FutureTask<Boolean> future =
79+
new FutureTask(
7980
() -> {
8081
if (index == 2) {
8182
try {
@@ -88,8 +89,8 @@ private List<CompletableFuture<Boolean>> getFutures(boolean fail) {
8889
return true;
8990
}
9091
return !fail || index != 1;
91-
},
92-
executorService);
92+
});
93+
executorService.submit(future);
9394
futures.add(future);
9495
}
9596
return futures;
@@ -98,13 +99,13 @@ private List<CompletableFuture<Boolean>> getFutures(boolean fail) {
9899
@Test
99100
public void testWaitUntilDoneOrFail() {
100101
// case1: enable fail fast
101-
List<CompletableFuture<Boolean>> futures1 = getFutures(true);
102+
List<Future<Boolean>> futures1 = getFutures(true);
102103
Awaitility.await()
103104
.timeout(2, TimeUnit.SECONDS)
104105
.until(() -> !waitUntilDoneOrFail(futures1, true));
105106

106107
// case2: disable fail fast
107-
List<CompletableFuture<Boolean>> futures2 = getFutures(true);
108+
List<Future<Boolean>> futures2 = getFutures(true);
108109
try {
109110
Awaitility.await()
110111
.timeout(2, TimeUnit.SECONDS)
@@ -115,7 +116,7 @@ public void testWaitUntilDoneOrFail() {
115116
}
116117

117118
// case3: all succeed
118-
List<CompletableFuture<Boolean>> futures3 = getFutures(false);
119+
List<Future<Boolean>> futures3 = getFutures(false);
119120
Awaitility.await()
120121
.timeout(4, TimeUnit.SECONDS)
121122
.until(() -> waitUntilDoneOrFail(futures3, true));

0 commit comments

Comments
 (0)