@@ -122,6 +122,7 @@ public class Client implements AutoCloseable {
122122
123123 private boolean useNewImplementation = false ;
124124
125+ private ClickHouseClient oldClient = null ;
125126
126127 private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ) {
127128 this .endpoints = endpoints ;
@@ -139,6 +140,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
139140 this .httpClientHelper = new HttpAPIClientHelper (configuration );
140141 LOG .info ("Using new http client implementation" );
141142 } else {
143+ this .oldClient = ClientV1AdaptorHelper .createClient (configuration );
142144 LOG .info ("Using old http client implementation" );
143145 }
144146 }
@@ -168,6 +170,10 @@ public void close() {
168170 } catch (Exception e ) {
169171 LOG .error ("Failed to close shared operation executor" , e );
170172 }
173+
174+ if (oldClient != null ) {
175+ oldClient .close ();
176+ }
171177 }
172178
173179 public static class Builder {
@@ -663,8 +669,14 @@ public boolean ping() {
663669 * @return true if the server is alive, false otherwise
664670 */
665671 public boolean ping (long timeout ) {
666- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
667- return client .ping (getServerNode (), Math .toIntExact (timeout ));
672+ if (useNewImplementation ) {
673+ try (QueryResponse response = query ("SELECT 1 FORMAT TabSeparated" ).get (timeout , TimeUnit .MILLISECONDS )) {
674+ return true ;
675+ } catch (Exception e ) {
676+ return false ;
677+ }
678+ } else {
679+ return oldClient .ping (getServerNode (), Math .toIntExact (timeout ));
668680 }
669681 }
670682
@@ -986,43 +998,41 @@ public CompletableFuture<InsertResponse> insert(String tableName,
986998 } else {
987999 CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
9881000
989- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
990- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
991- .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
1001+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
1002+ .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
9921003
993- CompletableFuture <ClickHouseResponse > future = null ;
994- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
995- future = request .data (stream .getInputStream ()).execute ();
1004+ CompletableFuture <ClickHouseResponse > future = null ;
1005+ try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
1006+ future = request .data (stream .getInputStream ()).execute ();
9961007
997- //Copy the data from the input stream to the output stream
998- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
999- int bytesRead ;
1000- while ((bytesRead = data .read (buffer )) != -1 ) {
1001- stream .write (buffer , 0 , bytesRead );
1002- }
1003- } catch (IOException e ) {
1004- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
1008+ //Copy the data from the input stream to the output stream
1009+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1010+ int bytesRead ;
1011+ while ((bytesRead = data .read (buffer )) != -1 ) {
1012+ stream .write (buffer , 0 , bytesRead );
10051013 }
1014+ } catch (IOException e ) {
1015+ responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
1016+ }
10061017
1007- if (!responseFuture .isCompletedExceptionally ()) {
1008- try {
1009- int operationTimeout = getOperationTimeout ();
1010- ClickHouseResponse clickHouseResponse ;
1011- if (operationTimeout > 0 ) {
1012- clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
1013- } else {
1014- clickHouseResponse = future .get ();
1015- }
1016- InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
1017- responseFuture .complete (response );
1018- } catch (ExecutionException e ) {
1019- responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
1020- } catch (InterruptedException | TimeoutException e ) {
1021- responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
1018+ if (!responseFuture .isCompletedExceptionally ()) {
1019+ try {
1020+ int operationTimeout = getOperationTimeout ();
1021+ ClickHouseResponse clickHouseResponse ;
1022+ if (operationTimeout > 0 ) {
1023+ clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
1024+ } else {
1025+ clickHouseResponse = future .get ();
10221026 }
1027+ InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
1028+ responseFuture .complete (response );
1029+ } catch (ExecutionException e ) {
1030+ responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
1031+ } catch (InterruptedException | TimeoutException e ) {
1032+ responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
10231033 }
1024- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
10251034 }
1035+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
10261036
10271037 return responseFuture ;
10281038 }
@@ -1127,7 +1137,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11271137 metrics .setQueryId (queryId );
11281138 metrics .operationComplete ();
11291139
1130- return new QueryResponse (httpResponse , finalSettings , metrics );
1140+ return new QueryResponse (httpResponse , finalSettings . getFormat () , metrics );
11311141 } catch (ClientException e ) {
11321142 throw e ;
11331143 } catch (Exception e ) {
@@ -1138,8 +1148,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11381148 }, sharedOperationExecutor );
11391149 return future ;
11401150 } else {
1141- ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration );
1142- ClickHouseRequest <?> request = client .read (getServerNode ());
1151+ ClickHouseRequest <?> request = oldClient .read (getServerNode ());
11431152 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
11441153 request .settings (SettingsConverter .toRequestSettings (settings .getAllSettings (), queryParams ));
11451154 request .option (ClickHouseClientOption .ASYNC , false ); // we have own async handling
@@ -1160,7 +1169,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11601169 clickHouseResponse = request .execute ().get ();
11611170 }
11621171
1163- return new QueryResponse (client , clickHouseResponse , finalSettings , format , clientStats );
1172+ return new QueryResponse (clickHouseResponse , format , clientStats );
11641173 } catch (ClientException e ) {
11651174 throw e ;
11661175 } catch (Exception e ) {
0 commit comments