7070import java .util .List ;
7171import java .util .Map ;
7272import java .util .Set ;
73+ import java .util .StringJoiner ;
7374import java .util .TimeZone ;
7475import java .util .UUID ;
7576import java .util .concurrent .CompletableFuture ;
8182import java .util .concurrent .TimeoutException ;
8283import java .util .function .Supplier ;
8384
85+ import static java .time .temporal .ChronoUnit .MILLIS ;
8486import static java .time .temporal .ChronoUnit .SECONDS ;
8587
8688/**
@@ -714,8 +716,31 @@ public Builder setSharedOperationExecutor(ExecutorService executorService) {
714716 return this ;
715717 }
716718
719+
720+ /**
721+ * Sets list of causes that should be retried on.
722+ * Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]}
723+ * Use {@link ClientFaultCause#None} to disable retries.
724+ *
725+ * @param causes - list of causes
726+ * @return
727+ */
728+ public Builder retryOnFailures (ClientFaultCause ...causes ) {
729+ StringJoiner joiner = new StringJoiner (VALUES_LIST_DELIMITER );
730+ for (ClientFaultCause cause : causes ) {
731+ joiner .add (cause .name ());
732+ }
733+ this .configuration .put ("client_retry_on_failures" , joiner .toString ());
734+ return this ;
735+ }
736+
737+ public Builder setMaxRetries (int maxRetries ) {
738+ this .configuration .put (ClickHouseClientOption .RETRY .getKey (), String .valueOf (maxRetries ));
739+ return this ;
740+ }
741+
717742 public Client build () {
718- this . configuration = setDefaults (this . configuration );
743+ setDefaults ();
719744
720745 // check if endpoint are empty. so can not initiate client
721746 if (this .endpoints .isEmpty ()) {
@@ -762,61 +787,65 @@ public Client build() {
762787 return new Client (this .endpoints , this .configuration , this .useNewImplementation , this .sharedOperationExecutor );
763788 }
764789
765- private Map < String , String > setDefaults (Map < String , String > userConfig ) {
790+ private void setDefaults () {
766791
767792 // set default database name if not specified
768- if (!userConfig .containsKey ("database" )) {
769- userConfig . put ( "database" , (String ) ClickHouseDefaults .DATABASE .getDefaultValue ());
793+ if (!configuration .containsKey ("database" )) {
794+ setDefaultDatabase ( (String ) ClickHouseDefaults .DATABASE .getDefaultValue ());
770795 }
771796
772- if (!userConfig .containsKey (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ())) {
773- userConfig .put (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey (),
774- String .valueOf (ClickHouseClientOption .MAX_EXECUTION_TIME .getDefaultValue ()));
797+ if (!configuration .containsKey (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ())) {
798+ setExecutionTimeout (0 , MILLIS );
775799 }
776800
777- if (!userConfig .containsKey (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey ())) {
778- userConfig .put (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (),
801+ if (!configuration .containsKey (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey ())) {
802+ configuration .put (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (),
779803 String .valueOf (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getDefaultValue ()));
780804 }
781805
782- if (!userConfig .containsKey ("compression.lz4.uncompressed_buffer_size" )) {
783- userConfig .put ("compression.lz4.uncompressed_buffer_size" ,
784- String .valueOf (ClickHouseLZ4OutputStream .UNCOMPRESSED_BUFF_SIZE ));
806+ if (!configuration .containsKey ("compression.lz4.uncompressed_buffer_size" )) {
807+ setLZ4UncompressedBufferSize (ClickHouseLZ4OutputStream .UNCOMPRESSED_BUFF_SIZE );
808+ }
809+
810+ if (!configuration .containsKey (ClickHouseClientOption .USE_SERVER_TIME_ZONE .getKey ())) {
811+ useServerTimeZone (true );
785812 }
786813
787- if (!userConfig .containsKey (ClickHouseClientOption .USE_SERVER_TIME_ZONE .getKey ())) {
788- userConfig . put ( ClickHouseClientOption . USE_SERVER_TIME_ZONE . getKey (), "true " );
814+ if (!configuration .containsKey (ClickHouseClientOption .SERVER_TIME_ZONE .getKey ())) {
815+ setServerTimeZone ( "UTC " );
789816 }
790817
791- if (!userConfig .containsKey (ClickHouseClientOption .SERVER_TIME_ZONE .getKey ())) {
792- userConfig . put ( ClickHouseClientOption . SERVER_TIME_ZONE . getKey (), "UTC" );
818+ if (!configuration .containsKey (ClickHouseClientOption .ASYNC .getKey ())) {
819+ useAsyncRequests ( false );
793820 }
794821
795- if (!userConfig .containsKey (ClickHouseClientOption . ASYNC .getKey ())) {
796- userConfig . put ( ClickHouseClientOption . ASYNC . getKey (), "false" );
822+ if (!configuration .containsKey (ClickHouseHttpOption . MAX_OPEN_CONNECTIONS .getKey ())) {
823+ setMaxConnections ( 10 );
797824 }
798825
799- if (!userConfig .containsKey (ClickHouseHttpOption . MAX_OPEN_CONNECTIONS . getKey () )) {
800- userConfig . put ( ClickHouseHttpOption . MAX_OPEN_CONNECTIONS . getKey (), "10" );
826+ if (!configuration .containsKey ("connection_request_timeout" )) {
827+ setConnectionRequestTimeout ( 10 , SECONDS );
801828 }
802829
803- if (!userConfig .containsKey ("connection_request_timeout " )) {
804- userConfig . put ( "connection_request_timeout" , "10000" );
830+ if (!configuration .containsKey ("connection_reuse_strategy " )) {
831+ setConnectionReuseStrategy ( ConnectionReuseStrategy . FIFO );
805832 }
806833
807- if (!userConfig .containsKey ("connection_reuse_strategy " )) {
808- userConfig . put ( "connection_reuse_strategy" , ConnectionReuseStrategy . FIFO . name () );
834+ if (!configuration .containsKey ("connection_pool_enabled " )) {
835+ enableConnectionPool ( true );
809836 }
810837
811- if (!userConfig .containsKey ("connection_pool_enabled " )) {
812- userConfig . put ( "connection_pool_enabled" , "true" );
838+ if (!configuration .containsKey ("connection_ttl " )) {
839+ setConnectionTTL (- 1 , MILLIS );
813840 }
814841
815- if (!userConfig .containsKey ("connection_ttl " )) {
816- userConfig . put ( "connection_ttl" , "-1" );
842+ if (!configuration .containsKey ("client_retry_on_failures " )) {
843+ retryOnFailures ( ClientFaultCause . NoHttpResponse , ClientFaultCause . ConnectTimeout , ClientFaultCause . ConnectionRequestTimeout );
817844 }
818845
819- return userConfig ;
846+ if (!configuration .containsKey (ClickHouseClientOption .RETRY .getKey ())) {
847+ setMaxRetries (3 );
848+ }
820849 }
821850 }
822851
@@ -990,6 +1019,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
9901019 // Selecting some node
9911020 ClickHouseNode selectedNode = getNextAliveNode ();
9921021
1022+ ClientException lastException = null ;
9931023 for (int i = 0 ; i <= maxRetries ; i ++) {
9941024 // Execute request
9951025 try (ClassicHttpResponse httpResponse =
@@ -1028,16 +1058,19 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10281058 metrics .operationComplete ();
10291059 metrics .setQueryId (queryId );
10301060 return new InsertResponse (metrics );
1031- } catch (NoHttpResponseException | ConnectionRequestTimeoutException e ) {
1032- LOG .warn ("Failed to get response. Retrying." , e );
1033- selectedNode = getNextAliveNode ();
1034- continue ;
1061+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1062+ lastException = httpClientHelper .wrapException ("Insert request initiation failed" , e );
1063+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1064+ LOG .warn ("Retrying" , e );
1065+ selectedNode = getNextAliveNode ();
1066+ } else {
1067+ throw lastException ;
1068+ }
10351069 } catch (IOException e ) {
1036- LOG .info ("Interrupted while waiting for response." );
1037- throw new ClientException ("Failed to get query response" , e );
1070+ throw new ClientException ("Insert request failed" , e );
10381071 }
10391072 }
1040- throw new ClientException ("Failed to get table schema: too many retries" );
1073+ throw new ClientException ("Insert request failed after retries" , lastException );
10411074 };
10421075
10431076 return runAsyncOperation (supplier , settings .getAllSettings ());
@@ -1057,7 +1090,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10571090 }
10581091
10591092 globalClientStats .get (operationId ).stop (ClientMetrics .OP_SERIALIZATION );
1060- LOG .debug ("Total serialization time: {}" , globalClientStats .get (operationId ).getElapsedTime ("serialization" ));
10611093 return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), format , settings );
10621094 }
10631095 }
@@ -1114,6 +1146,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11141146 // Selecting some node
11151147 ClickHouseNode selectedNode = getNextAliveNode ();
11161148
1149+ ClientException lastException = null ;
11171150 for (int i = 0 ; i <= maxRetries ; i ++) {
11181151 // Execute request
11191152 try (ClassicHttpResponse httpResponse =
@@ -1148,25 +1181,27 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11481181 metrics .operationComplete ();
11491182 metrics .setQueryId (queryId );
11501183 return new InsertResponse (metrics );
1151- } catch (NoHttpResponseException | ConnectionRequestTimeoutException e ) {
1152- if (i < maxRetries ) {
1153- try {
1154- data .reset ();
1155- } catch (IOException ioe ) {
1156- throw new ClientException ("Failed to reset stream for retry" , e );
1157- }
1158- LOG .warn ("Failed to get response. Retrying." , e );
1184+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1185+ lastException = httpClientHelper .wrapException ("Insert request initiation failed" , e );
1186+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1187+ LOG .warn ("Retrying" , e );
11591188 selectedNode = getNextAliveNode ();
11601189 } else {
1161- throw new ClientException ( "Server did not respond" , e ) ;
1190+ throw lastException ;
11621191 }
1163- continue ;
11641192 } catch (IOException e ) {
1165- LOG .info ("Interrupted while waiting for response." );
1166- throw new ClientException ("Failed to get query response" , e );
1193+ throw new ClientException ("Insert request failed" , e );
1194+ }
1195+
1196+ if (i < maxRetries ) {
1197+ try {
1198+ data .reset ();
1199+ } catch (IOException ioe ) {
1200+ throw new ClientException ("Failed to reset stream before next attempt" , ioe );
1201+ }
11671202 }
11681203 }
1169- throw new ClientException ("Failed to insert data: too many retries" );
1204+ throw new ClientException ("Insert request failed after retries" , lastException );
11701205 };
11711206 } else {
11721207 responseSupplier = () -> {
@@ -1193,7 +1228,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11931228 clickHouseResponse = future .get ();
11941229 }
11951230 InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
1196- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
11971231 return response ;
11981232 } catch (ExecutionException e ) {
11991233 throw new ClientException ("Failed to get insert response" , e .getCause ());
@@ -1282,6 +1316,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
12821316 responseSupplier = () -> {
12831317 // Selecting some node
12841318 ClickHouseNode selectedNode = getNextAliveNode ();
1319+ ClientException lastException = null ;
12851320 for (int i = 0 ; i <= maxRetries ; i ++) {
12861321 try {
12871322 ClassicHttpResponse httpResponse =
@@ -1307,19 +1342,23 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
13071342 metrics .operationComplete ();
13081343
13091344 return new QueryResponse (httpResponse , finalSettings .getFormat (), finalSettings , metrics );
1310- } catch (NoHttpResponseException | ConnectionRequestTimeoutException e ) {
1311- LOG .warn ("Failed to get response. Retrying." , e );
1312- selectedNode = getNextAliveNode ();
1313- continue ;
1345+
1346+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1347+ lastException = httpClientHelper .wrapException ("Query request initiation failed" , e );
1348+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1349+ LOG .warn ("Retrying." , e );
1350+ selectedNode = getNextAliveNode ();
1351+ } else {
1352+ throw lastException ;
1353+ }
13141354 } catch (ClientException e ) {
13151355 throw e ;
1316- } catch (ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1317- throw new ConnectionInitiationException ("Failed to get connection" , e );
13181356 } catch (Exception e ) {
1319- throw new ClientException ("Failed to execute query " , e );
1357+ throw new ClientException ("Query request failed " , e );
13201358 }
13211359 }
1322- throw new ClientException ("Failed to get table schema: too many retries" );
1360+
1361+ throw new ClientException ("Query request failed after retries" , lastException );
13231362 };
13241363 } else {
13251364 ClickHouseRequest <?> request = oldClient .read (getServerNode ());
@@ -1595,4 +1634,6 @@ public Set<String> getEndpoints() {
15951634 private ClickHouseNode getNextAliveNode () {
15961635 return serverNodes .get (0 );
15971636 }
1637+
1638+ public static final String VALUES_LIST_DELIMITER = "," ;
15981639}
0 commit comments