Skip to content

Commit 4971f35

Browse files
committed
implemented http compression for server response
1 parent 79a6589 commit 4971f35

File tree

5 files changed

+160
-1
lines changed

5 files changed

+160
-1
lines changed

client-v2/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
<artifactId>lz4-pure-java</artifactId>
4848
<optional>true</optional>
4949
</dependency>
50+
<dependency>
51+
<groupId>${project.groupId}</groupId>
52+
<artifactId>org.apache.commons.compress</artifactId>
53+
<version>${repackaged.version}</version>
54+
</dependency>
5055
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
5156
<dependency>
5257
<groupId>com.fasterxml.jackson.core</groupId>

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.clickhouse.client.config.ClickHouseClientOption;
3535
import com.clickhouse.client.config.ClickHouseDefaults;
3636
import com.clickhouse.client.http.ClickHouseHttpProto;
37+
import com.clickhouse.client.http.config.ClickHouseHttpOption;
3738
import com.clickhouse.data.ClickHouseColumn;
3839
import com.clickhouse.data.ClickHouseDataStreamFactory;
3940
import com.clickhouse.data.ClickHouseFormat;
@@ -407,6 +408,11 @@ public Builder compressClientRequest(boolean enabled) {
407408
return this;
408409
}
409410

411+
public Builder useHttpCompression(boolean enabled) {
412+
this.configuration.put("client.use_http_compression", String.valueOf(enabled));
413+
return this;
414+
}
415+
410416
/**
411417
* Sets the default database name that will be used by operations if not specified.
412418
* @param database - actual default database name.

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hc.core5.http.ClassicHttpResponse;
2828
import org.apache.hc.core5.http.ContentType;
2929
import org.apache.hc.core5.http.Header;
30+
import org.apache.hc.core5.http.HttpEntity;
3031
import org.apache.hc.core5.http.HttpHeaders;
3132
import org.apache.hc.core5.http.HttpHost;
3233
import org.apache.hc.core5.http.HttpStatus;
@@ -207,7 +208,8 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
207208
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig)
208209
.build();
209210
req.setConfig(httpReqConfig);
210-
req.setEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback));
211+
// setting entity. wrapping if compression is enabled
212+
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback)));
211213

212214
HttpClientContext context = HttpClientContext.create();
213215

@@ -229,6 +231,8 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
229231
httpResponse.close();
230232
return httpResponse;
231233
}
234+
235+
httpResponse.setEntity(wrapEntity(httpResponse.getEntity()));
232236
return httpResponse;
233237

234238
} catch (UnknownHostException e) {
@@ -264,6 +268,12 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
264268
if (proxyAuthHeaderValue != null) {
265269
req.addHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthHeaderValue);
266270
}
271+
272+
if (chConfig.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true")) {
273+
if (chConfig.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true")) {
274+
req.addHeader(HttpHeaders.ACCEPT_ENCODING, "lz4");
275+
}
276+
}
267277
}
268278
private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
269279
if (requestConfig != null) {
@@ -281,6 +291,25 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
281291
}
282292
}
283293
}
294+
295+
if (chConfig.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true")) {
296+
if (chConfig.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true")) {
297+
req.addParameter("enable_http_compression", "1");
298+
} else {
299+
req.addParameter("compress", "1");
300+
}
301+
}
302+
}
303+
304+
private HttpEntity wrapEntity(HttpEntity httpEntity) {
305+
boolean serverCompression = chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
306+
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
307+
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
308+
if (serverCompression || clientCompression) {
309+
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression);
310+
} else {
311+
return httpEntity;
312+
}
284313
}
285314

286315
public static int getHeaderInt(Header header, int defaultValue) {
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.clickhouse.client.api.internal;
2+
3+
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
4+
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
5+
import org.apache.hc.core5.function.Supplier;
6+
import org.apache.hc.core5.http.Header;
7+
import org.apache.hc.core5.http.HttpEntity;
8+
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.OutputStream;
12+
import java.util.List;
13+
import java.util.Set;
14+
15+
class LZ4Entity implements HttpEntity {
16+
17+
private HttpEntity httpEntity;
18+
19+
private boolean useHttpCompression;
20+
private boolean serverCompression;
21+
private boolean clientCompression;
22+
23+
24+
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression) {
25+
this.httpEntity = httpEntity;
26+
this.useHttpCompression = useHttpCompression;
27+
this.serverCompression = serverCompression;
28+
this.clientCompression = clientCompression;
29+
}
30+
31+
@Override
32+
public boolean isRepeatable() {
33+
return httpEntity.isRepeatable();
34+
}
35+
36+
@Override
37+
public InputStream getContent() throws IOException, UnsupportedOperationException {
38+
if (serverCompression && useHttpCompression) {
39+
return new FramedLZ4CompressorInputStream(httpEntity.getContent());
40+
} else {
41+
return httpEntity.getContent();
42+
}
43+
}
44+
45+
@Override
46+
public void writeTo(OutputStream outStream) throws IOException {
47+
if (clientCompression && useHttpCompression) {
48+
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
49+
} else {
50+
httpEntity.writeTo(outStream);
51+
}
52+
}
53+
54+
@Override
55+
public boolean isStreaming() {
56+
return httpEntity.isStreaming();
57+
}
58+
59+
@Override
60+
public Supplier<List<? extends Header>> getTrailers() {
61+
return httpEntity.getTrailers();
62+
}
63+
64+
@Override
65+
public void close() throws IOException {
66+
httpEntity.close();
67+
}
68+
69+
@Override
70+
public long getContentLength() {
71+
return httpEntity.getContentLength();
72+
}
73+
74+
@Override
75+
public String getContentType() {
76+
return httpEntity.getContentType();
77+
}
78+
79+
@Override
80+
public String getContentEncoding() {
81+
return httpEntity.getContentEncoding();
82+
}
83+
84+
@Override
85+
public boolean isChunked() {
86+
return httpEntity.isChunked();
87+
}
88+
89+
@Override
90+
public Set<String> getTrailerNames() {
91+
return httpEntity.getTrailerNames();
92+
}
93+
}

client-v2/src/test/java/com/clickhouse/client/ClientTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
public class ClientTests extends BaseIntegrationTest {
2020

21+
static {
22+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
23+
}
2124

2225
@Test(dataProvider = "clientProvider")
2326
public void testAddSecureEndpoint(Client client) {
@@ -115,5 +118,28 @@ public void testRawSettings() {
115118
}
116119
}
117120

121+
@Test
122+
public void testServerCompression() {
123+
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
124+
Client client = new Client.Builder()
125+
.addEndpoint(node.toUri().toString())
126+
.setUsername("default")
127+
.setPassword("")
128+
.compressServerResponse(true)
129+
.useHttpCompression(true)
130+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
131+
.build();
132+
133+
try (Records response = client.queryRecords("SELECT number FROM system.numbers LIMIT 10").get()) {
134+
response.forEach(record -> {
135+
System.out.println(record);
136+
});
137+
} catch (Exception e) {
138+
e.printStackTrace();
139+
Assert.fail(e.getMessage());
140+
} finally {
141+
client.close();
142+
}
143+
}
118144

119145
}

0 commit comments

Comments
 (0)