Skip to content

Commit 0d6fa7b

Browse files
committed
Merge branch 'main' into perf_deserializer_tests
2 parents 858876c + e363e40 commit 0d6fa7b

File tree

1 file changed

+27
-16
lines changed

1 file changed

+27
-16
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
public class ClickHouseLZ4OutputStream extends OutputStream {
1111

12-
public static final int UNCOMPRESSED_BUFF_SIZE = 8192;
12+
public static final int UNCOMPRESSED_BUFF_SIZE = 64 * 1024; // 64K is most optimal for LZ4 compression
1313

14-
private final ByteBuffer buffer;
14+
private final ByteBuffer inBuffer;
1515

1616
private final OutputStream out;
1717

@@ -26,16 +26,27 @@ public class ClickHouseLZ4OutputStream extends OutputStream {
2626

2727
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
2828
super();
29-
this.buffer = ByteBuffer.allocate(bufferSize);
29+
this.inBuffer = ByteBuffer.allocate(bufferSize);
3030
this.out = out;
3131
this.compressor = compressor;
32-
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);
32+
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(inBuffer.capacity()) + HEADER_LEN);
3333
}
3434

3535
@Override
3636
public void write(int b) throws IOException {
37-
tmpBuffer[0] = (byte) b;
38-
write(tmpBuffer, 0, 1);
37+
if (inBuffer.remaining() == 0) {
38+
flush();
39+
}
40+
inBuffer.put((byte) b);
41+
}
42+
43+
@Override
44+
public void write(byte[] b) throws IOException {
45+
if (b.length == 1) {
46+
write(b[0]);
47+
} else {
48+
write(b, 0, b.length);
49+
}
3950
}
4051

4152
@Override
@@ -54,23 +65,23 @@ public void write( byte[] b, int off, int len) throws IOException {
5465

5566
int writtenBytes = 0;
5667
do {
57-
int remaining = Math.min(len - writtenBytes, buffer.remaining());
58-
buffer.put(b, off + writtenBytes, remaining);
59-
writtenBytes += remaining;
60-
if (buffer.remaining() == 0) {
61-
flush();
68+
if (inBuffer.remaining() == 0) {
69+
flush(); // flush will make inBuffer clear
6270
}
71+
int remaining = Math.min(len - writtenBytes, inBuffer.remaining());
72+
inBuffer.put(b, off + writtenBytes, remaining);
73+
writtenBytes += remaining;
6374
} while (writtenBytes < len);
6475
}
6576

6677
@Override
6778
public void flush() throws IOException {
68-
if (buffer.position() > 0) {
79+
if (inBuffer.position() > 0) {
6980
compressedBuffer.clear();
7081
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
71-
int uncompressedLen = buffer.position();
72-
buffer.flip();
73-
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
82+
int uncompressedLen = inBuffer.position();
83+
inBuffer.flip();
84+
int compressed = compressor.compress(inBuffer, 0, uncompressedLen, compressedBuffer, 25,
7485
compressedBuffer.remaining() - 25);
7586
int compressedSizeWithHeader = compressed + 9;
7687
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
@@ -80,7 +91,7 @@ public void flush() throws IOException {
8091
setInt64(compressedBuffer.array(), 8, hash[1]);
8192
compressedBuffer.flip();
8293
out.write(compressedBuffer.array(), 0, compressed + 25);
83-
buffer.clear();
94+
inBuffer.clear();
8495
}
8596
}
8697

0 commit comments

Comments
 (0)