Skip to content

Commit e363e40

Browse files
authored
Merge pull request #2225 from ClickHouse/fix_compress_stream
[perf] fixed LZ4OutputStream
2 parents 50bfa00 + 07c1787 commit e363e40

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
public class ClickHouseLZ4OutputStream extends OutputStream {
1111

12-
public static final int UNCOMPRESSED_BUFF_SIZE = 8192;
12+
public static final int UNCOMPRESSED_BUFF_SIZE = 64 * 1024; // 64K is most optimal for LZ4 compression
1313

14-
private final ByteBuffer buffer;
14+
private final ByteBuffer inBuffer;
1515

1616
private final OutputStream out;
1717

@@ -26,16 +26,27 @@ public class ClickHouseLZ4OutputStream extends OutputStream {
2626

2727
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
2828
super();
29-
this.buffer = ByteBuffer.allocate(bufferSize);
29+
this.inBuffer = ByteBuffer.allocate(bufferSize);
3030
this.out = out;
3131
this.compressor = compressor;
32-
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);
32+
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(inBuffer.capacity()) + HEADER_LEN);
3333
}
3434

3535
@Override
3636
public void write(int b) throws IOException {
37-
tmpBuffer[0] = (byte) b;
38-
write(tmpBuffer, 0, 1);
37+
if (inBuffer.remaining() == 0) {
38+
flush();
39+
}
40+
inBuffer.put((byte) b);
41+
}
42+
43+
@Override
44+
public void write(byte[] b) throws IOException {
45+
if (b.length == 1) {
46+
write(b[0]);
47+
} else {
48+
write(b, 0, b.length);
49+
}
3950
}
4051

4152
@Override
@@ -54,23 +65,23 @@ public void write( byte[] b, int off, int len) throws IOException {
5465

5566
int writtenBytes = 0;
5667
do {
57-
int remaining = Math.min(len - writtenBytes, buffer.remaining());
58-
buffer.put(b, off + writtenBytes, remaining);
59-
writtenBytes += remaining;
60-
if (buffer.remaining() == 0) {
61-
flush();
68+
if (inBuffer.remaining() == 0) {
69+
flush(); // flush will make inBuffer clear
6270
}
71+
int remaining = Math.min(len - writtenBytes, inBuffer.remaining());
72+
inBuffer.put(b, off + writtenBytes, remaining);
73+
writtenBytes += remaining;
6374
} while (writtenBytes < len);
6475
}
6576

6677
@Override
6778
public void flush() throws IOException {
68-
if (buffer.position() > 0) {
79+
if (inBuffer.position() > 0) {
6980
compressedBuffer.clear();
7081
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
71-
int uncompressedLen = buffer.position();
72-
buffer.flip();
73-
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
82+
int uncompressedLen = inBuffer.position();
83+
inBuffer.flip();
84+
int compressed = compressor.compress(inBuffer, 0, uncompressedLen, compressedBuffer, 25,
7485
compressedBuffer.remaining() - 25);
7586
int compressedSizeWithHeader = compressed + 9;
7687
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
@@ -80,7 +91,7 @@ public void flush() throws IOException {
8091
setInt64(compressedBuffer.array(), 8, hash[1]);
8192
compressedBuffer.flip();
8293
out.write(compressedBuffer.array(), 0, compressed + 25);
83-
buffer.clear();
94+
inBuffer.clear();
8495
}
8596
}
8697

performance/src/test/com/clickhouse/benchmark/clients/Components.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
public class Components extends BenchmarkBase {
2626
private static final Logger LOGGER = LoggerFactory.getLogger(Components.class);
2727

28+
static final int COMPRESS_BUFFER_SIZE = 64 * 1024; // 64K
2829
@Benchmark
2930
public void CompressingOutputStreamV1(DataState dataState) {
3031
DataSet dataSet = dataState.dataSet;
3132
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ClickHouseOutputStream out =
32-
new Lz4OutputStream(baos, 8196, null)) {
33+
new Lz4OutputStream(baos, COMPRESS_BUFFER_SIZE, null)) {
3334
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
3435
out.write(bytes);
3536
}
@@ -45,7 +46,7 @@ public void CompressingOutputStreamV2(DataState dataState) {
4546
DataSet dataSet = dataState.dataSet;
4647
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
4748
ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(baos,
48-
factory.fastCompressor(), 8196)) {
49+
factory.fastCompressor(), COMPRESS_BUFFER_SIZE)) {
4950
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
5051
out.write(bytes);
5152
}

0 commit comments

Comments
 (0)