@@ -338,6 +338,9 @@ public Exception readError(ClassicHttpResponse httpResponse) {
338338
339339 public ClassicHttpResponse executeRequest (ClickHouseNode server , Map <String , Object > requestConfig ,
340340 IOCallback <OutputStream > writeCallback ) throws IOException {
341+ if (requestConfig == null ) {
342+ requestConfig = Collections .emptyMap ();
343+ }
341344 URI uri ;
342345 try {
343346 URIBuilder uriBuilder = new URIBuilder (server .getBaseUri ());
@@ -350,17 +353,21 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
350353// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
351354 addHeaders (req , chConfiguration , requestConfig );
352355
353- RequestConfig httpReqConfig = RequestConfig .copy (baseRequestConfig )
354- .build ();
356+ boolean clientCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey ());
357+ boolean useHttpCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .USE_HTTP_COMPRESSION .getKey ());
358+
359+ RequestConfig httpReqConfig = RequestConfig .copy (baseRequestConfig ).build ();
355360 req .setConfig (httpReqConfig );
356361 // setting entity. wrapping if compression is enabled
357- req .setEntity (wrapEntity (new EntityTemplate (-1 , CONTENT_TYPE , null , writeCallback ), HttpStatus .SC_OK , false ));
362+ req .setEntity (wrapRequestEntity (new EntityTemplate (-1 , CONTENT_TYPE , null , writeCallback ),
363+ clientCompression , useHttpCompression ));
358364
359365 HttpClientContext context = HttpClientContext .create ();
360366
361367 try {
362368 ClassicHttpResponse httpResponse = httpClient .executeOpen (null , req , context );
363- httpResponse .setEntity (wrapEntity (httpResponse .getEntity (), httpResponse .getCode (), true ));
369+ boolean serverCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey ());
370+ httpResponse .setEntity (wrapResponseEntity (httpResponse .getEntity (), httpResponse .getCode (), serverCompression , useHttpCompression ));
364371
365372 if (httpResponse .getCode () == HttpStatus .SC_PROXY_AUTHENTICATION_REQUIRED ) {
366373 throw new ClientMisconfigurationException ("Proxy authentication required. Please check your proxy settings." );
@@ -393,20 +400,18 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
393400
394401 private void addHeaders (HttpPost req , Map <String , String > chConfig , Map <String , Object > requestConfig ) {
395402 req .addHeader (HttpHeaders .CONTENT_TYPE , CONTENT_TYPE .getMimeType ());
396- if (requestConfig != null ) {
397- if (requestConfig .containsKey (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ())) {
398- req .addHeader (ClickHouseHttpProto .HEADER_FORMAT , requestConfig .get (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ()));
399- }
403+ if (requestConfig .containsKey (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ())) {
404+ req .addHeader (ClickHouseHttpProto .HEADER_FORMAT , requestConfig .get (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ()));
405+ }
400406
401- if (requestConfig .containsKey (ClientConfigProperties .QUERY_ID .getKey ())) {
402- req .addHeader (ClickHouseHttpProto .HEADER_QUERY_ID , requestConfig .get (ClientConfigProperties .QUERY_ID .getKey ()).toString ());
403- }
407+ if (requestConfig .containsKey (ClientConfigProperties .QUERY_ID .getKey ())) {
408+ req .addHeader (ClickHouseHttpProto .HEADER_QUERY_ID , requestConfig .get (ClientConfigProperties .QUERY_ID .getKey ()).toString ());
409+ }
404410
405- if (requestConfig .containsKey (ClientConfigProperties .DATABASE .getKey ())) {
406- req .addHeader (ClickHouseHttpProto .HEADER_DATABASE , requestConfig .get (ClientConfigProperties .DATABASE .getKey ()));
407- } else {
408- req .addHeader (ClickHouseHttpProto .HEADER_DATABASE , chConfig .get (ClientConfigProperties .DATABASE .getKey ()));
409- }
411+ if (requestConfig .containsKey (ClientConfigProperties .DATABASE .getKey ())) {
412+ req .addHeader (ClickHouseHttpProto .HEADER_DATABASE , requestConfig .get (ClientConfigProperties .DATABASE .getKey ()));
413+ } else {
414+ req .addHeader (ClickHouseHttpProto .HEADER_DATABASE , chConfig .get (ClientConfigProperties .DATABASE .getKey ()));
410415 }
411416
412417 if (MapUtils .getFlag (chConfig , "ssl_authentication" , false )) {
@@ -424,9 +429,9 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
424429 req .addHeader (HttpHeaders .PROXY_AUTHORIZATION , proxyAuthHeaderValue );
425430 }
426431
427- boolean serverCompression = chConfiguration . getOrDefault ( ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey (), "false" ). equalsIgnoreCase ( "true" );
428- boolean clientCompression = chConfiguration . getOrDefault ( ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey (), "false" ). equalsIgnoreCase ( "true" );
429- boolean useHttpCompression = chConfiguration . getOrDefault ( "client.use_http_compression" , "false" ). equalsIgnoreCase ( "true" );
432+ boolean clientCompression = MapUtils . getFlag ( requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey ());
433+ boolean serverCompression = MapUtils . getFlag ( requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey ());
434+ boolean useHttpCompression = MapUtils . getFlag ( requestConfig , chConfiguration , ClientConfigProperties . USE_HTTP_COMPRESSION . getKey () );
430435
431436 if (useHttpCompression ) {
432437 if (serverCompression ) {
@@ -461,10 +466,6 @@ private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String,
461466 req .setHeader (HttpHeaders .USER_AGENT , userAgent == null ? httpClientUserAgentPart : userAgent .getValue () + " " + httpClientUserAgentPart );
462467 }
463468 private void addQueryParams (URIBuilder req , Map <String , String > chConfig , Map <String , Object > requestConfig ) {
464- if (requestConfig == null ) {
465- requestConfig = Collections .emptyMap ();
466- }
467-
468469 for (Map .Entry <String , String > entry : chConfig .entrySet ()) {
469470 if (entry .getKey ().startsWith (ClientConfigProperties .SERVER_SETTING_PREFIX )) {
470471 req .addParameter (entry .getKey ().substring (ClientConfigProperties .SERVER_SETTING_PREFIX .length ()), entry .getValue ());
@@ -481,10 +482,9 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
481482 }
482483 }
483484
484- boolean serverCompression = chConfiguration .getOrDefault (ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey (), "false" ).equalsIgnoreCase ("true" );
485- boolean clientCompression = chConfiguration .getOrDefault (ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey (), "false" ).equalsIgnoreCase ("true" );
486- boolean useHttpCompression = chConfiguration .getOrDefault ("client.use_http_compression" , "false" ).equalsIgnoreCase ("true" );
487-
485+ boolean clientCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey ());
486+ boolean serverCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey ());
487+ boolean useHttpCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .USE_HTTP_COMPRESSION .getKey ());
488488
489489 if (useHttpCompression ) {
490490 // enable_http_compression make server react on http header
@@ -514,11 +514,21 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
514514 }
515515 }
516516
517- private HttpEntity wrapEntity (HttpEntity httpEntity , int httpStatus , boolean isResponse ) {
518- boolean serverCompression = MapUtils .getFlag (chConfiguration , ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey (), false );
519- boolean clientCompression = MapUtils .getFlag (chConfiguration , ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getKey (), false );
517+ private HttpEntity wrapRequestEntity (HttpEntity httpEntity , boolean clientCompression , boolean useHttpCompression ) {
518+ LOG .debug ("client compression: {}, http compression: {}" , clientCompression , useHttpCompression );
519+
520+ if (clientCompression ) {
521+ return new LZ4Entity (httpEntity , useHttpCompression , false , true ,
522+ MapUtils .getInt (chConfiguration , "compression.lz4.uncompressed_buffer_size" ), false );
523+ } else {
524+ return httpEntity ;
525+ }
526+ }
527+
528+ private HttpEntity wrapResponseEntity (HttpEntity httpEntity , int httpStatus , boolean serverCompression , boolean useHttpCompression ) {
529+ LOG .debug ("server compression: {}, http compression: {}" , serverCompression , useHttpCompression );
520530
521- if (serverCompression || clientCompression ) {
531+ if (serverCompression ) {
522532 // Server doesn't compress certain errors like 403
523533 switch (httpStatus ) {
524534 case HttpStatus .SC_OK :
@@ -531,9 +541,8 @@ private HttpEntity wrapEntity(HttpEntity httpEntity, int httpStatus, boolean isR
531541 case HttpStatus .SC_BAD_REQUEST :
532542 case HttpStatus .SC_INTERNAL_SERVER_ERROR :
533543 case HttpStatus .SC_NOT_FOUND :
534- boolean useHttpCompression = MapUtils .getFlag (chConfiguration , "client.use_http_compression" , false );
535- return new LZ4Entity (httpEntity , useHttpCompression , serverCompression , clientCompression ,
536- MapUtils .getInt (chConfiguration , "compression.lz4.uncompressed_buffer_size" ), isResponse );
544+ return new LZ4Entity (httpEntity , useHttpCompression , true , false ,
545+ MapUtils .getInt (chConfiguration , "compression.lz4.uncompressed_buffer_size" ), true );
537546 }
538547 }
539548
0 commit comments