@@ -119,6 +119,7 @@ public class Client implements AutoCloseable {
119119
120120 private boolean useNewImplementation = false ;
121121
122+ private ClickHouseClient oldClient = null ;
122123
123124 private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ) {
124125 this .endpoints = endpoints ;
@@ -136,6 +137,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
136137 this .httpClientHelper = new HttpAPIClientHelper (configuration );
137138 LOG .info ("Using new http client implementation" );
138139 } else {
140+ this .oldClient = ClientV1AdaptorHelper .createClient (configuration );
139141 LOG .info ("Using old http client implementation" );
140142 }
141143 }
@@ -165,6 +167,10 @@ public void close() {
165167 } catch (Exception e ) {
166168 LOG .error ("Failed to close shared operation executor" , e );
167169 }
170+
171+ if (oldClient != null ) {
172+ oldClient .close ();
173+ }
168174 }
169175
170176 public static class Builder {
@@ -589,8 +595,14 @@ public boolean ping() {
589595 * @return true if the server is alive, false otherwise
590596 */
591597 public boolean ping (long timeout ) {
592- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
593- return client .ping (getServerNode (), Math .toIntExact (timeout ));
598+ if (useNewImplementation ) {
599+ try (QueryResponse response = query ("SELECT 1 FORMAT TabSeparated" ).get (timeout , TimeUnit .MILLISECONDS )) {
600+ return true ;
601+ } catch (Exception e ) {
602+ return false ;
603+ }
604+ } else {
605+ return oldClient .ping (getServerNode (), Math .toIntExact (timeout ));
594606 }
595607 }
596608
@@ -912,43 +924,41 @@ public CompletableFuture<InsertResponse> insert(String tableName,
912924 } else {
913925 CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
914926
915- try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
916- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
917- .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
927+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
928+ .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
918929
919- CompletableFuture <ClickHouseResponse > future = null ;
920- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
921- future = request .data (stream .getInputStream ()).execute ();
930+ CompletableFuture <ClickHouseResponse > future = null ;
931+ try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
932+ future = request .data (stream .getInputStream ()).execute ();
922933
923- //Copy the data from the input stream to the output stream
924- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
925- int bytesRead ;
926- while ((bytesRead = data .read (buffer )) != -1 ) {
927- stream .write (buffer , 0 , bytesRead );
928- }
929- } catch (IOException e ) {
930- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
934+ //Copy the data from the input stream to the output stream
935+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
936+ int bytesRead ;
937+ while ((bytesRead = data .read (buffer )) != -1 ) {
938+ stream .write (buffer , 0 , bytesRead );
931939 }
940+ } catch (IOException e ) {
941+ responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
942+ }
932943
933- if (!responseFuture .isCompletedExceptionally ()) {
934- try {
935- int operationTimeout = getOperationTimeout ();
936- ClickHouseResponse clickHouseResponse ;
937- if (operationTimeout > 0 ) {
938- clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
939- } else {
940- clickHouseResponse = future .get ();
941- }
942- InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
943- responseFuture .complete (response );
944- } catch (ExecutionException e ) {
945- responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
946- } catch (InterruptedException | TimeoutException e ) {
947- responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
944+ if (!responseFuture .isCompletedExceptionally ()) {
945+ try {
946+ int operationTimeout = getOperationTimeout ();
947+ ClickHouseResponse clickHouseResponse ;
948+ if (operationTimeout > 0 ) {
949+ clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
950+ } else {
951+ clickHouseResponse = future .get ();
948952 }
953+ InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
954+ responseFuture .complete (response );
955+ } catch (ExecutionException e ) {
956+ responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
957+ } catch (InterruptedException | TimeoutException e ) {
958+ responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
949959 }
950- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
951960 }
961+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
952962
953963 return responseFuture ;
954964 }
@@ -1051,7 +1061,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10511061 metrics .setQueryId (queryId );
10521062 metrics .operationComplete ();
10531063
1054- return new QueryResponse (httpResponse , finalSettings , metrics );
1064+ return new QueryResponse (httpResponse , finalSettings . getFormat () , metrics );
10551065 } catch (ClientException e ) {
10561066 throw e ;
10571067 } catch (Exception e ) {
@@ -1062,8 +1072,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10621072 }, sharedOperationExecutor );
10631073 return future ;
10641074 } else {
1065- ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration );
1066- ClickHouseRequest <?> request = client .read (getServerNode ());
1075+ ClickHouseRequest <?> request = oldClient .read (getServerNode ());
10671076 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
10681077 request .settings (SettingsConverter .toRequestSettings (settings .getAllSettings (), queryParams ));
10691078 request .option (ClickHouseClientOption .ASYNC , false ); // we have own async handling
@@ -1084,7 +1093,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
10841093 clickHouseResponse = request .execute ().get ();
10851094 }
10861095
1087- return new QueryResponse (client , clickHouseResponse , finalSettings , format , clientStats );
1096+ return new QueryResponse (clickHouseResponse , format , clientStats );
10881097 } catch (ClientException e ) {
10891098 throw e ;
10901099 } catch (Exception e ) {
0 commit comments