@@ -123,6 +123,7 @@ public class Client implements AutoCloseable {
123123
124124 private boolean useNewImplementation = false ;
125125
126+ private ClickHouseClient oldClient = null ;
126127
127128 private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ) {
128129 this .endpoints = endpoints ;
@@ -140,6 +141,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
140141 this .httpClientHelper = new HttpAPIClientHelper (configuration );
141142 LOG .info ("Using new http client implementation" );
142143 } else {
144+ this .oldClient = ClientV1AdaptorHelper .createClient (configuration );
143145 LOG .info ("Using old http client implementation" );
144146 }
145147 }
@@ -169,6 +171,10 @@ public void close() {
169171 } catch (Exception e ) {
170172 LOG .error ("Failed to close shared operation executor" , e );
171173 }
174+
175+ if (oldClient != null ) {
176+ oldClient .close ();
177+ }
172178 }
173179
174180 public static class Builder {
@@ -623,8 +629,14 @@ public boolean ping() {
623629 * @return true if the server is alive, false otherwise
624630 */
625631 public boolean ping (long timeout ) {
626- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
627- return client .ping (getServerNode (), Math .toIntExact (timeout ));
632+ if (useNewImplementation ) {
633+ try (QueryResponse response = query ("SELECT 1 FORMAT TabSeparated" ).get (timeout , TimeUnit .MILLISECONDS )) {
634+ return true ;
635+ } catch (Exception e ) {
636+ return false ;
637+ }
638+ } else {
639+ return oldClient .ping (getServerNode (), Math .toIntExact (timeout ));
628640 }
629641 }
630642
@@ -947,43 +959,41 @@ public CompletableFuture<InsertResponse> insert(String tableName,
947959 } else {
948960 CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
949961
950- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
951- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
952- .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
962+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
963+ .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
953964
954- CompletableFuture <ClickHouseResponse > future = null ;
955- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
956- future = request .data (stream .getInputStream ()).execute ();
965+ CompletableFuture <ClickHouseResponse > future = null ;
966+ try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
967+ future = request .data (stream .getInputStream ()).execute ();
957968
958- //Copy the data from the input stream to the output stream
959- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
960- int bytesRead ;
961- while ((bytesRead = data .read (buffer )) != -1 ) {
962- stream .write (buffer , 0 , bytesRead );
963- }
964- } catch (IOException e ) {
965- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
969+ //Copy the data from the input stream to the output stream
970+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
971+ int bytesRead ;
972+ while ((bytesRead = data .read (buffer )) != -1 ) {
973+ stream .write (buffer , 0 , bytesRead );
966974 }
975+ } catch (IOException e ) {
976+ responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
977+ }
967978
968- if (!responseFuture .isCompletedExceptionally ()) {
969- try {
970- int operationTimeout = getOperationTimeout ();
971- ClickHouseResponse clickHouseResponse ;
972- if (operationTimeout > 0 ) {
973- clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
974- } else {
975- clickHouseResponse = future .get ();
976- }
977- InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
978- responseFuture .complete (response );
979- } catch (ExecutionException e ) {
980- responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
981- } catch (InterruptedException | TimeoutException e ) {
982- responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
979+ if (!responseFuture .isCompletedExceptionally ()) {
980+ try {
981+ int operationTimeout = getOperationTimeout ();
982+ ClickHouseResponse clickHouseResponse ;
983+ if (operationTimeout > 0 ) {
984+ clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
985+ } else {
986+ clickHouseResponse = future .get ();
983987 }
988+ InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
989+ responseFuture .complete (response );
990+ } catch (ExecutionException e ) {
991+ responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
992+ } catch (InterruptedException | TimeoutException e ) {
993+ responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
984994 }
985- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
986995 }
996+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
987997
988998 return responseFuture ;
989999 }
@@ -1086,7 +1096,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10861096 metrics .setQueryId (queryId );
10871097 metrics .operationComplete ();
10881098
1089- return new QueryResponse (httpResponse , finalSettings , metrics );
1099+ return new QueryResponse (httpResponse , finalSettings . getFormat () , metrics );
10901100 } catch (ClientException e ) {
10911101 throw e ;
10921102 } catch (Exception e ) {
@@ -1097,8 +1107,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10971107 }, sharedOperationExecutor );
10981108 return future ;
10991109 } else {
1100- ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration );
1101- ClickHouseRequest <?> request = client .read (getServerNode ());
1110+ ClickHouseRequest <?> request = oldClient .read (getServerNode ());
11021111 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
11031112 request .settings (SettingsConverter .toRequestSettings (settings .getAllSettings (), queryParams ));
11041113 request .option (ClickHouseClientOption .ASYNC , false ); // we have own async handling
@@ -1119,7 +1128,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11191128 clickHouseResponse = request .execute ().get ();
11201129 }
11211130
1122- return new QueryResponse (client , clickHouseResponse , finalSettings , format , clientStats );
1131+ return new QueryResponse (clickHouseResponse , format , clientStats );
11231132 } catch (ClientException e ) {
11241133 throw e ;
11251134 } catch (Exception e ) {
0 commit comments