Skip to content

Commit d4b12c7

Browse files
committed
implemented client content compression
1 parent 5e3f672 commit d4b12c7

File tree

7 files changed

+165
-30
lines changed

7 files changed

+165
-30
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import com.clickhouse.client.api.ClientException;
44
import com.clickhouse.data.ClickHouseByteUtils;
55
import com.clickhouse.data.ClickHouseCityHash;
6+
import com.clickhouse.data.ClickHouseUtils;
67
import net.jpountz.lz4.LZ4FastDecompressor;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
910

1011
import java.io.EOFException;
1112
import java.io.IOException;
1213
import java.io.InputStream;
14+
import java.io.StreamCorruptedException;
1315
import java.nio.ByteBuffer;
1416

1517
public class ClickHouseLZ4InputStream extends InputStream {
@@ -68,14 +70,33 @@ public int read(byte[] b, int off, int len) throws IOException {
6870

6971
static final byte[] headerBuff = new byte[HEADER_LENGTH];
7072

73+
/**
74+
* Method ensures to read all bytes from the input stream.
75+
* In case of network connection it may be a case when not all bytes are read at once.
76+
* @throws IOException
77+
*/
78+
private boolean readFully(byte[] b, int off, int len) throws IOException {
79+
int n = 0;
80+
while (n < len) {
81+
int count = in.read(b, off + n, len - n);
82+
if (count < 0) {
83+
if (n == 0) {
84+
return false;
85+
}
86+
throw new IOException(ClickHouseUtils.format("Incomplete read: {0} of {1}", n, len));
87+
}
88+
n += count;
89+
}
90+
91+
return true;
92+
}
93+
7194
private int refill() throws IOException {
7295

7396
// read header
74-
int readBytes = in.read(headerBuff, 0, HEADER_LENGTH);
75-
if (readBytes == -1) {
97+
boolean readFully = readFully(headerBuff, 0, HEADER_LENGTH);
98+
if (!readFully) {
7699
return -1;
77-
} else if (readBytes < HEADER_LENGTH) {
78-
throw new IOException("Unexpected end of stream");
79100
}
80101

81102
if (headerBuff[16] != MAGIC) {
@@ -95,11 +116,10 @@ private int refill() throws IOException {
95116
setInt32(block, 5, uncompressedSize);
96117
// compressed data: compressed_size - 9 bytes
97118
int remaining = compressedSizeWithHeader - offset;
98-
readBytes = in.read(block, offset, remaining);
99-
if (readBytes == -1) {
119+
120+
readFully = readFully(block, offset, remaining);
121+
if (!readFully) {
100122
throw new EOFException("Unexpected end of stream");
101-
} else if (readBytes < remaining) {
102-
throw new ClientException("Read bytes: " + readBytes + " but expected: " + remaining);
103123
}
104124

105125
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
@@ -123,7 +143,7 @@ private int refill() throws IOException {
123143
* @param offset
124144
* @return
125145
*/
126-
private int getInt32(byte[] bytes, int offset) {
146+
static int getInt32(byte[] bytes, int offset) {
127147
return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16)
128148
| ((0xFF & bytes[offset + 3]) << 24);
129149
}
@@ -134,14 +154,14 @@ private int getInt32(byte[] bytes, int offset) {
134154
* @param offset
135155
* @return
136156
*/
137-
public long getInt64(byte[] bytes, int offset) {
157+
static long getInt64(byte[] bytes, int offset) {
138158
return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16)
139159
| ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32)
140160
| ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48)
141161
| ((0xFFL & bytes[offset + 7]) << 56);
142162
}
143163

144-
public void setInt32(byte[] bytes, int offset, int value) {
164+
static void setInt32(byte[] bytes, int offset, int value) {
145165
bytes[offset] = (byte) (0xFF & value);
146166
bytes[offset + 1] = (byte) (0xFF & (value >> 8));
147167
bytes[offset + 2] = (byte) (0xFF & (value >> 16));
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import com.clickhouse.data.ClickHouseByteUtils;
4+
import com.clickhouse.data.ClickHouseCityHash;
5+
import com.clickhouse.data.stream.Lz4InputStream;
6+
import net.jpountz.lz4.LZ4Compressor;
7+
import net.jpountz.lz4.LZ4FastDecompressor;
8+
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.nio.ByteBuffer;
12+
import java.nio.MappedByteBuffer;
13+
14+
public class ClickHouseLZ4OutputStream extends OutputStream {
15+
16+
private final ByteBuffer buffer;
17+
18+
private final OutputStream out;
19+
20+
private final LZ4Compressor compressor;
21+
22+
private byte tmpBuffer[] = new byte[1];
23+
24+
private final ByteBuffer compressedBuffer;
25+
26+
private static int HEADER_LEN = 15; // 9 bytes for header, 6 bytes for checksum
27+
28+
29+
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor) {
30+
super();
31+
this.buffer = ByteBuffer.allocate(8192);
32+
this.out = out;
33+
this.compressor = compressor;
34+
this.compressedBuffer = ByteBuffer.allocate(compressor.maxCompressedLength(buffer.capacity()) + HEADER_LEN);
35+
}
36+
37+
@Override
38+
public void write(int b) throws IOException {
39+
tmpBuffer[0] = (byte) b;
40+
write(tmpBuffer, 0, 1);
41+
}
42+
43+
@Override
44+
public void write( byte[] b, int off, int len) throws IOException {
45+
if (b == null) {
46+
throw new NullPointerException("b is null");
47+
} else if (off < 0) {
48+
throw new IndexOutOfBoundsException("off is negative");
49+
} else if (len < 0) {
50+
throw new IndexOutOfBoundsException("len is negative");
51+
} else if (off + len > b.length) {
52+
throw new IndexOutOfBoundsException("off + len is greater than b.length");
53+
} else if (len == 0) {
54+
return;
55+
}
56+
57+
int writtenBytes = 0;
58+
do {
59+
int remaining = Math.min(len - writtenBytes, buffer.remaining());
60+
buffer.put(b, off + writtenBytes, remaining);
61+
writtenBytes += remaining;
62+
if (buffer.remaining() == 0) {
63+
flush();
64+
}
65+
} while (writtenBytes < len);
66+
}
67+
68+
@Override
69+
public void flush() throws IOException {
70+
compressedBuffer.clear();
71+
compressedBuffer.put(16, ClickHouseLZ4InputStream.MAGIC);
72+
int uncompressedLen = buffer.position();
73+
buffer.flip();
74+
int compressed = compressor.compress(buffer, 0, uncompressedLen, compressedBuffer, 25,
75+
compressedBuffer.remaining() - 25);
76+
int compressedSizeWithHeader = compressed + 9;
77+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 17, compressedSizeWithHeader); // compressed size with header
78+
ClickHouseLZ4InputStream.setInt32(compressedBuffer.array(), 21, uncompressedLen); // uncompressed size
79+
long[] hash = ClickHouseCityHash.cityHash128(compressedBuffer.array(), 16, compressedSizeWithHeader);
80+
setInt64(compressedBuffer.array(), 0, hash[0]);
81+
setInt64(compressedBuffer.array(), 8, hash[1]);
82+
compressedBuffer.flip();
83+
out.write(compressedBuffer.array(), 0, compressed + 25);
84+
buffer.clear();
85+
}
86+
87+
88+
static void setInt64(byte[] bytes, int offset, long value) {
89+
bytes[offset] = (byte) (0xFF & value);
90+
bytes[offset + 1] = (byte) (0xFF & (value >> 8));
91+
bytes[offset + 2] = (byte) (0xFF & (value >> 16));
92+
bytes[offset + 3] = (byte) (0xFF & (value >> 24));
93+
bytes[offset + 4] = (byte) (0xFF & (value >> 32));
94+
bytes[offset + 5] = (byte) (0xFF & (value >> 40));
95+
bytes[offset + 6] = (byte) (0xFF & (value >> 48));
96+
bytes[offset + 7] = (byte) (0xFF & (value >> 56));
97+
}
98+
@Override
99+
public void close() throws IOException {
100+
flush();
101+
out.close();
102+
}
103+
}

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,11 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
278278
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
279279
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
280280

281-
if (serverCompression) {
282-
if (useHttpCompression) {
281+
if (useHttpCompression) {
282+
if (serverCompression) {
283283
req.addHeader(HttpHeaders.ACCEPT_ENCODING, "lz4");
284284
}
285-
}
286-
if (clientCompression) {
287-
if (useHttpCompression) {
285+
if (clientCompression) {
288286
req.addHeader(HttpHeaders.CONTENT_ENCODING, "lz4");
289287
}
290288
}
@@ -310,19 +308,20 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
310308
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
311309
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
312310

313-
if (serverCompression) {
314-
if (useHttpCompression) {
315-
req.addParameter("enable_http_compression", "1");
316-
} else {
311+
312+
if (useHttpCompression) {
313+
// enable_http_compression make server react on http header
314+
// for client side compression Content-Encoding should be set
315+
// for server side compression Accept-Encoding should be set
316+
req.addParameter("enable_http_compression", "1");
317+
} else {
318+
if (serverCompression) {
317319
req.addParameter("compress", "1");
318320
}
319-
}
320-
if (clientCompression) {
321-
if (useHttpCompression) {
322-
req.addParameter("enable_http_compression", "1");
323-
} else {
321+
if (clientCompression) {
324322
req.addParameter("decompress", "1");
325-
} }
323+
}
324+
}
326325
}
327326

328327
private HttpEntity wrapEntity(HttpEntity httpEntity) {

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public InputStream getContent() throws IOException, UnsupportedOperationExceptio
4747
// So we just return original content and if there is a real data in it we will get error later
4848
return content;
4949
}
50-
} else if (serverCompression && !useHttpCompression) {
50+
} else if (serverCompression) {
5151
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor());
5252
} else {
5353
return httpEntity.getContent();
@@ -58,6 +58,8 @@ public InputStream getContent() throws IOException, UnsupportedOperationExceptio
5858
public void writeTo(OutputStream outStream) throws IOException {
5959
if (clientCompression && useHttpCompression) {
6060
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
61+
} else if (clientCompression) {
62+
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor()));
6163
} else {
6264
httpEntity.writeTo(outStream);
6365
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.clickhouse.client.insert;
2+
3+
public class InsertClientContentCompressionTests extends InsertTests {
4+
5+
static {
6+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
7+
}
8+
9+
public InsertClientContentCompressionTests() {
10+
super(true, false);
11+
}
12+
}

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,8 @@ public void setUp() throws IOException {
7575
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
7676
.setUsername("default")
7777
.setPassword("")
78-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
78+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
7979
.compressClientRequest(useClientCompression)
80-
.compressServerResponse(useHttpCompression)
8180
.useHttpCompression(useHttpCompression)
8281
.build();
8382
settings = new InsertSettings()

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void setUp() {
9999
.compressClientRequest(false)
100100
.compressServerResponse(useServerCompression)
101101
.useHttpCompression(useHttpCompression)
102-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
102+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
103103
.build();
104104

105105
delayForProfiler(0);

0 commit comments

Comments
 (0)