Skip to content

Commit 2d3852f

Browse files
committed
configurable uncompressed buffer
1 parent d4b12c7 commit 2d3852f

File tree

6 files changed

+58
-17
lines changed

6 files changed

+58
-17
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.clickhouse.client.api.insert.InsertSettings;
1818
import com.clickhouse.client.api.insert.POJOSerializer;
1919
import com.clickhouse.client.api.insert.SerializerNotFoundException;
20+
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
2021
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
2122
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
2223
import com.clickhouse.client.api.internal.HttpAPIClientHelper;
@@ -410,11 +411,32 @@ public Builder compressClientRequest(boolean enabled) {
410411
return this;
411412
}
412413

414+
/**
415+
* Configures the client to use HTTP compression. In this case compression is controlled by
416+
* http headers. Client compression will set {@code Content-Encoding: lz4} header and server
417+
* compression will set {@code Accept-Encoding: lz4} header.
418+
*
419+
* @param enabled - indicates if http compression is enabled
420+
* @return
421+
*/
413422
public Builder useHttpCompression(boolean enabled) {
414423
this.configuration.put("client.use_http_compression", String.valueOf(enabled));
415424
return this;
416425
}
417426

427+
/**
428+
* Sets buffer size for uncompressed data in LZ4 compression.
429+
* For outgoing data it is the size of a buffer that will be compressed.
430+
* For incoming data it is the size of a buffer that will be decompressed.
431+
*
432+
* @param size - size of the buffer in bytes
433+
* @return
434+
*/
435+
public Builder setLZ4UncompressedBufferSize(int size) {
436+
this.configuration.put("compression.lz4.uncompressed_buffer_size", String.valueOf(size));
437+
return this;
438+
}
439+
418440
/**
419441
* Sets the default database name that will be used by operations if not specified.
420442
* @param database - actual default database name.
@@ -573,6 +595,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
573595
String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue()));
574596
}
575597

598+
if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) {
599+
userConfig.put("compression.lz4.uncompressed_buffer_size",
600+
String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE));
601+
}
576602
return userConfig;
577603
}
578604
}

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.io.EOFException;
1212
import java.io.IOException;
1313
import java.io.InputStream;
14-
import java.io.StreamCorruptedException;
1514
import java.nio.ByteBuffer;
1615

1716
public class ClickHouseLZ4InputStream extends InputStream {
@@ -26,11 +25,11 @@ public class ClickHouseLZ4InputStream extends InputStream {
2625
private byte[] tmpBuffer = new byte[1];
2726

2827

29-
public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor) {
28+
public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor, int bufferSize) {
3029
super();
3130
this.decompressor = decompressor;
3231
this.in = in;
33-
this.buffer = ByteBuffer.allocate(8192);
32+
this.buffer = ByteBuffer.allocate(bufferSize);
3433
this.buffer.limit(0);
3534
}
3635

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
package com.clickhouse.client.api.internal;
22

3-
import com.clickhouse.data.ClickHouseByteUtils;
43
import com.clickhouse.data.ClickHouseCityHash;
5-
import com.clickhouse.data.stream.Lz4InputStream;
64
import net.jpountz.lz4.LZ4Compressor;
7-
import net.jpountz.lz4.LZ4FastDecompressor;
85

96
import java.io.IOException;
107
import java.io.OutputStream;
118
import java.nio.ByteBuffer;
12-
import java.nio.MappedByteBuffer;
139

1410
public class ClickHouseLZ4OutputStream extends OutputStream {
1511

12+
public static final int UNCOMPRESSED_BUFF_SIZE = 8192;
13+
1614
private final ByteBuffer buffer;
1715

1816
private final OutputStream out;
@@ -26,9 +24,9 @@ public class ClickHouseLZ4OutputStream extends OutputStream {
2624
private static int HEADER_LEN = 15; // 9 bytes for header, 6 bytes for checksum
2725

2826

29-
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor) {
27+
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
3028
super();
31-
this.buffer = ByteBuffer.allocate(8192);
29+
this.buffer = ByteBuffer.allocate(bufferSize);
3230
this.out = out;
3331
this.compressor = compressor;
3432
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ private HttpEntity wrapEntity(HttpEntity httpEntity) {
329329
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
330330
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
331331
if (serverCompression || clientCompression) {
332-
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression);
332+
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression,
333+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"));
333334
} else {
334335
return httpEntity;
335336
}

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@ class LZ4Entity implements HttpEntity {
1717

1818
private HttpEntity httpEntity;
1919

20-
private boolean useHttpCompression;
21-
private boolean serverCompression;
22-
private boolean clientCompression;
20+
private final boolean useHttpCompression;
21+
private final boolean serverCompression;
22+
private final boolean clientCompression;
2323

24-
25-
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression) {
24+
private final int bufferSize;
25+
26+
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression,
27+
int bufferSize) {
2628
this.httpEntity = httpEntity;
2729
this.useHttpCompression = useHttpCompression;
2830
this.serverCompression = serverCompression;
2931
this.clientCompression = clientCompression;
32+
this.bufferSize = bufferSize;
3033
}
3134

3235
@Override
@@ -48,7 +51,8 @@ public InputStream getContent() throws IOException, UnsupportedOperationExceptio
4851
return content;
4952
}
5053
} else if (serverCompression) {
51-
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor());
54+
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
55+
bufferSize);
5256
} else {
5357
return httpEntity.getContent();
5458
}
@@ -59,7 +63,8 @@ public void writeTo(OutputStream outStream) throws IOException {
5963
if (clientCompression && useHttpCompression) {
6064
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
6165
} else if (clientCompression) {
62-
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor()));
66+
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
67+
bufferSize));
6368
} else {
6469
httpEntity.writeTo(outStream);
6570
}

client-v2/src/main/java/com/clickhouse/client/api/internal/MapUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,16 @@ public static void applyInt(Map<String, String> map, String key, Consumer<Intege
2929
}
3030
}
3131
}
32+
33+
public static int getInt(Map<String, String> map, String key) {
34+
String val = map.get(key);
35+
if (val != null) {
36+
try {
37+
return Integer.parseInt(val);
38+
} catch (NumberFormatException e) {
39+
throw new RuntimeException("Invalid value for key " + key + ": " + val, e);
40+
}
41+
}
42+
return 0;
43+
}
3244
}

0 commit comments

Comments
 (0)