2929import com .clickhouse .client .api .query .QuerySettings ;
3030import com .clickhouse .client .api .query .Records ;
3131import com .clickhouse .client .config .ClickHouseClientOption ;
32+ import com .clickhouse .client .config .ClickHouseDefaults ;
3233import com .clickhouse .data .ClickHouseColumn ;
3334import com .clickhouse .data .ClickHouseDataStreamFactory ;
3435import com .clickhouse .data .ClickHouseFormat ;
3536import com .clickhouse .data .ClickHousePipedOutputStream ;
3637import com .clickhouse .data .format .BinaryStreamUtils ;
38+ import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
3739import org .slf4j .Logger ;
3840import org .slf4j .LoggerFactory ;
3941
6062import java .util .concurrent .ExecutorService ;
6163import java .util .concurrent .Executors ;
6264import java .util .concurrent .TimeUnit ;
65+ import java .util .concurrent .TimeoutException ;
6366
6467import static java .time .temporal .ChronoUnit .SECONDS ;
6568
9396 *
9497 */
9598public class Client {
96- private static final long TIMEOUT = TimeUnit .SECONDS .toMillis (30 );
97-
98- private static final String DEFAULT_DB_NAME = "default" ;
9999
100100 private Set <String > endpoints ;
101101 private Map <String , String > configuration ;
102102 private List <ClickHouseNode > serverNodes = new ArrayList <>();
103- private Map <Class <?>, List <POJOSerializer >> serializers ;//Order is important to preserve for RowBinary
103+ private Map <Class <?>, List <POJOSerializer >> serializers ; //Order is important to preserve for RowBinary
104104 private Map <Class <?>, Map <String , Method >> getterMethods ;
105- private Map <Class <?>, Boolean > hasDefaults ;
105+ private Map <Class <?>, Boolean > hasDefaults ; // Whether the POJO has defaults
106106 private static final Logger LOG = LoggerFactory .getLogger (Client .class );
107- private ExecutorService queryExecutor ;
107+ private ExecutorService sharedOperationExecutor ;
108108
109109 private Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
110110
@@ -118,17 +118,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration) {
118118 this .getterMethods = new HashMap <>();
119119 this .hasDefaults = new HashMap <>();
120120
121- final int numThreads = Integer .parseInt (configuration .getOrDefault (
122- ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (), "3" ));
123- this .queryExecutor = Executors .newFixedThreadPool (numThreads , r -> {
124- Thread t = new Thread (r );
125- t .setName ("ClickHouse-Query-Executor" );
126- t .setUncaughtExceptionHandler ((t1 , e ) -> {
127- LOG .error ("Uncaught exception in thread {}" , t1 .getName (), e );
128- });
129- return t ;
130- });
131- LOG .debug ("Query executor created with {} threads" , numThreads );
121+ this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
132122 }
133123
134124 /**
@@ -142,6 +132,8 @@ public String getDefaultDatabase() {
142132
143133 public static class Builder {
144134 private Set <String > endpoints ;
135+
136+ // Read-only configuration
145137 private Map <String , String > configuration ;
146138
147139 public Builder () {
@@ -397,6 +389,17 @@ public Builder addProxy(ProxyType type, String host, int port) {
397389 return this ;
398390 }
399391
392+ /**
393+ * Sets the maximum time for operation to complete. By default, it is set to 3 hours.
394+ * @param timeout
395+ * @param timeUnit
396+ * @return
397+ */
398+ public Builder setExecutionTimeout (long timeout , TimeUnit timeUnit ) {
399+ this .configuration .put (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey (), String .valueOf (timeUnit .toMillis (timeout )));
400+ return this ;
401+ }
402+
400403 public Client build () {
401404 // check if endpoint are empty. so can not initiate client
402405 if (this .endpoints .isEmpty ()) {
@@ -406,11 +409,30 @@ public Client build() {
406409 if (!this .configuration .containsKey ("access_token" ) && (!this .configuration .containsKey ("user" ) || !this .configuration .containsKey ("password" ))) {
407410 throw new IllegalArgumentException ("Username and password are required" );
408411 }
412+
413+ this .configuration = setDefaults (this .configuration );
414+
415+ return new Client (this .endpoints , this .configuration );
416+ }
417+
418+ private Map <String , String > setDefaults (Map <String , String > userConfig ) {
419+
409420 // set default database name if not specified
410- if (!this . configuration .containsKey ("database" )) {
411- this . configuration . put ("database" , DEFAULT_DB_NAME );
421+ if (!userConfig .containsKey ("database" )) {
422+ userConfig . put ("database" , ( String ) ClickHouseDefaults . DATABASE . getDefaultValue () );
412423 }
413- return new Client (this .endpoints , this .configuration );
424+
425+ if (!userConfig .containsKey (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ())) {
426+ userConfig .put (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey (),
427+ String .valueOf (ClickHouseClientOption .MAX_EXECUTION_TIME .getDefaultValue ()));
428+ }
429+
430+ if (!userConfig .containsKey (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey ())) {
431+ userConfig .put (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (),
432+ String .valueOf (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getDefaultValue ()));
433+ }
434+
435+ return userConfig ;
414436 }
415437 }
416438
@@ -424,7 +446,7 @@ private ClickHouseNode getServerNode() {
424446 * @return true if the server is alive, false otherwise
425447 */
426448 public boolean ping () {
427- return ping (Client . TIMEOUT );
449+ return ping (getOperationTimeout () );
428450 }
429451
430452 /**
@@ -434,9 +456,6 @@ public boolean ping() {
434456 * @return true if the server is alive, false otherwise
435457 */
436458 public boolean ping (long timeout ) {
437- ValidationUtils .checkRange (timeout , TimeUnit .SECONDS .toMillis (1 ), TimeUnit .MINUTES .toMillis (10 ),
438- "timeout" );
439-
440459 try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
441460 return client .ping (getServerNode (), Math .toIntExact (timeout ));
442461 }
@@ -625,6 +644,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
625644 try (ClickHouseClient client = ClientV1AdaptorHelper .createClient (configuration )) {
626645 ClickHouseRequest .Mutation request = ClientV1AdaptorHelper
627646 .createMutationRequest (client .write (getServerNode ()), tableName , settings , configuration ).format (format );
647+
628648 CompletableFuture <ClickHouseResponse > future = null ;
629649 try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
630650 future = request .data (stream .getInputStream ()).execute ();
@@ -641,9 +661,18 @@ public CompletableFuture<InsertResponse> insert(String tableName,
641661
642662 if (!responseFuture .isCompletedExceptionally ()) {
643663 try {
644- InsertResponse response = new InsertResponse (client , future .get (), clientStats );
664+ int operationTimeout = getOperationTimeout ();
665+ ClickHouseResponse clickHouseResponse ;
666+ if (operationTimeout > 0 ) {
667+ clickHouseResponse = future .get (operationTimeout , TimeUnit .MILLISECONDS );
668+ } else {
669+ clickHouseResponse = future .get ();
670+ }
671+ InsertResponse response = new InsertResponse (client , clickHouseResponse , clientStats );
645672 responseFuture .complete (response );
646- } catch (InterruptedException | ExecutionException e ) {
673+ } catch (ExecutionException e ) {
674+ responseFuture .completeExceptionally (new ClientException ("Failed to get insert response" , e .getCause ()));
675+ } catch (InterruptedException | TimeoutException e ) {
647676 responseFuture .completeExceptionally (new ClientException ("Operation has likely timed out." , e ));
648677 }
649678 }
@@ -718,6 +747,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
718747 ClickHouseRequest <?> request = client .read (getServerNode ());
719748 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
720749 request .settings (SettingsConverter .toRequestSettings (settings .getAllSettings (), queryParams ));
750+ request .option (ClickHouseClientOption .ASYNC , false ); // we have own async handling
721751 request .query (sqlQuery , settings .getQueryId ());
722752 final ClickHouseFormat format = settings .getFormat ();
723753 request .format (format );
@@ -726,15 +756,22 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
726756 CompletableFuture <QueryResponse > future = CompletableFuture .supplyAsync (() -> {
727757 LOG .trace ("Executing request: {}" , request );
728758 try {
729- QueryResponse queryResponse = new QueryResponse (client , request .execute (), finalSettings , format , clientStats );
730- queryResponse .ensureDone ();
731- return queryResponse ;
759+
760+ int operationTimeout = getOperationTimeout ();
761+ ClickHouseResponse clickHouseResponse ;
762+ if (operationTimeout > 0 ) {
763+ clickHouseResponse = request .execute ().get (operationTimeout , TimeUnit .MILLISECONDS );
764+ } else {
765+ clickHouseResponse = request .execute ().get ();
766+ }
767+
768+ return new QueryResponse (client , clickHouseResponse , finalSettings , format , clientStats );
732769 } catch (ClientException e ) {
733770 throw e ;
734771 } catch (Exception e ) {
735772 throw new ClientException ("Failed to get query response" , e );
736773 }
737- }, queryExecutor );
774+ }, sharedOperationExecutor );
738775 return future ;
739776 }
740777
@@ -770,6 +807,7 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
770807 ClickHouseRequest <?> request = client .read (getServerNode ());
771808 request .options (SettingsConverter .toRequestOptions (settings .getAllSettings ()));
772809 request .settings (SettingsConverter .toRequestSettings (settings .getAllSettings (), null ));
810+ request .option (ClickHouseClientOption .ASYNC , false ); // we have own async handling
773811 request .query (sqlQuery , settings .getQueryId ());
774812 final ClickHouseFormat format = settings .getFormat ();
775813 request .format (format );
@@ -778,15 +816,22 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
778816 CompletableFuture <Records > future = CompletableFuture .supplyAsync (() -> {
779817 LOG .trace ("Executing request: {}" , request );
780818 try {
781- QueryResponse queryResponse = new QueryResponse (client , request .execute (), finalSettings , format , clientStats );
782- queryResponse .ensureDone ();
819+ int operationTimeout = getOperationTimeout ();
820+ ClickHouseResponse clickHouseResponse ;
821+ if (operationTimeout > 0 ) {
822+ clickHouseResponse = request .execute ().get (operationTimeout , TimeUnit .MILLISECONDS );
823+ } else {
824+ clickHouseResponse = request .execute ().get ();
825+ }
826+
827+ QueryResponse queryResponse = new QueryResponse (client , clickHouseResponse , finalSettings , format , clientStats );
783828 return new Records (queryResponse , finalSettings );
784829 } catch (ClientException e ) {
785830 throw e ;
786831 } catch (Exception e ) {
787832 throw new ClientException ("Failed to get query response" , e );
788833 }
789- }, queryExecutor );
834+ }, sharedOperationExecutor );
790835
791836 return future ;
792837 }
@@ -800,7 +845,10 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
800845 */
801846 public List <GenericRecord > queryAll (String sqlQuery ) {
802847 try {
803- try (QueryResponse response = query (sqlQuery ).get (TIMEOUT , TimeUnit .MILLISECONDS )) {
848+ int operationTimeout = getOperationTimeout ();
849+
850+ try (QueryResponse response = operationTimeout == 0 ? query (sqlQuery ).get () :
851+ query (sqlQuery ).get (operationTimeout , TimeUnit .MILLISECONDS )) {
804852 List <GenericRecord > records = new ArrayList <>();
805853 if (response .getResultRows () > 0 ) {
806854 ClickHouseBinaryFormatReader reader = new RowBinaryWithNamesAndTypesFormatReader (response .getInputStream ());
@@ -810,6 +858,8 @@ public List<GenericRecord> queryAll(String sqlQuery) {
810858 }
811859 return records ;
812860 }
861+ } catch (ExecutionException e ) {
862+ throw new ClientException ("Failed to get query response" , e .getCause ());
813863 } catch (Exception e ) {
814864 throw new ClientException ("Failed to get query response" , e );
815865 }
@@ -908,6 +958,10 @@ public Map<String, String> getConfiguration() {
908958 return Collections .unmodifiableMap (configuration );
909959 }
910960
961+ protected int getOperationTimeout () {
962+ return Integer .parseInt (configuration .get (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ()));
963+ }
964+
911965 /**
912966 * Returns unmodifiable set of endpoints.
913967 * @return - set of endpoints
0 commit comments