Skip to content

Commit 5e3f672

Browse files
committed
server response content compression
1 parent 0bb8db0 commit 5e3f672

File tree

5 files changed

+176
-4
lines changed

5 files changed

+176
-4
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import com.clickhouse.client.api.ClientException;
4+
import com.clickhouse.data.ClickHouseByteUtils;
5+
import com.clickhouse.data.ClickHouseCityHash;
6+
import net.jpountz.lz4.LZ4FastDecompressor;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.io.EOFException;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.nio.ByteBuffer;
14+
15+
public class ClickHouseLZ4InputStream extends InputStream {
16+
17+
private static Logger LOG = LoggerFactory.getLogger(ClickHouseLZ4InputStream.class);
18+
private final LZ4FastDecompressor decompressor;
19+
20+
private final InputStream in;
21+
22+
private ByteBuffer buffer;
23+
24+
private byte[] tmpBuffer = new byte[1];
25+
26+
27+
public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor) {
28+
super();
29+
this.decompressor = decompressor;
30+
this.in = in;
31+
this.buffer = ByteBuffer.allocate(8192);
32+
this.buffer.limit(0);
33+
}
34+
35+
@Override
36+
public int read() throws IOException {
37+
int n = read(tmpBuffer, 0, 1);
38+
return n == -1 ? -1 : tmpBuffer[0] & 0xFF;
39+
}
40+
41+
@Override
42+
public int read(byte[] b, int off, int len) throws IOException {
43+
if (b == null) {
44+
throw new NullPointerException("b is null");
45+
} else if (off < 0) {
46+
throw new IndexOutOfBoundsException("off is negative");
47+
} else if (len < 0) {
48+
throw new IndexOutOfBoundsException("len is negative");
49+
} else if (off + len > b.length) {
50+
throw new IndexOutOfBoundsException("off + len is greater than b.length");
51+
} else if (len == 0) {
52+
return 0;
53+
}
54+
55+
int readBytes = 0;
56+
do {
57+
int remaining = Math.min(len - readBytes, buffer.remaining());
58+
buffer.get(b, off + readBytes, remaining);
59+
readBytes += remaining;
60+
} while (readBytes < len && refill() != -1);
61+
62+
return readBytes == 0 ? -1 : readBytes;
63+
}
64+
65+
66+
static final byte MAGIC = (byte) 0x82;
67+
static final int HEADER_LENGTH = 25;
68+
69+
static final byte[] headerBuff = new byte[HEADER_LENGTH];
70+
71+
private int refill() throws IOException {
72+
73+
// read header
74+
int readBytes = in.read(headerBuff, 0, HEADER_LENGTH);
75+
if (readBytes == -1) {
76+
return -1;
77+
} else if (readBytes < HEADER_LENGTH) {
78+
throw new IOException("Unexpected end of stream");
79+
}
80+
81+
if (headerBuff[16] != MAGIC) {
82+
// 1 byte - 0x82 (shows this is LZ4)
83+
throw new ClientException("Invalid LZ4 magic byte: '" + headerBuff[16] + "'");
84+
}
85+
86+
// 4 bytes - size of the compressed data including 9 bytes of the header
87+
int compressedSizeWithHeader = getInt32(headerBuff, 17);
88+
// 4 bytes - size of uncompressed data
89+
int uncompressedSize = getInt32(headerBuff, 21);
90+
91+
int offset = 9;
92+
final byte[] block = new byte[compressedSizeWithHeader];
93+
block[0] = MAGIC;
94+
setInt32(block, 1, compressedSizeWithHeader);
95+
setInt32(block, 5, uncompressedSize);
96+
// compressed data: compressed_size - 9 bytes
97+
int remaining = compressedSizeWithHeader - offset;
98+
readBytes = in.read(block, offset, remaining);
99+
if (readBytes == -1) {
100+
throw new EOFException("Unexpected end of stream");
101+
} else if (readBytes < remaining) {
102+
throw new ClientException("Read bytes: " + readBytes + " but expected: " + remaining);
103+
}
104+
105+
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
106+
if (real[0] != getInt64(headerBuff, 0) || real[1] != ClickHouseByteUtils.getInt64(headerBuff, 8)) {
107+
throw new ClientException("Corrupted stream: checksum mismatch");
108+
}
109+
110+
if (buffer.capacity() < uncompressedSize) {
111+
buffer = ByteBuffer.allocate(uncompressedSize);
112+
LOG.warn("Buffer size is too small, reallocate buffer with size: " + uncompressedSize);
113+
}
114+
decompressor.decompress(ByteBuffer.wrap(block), offset, buffer, 0, uncompressedSize);
115+
buffer.position(0);
116+
buffer.limit(uncompressedSize);
117+
return uncompressedSize;
118+
}
119+
120+
/**
121+
* Read int32 Little Endian
122+
* @param bytes
123+
* @param offset
124+
* @return
125+
*/
126+
private int getInt32(byte[] bytes, int offset) {
127+
return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16)
128+
| ((0xFF & bytes[offset + 3]) << 24);
129+
}
130+
131+
/**
132+
* Read int64 Little Endian
133+
* @param bytes
134+
* @param offset
135+
* @return
136+
*/
137+
public long getInt64(byte[] bytes, int offset) {
138+
return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16)
139+
| ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32)
140+
| ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48)
141+
| ((0xFFL & bytes[offset + 7]) << 56);
142+
}
143+
144+
public void setInt32(byte[] bytes, int offset, int value) {
145+
bytes[offset] = (byte) (0xFF & value);
146+
bytes[offset + 1] = (byte) (0xFF & (value >> 8));
147+
bytes[offset + 2] = (byte) (0xFF & (value >> 16));
148+
bytes[offset + 3] = (byte) (0xFF & (value >> 24));
149+
}
150+
151+
@Override
152+
public void close() throws IOException {
153+
super.close();
154+
}
155+
}

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.clickhouse.client.api.internal;
22

3+
import net.jpountz.lz4.LZ4Factory;
34
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
45
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
56
import org.apache.hc.core5.function.Supplier;
@@ -46,6 +47,8 @@ public InputStream getContent() throws IOException, UnsupportedOperationExceptio
4647
// So we just return original content and if there is a real data in it we will get error later
4748
return content;
4849
}
50+
} else if (serverCompression && !useHttpCompression) {
51+
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor());
4952
} else {
5053
return httpEntity.getContent();
5154
}

client-v2/src/main/java/com/clickhouse/client/api/internal/TableSchemaParser.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.io.BufferedReader;
77
import java.io.IOException;
88
import java.io.InputStream;
9+
import java.io.InputStreamReader;
910
import java.io.StringReader;
1011
import java.util.Properties;
1112

@@ -37,12 +38,15 @@ public TableSchema readTSKV(InputStream content, String table, String database)
3738
schema.setTableName(table);
3839
schema.setDatabaseName(database);
3940
Properties p = new Properties();
40-
try (BufferedReader r = new BufferedReader(new java.io.InputStreamReader(content))) {
41+
try (BufferedReader r = new BufferedReader(new InputStreamReader(content))) {
4142
String line;
4243
while ((line = r.readLine()) != null) {
4344
p.clear();
44-
p.load(new StringReader(line.replaceAll("\t", "\n")));
45-
schema.addColumn(p.getProperty("name"), p.getProperty("type"), p.getProperty("default_type"));
45+
int lineLength = line.length();
46+
if (!line.trim().isEmpty()) {
47+
p.load(new StringReader(line.replaceAll("\t", "\n")));
48+
schema.addColumn(p.getProperty("name"), p.getProperty("type"), p.getProperty("default_type"));
49+
}
4650
}
4751
} catch (IOException e) {
4852
throw new RuntimeException("Failed to parse table schema", e);
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.clickhouse.client.query;
2+
3+
public class QueryServerContentCompressionTests extends QueryTests {
4+
static {
5+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
6+
}
7+
QueryServerContentCompressionTests() {
8+
super(true, false);
9+
}
10+
}

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void setUp() {
9999
.compressClientRequest(false)
100100
.compressServerResponse(useServerCompression)
101101
.useHttpCompression(useHttpCompression)
102-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
102+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
103103
.build();
104104

105105
delayForProfiler(0);

0 commit comments

Comments
 (0)