Skip to content

Commit 6c3a81b

Browse files
committed
Merge branch 'main' into feat_clientv2_timezone
2 parents b7db65f + 3a82665 commit 6c3a81b

15 files changed

+578
-9
lines changed

client-v2/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
<artifactId>lz4-pure-java</artifactId>
4848
<optional>true</optional>
4949
</dependency>
50+
<dependency>
51+
<groupId>${project.groupId}</groupId>
52+
<artifactId>org.apache.commons.compress</artifactId>
53+
<version>${repackaged.version}</version>
54+
</dependency>
5055
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
5156
<dependency>
5257
<groupId>com.fasterxml.jackson.core</groupId>

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.clickhouse.client.api.insert.InsertSettings;
1818
import com.clickhouse.client.api.insert.POJOSerializer;
1919
import com.clickhouse.client.api.insert.SerializerNotFoundException;
20+
import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream;
2021
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
2122
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
2223
import com.clickhouse.client.api.internal.HttpAPIClientHelper;
@@ -35,11 +36,14 @@
3536
import com.clickhouse.client.config.ClickHouseClientOption;
3637
import com.clickhouse.client.config.ClickHouseDefaults;
3738
import com.clickhouse.client.http.ClickHouseHttpProto;
39+
import com.clickhouse.client.http.config.ClickHouseHttpOption;
3840
import com.clickhouse.data.ClickHouseColumn;
3941
import com.clickhouse.data.ClickHouseDataStreamFactory;
4042
import com.clickhouse.data.ClickHouseFormat;
4143
import com.clickhouse.data.ClickHousePipedOutputStream;
4244
import com.clickhouse.data.format.BinaryStreamUtils;
45+
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
46+
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
4347
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4448
import org.apache.hc.core5.http.ClassicHttpResponse;
4549
import org.apache.hc.core5.http.HttpStatus;
@@ -416,6 +420,32 @@ public Builder compressClientRequest(boolean enabled) {
416420
return this;
417421
}
418422

423+
/**
424+
* Configures the client to use HTTP compression. In this case compression is controlled by
425+
* http headers. Client compression will set {@code Content-Encoding: lz4} header and server
426+
* compression will set {@code Accept-Encoding: lz4} header.
427+
*
428+
* @param enabled - indicates if http compression is enabled
429+
* @return
430+
*/
431+
public Builder useHttpCompression(boolean enabled) {
432+
this.configuration.put("client.use_http_compression", String.valueOf(enabled));
433+
return this;
434+
}
435+
436+
/**
437+
* Sets buffer size for uncompressed data in LZ4 compression.
438+
* For outgoing data it is the size of a buffer that will be compressed.
439+
* For incoming data it is the size of a buffer that will be decompressed.
440+
*
441+
* @param size - size of the buffer in bytes
442+
* @return
443+
*/
444+
public Builder setLZ4UncompressedBufferSize(int size) {
445+
this.configuration.put("compression.lz4.uncompressed_buffer_size", String.valueOf(size));
446+
return this;
447+
}
448+
419449
/**
420450
* Sets the default database name that will be used by operations if not specified.
421451
* @param database - actual default database name.
@@ -637,6 +667,11 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
637667
String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue()));
638668
}
639669

670+
if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) {
671+
userConfig.put("compression.lz4.uncompressed_buffer_size",
672+
String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE));
673+
}
674+
640675
if (!userConfig.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) {
641676
userConfig.put(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "true");
642677
}
@@ -838,6 +873,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
838873
}
839874
}
840875
}
876+
out.close();
841877
})) {
842878

843879

@@ -956,7 +992,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
956992
while ((bytesRead = data.read(buffer)) != -1) {
957993
out.write(buffer, 0, bytesRead);
958994
}
959-
out.flush();
995+
out.close();
960996
})) {
961997

962998

@@ -1102,7 +1138,6 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11021138
applyDefaults(settings);
11031139

11041140
if (useNewImplementation) {
1105-
//
11061141
String retry = configuration.get(ClickHouseClientOption.RETRY.getKey());
11071142
final int maxRetries = retry == null ? (int) ClickHouseClientOption.RETRY.getDefaultValue() : Integer.parseInt(retry);
11081143

@@ -1118,7 +1153,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11181153
ClassicHttpResponse httpResponse =
11191154
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), output -> {
11201155
output.write(sqlQuery.getBytes(StandardCharsets.UTF_8));
1121-
output.flush();
1156+
output.close();
11221157
});
11231158

11241159
// Check response
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import com.clickhouse.client.api.ClientException;
4+
import com.clickhouse.data.ClickHouseByteUtils;
5+
import com.clickhouse.data.ClickHouseCityHash;
6+
import com.clickhouse.data.ClickHouseUtils;
7+
import net.jpountz.lz4.LZ4FastDecompressor;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.io.EOFException;
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.nio.ByteBuffer;
15+
16+
public class ClickHouseLZ4InputStream extends InputStream {
17+
18+
private static Logger LOG = LoggerFactory.getLogger(ClickHouseLZ4InputStream.class);
19+
private final LZ4FastDecompressor decompressor;
20+
21+
private final InputStream in;
22+
23+
private ByteBuffer buffer;
24+
25+
private byte[] tmpBuffer = new byte[1];
26+
27+
28+
public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor, int bufferSize) {
29+
super();
30+
this.decompressor = decompressor;
31+
this.in = in;
32+
this.buffer = ByteBuffer.allocate(bufferSize);
33+
this.buffer.limit(0);
34+
}
35+
36+
@Override
37+
public int read() throws IOException {
38+
int n = read(tmpBuffer, 0, 1);
39+
return n == -1 ? -1 : tmpBuffer[0] & 0xFF;
40+
}
41+
42+
@Override
43+
public int read(byte[] b, int off, int len) throws IOException {
44+
if (b == null) {
45+
throw new NullPointerException("b is null");
46+
} else if (off < 0) {
47+
throw new IndexOutOfBoundsException("off is negative");
48+
} else if (len < 0) {
49+
throw new IndexOutOfBoundsException("len is negative");
50+
} else if (off + len > b.length) {
51+
throw new IndexOutOfBoundsException("off + len is greater than b.length");
52+
} else if (len == 0) {
53+
return 0;
54+
}
55+
56+
int readBytes = 0;
57+
do {
58+
int remaining = Math.min(len - readBytes, buffer.remaining());
59+
buffer.get(b, off + readBytes, remaining);
60+
readBytes += remaining;
61+
} while (readBytes < len && refill() != -1);
62+
63+
return readBytes == 0 ? -1 : readBytes;
64+
}
65+
66+
67+
static final byte MAGIC = (byte) 0x82;
68+
static final int HEADER_LENGTH = 25;
69+
70+
static final byte[] headerBuff = new byte[HEADER_LENGTH];
71+
72+
/**
73+
* Method ensures to read all bytes from the input stream.
74+
* In case of network connection it may be a case when not all bytes are read at once.
75+
* @throws IOException
76+
*/
77+
private boolean readFully(byte[] b, int off, int len) throws IOException {
78+
int n = 0;
79+
while (n < len) {
80+
int count = in.read(b, off + n, len - n);
81+
if (count < 0) {
82+
if (n == 0) {
83+
return false;
84+
}
85+
throw new IOException(ClickHouseUtils.format("Incomplete read: {0} of {1}", n, len));
86+
}
87+
n += count;
88+
}
89+
90+
return true;
91+
}
92+
93+
private int refill() throws IOException {
94+
95+
// read header
96+
boolean readFully = readFully(headerBuff, 0, HEADER_LENGTH);
97+
if (!readFully) {
98+
return -1;
99+
}
100+
101+
if (headerBuff[16] != MAGIC) {
102+
// 1 byte - 0x82 (shows this is LZ4)
103+
throw new ClientException("Invalid LZ4 magic byte: '" + headerBuff[16] + "'");
104+
}
105+
106+
// 4 bytes - size of the compressed data including 9 bytes of the header
107+
int compressedSizeWithHeader = getInt32(headerBuff, 17);
108+
// 4 bytes - size of uncompressed data
109+
int uncompressedSize = getInt32(headerBuff, 21);
110+
111+
int offset = 9;
112+
final byte[] block = new byte[compressedSizeWithHeader];
113+
block[0] = MAGIC;
114+
setInt32(block, 1, compressedSizeWithHeader);
115+
setInt32(block, 5, uncompressedSize);
116+
// compressed data: compressed_size - 9 bytes
117+
int remaining = compressedSizeWithHeader - offset;
118+
119+
readFully = readFully(block, offset, remaining);
120+
if (!readFully) {
121+
throw new EOFException("Unexpected end of stream");
122+
}
123+
124+
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
125+
if (real[0] != getInt64(headerBuff, 0) || real[1] != ClickHouseByteUtils.getInt64(headerBuff, 8)) {
126+
throw new ClientException("Corrupted stream: checksum mismatch");
127+
}
128+
129+
if (buffer.capacity() < uncompressedSize) {
130+
buffer = ByteBuffer.allocate(uncompressedSize);
131+
LOG.warn("Buffer size is too small, reallocate buffer with size: " + uncompressedSize);
132+
}
133+
decompressor.decompress(ByteBuffer.wrap(block), offset, buffer, 0, uncompressedSize);
134+
buffer.position(0);
135+
buffer.limit(uncompressedSize);
136+
return uncompressedSize;
137+
}
138+
139+
/**
140+
* Read int32 Little Endian
141+
* @param bytes
142+
* @param offset
143+
* @return
144+
*/
145+
static int getInt32(byte[] bytes, int offset) {
146+
return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16)
147+
| ((0xFF & bytes[offset + 3]) << 24);
148+
}
149+
150+
/**
151+
* Read int64 Little Endian
152+
* @param bytes
153+
* @param offset
154+
* @return
155+
*/
156+
static long getInt64(byte[] bytes, int offset) {
157+
return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16)
158+
| ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32)
159+
| ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48)
160+
| ((0xFFL & bytes[offset + 7]) << 56);
161+
}
162+
163+
static void setInt32(byte[] bytes, int offset, int value) {
164+
bytes[offset] = (byte) (0xFF & value);
165+
bytes[offset + 1] = (byte) (0xFF & (value >> 8));
166+
bytes[offset + 2] = (byte) (0xFF & (value >> 16));
167+
bytes[offset + 3] = (byte) (0xFF & (value >> 24));
168+
}
169+
170+
@Override
171+
public void close() throws IOException {
172+
super.close();
173+
}
174+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import com.clickhouse.data.ClickHouseCityHash;
4+
import net.jpountz.lz4.LZ4Compressor;
5+
6+
import java.io.IOException;
7+
import java.io.OutputStream;
8+
import java.nio.ByteBuffer;
9+
10+
public class ClickHouseLZ4OutputStream extends OutputStream {
11+
12+
public static final int UNCOMPRESSED_BUFF_SIZE = 8192;
13+
14+
private final ByteBuffer buffer;
15+
16+
private final OutputStream out;
17+
18+
private final LZ4Compressor compressor;
19+
20+
private byte tmpBuffer[] = new byte[1];
21+
22+
private final ByteBuffer compressedBuffer;
23+
24+
private static int HEADER_LEN = 15; // 9 bytes for header, 6 bytes for checksum
25+
26+
27+
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
28+
super();
29+
this.buffer = ByteBuffer.allocate(bufferSize);
30+
this.out = out;
31+
this.compressor = compressor;
32+
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);
33+
}
34+
35+
@Override
36+
public void write(int b) throws IOException {
37+
tmpBuffer[0] = (byte) b;
38+
write(tmpBuffer, 0, 1);
39+
}
40+
41+
@Override
42+
public void write( byte[] b, int off, int len) throws IOException {
43+
if (b == null) {
44+
throw new NullPointerException("b is null");
45+
} else if (off < 0) {
46+
throw new IndexOutOfBoundsException("off is negative");
47+
} else if (len < 0) {
48+
throw new IndexOutOfBoundsException("len is negative");
49+
} else if (off + len > b.length) {
50+
throw new IndexOutOfBoundsException("off + len is greater than b.length");
51+
} else if (len == 0) {
52+
return;
53+
}
54+
55+
int writtenBytes = 0;
56+
do {
57+
int remaining = Math.min(len - writtenBytes, buffer.remaining());
58+
buffer.put(b, off + writtenBytes, remaining);
59+
writtenBytes += remaining;
60+
if (buffer.remaining() == 0) {
61+
flush();
62+
}
63+
} while (writtenBytes < len);
64+
}
65+
66+
@Override
67+
public void flush() throws IOException {
68+
compressedBuffer.clear();
69+
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
70+
int uncompressedLen = buffer.position();
71+
buffer.flip();
72+
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
73+
compressedBuffer.remaining() - 25);
74+
int compressedSizeWithHeader = compressed + 9;
75+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
76+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
77+
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
78+
setInt64(compressedBuffer.array(), 0, hash[0]);
79+
setInt64(compressedBuffer.array(), 8, hash[1]);
80+
compressedBuffer.flip();
81+
out.write(compressedBuffer.array(), 0, compressed + 25);
82+
buffer.clear();
83+
}
84+
85+
86+
static void setInt64(byte[] bytes, int offset, long value) {
87+
bytes[offset] = (byte) (0xFF & value);
88+
bytes[offset + 1] = (byte) (0xFF & (value >> 8));
89+
bytes[offset + 2] = (byte) (0xFF & (value >> 16));
90+
bytes[offset + 3] = (byte) (0xFF & (value >> 24));
91+
bytes[offset + 4] = (byte) (0xFF & (value >> 32));
92+
bytes[offset + 5] = (byte) (0xFF & (value >> 40));
93+
bytes[offset + 6] = (byte) (0xFF & (value >> 48));
94+
bytes[offset + 7] = (byte) (0xFF & (value >> 56));
95+
}
96+
@Override
97+
public void close() throws IOException {
98+
flush();
99+
out.close();
100+
}
101+
}

0 commit comments

Comments
 (0)