Skip to content

Commit 63b9a78

Browse files
author
Junfan Zhang
committed
reuse the ctx
1 parent fce4cc5 commit 63b9a78

File tree

5 files changed

+41
-7
lines changed

5 files changed

+41
-7
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.shuffle.writer;
1919

20+
import java.io.Closeable;
21+
import java.io.IOException;
2022
import java.util.ArrayList;
2123
import java.util.Collections;
2224
import java.util.Comparator;
@@ -55,7 +57,7 @@
5557
import org.apache.uniffle.common.util.BlockIdLayout;
5658
import org.apache.uniffle.common.util.ChecksumUtils;
5759

58-
public class WriteBufferManager extends MemoryConsumer {
60+
public class WriteBufferManager extends MemoryConsumer implements Closeable {
5961

6062
private static final Logger LOG = LoggerFactory.getLogger(WriteBufferManager.class);
6163
private int bufferSize;
@@ -674,4 +676,11 @@ public void setPartitionAssignmentRetrieveFunc(
674676
Function<Integer, List<ShuffleServerInfo>> partitionAssignmentRetrieveFunc) {
675677
this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
676678
}
679+
680+
@Override
681+
public void close() throws IOException {
682+
if (codec.isPresent()) {
683+
codec.get().close();
684+
}
685+
}
677686
}

client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,11 @@ public Option<MapStatus> stop(boolean success) {
514514
// free all memory & metadata, or memory leak happen in executor
515515
if (bufferManager != null) {
516516
bufferManager.freeAllMemory();
517+
try {
518+
bufferManager.close();
519+
} catch (Exception e) {
520+
LOG.warn("Errors on closing the buffer manager", e);
521+
}
517522
}
518523
if (shuffleManager != null) {
519524
shuffleManager.clearTaskMeta(taskId);

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,11 @@ public Option<MapStatus> stop(boolean success) {
860860
// free all memory & metadata, or memory leak happen in executor
861861
if (bufferManager != null) {
862862
bufferManager.freeAllMemory();
863+
try {
864+
bufferManager.close();
865+
} catch (Exception e) {
866+
LOG.warn("Errors on closing the buffer manager", e);
867+
}
863868
}
864869
if (shuffleManager != null) {
865870
shuffleManager.clearTaskMeta(taskId);

common/src/main/java/org/apache/uniffle/common/compression/Codec.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.uniffle.common.compression;
1919

20+
import java.io.Closeable;
21+
import java.io.IOException;
2022
import java.nio.ByteBuffer;
2123
import java.util.Optional;
2224

2325
import org.apache.uniffle.common.config.RssConf;
2426

2527
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
2628

27-
public abstract class Codec {
29+
public abstract class Codec implements Closeable {
2830

2931
public static Optional<Codec> newInstance(RssConf rssConf) {
3032
Type type = rssConf.get(COMPRESSION_TYPE);
@@ -76,4 +78,9 @@ public enum Type {
7678
SNAPPY,
7779
NONE,
7880
}
81+
82+
@Override
83+
public void close() throws IOException {
84+
// ignore
85+
}
7986
}

common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.uniffle.common.compression;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122

2223
import com.github.luben.zstd.Zstd;
@@ -35,6 +36,7 @@ public class ZstdCodec extends Codec {
3536

3637
private int compressionLevel;
3738
private int workerNumber;
39+
private ZstdCompressCtx zstdCompressCtx;
3840

3941
private static class LazyHolder {
4042
static final ZstdCodec INSTANCE = new ZstdCodec();
@@ -76,16 +78,15 @@ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int
7678

7779
@Override
7880
public byte[] compress(byte[] src) {
79-
ZstdCompressCtx ctx = new ZstdCompressCtx();
80-
try {
81+
if (this.zstdCompressCtx == null) {
82+
ZstdCompressCtx ctx = new ZstdCompressCtx();
8183
ctx.setLevel(compressionLevel);
8284
if (workerNumber > 0) {
8385
ctx.setWorkers(workerNumber);
8486
}
85-
return ctx.compress(src);
86-
} finally {
87-
ctx.close();
87+
this.zstdCompressCtx = ctx;
8888
}
89+
return this.zstdCompressCtx.compress(src);
8990
}
9091

9192
@Override
@@ -119,4 +120,11 @@ public int compress(ByteBuffer src, ByteBuffer dest) {
119120
public int maxCompressedLength(int sourceLength) {
120121
return (int) Zstd.compressBound(sourceLength);
121122
}
123+
124+
@Override
125+
public void close() throws IOException {
126+
if (this.zstdCompressCtx != null) {
127+
zstdCompressCtx.close();
128+
}
129+
}
122130
}

0 commit comments

Comments
 (0)