Skip to content

Commit 827ee04

Browse files
committed
Put sparkConf as extra properties while client request accessCluster
1 parent 3cf82d7 commit 827ee04

File tree

3 files changed

+16
-15
lines changed

3 files changed

+16
-15
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.shuffle;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
2022
import java.util.Set;
2123

2224
import scala.Tuple2;
@@ -512,4 +514,15 @@ public static RssConf toRssConf(SparkConf sparkConf) {
512514
}
513515
return rssConf;
514516
}
517+
518+
public static Map<String, String> sparkConfToMap(SparkConf sparkConf) {
519+
Map<String, String> map = new HashMap<>();
520+
521+
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
522+
String key = tuple._1;
523+
map.put(key, tuple._2);
524+
}
525+
526+
return map;
527+
}
515528
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import java.util.function.Supplier;
3535
import java.util.stream.Collectors;
3636

37-
import scala.Tuple2;
38-
3937
import com.google.common.annotations.VisibleForTesting;
4038
import com.google.common.collect.Maps;
4139
import com.google.common.collect.Sets;
@@ -1064,7 +1062,7 @@ protected void registerShuffleServers(
10641062
}
10651063
LOG.info("Start to register shuffleId {}", shuffleId);
10661064
long start = System.currentTimeMillis();
1067-
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
1065+
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
10681066
serverToPartitionRanges.entrySet().stream()
10691067
.forEach(
10701068
entry -> {
@@ -1095,7 +1093,7 @@ protected void registerShuffleServers(
10951093
}
10961094
LOG.info("Start to register shuffleId[{}]", shuffleId);
10971095
long start = System.currentTimeMillis();
1098-
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
1096+
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
10991097
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
11001098
serverToPartitionRanges.entrySet();
11011099
entries.stream()
@@ -1141,15 +1139,4 @@ public boolean isRssStageRetryForFetchFailureEnabled() {
11411139
public SparkConf getSparkConf() {
11421140
return sparkConf;
11431141
}
1144-
1145-
public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
1146-
Map<String, String> map = new HashMap<>();
1147-
1148-
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
1149-
String key = tuple._1;
1150-
map.put(key, tuple._2);
1151-
}
1152-
1153-
return map;
1154-
}
11551142
}

client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ private boolean tryAccessCluster() {
123123
Map<String, String> extraProperties = Maps.newHashMap();
124124
extraProperties.put(
125125
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));
126+
extraProperties.putAll(RssSparkConfig.sparkConfToMap(sparkConf));
126127

127128
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
128129
try {

0 commit comments

Comments
 (0)