Skip to content

Commit e3710c9

Browse files
committed
fixed LZ4OutputStream
1 parent bd05742 commit e3710c9

File tree

3 files changed

+36
-24
lines changed

3 files changed

+36
-24
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
public class ClickHouseLZ4OutputStream extends OutputStream {
1111

12-
public static final int UNCOMPRESSED_BUFF_SIZE = 8192;
12+
public static final int UNCOMPRESSED_BUFF_SIZE = 64 * 1024; // 64K is most optimal for LZ4 compression
1313

14-
private final ByteBuffer buffer;
14+
private final ByteBuffer inBuffer;
1515

1616
private final OutputStream out;
1717

@@ -26,16 +26,27 @@ public class ClickHouseLZ4OutputStream extends OutputStream {
2626

2727
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
2828
super();
29-
this.buffer = ByteBuffer.allocate(bufferSize);
29+
this.inBuffer = ByteBuffer.allocate(bufferSize);
3030
this.out = out;
3131
this.compressor = compressor;
32-
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);
32+
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(inBuffer.capacity()) + HEADER_LEN);
3333
}
3434

3535
@Override
3636
public void write(int b) throws IOException {
37-
tmpBuffer[0] = (byte) b;
38-
write(tmpBuffer, 0, 1);
37+
if (inBuffer.remaining() == 0) {
38+
flush();
39+
}
40+
inBuffer.put((byte) b);
41+
}
42+
43+
@Override
44+
public void write(byte[] b) throws IOException {
45+
if (b.length == 1) {
46+
write(b[0]);
47+
} else {
48+
write(b, 0, b.length);
49+
}
3950
}
4051

4152
@Override
@@ -54,23 +65,23 @@ public void write( byte[] b, int off, int len) throws IOException {
5465

5566
int writtenBytes = 0;
5667
do {
57-
int remaining = Math.min(len - writtenBytes, buffer.remaining());
58-
buffer.put(b, off + writtenBytes, remaining);
59-
writtenBytes += remaining;
60-
if (buffer.remaining() == 0) {
61-
flush();
68+
if (inBuffer.remaining() == 0) {
69+
flush(); // flush will make inBuffer clear
6270
}
71+
int remaining = Math.min(len - writtenBytes, inBuffer.remaining());
72+
inBuffer.put(b, off + writtenBytes, remaining);
73+
writtenBytes += remaining;
6374
} while (writtenBytes < len);
6475
}
6576

6677
@Override
6778
public void flush() throws IOException {
68-
if (buffer.position() > 0) {
79+
if (inBuffer.position() > 0) {
6980
compressedBuffer.clear();
7081
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
71-
int uncompressedLen = buffer.position();
72-
buffer.flip();
73-
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
82+
int uncompressedLen = inBuffer.position();
83+
inBuffer.flip();
84+
int compressed = compressor.compress(inBuffer, 0, uncompressedLen, compressedBuffer, 25,
7485
compressedBuffer.remaining() - 25);
7586
int compressedSizeWithHeader = compressed + 9;
7687
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
@@ -80,7 +91,7 @@ public void flush() throws IOException {
8091
setInt64(compressedBuffer.array(), 8, hash[1]);
8192
compressedBuffer.flip();
8293
out.write(compressedBuffer.array(), 0, compressed + 25);
83-
buffer.clear();
94+
inBuffer.clear();
8495
}
8596
}
8697

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,18 @@ public static void main(String[] args) throws Exception {
3232
Options opt = new OptionsBuilder()
3333
.include(QueryClient.class.getSimpleName())
3434
.include(InsertClient.class.getSimpleName())
35-
// .include(Components.class.getSimpleName())
35+
.include(Components.class.getSimpleName())
3636
.forks(1) // must be a fork. No fork only for debugging
3737
.mode(Mode.SampleTime)
3838
.timeUnit(TimeUnit.MILLISECONDS)
3939
.threads(1)
4040
.addProfiler(GCProfiler.class)
4141
.addProfiler(MemPoolProfiler.class)
42-
.warmupIterations(3)
42+
.warmupIterations(0)
4343
.warmupTime(TimeValue.seconds(10))
44-
.measurementIterations(10)
44+
.measurementIterations(5)
4545
.jvmArgs("-Xms8g", "-Xmx8g")
46-
.measurementTime(TimeValue.seconds(isCloud() ? 30 : 10))
46+
.measurementTime(TimeValue.seconds(30))
4747
.resultFormat(ResultFormatType.JSON)
4848
.output(String.format("jmh-results-%s-%s.out", isCloud() ? "cloud" : "local", System.currentTimeMillis()))
4949
.result(String.format("jmh-results-%s-%s.json", isCloud() ? "cloud" : "local", System.currentTimeMillis()))

performance/src/test/com/clickhouse/benchmark/clients/Components.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
public class Components extends BenchmarkBase {
2626
private static final Logger LOGGER = LoggerFactory.getLogger(Components.class);
2727

28+
static final int COMPRESS_BUFFER_SIZE = 64 * 1024; // 64K
2829
@Benchmark
2930
public void CompressingOutputStreamV1(DataState dataState) {
3031
DataSet dataSet = dataState.dataSet;
3132
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ClickHouseOutputStream out =
32-
new Lz4OutputStream(baos, 8196, null)) {
33+
new Lz4OutputStream(baos, COMPRESS_BUFFER_SIZE, null)) {
3334
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
3435
out.write(bytes);
3536
}
@@ -45,7 +46,7 @@ public void CompressingOutputStreamV2(DataState dataState) {
4546
DataSet dataSet = dataState.dataSet;
4647
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
4748
ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(baos,
48-
factory.fastCompressor(), 8196)) {
49+
factory.fastCompressor(), COMPRESS_BUFFER_SIZE)) {
4950
for (byte[] bytes : dataSet.getBytesList(dataSet.getFormat())) {
5051
out.write(bytes);
5152
}
@@ -83,7 +84,7 @@ public void close() {
8384
}
8485
};
8586
}
86-
@Benchmark
87+
// @Benchmark
8788
public void SerializerOutputStreamV1(DataState dataState) {
8889
OutputStream empty = createEmptyOutputStream();
8990
try {
@@ -101,7 +102,7 @@ public void SerializerOutputStreamV1(DataState dataState) {
101102
}
102103
}
103104

104-
@Benchmark
105+
// @Benchmark
105106
public void SerializerOutputStreamV2(DataState dataState) {
106107
OutputStream empty = createEmptyOutputStream();
107108
try {

0 commit comments

Comments
 (0)