Skip to content

Commit 5c52edb

Browse files
committed
fixed flags for compression
1 parent 6e6b6b7 commit 5c52edb

File tree

5 files changed

+71
-36
lines changed

5 files changed

+71
-36
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2026,7 +2026,7 @@ private void applyDefaults(QuerySettings settings) {
20262026
}
20272027

20282028
private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, Map<String, Object> requestSettings) {
2029-
boolean isAsync = MapUtils.getFlag(configuration, requestSettings, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
2029+
boolean isAsync = MapUtils.getFlag(requestSettings, configuration, ClientConfigProperties.ASYNC_OPERATIONS.getKey());
20302030
return isAsync ? CompletableFuture.supplyAsync(resultSupplier, sharedOperationExecutor) : CompletableFuture.completedFuture(resultSupplier.get());
20312031
}
20322032

client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public enum ClientConfigProperties {
6565

6666
COMPRESS_CLIENT_REQUEST("decompress"), // actually a server setting, but has client effect too
6767

68+
6869
USE_HTTP_COMPRESSION("client.use_http_compression"),
6970

7071
COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE("compression.lz4.uncompressed_buffer_size"),

client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public String getDatabase() {
135135
* @param enabled - indicates if client request compression is enabled
136136
*/
137137
public InsertSettings compressClientRequest(boolean enabled) {
138-
this.rawSettings.put("decompress", enabled);
138+
this.rawSettings.put(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), enabled);
139139
return this;
140140
}
141141

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,9 @@ public Exception readError(ClassicHttpResponse httpResponse) {
338338

339339
public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Object> requestConfig,
340340
IOCallback<OutputStream> writeCallback) throws IOException {
341+
if (requestConfig == null) {
342+
requestConfig = Collections.emptyMap();
343+
}
341344
URI uri;
342345
try {
343346
URIBuilder uriBuilder = new URIBuilder(server.getBaseUri());
@@ -350,17 +353,21 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
350353
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
351354
addHeaders(req, chConfiguration, requestConfig);
352355

353-
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig)
354-
.build();
356+
boolean clientCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey());
357+
boolean useHttpCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.USE_HTTP_COMPRESSION.getKey());
358+
359+
RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig).build();
355360
req.setConfig(httpReqConfig);
356361
// setting entity. wrapping if compression is enabled
357-
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), HttpStatus.SC_OK, false));
362+
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
363+
clientCompression, useHttpCompression));
358364

359365
HttpClientContext context = HttpClientContext.create();
360366

361367
try {
362368
ClassicHttpResponse httpResponse = httpClient.executeOpen(null, req, context);
363-
httpResponse.setEntity(wrapEntity(httpResponse.getEntity(), httpResponse.getCode(), true));
369+
boolean serverCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey());
370+
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression));
364371

365372
if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
366373
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
@@ -393,20 +400,18 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
393400

394401
private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
395402
req.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
396-
if (requestConfig != null) {
397-
if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) {
398-
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey()));
399-
}
403+
if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) {
404+
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey()));
405+
}
400406

401-
if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) {
402-
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString());
403-
}
407+
if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) {
408+
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString());
409+
}
404410

405-
if(requestConfig.containsKey(ClientConfigProperties.DATABASE.getKey())) {
406-
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, requestConfig.get(ClientConfigProperties.DATABASE.getKey()));
407-
} else {
408-
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, chConfig.get(ClientConfigProperties.DATABASE.getKey()));
409-
}
411+
if(requestConfig.containsKey(ClientConfigProperties.DATABASE.getKey())) {
412+
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, requestConfig.get(ClientConfigProperties.DATABASE.getKey()));
413+
} else {
414+
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, chConfig.get(ClientConfigProperties.DATABASE.getKey()));
410415
}
411416

412417
if (MapUtils.getFlag(chConfig, "ssl_authentication", false)) {
@@ -424,9 +429,9 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
424429
req.addHeader(HttpHeaders.PROXY_AUTHORIZATION, proxyAuthHeaderValue);
425430
}
426431

427-
boolean serverCompression = chConfiguration.getOrDefault(ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), "false").equalsIgnoreCase("true");
428-
boolean clientCompression = chConfiguration.getOrDefault(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), "false").equalsIgnoreCase("true");
429-
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
432+
boolean clientCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey());
433+
boolean serverCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey());
434+
boolean useHttpCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.USE_HTTP_COMPRESSION.getKey());
430435

431436
if (useHttpCompression) {
432437
if (serverCompression) {
@@ -461,10 +466,6 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
461466
req.setHeader(HttpHeaders.USER_AGENT, userAgent == null ? httpClientUserAgentPart : userAgent.getValue() + " " + httpClientUserAgentPart);
462467
}
463468
private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
464-
if (requestConfig == null) {
465-
requestConfig = Collections.emptyMap();
466-
}
467-
468469
for (Map.Entry<String, String> entry : chConfig.entrySet()) {
469470
if (entry.getKey().startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) {
470471
req.addParameter(entry.getKey().substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), entry.getValue());
@@ -481,10 +482,9 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
481482
}
482483
}
483484

484-
boolean serverCompression = chConfiguration.getOrDefault(ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), "false").equalsIgnoreCase("true");
485-
boolean clientCompression = chConfiguration.getOrDefault(ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), "false").equalsIgnoreCase("true");
486-
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
487-
485+
boolean clientCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey());
486+
boolean serverCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey());
487+
boolean useHttpCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.USE_HTTP_COMPRESSION.getKey());
488488

489489
if (useHttpCompression) {
490490
// enable_http_compression make server react on http header
@@ -514,11 +514,21 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
514514
}
515515
}
516516

517-
private HttpEntity wrapEntity(HttpEntity httpEntity, int httpStatus, boolean isResponse) {
518-
boolean serverCompression = MapUtils.getFlag(chConfiguration, ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), false);
519-
boolean clientCompression = MapUtils.getFlag(chConfiguration, ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getKey(), false);
517+
private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression) {
518+
LOG.debug("client compression: {}, http compression: {}", clientCompression, useHttpCompression);
519+
520+
if (clientCompression) {
521+
return new LZ4Entity(httpEntity, useHttpCompression, false, true,
522+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), false);
523+
} else {
524+
return httpEntity;
525+
}
526+
}
527+
528+
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression) {
529+
LOG.debug("server compression: {}, http compression: {}", serverCompression, useHttpCompression);
520530

521-
if (serverCompression || clientCompression) {
531+
if (serverCompression) {
522532
// Server doesn't compress certain errors like 403
523533
switch (httpStatus) {
524534
case HttpStatus.SC_OK:
@@ -531,9 +541,8 @@ private HttpEntity wrapEntity(HttpEntity httpEntity, int httpStatus, boolean isR
531541
case HttpStatus.SC_BAD_REQUEST:
532542
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
533543
case HttpStatus.SC_NOT_FOUND:
534-
boolean useHttpCompression = MapUtils.getFlag(chConfiguration, "client.use_http_compression", false);
535-
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression,
536-
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), isResponse);
544+
return new LZ4Entity(httpEntity, useHttpCompression, true, false,
545+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), true);
537546
}
538547
}
539548

client-v2/src/main/java/com/clickhouse/client/api/internal/MapUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ public static long getLong(Map<String, String> map, String key) {
5454
return 0;
5555
}
5656

57+
/**
58+
* Get a boolean value from a map.
59+
*
60+
* @param map map to get value from
61+
* @param key key to get value for
62+
* @return boolean value
63+
* @throws NullPointerException if the key is missing
64+
* @throws IllegalArgumentException if the value is not a boolean
65+
*/
5766
public static boolean getFlag(Map<String, String> map, String key) {
5867
String val = map.get(key);
5968
if (val == null) {
@@ -68,6 +77,13 @@ public static boolean getFlag(Map<String, String> map, String key) {
6877
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
6978
}
7079

80+
/**
81+
* Get a boolean value from a map.
82+
* @param map - configuration map
83+
* @param key - key of the property
84+
* @param defaultValue - value if not found
85+
* @return boolean value
86+
*/
7187
public static boolean getFlag(Map<String, ?> map, String key, boolean defaultValue) {
7288
Object val = map.get(key);
7389
if (val == null) {
@@ -86,6 +102,15 @@ public static boolean getFlag(Map<String, ?> map, String key, boolean defaultVal
86102
throw new IllegalArgumentException("Invalid non-boolean value for the key '" + key + "': '" + val + "'");
87103
}
88104

105+
/**
106+
* Get a boolean value from a p1, if not found, get from p2.
107+
*
108+
* @param p1 - first map
109+
* @param p2 - second map
110+
* @param key - key of the property
111+
* @return boolean value
112+
* @throws NullPointerException if the key is missing in both maps
113+
*/
89114
public static boolean getFlag(Map<String, ?> p1, Map<String, ?> p2, String key) {
90115
Object val = p1.get(key);
91116
if (val == null) {

0 commit comments

Comments
 (0)