Skip to content

Commit 4def3cc

Browse files
authored
[#2093][followup] feat: Add support of partition split for grpc (#2396)
### What changes were proposed in this pull request? Add support of partition split for grpc ### Why are the changes needed? followup for #2093 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests
1 parent 9ce55e3 commit 4def3cc

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
271271
throw new RssException(
272272
"The feature of task partition reassign is incompatible with multiple replicas mechanism.");
273273
}
274+
LOG.info("Partition reassign is enabled.");
274275
}
275276
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
276277
this.shuffleManagerRpcServiceEnabled =

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Random;
26+
import java.util.Set;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicReference;
2829

2930
import com.google.common.annotations.VisibleForTesting;
3031
import com.google.common.collect.Lists;
32+
import com.google.common.collect.Sets;
3133
import com.google.protobuf.ByteString;
3234
import com.google.protobuf.UnsafeByteOperations;
3335
import io.grpc.StatusRuntimeException;
@@ -540,7 +542,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
540542

541543
boolean isSuccessful = true;
542544
AtomicReference<StatusCode> failedStatusCode = new AtomicReference<>(StatusCode.INTERNAL_ERROR);
543-
545+
Set<Integer> needSplitPartitionIds = Sets.newHashSet();
544546
// prepare rpc request based on shuffleId -> partitionId -> blocks
545547
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb :
546548
shuffleIdToBlocks.entrySet()) {
@@ -583,17 +585,18 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
583585
RetryUtils.retryWithCondition(
584586
() -> {
585587
// TODO(baoloongmao): support partition split follow netty client
586-
long requireId =
588+
Pair<Long, List<Integer>> allocationResult =
587589
requirePreAllocation(
588-
appId,
589-
shuffleId,
590-
partitionIds,
591-
partitionRequireSizes,
592-
allocateSize,
593-
request.getRetryMax() / maxRetryAttempts,
594-
request.getRetryIntervalMax(),
595-
failedStatusCode)
596-
.getLeft();
590+
appId,
591+
shuffleId,
592+
partitionIds,
593+
partitionRequireSizes,
594+
allocateSize,
595+
request.getRetryMax() / maxRetryAttempts,
596+
request.getRetryIntervalMax(),
597+
failedStatusCode);
598+
long requireId = allocationResult.getLeft();
599+
needSplitPartitionIds.addAll(allocationResult.getRight());
597600
if (requireId == FAILED_REQUIRE_ID) {
598601
throw new RssException(
599602
String.format(
@@ -661,6 +664,7 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
661664
} else {
662665
response = new RssSendShuffleDataResponse(failedStatusCode.get());
663666
}
667+
response.setNeedSplitPartitionIds(needSplitPartitionIds);
664668
return response;
665669
}
666670

0 commit comments

Comments
 (0)