Skip to content

Commit 47e5d17

Browse files
zustonJunfan Zhang
andauthored
[#2362] feat(client): Add support of zstd parallel compression (#2363)
### What changes were proposed in this pull request? Add support of zstd parallel compression ### Why are the changes needed? for #2362 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests --------- Co-authored-by: Junfan Zhang <zhangjunfan@qiyi.com>
1 parent e5cfc4a commit 47e5d17

File tree

4 files changed

+28
-5
lines changed

4 files changed

+28
-5
lines changed

common/src/main/java/org/apache/uniffle/common/compression/Codec.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.uniffle.common.config.RssConf;
2424

2525
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
26-
import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
2726

2827
public abstract class Codec {
2928

@@ -33,7 +32,7 @@ public static Optional<Codec> newInstance(RssConf rssConf) {
3332
case NONE:
3433
return Optional.empty();
3534
case ZSTD:
36-
return Optional.of(ZstdCodec.getInstance(rssConf.get(ZSTD_COMPRESSION_LEVEL)));
35+
return Optional.of(ZstdCodec.getInstance(rssConf));
3736
case SNAPPY:
3837
return Optional.of(SnappyCodec.getInstance());
3938
case NOOP:

common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,29 @@
2020
import java.nio.ByteBuffer;
2121

2222
import com.github.luben.zstd.Zstd;
23+
import com.github.luben.zstd.ZstdCompressCtx;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

27+
import org.apache.uniffle.common.config.RssConf;
2628
import org.apache.uniffle.common.exception.RssException;
2729

30+
import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
31+
import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_WORKER_NUMBER;
32+
2833
public class ZstdCodec extends Codec {
2934
private static final Logger LOGGER = LoggerFactory.getLogger(ZstdCodec.class);
3035

3136
private int compressionLevel;
37+
private int workerNumber;
3238

3339
private static class LazyHolder {
3440
static final ZstdCodec INSTANCE = new ZstdCodec();
3541
}
3642

37-
public static ZstdCodec getInstance(int level) {
38-
LazyHolder.INSTANCE.compressionLevel = level;
43+
public static ZstdCodec getInstance(RssConf conf) {
44+
LazyHolder.INSTANCE.compressionLevel = conf.get(ZSTD_COMPRESSION_LEVEL);
45+
LazyHolder.INSTANCE.workerNumber = conf.get(ZSTD_COMPRESSION_WORKER_NUMBER);
3946
return LazyHolder.INSTANCE;
4047
}
4148

@@ -69,7 +76,16 @@ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int
6976

7077
@Override
7178
public byte[] compress(byte[] src) {
72-
return Zstd.compress(src, compressionLevel);
79+
ZstdCompressCtx ctx = new ZstdCompressCtx();
80+
try {
81+
ctx.setLevel(compressionLevel);
82+
if (workerNumber > 0) {
83+
ctx.setWorkers(workerNumber);
84+
}
85+
return ctx.compress(src);
86+
} finally {
87+
ctx.close();
88+
}
7389
}
7490

7591
@Override

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public class RssClientConf {
5252
.defaultValue(3)
5353
.withDescription("The zstd compression level, the default level is 3");
5454

55+
public static final ConfigOption<Integer> ZSTD_COMPRESSION_WORKER_NUMBER =
56+
ConfigOptions.key("rss.client.io.compression.zstd.workerNumber")
57+
.intType()
58+
.defaultValue(-1)
59+
.withDescription(
60+
"Set the parallel compression worker number. This will not enabled by default");
61+
5562
public static final ConfigOption<ShuffleDataDistributionType> DATA_DISTRIBUTION_TYPE =
5663
ConfigOptions.key("rss.client.shuffle.data.distribution.type")
5764
.enumType(ShuffleDataDistributionType.class)

docs/client_guide/client_guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ The important configuration of client is listed as following. These configuratio
4949
| <client_type>.rss.client.assignment.shuffle.nodes.max | -1 | The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default |
5050
| <client_type>.rss.client.io.compression.codec | lz4 | The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`. |
5151
| <client_type>.rss.client.io.compression.zstd.level | 3 | The zstd compression level, the default level is 3 |
52+
| <client_type>.rss.client.io.compression.zstd.workerNumber | -1 | Set zstd parallel compression worker number. This will not enabled by default |
5253
| <client_type>.rss.client.shuffle.data.distribution.type | NORMAL | The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x |
5354
| <client_type>.rss.estimate.task.concurrency.dynamic.factor | 1.0 | Between 0 and 1, used to estimate task concurrency, when the client is spark, it represents how likely is this part of the resource between spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it represents how likely the resources of map and reduce are satisfied. Effective when <client_type>.rss.estimate.server.assignment.enabled=true or Coordinator's rss.coordinator.select.partition.strategy is CONTINUOUS. |
5455
| <client_type>.rss.estimate.server.assignment.enabled | false | Support mr and spark, whether to enable estimation of the number of ShuffleServers that need to be allocated based on the number of concurrent tasks. |

0 commit comments

Comments
 (0)