Skip to content

Commit 3cf82d7

Browse files
authored
[#2219] feat: Introduce ShuffleBlockIdManagerFactory and PartitionedShuffleBlockIdManager (#2227)
### What changes were proposed in this pull request? - Introduce a ShuffleBlockIdManagerFactory to create configured implementation. - Introduce PartitionedShuffleBlockIdManager to individual bitmap for each partition. ### Why are the changes needed? Fix: #2219 <img width="1596" alt="image" src="https://github.com/user-attachments/assets/c7752f94-941b-45d2-9a45-1da197e38984"> This is the maximum partition num with node metrics in our cluster, it is ~410K, it cost heap size 410K * 50KiB = 2GiB if we say a bitmap related a partition avg 10000 blocks(cost 50KiB heap). I think this is worth to choose this new policy in our production cluster. ### Does this PR introduce _any_ user-facing change? - Config option: rss.server.blockIdStrategyClass ### How was this patch tested? Test Locally, check result by `arthas` - Common case, test with default config ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @DefaultShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@3f211bae], partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_431031_1730297736664") ' @ConcurrentHashMapForJDK8[ @integer[0]:@Roaring64NavigableMap[][isEmpty=false;size=10], @integer[1]:@Roaring64NavigableMap[][isEmpty=false;size=10], @integer[2]:@Roaring64NavigableMap[][isEmpty=false;size=10], ] ``` - Config to PartitionedShuffleBlockIdManager ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @PartitionedShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@6aa6fea0], partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds' @ConcurrentHashMap[ @string[application_1729845342052_432324_1730299397916]:@ConcurrentHashMapForJDK8[isEmpty=false;size=2], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager.partitionsToBlockIds.get("application_1729845342052_432324_1730299397916")' @ConcurrentHashMapForJDK8[ @integer[0]:@ConcurrentHashMap[isEmpty=false;size=2000], @integer[1]:@ConcurrentHashMap[isEmpty=false;size=2000], ] ``` - Use client app level config submit two app with `spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.DefaultShuffleBlockIdManager` and `spark.rss.client.blockIdStrategyClass=org.apache.uniffle.server.block.PartitionedShuffleBlockIdManager` ``` [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[0].shuffleBlockIdManager' @DefaultShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@ed3b52a], partitionsToBlockIds=@ConcurrentHashMapForJDK8[isEmpty=false;size=1], ] [arthas@264]$ vmtool --action getInstances --className org.apache.uniffle.server.ShuffleTaskInfo --express 'instances[1].shuffleBlockIdManager' @PartitionedShuffleBlockIdManager[ LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@5fc14628], partitionsToBlockIds=@ConcurrentHashMap[isEmpty=false;size=1], ] ``` Log example ``` [2024-10-30 23:01:37.585] [Grpc-709] [INFO] ShuffleTaskInfo - application_1729845342052_432387_1730300440568 use app configured ShuffleBlockIdManager to org.apache.uniffle.server.block.DefaultShuffleBlockIdManager@3c6b75a8 ```
1 parent 4eebbd4 commit 3cf82d7

File tree

13 files changed

+590
-142
lines changed

13 files changed

+590
-142
lines changed

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,4 +295,12 @@ public class RssClientConf {
295295
.asList()
296296
.noDefaultValue()
297297
.withDescription("the extra java properties could be configured by this option");
298+
299+
public static final ConfigOption<String> RSS_CLIENT_BLOCK_ID_MANAGER_CLASS =
300+
ConfigOptions.key("rss.client.blockIdManagerClass")
301+
.stringType()
302+
.noDefaultValue()
303+
.withDescription(
304+
"The block id manager class of server for this application, "
305+
+ "the implementation of this interface to manage the shuffle block ids");
298306
}

docs/client_guide/client_guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ The important configuration of client is listed as following. These configuratio
5959
| <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
6060
| <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
6161
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
62+
| <client_type>.rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids |
6263

6364
Notice:
6465

docs/server_guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
121121
| rss.storage.localFileWriterClass | org.apache.uniffle.storage.handler.impl.LocalFileWriter | The writer class to write shuffle data for LOCALFILE. |
122122
| rss.storage.hdfs.write.dataBufferSize | 8K | The size of the buffer used to cache data written for HDFS. |
123123
| rss.storage.hdfs.write.indexBufferSize | 8K | The size of the buffer used to cache index written for HDFS. |
124+
| rss.server.blockIdManagerClass | org.apache.uniffle.server.block.DefaultShuffleBlockIdManager | The block id manager class. It is used to manage block id. |
124125

125126
### Advanced Configurations
126127
| Property Name | Default | Description |

integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,14 +352,13 @@ public void shuffleResultTest() throws Exception {
352352
request = new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, partitionToBlockIds, 3);
353353
grpcShuffleServerClient.reportShuffleResult(request);
354354
// validate bitmap in shuffleTaskManager
355-
Roaring64NavigableMap[] bitmaps =
355+
long bitmapNum =
356356
grpcShuffleServers
357357
.get(0)
358358
.getShuffleTaskManager()
359-
.getPartitionsToBlockIds()
360-
.get("shuffleResultTest")
361-
.get(2);
362-
assertEquals(3, bitmaps.length);
359+
.getShuffleBlockIdManager()
360+
.getBitmapNum("shuffleResultTest", 2);
361+
assertEquals(3, bitmapNum);
363362

364363
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1, layout);
365364
result = grpcShuffleServerClient.getShuffleResult(req);

integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,18 @@ private void localWriteReadTest(boolean isNettyMode) throws Exception {
179179

180180
List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers : grpcShuffleServers;
181181
assertNotNull(
182-
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
182+
shuffleServers
183+
.get(0)
184+
.getShuffleTaskManager()
185+
.getShuffleBlockIdManager()
186+
.contains(testAppId));
183187
Thread.sleep(8000);
184188
assertNull(
185-
shuffleServers.get(0).getShuffleTaskManager().getPartitionsToBlockIds().get(testAppId));
189+
shuffleServers
190+
.get(0)
191+
.getShuffleTaskManager()
192+
.getShuffleBlockIdManager()
193+
.contains(testAppId));
186194
}
187195

188196
protected void validateResult(

server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.uniffle.common.config.ConfigOptions;
2626
import org.apache.uniffle.common.config.ConfigUtils;
2727
import org.apache.uniffle.common.config.RssBaseConf;
28+
import org.apache.uniffle.server.block.DefaultShuffleBlockIdManager;
2829
import org.apache.uniffle.server.buffer.ShuffleBufferType;
2930

3031
public class ShuffleServerConf extends RssBaseConf {
@@ -743,6 +744,13 @@ public class ShuffleServerConf extends RssBaseConf {
743744
.booleanType()
744745
.defaultValue(false)
745746
.withDescription("Whether to enable app detail log");
747+
public static final ConfigOption<String> SERVER_BLOCK_ID_MANAGER_CLASS =
748+
ConfigOptions.key("rss.server.blockIdManagerClass")
749+
.stringType()
750+
.defaultValue(DefaultShuffleBlockIdManager.class.getName())
751+
.withDescription(
752+
"The block id manager class, the implementation of this interface "
753+
+ "to manage the shuffle block ids");
746754

747755
public ShuffleServerConf() {}
748756

server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@
2626
import java.util.stream.Collectors;
2727

2828
import com.google.common.collect.Sets;
29+
import org.apache.commons.lang3.StringUtils;
2930
import org.roaringbitmap.longlong.Roaring64NavigableMap;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
import org.apache.uniffle.common.PartitionInfo;
3435
import org.apache.uniffle.common.ShuffleDataDistributionType;
36+
import org.apache.uniffle.common.config.RssClientConf;
37+
import org.apache.uniffle.common.util.Constants;
3538
import org.apache.uniffle.common.util.JavaUtils;
3639
import org.apache.uniffle.common.util.UnitConverter;
40+
import org.apache.uniffle.server.block.ShuffleBlockIdManager;
41+
import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;
3742

3843
/**
3944
* ShuffleTaskInfo contains the information of submitting the shuffle, the information of the cache
@@ -78,6 +83,7 @@ public class ShuffleTaskInfo {
7883

7984
private final Map<Integer, Integer> latestStageAttemptNumbers;
8085
private Map<String, String> properties;
86+
private ShuffleBlockIdManager shuffleBlockIdManager;
8187

8288
public ShuffleTaskInfo(String appId) {
8389
this.appId = appId;
@@ -324,6 +330,32 @@ public void setProperties(Map<String, String> properties) {
324330
.filter(entry -> entry.getKey().contains(".rss."))
325331
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
326332
this.properties = filteredProperties;
327-
LOGGER.info("{} set properties to {}", appId, properties);
333+
LOGGER.info("{} set properties to {}", appId, filteredProperties);
334+
String keyName = RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
335+
String className = properties.get(keyName);
336+
if (StringUtils.isEmpty(className)) {
337+
keyName =
338+
Constants.SPARK_RSS_CONFIG_PREFIX + RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key();
339+
className =
340+
properties.get(
341+
Constants.SPARK_RSS_CONFIG_PREFIX
342+
+ RssClientConf.RSS_CLIENT_BLOCK_ID_MANAGER_CLASS.key());
343+
}
344+
if (StringUtils.isNotEmpty(className)) {
345+
shuffleBlockIdManager =
346+
ShuffleBlockIdManagerFactory.createShuffleBlockIdManager(className, keyName);
347+
LOGGER.info(
348+
"{} use app configured ShuffleBlockIdManager to {}", appId, shuffleBlockIdManager);
349+
}
350+
}
351+
352+
public ShuffleBlockIdManager getShuffleBlockIdManager() {
353+
return shuffleBlockIdManager;
354+
}
355+
356+
public void setShuffleBlockIdManagerIfNeeded(ShuffleBlockIdManager shuffleBlockIdManager) {
357+
if (this.shuffleBlockIdManager == null) {
358+
this.shuffleBlockIdManager = shuffleBlockIdManager;
359+
}
328360
}
329361
}

0 commit comments

Comments
 (0)