|
50 | 50 | import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; |
51 | 51 | import org.apache.hc.core5.http.io.SocketConfig; |
52 | 52 | import org.apache.hc.core5.http.io.entity.EntityTemplate; |
| 53 | +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; |
| 54 | +import java.io.ByteArrayOutputStream; |
53 | 55 | import org.apache.hc.core5.http.protocol.HttpContext; |
54 | 56 | import org.apache.hc.core5.io.CloseMode; |
55 | 57 | import org.apache.hc.core5.io.IOCallback; |
@@ -432,12 +434,45 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r |
432 | 434 | // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding |
433 | 435 | addHeaders(req, requestConfig); |
434 | 436 |
|
435 | | - |
436 | | - // setting entity. wrapping if compression is enabled |
437 | | - String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; |
438 | | - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), |
439 | | - lz4Factory, |
440 | | - requestConfig)); |
| 437 | + // Check if statement params exist - if so, send as multipart |
| 438 | + boolean hasStatementParams = requestConfig.containsKey(KEY_STATEMENT_PARAMS); |
| 439 | + Map<?, ?> statementParams = hasStatementParams ? |
| 440 | + (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS) : null; |
| 441 | + |
| 442 | + HttpEntity requestEntity; |
| 443 | + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? |
| 444 | + req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; |
| 445 | + |
| 446 | + if (hasStatementParams && statementParams != null && !statementParams.isEmpty()) { |
| 447 | + // Create multipart entity with query body and statement params |
| 448 | + MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create(); |
| 449 | + |
| 450 | + // Add query/body data as binary body |
| 451 | + if (writeCallback != null) { |
| 452 | + // Write callback output to buffer, then add as binary body |
| 453 | + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); |
| 454 | + writeCallback.execute(buffer); |
| 455 | + byte[] queryData = buffer.toByteArray(); |
| 456 | + multipartBuilder.addBinaryBody("query", queryData, CONTENT_TYPE, null); |
| 457 | + } |
| 458 | + |
| 459 | + // Add statement params as multipart parts |
| 460 | + for (Map.Entry<?, ?> entry : statementParams.entrySet()) { |
| 461 | + String paramName = "param_" + entry.getKey().toString(); |
| 462 | + String paramValue = String.valueOf(entry.getValue()); |
| 463 | + multipartBuilder.addTextBody(paramName, paramValue); |
| 464 | + } |
| 465 | + |
| 466 | + requestEntity = multipartBuilder.build(); |
| 467 | + // Update content type header for multipart |
| 468 | + req.setHeader(HttpHeaders.CONTENT_TYPE, requestEntity.getContentType()); |
| 469 | + } else { |
| 470 | + // No statement params - use regular entity template |
| 471 | + requestEntity = new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback); |
| 472 | + } |
| 473 | + |
| 474 | + // Wrap entity with compression if enabled |
| 475 | + req.setEntity(wrapRequestEntity(requestEntity, lz4Factory, requestConfig)); |
441 | 476 |
|
442 | 477 | HttpClientContext context = HttpClientContext.create(); |
443 | 478 | Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); |
@@ -595,10 +630,7 @@ private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) { |
595 | 630 | if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { |
596 | 631 | req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); |
597 | 632 | } |
598 | | - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { |
599 | | - Map<?, ?> params = (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS); |
600 | | - params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); |
601 | | - } |
| 633 | + // Statement params are now sent as multipart, not query params |
602 | 634 |
|
603 | 635 | boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); |
604 | 636 | boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); |
|
0 commit comments