Skip to content

Commit 0bb8db0

Browse files
committed
http compression client/server
1 parent 31cbcde commit 0bb8db0

File tree

8 files changed

+99
-39
lines changed

8 files changed

+99
-39
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import com.clickhouse.data.ClickHouseFormat;
4141
import com.clickhouse.data.ClickHousePipedOutputStream;
4242
import com.clickhouse.data.format.BinaryStreamUtils;
43+
import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorOutputStream;
44+
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
4345
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4446
import org.apache.hc.core5.http.ClassicHttpResponse;
4547
import org.apache.hc.core5.http.HttpStatus;
@@ -758,6 +760,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
758760
}
759761
}
760762
}
763+
out.close();
761764
})) {
762765

763766

@@ -876,7 +879,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
876879
while ((bytesRead = data.read(buffer)) != -1) {
877880
out.write(buffer, 0, bytesRead);
878881
}
879-
out.flush();
882+
out.close();
880883
})) {
881884

882885

@@ -1038,7 +1041,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10381041
ClassicHttpResponse httpResponse =
10391042
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), output -> {
10401043
output.write(sqlQuery.getBytes(StandardCharsets.UTF_8));
1041-
output.flush();
1044+
output.close();
10421045
});
10431046

10441047
// Check response

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
8888
(t) -> reqConfBuilder.setConnectionRequestTimeout(t, TimeUnit.MILLISECONDS));
8989

9090
this.baseRequestConfig = reqConfBuilder.build();
91+
92+
boolean usingClientCompression= chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
93+
boolean usingServerCompression= chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
94+
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
95+
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);
9196
}
9297

9398
public CloseableHttpClient createHttpClient() {
@@ -269,11 +274,20 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
269274
req.addHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthHeaderValue);
270275
}
271276

272-
if (chConfig.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true")) {
273-
if (chConfig.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true")) {
277+
boolean serverCompression = chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
278+
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
279+
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
280+
281+
if (serverCompression) {
282+
if (useHttpCompression) {
274283
req.addHeader(HttpHeaders.ACCEPT_ENCODING, "lz4");
275284
}
276285
}
286+
if (clientCompression) {
287+
if (useHttpCompression) {
288+
req.addHeader(HttpHeaders.CONTENT_ENCODING, "lz4");
289+
}
290+
}
277291
}
278292
private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
279293
if (requestConfig != null) {
@@ -292,13 +306,23 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
292306
}
293307
}
294308

295-
if (chConfig.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true")) {
296-
if (chConfig.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true")) {
309+
boolean serverCompression = chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
310+
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
311+
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
312+
313+
if (serverCompression) {
314+
if (useHttpCompression) {
297315
req.addParameter("enable_http_compression", "1");
298316
} else {
299317
req.addParameter("compress", "1");
300318
}
301319
}
320+
if (clientCompression) {
321+
if (useHttpCompression) {
322+
req.addParameter("enable_http_compression", "1");
323+
} else {
324+
req.addParameter("decompress", "1");
325+
} }
302326
}
303327

304328
private HttpEntity wrapEntity(HttpEntity httpEntity) {

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,16 @@ public boolean isRepeatable() {
3636
@Override
3737
public InputStream getContent() throws IOException, UnsupportedOperationException {
3838
if (serverCompression && useHttpCompression) {
39-
return new FramedLZ4CompressorInputStream(httpEntity.getContent());
39+
InputStream content = httpEntity.getContent();
40+
try {
41+
return new FramedLZ4CompressorInputStream(content);
42+
} catch (IOException e) {
43+
// This is the easiest way to handle empty content because
44+
// - streams at this point wrapped with something else and we can't check content length
45+
// - exception is thrown with no details
46+
// So we just return original content and if there is a real data in it we will get error later
47+
return content;
48+
}
4049
} else {
4150
return httpEntity.getContent();
4251
}

client-v2/src/test/java/com/clickhouse/client/ClientTests.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
public class ClientTests extends BaseIntegrationTest {
2020

21-
static {
22-
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
23-
}
21+
// static {
22+
// System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
23+
// }
2424

2525
@Test(dataProvider = "clientProvider")
2626
public void testAddSecureEndpoint(Client client) {
@@ -117,29 +117,4 @@ public void testRawSettings() {
117117
client.close();
118118
}
119119
}
120-
121-
@Test
122-
public void testServerCompression() {
123-
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
124-
Client client = new Client.Builder()
125-
.addEndpoint(node.toUri().toString())
126-
.setUsername("default")
127-
.setPassword("")
128-
.compressServerResponse(true)
129-
.useHttpCompression(true)
130-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
131-
.build();
132-
133-
try (Records response = client.queryRecords("SELECT number FROM system.numbers LIMIT 10").get()) {
134-
response.forEach(record -> {
135-
System.out.println(record);
136-
});
137-
} catch (Exception e) {
138-
e.printStackTrace();
139-
Assert.fail(e.getMessage());
140-
} finally {
141-
client.close();
142-
}
143-
}
144-
145120
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.clickhouse.client.insert;
2+
3+
public class InsertClientHttpCompressionTests extends InsertTests {
4+
5+
static {
6+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
7+
}
8+
9+
public InsertClientHttpCompressionTests() {
10+
super(true, true);
11+
}
12+
}

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,29 @@ public class InsertTests extends BaseIntegrationTest {
5656
private Client client;
5757
private InsertSettings settings;
5858

59+
private boolean useClientCompression = false;
60+
61+
private boolean useHttpCompression = false;
62+
63+
InsertTests() {
64+
}
65+
66+
InsertTests(boolean useClientCompression, boolean useHttpCompression) {
67+
this.useClientCompression = useClientCompression;
68+
this.useHttpCompression = useHttpCompression;
69+
}
70+
5971
@BeforeMethod(groups = { "integration" })
6072
public void setUp() throws IOException {
6173
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
6274
client = new Client.Builder()
6375
.addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false)
6476
.setUsername("default")
6577
.setPassword("")
66-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
67-
.compressClientRequest(false)
78+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
79+
.compressClientRequest(useClientCompression)
80+
.compressServerResponse(useHttpCompression)
81+
.useHttpCompression(useHttpCompression)
6882
.build();
6983
settings = new InsertSettings()
7084
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.clickhouse.client.query;
2+
3+
public class QueryServerHttpCompressionTests extends QueryTests {
4+
static {
5+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
6+
}
7+
QueryServerHttpCompressionTests() {
8+
super(true, true);
9+
}
10+
}

client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ public class QueryTests extends BaseIntegrationTest {
7777

7878
private Client client;
7979

80+
private boolean useServerCompression = false;
81+
82+
private boolean useHttpCompression = false;
83+
84+
QueryTests(){
85+
}
86+
87+
QueryTests(boolean useServerCompression, boolean useHttpCompression) {
88+
this.useServerCompression = useServerCompression;
89+
this.useHttpCompression = useHttpCompression;
90+
}
91+
8092
@BeforeMethod(groups = {"integration"})
8193
public void setUp() {
8294
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
@@ -85,8 +97,9 @@ public void setUp() {
8597
.setUsername("default")
8698
.setPassword("")
8799
.compressClientRequest(false)
88-
.compressServerResponse(false)
89-
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true"))
100+
.compressServerResponse(useServerCompression)
101+
.useHttpCompression(useHttpCompression)
102+
.useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
90103
.build();
91104

92105
delayForProfiler(0);

0 commit comments

Comments
 (0)