3939import com .clickhouse .client .config .ClickHouseClientOption ;
4040import com .clickhouse .client .config .ClickHouseDefaults ;
4141import com .clickhouse .client .http .ClickHouseHttpProto ;
42- import com .clickhouse .client .http .config .ClickHouseHttpOption ;
4342import com .clickhouse .data .ClickHouseColumn ;
44- import com .clickhouse .data .ClickHouseDataStreamFactory ;
4543import com .clickhouse .data .ClickHouseFormat ;
46- import com .clickhouse .data .ClickHousePipedOutputStream ;
4744import com .clickhouse .data .format .BinaryStreamUtils ;
4845import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
4946import org .apache .hc .core5 .http .ClassicHttpResponse ;
7976import java .util .concurrent .Executors ;
8077import java .util .concurrent .TimeUnit ;
8178import java .util .concurrent .TimeoutException ;
79+ import java .util .function .Supplier ;
8280
8381import static java .time .temporal .ChronoUnit .SECONDS ;
8482
114112public class Client implements AutoCloseable {
115113 private HttpAPIClientHelper httpClientHelper = null ;
116114
117- private Set <String > endpoints ;
118- private Map <String , String > configuration ;
119- private List <ClickHouseNode > serverNodes = new ArrayList <>();
120- private Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
121- private Map <Class <?>, Map <String , Method >> getterMethods ;
122- private Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
115+ private final Set <String > endpoints ;
116+ private final Map <String , String > configuration ;
117+ private final List <ClickHouseNode > serverNodes = new ArrayList <>();
118+ private final Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
119+ private final Map <Class <?>, Map <String , Method >> getterMethods ;
120+ private final Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
123121 private static final Logger LOG = LoggerFactory .getLogger (Client .class );
124- private ExecutorService sharedOperationExecutor ;
122+ private final ExecutorService sharedOperationExecutor ;
125123
126- private Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
124+ private final Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
127125
128126 private boolean useNewImplementation = false ;
129127
130128 private ClickHouseClient oldClient = null ;
131129
132- private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ) {
130+ private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ,
131+ ExecutorService sharedOperationExecutor ) {
133132 this .endpoints = endpoints ;
134133 this .configuration = configuration ;
135134 this .endpoints .forEach (endpoint -> {
@@ -139,7 +138,12 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
139138 this .getterMethods = new HashMap <>();
140139 this .hasDefaults = new HashMap <>();
141140
142- this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
141+ boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClickHouseClientOption .ASYNC .getKey ());
142+ if (isAsyncEnabled && sharedOperationExecutor == null ) {
143+ this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
144+ } else {
145+ this .sharedOperationExecutor = sharedOperationExecutor ;
146+ }
143147 this .useNewImplementation = useNewImplementation ;
144148 if (useNewImplementation ) {
145149 this .httpClientHelper = new HttpAPIClientHelper (configuration );
@@ -169,7 +173,7 @@ public String getDefaultDatabase() {
169173 @ Override
170174 public void close () {
171175 try {
172- if (!sharedOperationExecutor .isShutdown ()) {
176+ if (sharedOperationExecutor != null && !sharedOperationExecutor .isShutdown ()) {
173177 this .sharedOperationExecutor .shutdownNow ();
174178 }
175179 } catch (Exception e ) {
@@ -188,6 +192,8 @@ public static class Builder {
188192 private Map <String , String > configuration ;
189193 private boolean useNewImplementation = false ;
190194
195+ private ExecutorService sharedOperationExecutor = null ;
196+
191197 public Builder () {
192198 this .endpoints = new HashSet <>();
193199 this .configuration = new HashMap <String , String >();
@@ -677,6 +683,34 @@ public Builder setServerTimeZone(String timeZone) {
677683 return this ;
678684 }
679685
686+ /**
687+ * Configures client to execute requests in a separate thread. By default, operations (query, insert)
688+ * are executed in the same thread as the caller.
689+ * It is possible to set a shared executor for all operations. See {@link #setSharedOperationExecutor(ExecutorService)}
690+ *
691+ * Note: Async operations a using executor what expects having a queue of tasks for a pool of executors.
692+ * The queue size limit is small it may quickly become a problem for scheduling new tasks.
693+ *
694+ * @param async - if to use async requests
695+ * @return
696+ */
697+ public Builder useAsyncRequests (boolean async ) {
698+ this .configuration .put (ClickHouseClientOption .ASYNC .getKey (), String .valueOf (async ));
699+ return this ;
700+ }
701+
702+ /**
703+ * Sets an executor for running operations. If async operations are enabled and no executor is specified
704+ * client will create a default executor.
705+ *
706+ * @param executorService - executor service for async operations
707+ * @return
708+ */
709+ public Builder setSharedOperationExecutor (ExecutorService executorService ) {
710+ this .sharedOperationExecutor = executorService ;
711+ return this ;
712+ }
713+
680714 public Client build () {
681715 this .configuration = setDefaults (this .configuration );
682716
@@ -722,7 +756,7 @@ public Client build() {
722756 throw new IllegalArgumentException ("Nor server timezone nor specific timezone is set" );
723757 }
724758
725- return new Client (this .endpoints , this .configuration , this .useNewImplementation );
759+ return new Client (this .endpoints , this .configuration , this .useNewImplementation , this . sharedOperationExecutor );
726760 }
727761
728762 private Map <String , String > setDefaults (Map <String , String > userConfig ) {
@@ -755,6 +789,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
755789 userConfig .put (ClickHouseClientOption .SERVER_TIME_ZONE .getKey (), "UTC" );
756790 }
757791
792+ if (!userConfig .containsKey (ClickHouseClientOption .ASYNC .getKey ())) {
793+ userConfig .put (ClickHouseClientOption .ASYNC .getKey (), "false" );
794+ }
795+
758796 if (!userConfig .containsKey (ClickHouseHttpOption .MAX_OPEN_CONNECTIONS .getKey ())) {
759797 userConfig .put (ClickHouseHttpOption .MAX_OPEN_CONNECTIONS .getKey (), "10" );
760798 }
@@ -945,7 +983,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
945983
946984 settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
947985 final InsertSettings finalSettings = settings ;
948- CompletableFuture <InsertResponse > future = CompletableFuture . supplyAsync ( () -> {
986+ Supplier <InsertResponse > supplier = () -> {
949987 // Selecting some node
950988 ClickHouseNode selectedNode = getNextAliveNode ();
951989
@@ -997,8 +1035,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
9971035 }
9981036 }
9991037 throw new ClientException ("Failed to get table schema: too many retries" );
1000- }, sharedOperationExecutor );
1001- return future ;
1038+ };
1039+
1040+ return runAsyncOperation (supplier , settings .getAllSettings ());
10021041 } else {
10031042 //Create an output stream to write the data to
10041043 ByteArrayOutputStream stream = new ByteArrayOutputStream ();
@@ -1053,6 +1092,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10531092 ClientStatisticsHolder clientStats = globalClientStats .remove (operationId );
10541093 clientStats .start (ClientMetrics .OP_DURATION );
10551094
1095+ Supplier <InsertResponse > responseSupplier ;
10561096 if (useNewImplementation ) {
10571097
10581098 String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
@@ -1067,7 +1107,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10671107
10681108 settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
10691109 final InsertSettings finalSettings = settings ;
1070- CompletableFuture < InsertResponse > future = CompletableFuture . supplyAsync ( () -> {
1110+ responseSupplier = () -> {
10711111 // Selecting some node
10721112 ClickHouseNode selectedNode = getNextAliveNode ();
10731113
@@ -1124,29 +1164,23 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11241164 }
11251165 }
11261166 throw new ClientException ("Failed to insert data: too many retries" );
1127- }, sharedOperationExecutor );
1128- return future ;
1167+ };
11291168 } else {
1130- CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
1131-
1132- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
1133- .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
1134-
1135- CompletableFuture <ClickHouseResponse > future = null ;
1136- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
1137- future = request .data (stream .getInputStream ()).execute ();
1138-
1139- //Copy the data from the input stream to the output stream
1140- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1141- int bytesRead ;
1142- while ((bytesRead = data .read (buffer )) != -1 ) {
1143- stream .write (buffer , 0 , bytesRead );
1144- }
1145- } catch (IOException e ) {
1146- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
1147- }
1169+ responseSupplier = () -> {
1170+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
1171+ .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
1172+
1173+ CompletableFuture <ClickHouseResponse > future = null ;
1174+ future = request .data (output -> {
1175+ //Copy the data from the input stream to the output stream
1176+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1177+ int bytesRead ;
1178+ while ((bytesRead = data .read (buffer )) != -1 ) {
1179+ output .write (buffer , 0 , bytesRead );
1180+ }
1181+ output .close ();
1182+ }).option (ClickHouseClientOption .ASYNC , false ).execute ();
11481183
1149- if (!responseFuture .isCompletedExceptionally ()) {
11501184 try {
11511185 int operationTimeout = getOperationTimeout ();
11521186 ClickHouseResponse clickHouseResponse ;
@@ -1156,17 +1190,17 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11561190 clickHouseResponse = future .get ();
11571191 }
11581192 InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
1159- responseFuture .complete (response );
1193+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
1194+ return response ;
11601195 } catch (ExecutionException e ) {
1161- responseFuture . completeExceptionally ( new ClientException ("Failed to get insert response" , e .getCause () ));
1196+ throw new ClientException ("Failed to get insert response" , e .getCause ());
11621197 } catch (InterruptedException | TimeoutException e ) {
1163- responseFuture . completeExceptionally ( new ClientException ("Operation has likely timed out." , e ) );
1198+ throw new ClientException ("Operation has likely timed out." , e );
11641199 }
1165- }
1166- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
1167-
1168- return responseFuture ;
1200+ };
11691201 }
1202+
1203+ return runAsyncOperation (responseSupplier , settings .getAllSettings ());
11701204 }
11711205
11721206 /**
@@ -1232,6 +1266,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12321266 clientStats .start (ClientMetrics .OP_DURATION );
12331267 applyDefaults (settings );
12341268
1269+ Supplier <QueryResponse > responseSupplier ;
1270+
12351271 if (useNewImplementation ) {
12361272 String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
12371273 final int maxRetries = retry == null ? (int ) ClickHouseClientOption .RETRY .getDefaultValue () : Integer .parseInt (retry );
@@ -1240,7 +1276,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12401276 settings .setOption ("statement_params" , queryParams );
12411277 }
12421278 final QuerySettings finalSettings = settings ;
1243- CompletableFuture < QueryResponse > future = CompletableFuture . supplyAsync ( () -> {
1279+ responseSupplier = () -> {
12441280 // Selecting some node
12451281 ClickHouseNode selectedNode = getNextAliveNode ();
12461282 for (int i = 0 ; i <= maxRetries ; i ++) {
@@ -1275,8 +1311,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12751311 }
12761312 }
12771313 throw new ClientException ("Failed to get table schema: too many retries" );
1278- }, sharedOperationExecutor );
1279- return future ;
1314+ };
12801315 } else {
12811316 ClickHouseRequest <?> request = oldClient .read (getServerNode ());
12821317 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
@@ -1287,7 +1322,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12871322 request .format (format );
12881323
12891324 final QuerySettings finalSettings = settings ;
1290- CompletableFuture < QueryResponse > future = CompletableFuture . supplyAsync ( () -> {
1325+ responseSupplier = () -> {
12911326 LOG .trace ("Executing request: {}" , request );
12921327 try {
12931328
@@ -1305,9 +1340,10 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
13051340 } catch (Exception e ) {
13061341 throw new ClientException ("Failed to get query response" , e );
13071342 }
1308- }, sharedOperationExecutor );
1309- return future ;
1343+ };
13101344 }
1345+
1346+ return runAsyncOperation (responseSupplier , settings .getAllSettings ());
13111347 }
13121348
13131349 /**
@@ -1339,13 +1375,13 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
13391375 settings .waitEndOfQuery (true ); // we rely on the summery
13401376
13411377 final QuerySettings finalSettings = settings ;
1342- return query (sqlQuery , settings ).thenApplyAsync (response -> {
1378+ return query (sqlQuery , settings ).thenApply (response -> {
13431379 try {
13441380 return new Records (response , finalSettings );
13451381 } catch (Exception e ) {
13461382 throw new ClientException ("Failed to get query response" , e );
13471383 }
1348- }, sharedOperationExecutor );
1384+ });
13491385 }
13501386
13511387 /**
@@ -1447,13 +1483,13 @@ public CompletableFuture<CommandResponse> execute(String sql, CommandSettings se
14471483 */
14481484 public CompletableFuture <CommandResponse > execute (String sql ) {
14491485 return query (sql )
1450- .thenApplyAsync (response -> {
1486+ .thenApply (response -> {
14511487 try {
14521488 return new CommandResponse (response );
14531489 } catch (Exception e ) {
14541490 throw new ClientException ("Failed to get command response" , e );
14551491 }
1456- }, sharedOperationExecutor );
1492+ });
14571493 }
14581494
14591495 /**
@@ -1514,6 +1550,11 @@ private void applyDefaults(QuerySettings settings) {
15141550 }
15151551 }
15161552
1553+ private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
1554+ boolean isAsync = MapUtils .getFlag (configuration , requestSettings , ClickHouseClientOption .ASYNC .getKey ());
1555+ return isAsync ? CompletableFuture .supplyAsync (resultSupplier , sharedOperationExecutor ) : CompletableFuture .completedFuture (resultSupplier .get ());
1556+ }
1557+
15171558 public String toString () {
15181559 return "Client{" +
15191560 "endpoints=" + endpoints +
0 commit comments