@@ -418,7 +418,7 @@ public Builder addProxy(ProxyType type, String host, int port) {
418418 ValidationUtils .checkNonBlank (host , "host" );
419419 ValidationUtils .checkRange (port , 1 , ValidationUtils .TCP_PORT_NUMBER_MAX , "port" );
420420
421- this .configuration .put (ClickHouseClientOption .PROXY_TYPE .getKey (), type .toString ());
421+ this .configuration .put (ClickHouseClientOption .PROXY_TYPE .getKey (), type .name ());
422422 this .configuration .put (ClickHouseClientOption .PROXY_HOST .getKey (), host );
423423 this .configuration .put (ClickHouseClientOption .PROXY_PORT .getKey (), String .valueOf (port ));
424424 return this ;
@@ -611,6 +611,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
611611
612612 String operationId = startOperation ();
613613 settings .setOperationId (operationId );
614+ if (useNewImplementation ) {
615+ globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
616+ }
614617 globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
615618
616619 if (data == null || data .isEmpty ()) {
@@ -625,29 +628,90 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
625628 boolean hasDefaults = this .hasDefaults .get (data .get (0 ).getClass ());
626629 ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
627630
628- //Create an output stream to write the data to
629- ByteArrayOutputStream stream = new ByteArrayOutputStream ();
630-
631631 //Lookup the Serializer for the POJO
632632 List <POJOSerializer > serializers = this .serializers .get (data .get (0 ).getClass ());
633633 if (serializers == null || serializers .isEmpty ()) {
634634 throw new SerializerNotFoundException (data .get (0 ).getClass ());
635635 }
636636
637- //Call the static .serialize method on the POJOSerializer for each object in the list
638- for (Object obj : data ) {
639- for (POJOSerializer serializer : serializers ) {
640- try {
641- serializer .serialize (obj , stream );
642- } catch (InvocationTargetException | IllegalAccessException | IOException e ) {
643- throw new DataSerializationException (obj , serializer , e );
637+ if (useNewImplementation ) {
638+ String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
639+ final int maxRetries = retry == null ? (int ) ClickHouseClientOption .RETRY .getDefaultValue () : Integer .parseInt (retry );
640+
641+ settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
642+ final InsertSettings finalSettings = settings ;
643+ CompletableFuture <InsertResponse > future = CompletableFuture .supplyAsync (() -> {
644+ // Selecting some node
645+ ClickHouseNode selectedNode = getNextAliveNode ();
646+
647+ for (int i = 0 ; i <= maxRetries ; i ++) {
648+ // Execute request
649+ try (ClassicHttpResponse httpResponse =
650+ httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (),
651+ out -> {
652+ out .write ("INSERT INTO " .getBytes ());
653+ out .write (tableName .getBytes ());
654+ out .write (" \n FORMAT " .getBytes ());
655+ out .write (format .name ().getBytes ());
656+ out .write (" \n " .getBytes ());
657+ for (Object obj : data ) {
658+ for (POJOSerializer serializer : serializers ) {
659+ try {
660+ serializer .serialize (obj , out );
661+ } catch (InvocationTargetException | IllegalAccessException | IOException e ) {
662+ throw new DataSerializationException (obj , serializer , e );
663+ }
664+ }
665+ }
666+ })) {
667+
668+
669+ // Check response
670+ if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
671+ LOG .warn ("Failed to get response. Server returned {}. Retrying." , httpResponse .getCode ());
672+ selectedNode = getNextAliveNode ();
673+ continue ;
674+ }
675+
676+ ClientStatisticsHolder clientStats = globalClientStats .remove (operationId );
677+ OperationMetrics metrics = new OperationMetrics (clientStats );
678+ String summary = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_SRV_SUMMARY ), "{}" );
679+ ProcessParser .parseSummary (summary , metrics );
680+ String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .QPARAM_QUERY_ID ), finalSettings .getQueryId (), String ::valueOf );
681+ metrics .operationComplete ();
682+ metrics .setQueryId (queryId );
683+ return new InsertResponse (metrics );
684+ } catch (NoHttpResponseException e ) {
685+ LOG .warn ("Failed to get response. Retrying." , e );
686+ selectedNode = getNextAliveNode ();
687+ continue ;
688+ } catch (IOException e ) {
689+ LOG .info ("Interrupted while waiting for response." );
690+ throw new ClientException ("Failed to get query response" , e );
691+ }
692+ }
693+ throw new ClientException ("Failed to get table schema: too many retries" );
694+ }, sharedOperationExecutor );
695+ return future ;
696+ } else {
697+ //Create an output stream to write the data to
698+ ByteArrayOutputStream stream = new ByteArrayOutputStream ();
699+
700+ //Call the static .serialize method on the POJOSerializer for each object in the list
701+ for (Object obj : data ) {
702+ for (POJOSerializer serializer : serializers ) {
703+ try {
704+ serializer .serialize (obj , stream );
705+ } catch (InvocationTargetException | IllegalAccessException | IOException e ) {
706+ throw new DataSerializationException (obj , serializer , e );
707+ }
644708 }
645709 }
646- }
647710
648- globalClientStats .get (operationId ).stop (ClientMetrics .OP_SERIALIZATION );
649- LOG .debug ("Total serialization time: {}" , globalClientStats .get (operationId ).getElapsedTime ("serialization" ));
650- return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), format , settings );
711+ globalClientStats .get (operationId ).stop (ClientMetrics .OP_SERIALIZATION );
712+ LOG .debug ("Total serialization time: {}" , globalClientStats .get (operationId ).getElapsedTime ("serialization" ));
713+ return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), format , settings );
714+ }
651715 }
652716
653717 /**
@@ -683,47 +747,122 @@ public CompletableFuture<InsertResponse> insert(String tableName,
683747 ClientStatisticsHolder clientStats = globalClientStats .remove (operationId );
684748 clientStats .start (ClientMetrics .OP_DURATION );
685749
686- CompletableFuture < InsertResponse > responseFuture = new CompletableFuture <>();
750+ if ( useNewImplementation ) {
687751
688- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
689- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
690- .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
691-
692- CompletableFuture <ClickHouseResponse > future = null ;
693- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
694- future = request .data (stream .getInputStream ()).execute ();
695-
696- //Copy the data from the input stream to the output stream
697- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
698- int bytesRead ;
699- while ((bytesRead = data .read (buffer )) != -1 ) {
700- stream .write (buffer , 0 , bytesRead );
701- }
702- } catch (IOException e ) {
703- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
752+ String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
753+ final int maxRetries = retry == null ? (int ) ClickHouseClientOption .RETRY .getDefaultValue () : Integer .parseInt (retry );
754+ final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
755+ Integer .parseInt (configuration .getOrDefault (ClickHouseClientOption .WRITE_BUFFER_SIZE .getKey (), "8192" )) :
756+ settings .getInputStreamCopyBufferSize ();
757+
758+ if (writeBufferSize <= 0 ) {
759+ throw new IllegalArgumentException ("Buffer size must be greater than 0" );
704760 }
705761
706- if (!responseFuture .isCompletedExceptionally ()) {
707- try {
708- int operationTimeout = getOperationTimeout ();
709- ClickHouseResponse clickHouseResponse ;
710- if (operationTimeout > 0 ) {
711- clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
712- } else {
713- clickHouseResponse = future .get ();
762+ settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
763+ final InsertSettings finalSettings = settings ;
764+ CompletableFuture <InsertResponse > future = CompletableFuture .supplyAsync (() -> {
765+ // Selecting some node
766+ ClickHouseNode selectedNode = getNextAliveNode ();
767+
768+ for (int i = 0 ; i <= maxRetries ; i ++) {
769+ // Execute request
770+ try (ClassicHttpResponse httpResponse =
771+ httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (),
772+ out -> {
773+ out .write ("INSERT INTO " .getBytes ());
774+ out .write (tableName .getBytes ());
775+ out .write (" FORMAT " .getBytes ());
776+ out .write (format .name ().getBytes ());
777+ out .write (" \n " .getBytes ());
778+
779+ byte [] buffer = new byte [writeBufferSize ];
780+ int bytesRead ;
781+ while ((bytesRead = data .read (buffer )) != -1 ) {
782+ out .write (buffer , 0 , bytesRead );
783+ }
784+ out .flush ();
785+ })) {
786+
787+
788+ // Check response
789+ if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
790+ LOG .warn ("Failed to get response. Server returned {}. Retrying." , httpResponse .getCode ());
791+ selectedNode = getNextAliveNode ();
792+ continue ;
793+ }
794+
795+ OperationMetrics metrics = new OperationMetrics (clientStats );
796+ String summary = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .HEADER_SRV_SUMMARY ), "{}" );
797+ ProcessParser .parseSummary (summary , metrics );
798+ String queryId = HttpAPIClientHelper .getHeaderVal (httpResponse .getFirstHeader (ClickHouseHttpProto .QPARAM_QUERY_ID ), finalSettings .getQueryId (), String ::valueOf );
799+ metrics .operationComplete ();
800+ metrics .setQueryId (queryId );
801+ return new InsertResponse (metrics );
802+ } catch (NoHttpResponseException e ) {
803+ if (i < maxRetries ) {
804+ try {
805+ data .reset ();
806+ } catch (IOException ioe ) {
807+ throw new ClientException ("Failed to get response" , e );
808+ }
809+ LOG .warn ("Failed to get response. Retrying." , e );
810+ selectedNode = getNextAliveNode ();
811+ } else {
812+ throw new ClientException ("Server did not respond" , e );
813+ }
814+ continue ;
815+ } catch (IOException e ) {
816+ LOG .info ("Interrupted while waiting for response." );
817+ throw new ClientException ("Failed to get query response" , e );
818+ }
819+ }
820+ throw new ClientException ("Failed to insert data: too many retries" );
821+ }, sharedOperationExecutor );
822+ return future ;
823+ } else {
824+ CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
825+
826+ try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
827+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
828+ .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
829+
830+ CompletableFuture <ClickHouseResponse > future = null ;
831+ try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
832+ future = request .data (stream .getInputStream ()).execute ();
833+
834+ //Copy the data from the input stream to the output stream
835+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
836+ int bytesRead ;
837+ while ((bytesRead = data .read (buffer )) != -1 ) {
838+ stream .write (buffer , 0 , bytesRead );
839+ }
840+ } catch (IOException e ) {
841+ responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
842+ }
843+
844+ if (!responseFuture .isCompletedExceptionally ()) {
845+ try {
846+ int operationTimeout = getOperationTimeout ();
847+ ClickHouseResponse clickHouseResponse ;
848+ if (operationTimeout > 0 ) {
849+ clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
850+ } else {
851+ clickHouseResponse = future .get ();
852+ }
853+ InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
854+ responseFuture .complete (response );
855+ } catch (ExecutionException e ) {
856+ responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
857+ } catch (InterruptedException | TimeoutException e ) {
858+ responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
714859 }
715- InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
716- responseFuture .complete (response );
717- } catch (ExecutionException e ) {
718- responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
719- } catch (InterruptedException | TimeoutException e ) {
720- responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
721860 }
861+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
722862 }
723- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
724- }
725863
726- return responseFuture ;
864+ return responseFuture ;
865+ }
727866 }
728867
729868 /**
0 commit comments