3939import com .clickhouse .client .config .ClickHouseClientOption ;
4040import com .clickhouse .client .config .ClickHouseDefaults ;
4141import com .clickhouse .client .http .ClickHouseHttpProto ;
42- import com .clickhouse .client .http .config .ClickHouseHttpOption ;
4342import com .clickhouse .data .ClickHouseColumn ;
44- import com .clickhouse .data .ClickHouseDataStreamFactory ;
4543import com .clickhouse .data .ClickHouseFormat ;
46- import com .clickhouse .data .ClickHousePipedOutputStream ;
4744import com .clickhouse .data .format .BinaryStreamUtils ;
48- import org .apache .commons .compress .compressors .lz4 .BlockLZ4CompressorOutputStream ;
49- import org .apache .commons .compress .compressors .lz4 .FramedLZ4CompressorOutputStream ;
5045import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
5146import org .apache .hc .core5 .http .ClassicHttpResponse ;
5247import org .apache .hc .core5 .http .HttpStatus ;
8176import java .util .concurrent .Executors ;
8277import java .util .concurrent .TimeUnit ;
8378import java .util .concurrent .TimeoutException ;
79+ import java .util .function .Supplier ;
8480
8581import static java .time .temporal .ChronoUnit .SECONDS ;
8682
116112public class Client implements AutoCloseable {
117113 private HttpAPIClientHelper httpClientHelper = null ;
118114
119- private Set <String > endpoints ;
120- private Map <String , String > configuration ;
121- private List <ClickHouseNode > serverNodes = new ArrayList <>();
122- private Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
123- private Map <Class <?>, Map <String , Method >> getterMethods ;
124- private Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
115+ private final Set <String > endpoints ;
116+ private final Map <String , String > configuration ;
117+ private final List <ClickHouseNode > serverNodes = new ArrayList <>();
118+ private final Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
119+ private final Map <Class <?>, Map <String , Method >> getterMethods ;
120+ private final Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
125121 private static final Logger LOG = LoggerFactory .getLogger (Client .class );
126- private ExecutorService sharedOperationExecutor ;
122+ private final ExecutorService sharedOperationExecutor ;
127123
128- private Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
124+ private final Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
129125
130126 private boolean useNewImplementation = false ;
131127
132128 private ClickHouseClient oldClient = null ;
133129
134- private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ) {
130+ private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ,
131+ ExecutorService sharedOperationExecutor ) {
135132 this .endpoints = endpoints ;
136133 this .configuration = configuration ;
137134 this .endpoints .forEach (endpoint -> {
@@ -141,7 +138,12 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
141138 this .getterMethods = new HashMap <>();
142139 this .hasDefaults = new HashMap <>();
143140
144- this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
141+ boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClickHouseClientOption .ASYNC .getKey ());
142+ if (isAsyncEnabled && sharedOperationExecutor == null ) {
143+ this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
144+ } else {
145+ this .sharedOperationExecutor = sharedOperationExecutor ;
146+ }
145147 this .useNewImplementation = useNewImplementation ;
146148 if (useNewImplementation ) {
147149 this .httpClientHelper = new HttpAPIClientHelper (configuration );
@@ -171,7 +173,7 @@ public String getDefaultDatabase() {
171173 @ Override
172174 public void close () {
173175 try {
174- if (!sharedOperationExecutor .isShutdown ()) {
176+ if (sharedOperationExecutor != null && !sharedOperationExecutor .isShutdown ()) {
175177 this .sharedOperationExecutor .shutdownNow ();
176178 }
177179 } catch (Exception e ) {
@@ -190,6 +192,8 @@ public static class Builder {
190192 private Map <String , String > configuration ;
191193 private boolean useNewImplementation = false ;
192194
195+ private ExecutorService sharedOperationExecutor = null ;
196+
193197 public Builder () {
194198 this .endpoints = new HashSet <>();
195199 this .configuration = new HashMap <String , String >();
@@ -605,6 +609,34 @@ public Builder setServerTimeZone(String timeZone) {
605609 return this ;
606610 }
607611
612+ /**
613+ * Configures client to execute requests in a separate thread. By default, operations (query, insert)
614+ * are executed in the same thread as the caller.
615+ * It is possible to set a shared executor for all operations. See {@link #setSharedOperationExecutor(ExecutorService)}
616+ *
617+ * Note: Async operations a using executor what expects having a queue of tasks for a pool of executors.
618+ * The queue size limit is small it may quickly become a problem for scheduling new tasks.
619+ *
620+ * @param async - if to use async requests
621+ * @return
622+ */
623+ public Builder useAsyncRequests (boolean async ) {
624+ this .configuration .put (ClickHouseClientOption .ASYNC .getKey (), String .valueOf (async ));
625+ return this ;
626+ }
627+
628+ /**
629+ * Sets an executor for running operations. If async operations are enabled and no executor is specified
630+ * client will create a default executor.
631+ *
632+ * @param executorService - executor service for async operations
633+ * @return
634+ */
635+ public Builder setSharedOperationExecutor (ExecutorService executorService ) {
636+ this .sharedOperationExecutor = executorService ;
637+ return this ;
638+ }
639+
608640 public Client build () {
609641 this .configuration = setDefaults (this .configuration );
610642
@@ -650,7 +682,7 @@ public Client build() {
650682 throw new IllegalArgumentException ("Nor server timezone nor specific timezone is set" );
651683 }
652684
653- return new Client (this .endpoints , this .configuration , this .useNewImplementation );
685+ return new Client (this .endpoints , this .configuration , this .useNewImplementation , this . sharedOperationExecutor );
654686 }
655687
656688 private Map <String , String > setDefaults (Map <String , String > userConfig ) {
@@ -683,6 +715,10 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
683715 userConfig .put (ClickHouseClientOption .SERVER_TIME_ZONE .getKey (), "UTC" );
684716 }
685717
718+ if (!userConfig .containsKey (ClickHouseClientOption .ASYNC .getKey ())) {
719+ userConfig .put (ClickHouseClientOption .ASYNC .getKey (), "false" );
720+ }
721+
686722 return userConfig ;
687723 }
688724 }
@@ -853,7 +889,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
853889
854890 settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
855891 final InsertSettings finalSettings = settings ;
856- CompletableFuture <InsertResponse > future = CompletableFuture . supplyAsync ( () -> {
892+ Supplier <InsertResponse > supplier = () -> {
857893 // Selecting some node
858894 ClickHouseNode selectedNode = getNextAliveNode ();
859895
@@ -905,8 +941,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
905941 }
906942 }
907943 throw new ClientException ("Failed to get table schema: too many retries" );
908- }, sharedOperationExecutor );
909- return future ;
944+ };
945+
946+ return runAsyncOperation (supplier , settings .getAllSettings ());
910947 } else {
911948 //Create an output stream to write the data to
912949 ByteArrayOutputStream stream = new ByteArrayOutputStream ();
@@ -961,6 +998,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
961998 ClientStatisticsHolder clientStats = globalClientStats .remove (operationId );
962999 clientStats .start (ClientMetrics .OP_DURATION );
9631000
1001+ Supplier <InsertResponse > responseSupplier ;
9641002 if (useNewImplementation ) {
9651003
9661004 String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
@@ -975,7 +1013,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
9751013
9761014 settings .setOption (ClickHouseClientOption .FORMAT .getKey (), format .name ());
9771015 final InsertSettings finalSettings = settings ;
978- CompletableFuture < InsertResponse > future = CompletableFuture . supplyAsync ( () -> {
1016+ responseSupplier = () -> {
9791017 // Selecting some node
9801018 ClickHouseNode selectedNode = getNextAliveNode ();
9811019
@@ -1032,29 +1070,23 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10321070 }
10331071 }
10341072 throw new ClientException ("Failed to insert data: too many retries" );
1035- }, sharedOperationExecutor );
1036- return future ;
1073+ };
10371074 } else {
1038- CompletableFuture <InsertResponse > responseFuture = new CompletableFuture <>();
1039-
1040- ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
1041- .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
1042-
1043- CompletableFuture <ClickHouseResponse > future = null ;
1044- try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
1045- future = request .data (stream .getInputStream ()).execute ();
1046-
1047- //Copy the data from the input stream to the output stream
1048- byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1049- int bytesRead ;
1050- while ((bytesRead = data .read (buffer )) != -1 ) {
1051- stream .write (buffer , 0 , bytesRead );
1052- }
1053- } catch (IOException e ) {
1054- responseFuture .completeExceptionally (new ClientException ("Failed to write data to the output stream" , e ));
1055- }
1075+ responseSupplier = () -> {
1076+ ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
1077+ .createMutationRequest (oldClient .write (getServerNode ()), tableName , settings , configuration ).format (format );
1078+
1079+ CompletableFuture <ClickHouseResponse > future = null ;
1080+ future = request .data (output -> {
1081+ //Copy the data from the input stream to the output stream
1082+ byte [] buffer = new byte [settings .getInputStreamCopyBufferSize ()];
1083+ int bytesRead ;
1084+ while ((bytesRead = data .read (buffer )) != -1 ) {
1085+ output .write (buffer , 0 , bytesRead );
1086+ }
1087+ output .close ();
1088+ }).option (ClickHouseClientOption .ASYNC , false ).execute ();
10561089
1057- if (!responseFuture .isCompletedExceptionally ()) {
10581090 try {
10591091 int operationTimeout = getOperationTimeout ();
10601092 ClickHouseResponse clickHouseResponse ;
@@ -1064,17 +1096,17 @@ public CompletableFuture<InsertResponse> insert(String tableName,
10641096 clickHouseResponse = future .get ();
10651097 }
10661098 InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
1067- responseFuture .complete (response );
1099+ LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
1100+ return response ;
10681101 } catch (ExecutionException e ) {
1069- responseFuture . completeExceptionally ( new ClientException ("Failed to get insert response" , e .getCause () ));
1102+ throw new ClientException ("Failed to get insert response" , e .getCause ());
10701103 } catch (InterruptedException | TimeoutException e ) {
1071- responseFuture . completeExceptionally ( new ClientException ("Operation has likely timed out." , e ) );
1104+ throw new ClientException ("Operation has likely timed out." , e );
10721105 }
1073- }
1074- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
1075-
1076- return responseFuture ;
1106+ };
10771107 }
1108+
1109+ return runAsyncOperation (responseSupplier , settings .getAllSettings ());
10781110 }
10791111
10801112 /**
@@ -1140,6 +1172,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11401172 clientStats .start (ClientMetrics .OP_DURATION );
11411173 applyDefaults (settings );
11421174
1175+ Supplier <QueryResponse > responseSupplier ;
1176+
11431177 if (useNewImplementation ) {
11441178 String retry = configuration .get (ClickHouseClientOption .RETRY .getKey ());
11451179 final int maxRetries = retry == null ? (int ) ClickHouseClientOption .RETRY .getDefaultValue () : Integer .parseInt (retry );
@@ -1148,7 +1182,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11481182 settings .setOption ("statement_params" , queryParams );
11491183 }
11501184 final QuerySettings finalSettings = settings ;
1151- CompletableFuture < QueryResponse > future = CompletableFuture . supplyAsync ( () -> {
1185+ responseSupplier = () -> {
11521186 // Selecting some node
11531187 ClickHouseNode selectedNode = getNextAliveNode ();
11541188 for (int i = 0 ; i <= maxRetries ; i ++) {
@@ -1183,8 +1217,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11831217 }
11841218 }
11851219 throw new ClientException ("Failed to get table schema: too many retries" );
1186- }, sharedOperationExecutor );
1187- return future ;
1220+ };
11881221 } else {
11891222 ClickHouseRequest <?> request = oldClient .read (getServerNode ());
11901223 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
@@ -1195,7 +1228,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
11951228 request .format (format );
11961229
11971230 final QuerySettings finalSettings = settings ;
1198- CompletableFuture < QueryResponse > future = CompletableFuture . supplyAsync ( () -> {
1231+ responseSupplier = () -> {
11991232 LOG .trace ("Executing request: {}" , request );
12001233 try {
12011234
@@ -1213,9 +1246,10 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12131246 } catch (Exception e ) {
12141247 throw new ClientException ("Failed to get query response" , e );
12151248 }
1216- }, sharedOperationExecutor );
1217- return future ;
1249+ };
12181250 }
1251+
1252+ return runAsyncOperation (responseSupplier , settings .getAllSettings ());
12191253 }
12201254
12211255 /**
@@ -1247,13 +1281,13 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
12471281 settings .waitEndOfQuery (true ); // we rely on the summery
12481282
12491283 final QuerySettings finalSettings = settings ;
1250- return query (sqlQuery , settings ).thenApplyAsync (response -> {
1284+ return query (sqlQuery , settings ).thenApply (response -> {
12511285 try {
12521286 return new Records (response , finalSettings );
12531287 } catch (Exception e ) {
12541288 throw new ClientException ("Failed to get query response" , e );
12551289 }
1256- }, sharedOperationExecutor );
1290+ });
12571291 }
12581292
12591293 /**
@@ -1355,13 +1389,13 @@ public CompletableFuture<CommandResponse> execute(String sql, CommandSettings se
13551389 */
13561390 public CompletableFuture <CommandResponse > execute (String sql ) {
13571391 return query (sql )
1358- .thenApplyAsync (response -> {
1392+ .thenApply (response -> {
13591393 try {
13601394 return new CommandResponse (response );
13611395 } catch (Exception e ) {
13621396 throw new ClientException ("Failed to get command response" , e );
13631397 }
1364- }, sharedOperationExecutor );
1398+ });
13651399 }
13661400
13671401 /**
@@ -1422,6 +1456,11 @@ private void applyDefaults(QuerySettings settings) {
14221456 }
14231457 }
14241458
1459+ private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
1460+ boolean isAsync = MapUtils .getFlag (configuration , requestSettings , ClickHouseClientOption .ASYNC .getKey ());
1461+ return isAsync ? CompletableFuture .supplyAsync (resultSupplier , sharedOperationExecutor ) : CompletableFuture .completedFuture (resultSupplier .get ());
1462+ }
1463+
14251464 public String toString () {
14261465 return "Client{" +
14271466 "endpoints=" + endpoints +
0 commit comments