5151import org .apache .hc .core5 .http .io .SocketConfig ;
5252import org .apache .hc .core5 .http .io .entity .EntityTemplate ;
5353import org .apache .hc .client5 .http .entity .mime .MultipartEntityBuilder ;
54- import java . io . ByteArrayOutputStream ;
54+ import org . apache . hc . client5 . http . entity . mime . ContentBody ;
5555import org .apache .hc .core5 .http .protocol .HttpContext ;
5656import org .apache .hc .core5 .io .CloseMode ;
5757import org .apache .hc .core5 .io .IOCallback ;
@@ -447,13 +447,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
447447 // Create multipart entity with query body and statement params
448448 MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder .create ();
449449
450- // Add query/body data as binary body
450+ // Add query/body data as streaming binary body
451451 if (writeCallback != null ) {
452- // Write callback output to buffer, then add as binary body
453- ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
454- writeCallback .execute (buffer );
455- byte [] queryData = buffer .toByteArray ();
456- multipartBuilder .addBinaryBody ("query" , queryData , CONTENT_TYPE , null );
452+ // Use streaming ContentBody to avoid buffering large queries in memory
453+ StreamingContentBody queryBody = new StreamingContentBody (writeCallback , CONTENT_TYPE );
454+ multipartBuilder .addPart ("query" , queryBody );
457455 }
458456
459457 // Add statement params as multipart parts
@@ -465,14 +463,25 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
465463
466464 requestEntity = multipartBuilder .build ();
467465 // Update content type header for multipart
466+ // Note: Content-Encoding is a separate header and will be preserved
467+ // The multipart entity's Content-Type includes the boundary which is required
468468 req .setHeader (HttpHeaders .CONTENT_TYPE , requestEntity .getContentType ());
469469 } else {
470470 // No statement params - use regular entity template
471471 requestEntity = new EntityTemplate (-1 , CONTENT_TYPE , contentEncoding , writeCallback );
472472 }
473473
474474 // Wrap entity with compression if enabled
475- req .setEntity (wrapRequestEntity (requestEntity , lz4Factory , requestConfig ));
475+ // This will set Content-Encoding header if compression is enabled
476+ HttpEntity wrappedEntity = wrapRequestEntity (requestEntity , lz4Factory , requestConfig );
477+
478+ // Ensure Content-Encoding header is set on request if entity has it
479+ // This preserves compression settings even after setting multipart Content-Type
480+ if (wrappedEntity .getContentEncoding () != null ) {
481+ req .setHeader (HttpHeaders .CONTENT_ENCODING , wrappedEntity .getContentEncoding ());
482+ }
483+
484+ req .setEntity (wrappedEntity );
476485
477486 HttpClientContext context = HttpClientContext .create ();
478487 Number responseTimeout = ClientConfigProperties .SOCKET_OPERATION_TIMEOUT .getOrDefault (requestConfig );
@@ -944,4 +953,92 @@ protected void prepareSocket(SSLSocket socket, HttpContext context) throws IOExc
944953 }
945954 }
946955 }
956+
957+ /**
958+ * Streaming ContentBody implementation that writes data from an IOCallback without buffering.
959+ * This avoids OutOfMemoryError for large queries by streaming data directly to the output stream.
960+ */
961+ private static class StreamingContentBody implements ContentBody {
962+ private final IOCallback <OutputStream > writeCallback ;
963+ private final ContentType contentType ;
964+
965+ StreamingContentBody (IOCallback <OutputStream > writeCallback , ContentType contentType ) {
966+ this .writeCallback = writeCallback ;
967+ this .contentType = contentType ;
968+ }
969+
970+ @ Override
971+ public String getFilename () {
972+ return null ;
973+ }
974+
975+ @ Override
976+ public String getMimeType () {
977+ return contentType != null ? contentType .getMimeType () : null ;
978+ }
979+
980+ @ Override
981+ public String getCharset () {
982+ return contentType != null && contentType .getCharset () != null ?
983+ contentType .getCharset ().name () : null ;
984+ }
985+
986+ @ Override
987+ public long getContentLength () {
988+ return -1 ; // Unknown length - streaming
989+ }
990+
991+ @ Override
992+ public void writeTo (OutputStream outStream ) throws IOException {
993+ // Wrap the stream to prevent premature closure
994+ // Multipart entities may call writeTo() multiple times, so we must not close the stream
995+ OutputStream nonClosingStream = new OutputStream () {
996+ @ Override
997+ public void write (int b ) throws IOException {
998+ outStream .write (b );
999+ }
1000+
1001+ @ Override
1002+ public void write (byte [] b ) throws IOException {
1003+ outStream .write (b );
1004+ }
1005+
1006+ @ Override
1007+ public void write (byte [] b , int off , int len ) throws IOException {
1008+ outStream .write (b , off , len );
1009+ }
1010+
1011+ @ Override
1012+ public void flush () throws IOException {
1013+ outStream .flush ();
1014+ }
1015+
1016+ @ Override
1017+ public void close () throws IOException {
1018+ // Do not close - let the caller (multipart entity) handle stream closure
1019+ // This prevents "Stream already closed" errors when writeTo() is called multiple times
1020+ flush ();
1021+ }
1022+ };
1023+ writeCallback .execute (nonClosingStream );
1024+ }
1025+
1026+ @ Override
1027+ public String getMediaType () {
1028+ String mimeType = getMimeType ();
1029+ if (mimeType != null && mimeType .contains ("/" )) {
1030+ return mimeType .substring (0 , mimeType .indexOf ('/' ));
1031+ }
1032+ return null ;
1033+ }
1034+
1035+ @ Override
1036+ public String getSubType () {
1037+ String mimeType = getMimeType ();
1038+ if (mimeType != null && mimeType .contains ("/" )) {
1039+ return mimeType .substring (mimeType .indexOf ('/' ) + 1 );
1040+ }
1041+ return null ;
1042+ }
1043+ }
9471044}
0 commit comments