7171import java .util .List ;
7272import java .util .Map ;
7373import java .util .Set ;
74+ import java .util .StringJoiner ;
7475import java .util .TimeZone ;
7576import java .util .UUID ;
7677import java .util .concurrent .CompletableFuture ;
8283import java .util .concurrent .TimeoutException ;
8384import java .util .function .Supplier ;
8485
86+ import static java .time .temporal .ChronoUnit .MILLIS ;
8587import static java .time .temporal .ChronoUnit .SECONDS ;
8688
8789/**
@@ -728,8 +730,31 @@ public Builder setClientNetworkBufferSize(int size) {
728730 return this ;
729731 }
730732
733+
734+ /**
735+ * Sets list of causes that should be retried on.
736+ * Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]}
737+ * Use {@link ClientFaultCause#None} to disable retries.
738+ *
739+ * @param causes - list of causes
740+ * @return
741+ */
742+ public Builder retryOnFailures (ClientFaultCause ...causes ) {
743+ StringJoiner joiner = new StringJoiner (VALUES_LIST_DELIMITER );
744+ for (ClientFaultCause cause : causes ) {
745+ joiner .add (cause .name ());
746+ }
747+ this .configuration .put ("client_retry_on_failures" , joiner .toString ());
748+ return this ;
749+ }
750+
751+ public Builder setMaxRetries (int maxRetries ) {
752+ this .configuration .put (ClickHouseClientOption .RETRY .getKey (), String .valueOf (maxRetries ));
753+ return this ;
754+ }
755+
731756 public Client build () {
732- this . configuration = setDefaults (this . configuration );
757+ setDefaults ();
733758
734759 // check if endpoint are empty. so can not initiate client
735760 if (this .endpoints .isEmpty ()) {
@@ -776,65 +801,71 @@ public Client build() {
776801 return new Client (this .endpoints , this .configuration , this .useNewImplementation , this .sharedOperationExecutor );
777802 }
778803
779- private Map <String , String > setDefaults (Map <String , String > userConfig ) {
804+ private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000 ;
805+
806+ private void setDefaults () {
780807
781808 // set default database name if not specified
782- if (!userConfig .containsKey ("database" )) {
783- userConfig . put ( "database" , (String ) ClickHouseDefaults .DATABASE .getDefaultValue ());
809+ if (!configuration .containsKey ("database" )) {
810+ setDefaultDatabase ( (String ) ClickHouseDefaults .DATABASE .getDefaultValue ());
784811 }
785812
786- if (!userConfig .containsKey (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ())) {
787- userConfig .put (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey (),
788- String .valueOf (ClickHouseClientOption .MAX_EXECUTION_TIME .getDefaultValue ()));
813+ if (!configuration .containsKey (ClickHouseClientOption .MAX_EXECUTION_TIME .getKey ())) {
814+ setExecutionTimeout (0 , MILLIS );
789815 }
790816
791- if (!userConfig .containsKey (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey ())) {
792- userConfig .put (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (),
817+ if (!configuration .containsKey (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey ())) {
818+ configuration .put (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getKey (),
793819 String .valueOf (ClickHouseClientOption .MAX_THREADS_PER_CLIENT .getDefaultValue ()));
794820 }
795821
796- if (!userConfig .containsKey ("compression.lz4.uncompressed_buffer_size" )) {
797- userConfig .put ("compression.lz4.uncompressed_buffer_size" ,
798- String .valueOf (ClickHouseLZ4OutputStream .UNCOMPRESSED_BUFF_SIZE ));
822+ if (!configuration .containsKey ("compression.lz4.uncompressed_buffer_size" )) {
823+ setLZ4UncompressedBufferSize (ClickHouseLZ4OutputStream .UNCOMPRESSED_BUFF_SIZE );
824+ }
825+
826+ if (!configuration .containsKey (ClickHouseClientOption .USE_SERVER_TIME_ZONE .getKey ())) {
827+ useServerTimeZone (true );
799828 }
800829
801- if (!userConfig .containsKey (ClickHouseClientOption .USE_SERVER_TIME_ZONE .getKey ())) {
802- userConfig . put ( ClickHouseClientOption . USE_SERVER_TIME_ZONE . getKey (), "true " );
830+ if (!configuration .containsKey (ClickHouseClientOption .SERVER_TIME_ZONE .getKey ())) {
831+ setServerTimeZone ( "UTC " );
803832 }
804833
805- if (!userConfig .containsKey (ClickHouseClientOption .SERVER_TIME_ZONE .getKey ())) {
806- userConfig . put ( ClickHouseClientOption . SERVER_TIME_ZONE . getKey (), "UTC" );
834+ if (!configuration .containsKey (ClickHouseClientOption .ASYNC .getKey ())) {
835+ useAsyncRequests ( false );
807836 }
808837
809- if (!userConfig .containsKey (ClickHouseClientOption . ASYNC .getKey ())) {
810- userConfig . put ( ClickHouseClientOption . ASYNC . getKey (), "false" );
838+ if (!configuration .containsKey (ClickHouseHttpOption . MAX_OPEN_CONNECTIONS .getKey ())) {
839+ setMaxConnections ( 10 );
811840 }
812841
813- if (!userConfig .containsKey (ClickHouseHttpOption . MAX_OPEN_CONNECTIONS . getKey () )) {
814- userConfig . put ( ClickHouseHttpOption . MAX_OPEN_CONNECTIONS . getKey (), "10" );
842+ if (!configuration .containsKey ("connection_request_timeout" )) {
843+ setConnectionRequestTimeout ( 10 , SECONDS );
815844 }
816845
817- if (!userConfig .containsKey ("connection_request_timeout " )) {
818- userConfig . put ( "connection_request_timeout" , "10000" );
846+ if (!configuration .containsKey ("connection_reuse_strategy " )) {
847+ setConnectionReuseStrategy ( ConnectionReuseStrategy . FIFO );
819848 }
820849
821- if (!userConfig .containsKey ("connection_reuse_strategy " )) {
822- userConfig . put ( "connection_reuse_strategy" , ConnectionReuseStrategy . FIFO . name () );
850+ if (!configuration .containsKey ("connection_pool_enabled " )) {
851+ enableConnectionPool ( true );
823852 }
824853
825- if (!userConfig .containsKey ("connection_pool_enabled " )) {
826- userConfig . put ( "connection_pool_enabled" , "true" );
854+ if (!configuration .containsKey ("connection_ttl " )) {
855+ setConnectionTTL (- 1 , MILLIS );
827856 }
828857
829- if (!userConfig .containsKey ("connection_ttl " )) {
830- userConfig . put ( "connection_ttl" , "-1" );
858+ if (!configuration .containsKey ("client_retry_on_failures " )) {
859+ retryOnFailures ( ClientFaultCause . NoHttpResponse , ClientFaultCause . ConnectTimeout , ClientFaultCause . ConnectionRequestTimeout );
831860 }
832861
833- if (!userConfig .containsKey ("client_network_buffer_size" )) {
834- setClientNetworkBufferSize (8192 );
862+ if (!configuration .containsKey ("client_network_buffer_size" )) {
863+ setClientNetworkBufferSize (DEFAULT_NETWORK_BUFFER_SIZE );
835864 }
836865
837- return userConfig ;
866+ if (!configuration .containsKey (ClickHouseClientOption .RETRY .getKey ())) {
867+ setMaxRetries (3 );
868+ }
838869 }
839870 }
840871
@@ -1008,6 +1039,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10081039 // Selecting some node
10091040 ClickHouseNode selectedNode = getNextAliveNode ();
10101041
1042+ ClientException lastException = null ;
10111043 for (int i = 0 ; i <= maxRetries ; i ++) {
10121044 // Execute request
10131045 try (ClassicHttpResponse httpResponse =
@@ -1046,16 +1078,19 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10461078 metrics .operationComplete ();
10471079 metrics .setQueryId (queryId );
10481080 return new InsertResponse (metrics );
1049- } catch (NoHttpResponseException e ) {
1050- LOG .warn ("Failed to get response. Retrying." , e );
1051- selectedNode = getNextAliveNode ();
1052- continue ;
1081+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1082+ lastException = httpClientHelper .wrapException ("Insert request initiation failed" , e );
1083+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1084+ LOG .warn ("Retrying" , e );
1085+ selectedNode = getNextAliveNode ();
1086+ } else {
1087+ throw lastException ;
1088+ }
10531089 } catch (IOException e ) {
1054- LOG .info ("Interrupted while waiting for response." );
1055- throw new ClientException ("Failed to get query response" , e );
1090+ throw new ClientException ("Insert request failed" , e );
10561091 }
10571092 }
1058- throw new ClientException ("Failed to get table schema: too many retries" );
1093+ throw new ClientException ("Insert request failed after retries" , lastException );
10591094 };
10601095
10611096 return runAsyncOperation (supplier , settings .getAllSettings ());
@@ -1075,7 +1110,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
10751110 }
10761111
10771112 globalClientStats .get (operationId ).stop (ClientMetrics .OP_SERIALIZATION );
1078- LOG .debug ("Total serialization time: {}" , globalClientStats .get (operationId ).getElapsedTime ("serialization" ));
10791113 return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), format , settings );
10801114 }
10811115 }
@@ -1132,6 +1166,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11321166 // Selecting some node
11331167 ClickHouseNode selectedNode = getNextAliveNode ();
11341168
1169+ ClientException lastException = null ;
11351170 for (int i = 0 ; i <= maxRetries ; i ++) {
11361171 // Execute request
11371172 try (ClassicHttpResponse httpResponse =
@@ -1166,25 +1201,27 @@ public CompletableFuture<InsertResponse> insert(String tableName,
11661201 metrics .operationComplete ();
11671202 metrics .setQueryId (queryId );
11681203 return new InsertResponse (metrics );
1169- } catch (NoHttpResponseException e ) {
1170- if (i < maxRetries ) {
1171- try {
1172- data .reset ();
1173- } catch (IOException ioe ) {
1174- throw new ClientException ("Failed to get response" , e );
1175- }
1176- LOG .warn ("Failed to get response. Retrying." , e );
1204+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1205+ lastException = httpClientHelper .wrapException ("Insert request initiation failed" , e );
1206+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1207+ LOG .warn ("Retrying" , e );
11771208 selectedNode = getNextAliveNode ();
11781209 } else {
1179- throw new ClientException ( "Server did not respond" , e ) ;
1210+ throw lastException ;
11801211 }
1181- continue ;
11821212 } catch (IOException e ) {
1183- LOG .info ("Interrupted while waiting for response." );
1184- throw new ClientException ("Failed to get query response" , e );
1213+ throw new ClientException ("Insert request failed" , e );
1214+ }
1215+
1216+ if (i < maxRetries ) {
1217+ try {
1218+ data .reset ();
1219+ } catch (IOException ioe ) {
1220+ throw new ClientException ("Failed to reset stream before next attempt" , ioe );
1221+ }
11851222 }
11861223 }
1187- throw new ClientException ("Failed to insert data: too many retries" );
1224+ throw new ClientException ("Insert request failed after retries" , lastException );
11881225 };
11891226 } else {
11901227 responseSupplier = () -> {
@@ -1211,7 +1248,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
12111248 clickHouseResponse = future .get ();
12121249 }
12131250 InsertResponse response = new InsertResponse (clickHouseResponse , clientStats );
1214- LOG .debug ("Total insert (InputStream) time: {}" , clientStats .getElapsedTime ("insert" ));
12151251 return response ;
12161252 } catch (ExecutionException e ) {
12171253 throw new ClientException ("Failed to get insert response" , e .getCause ());
@@ -1300,6 +1336,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
13001336 responseSupplier = () -> {
13011337 // Selecting some node
13021338 ClickHouseNode selectedNode = getNextAliveNode ();
1339+ ClientException lastException = null ;
13031340 for (int i = 0 ; i <= maxRetries ; i ++) {
13041341 try {
13051342 ClassicHttpResponse httpResponse =
@@ -1325,15 +1362,23 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
13251362 metrics .operationComplete ();
13261363
13271364 return new QueryResponse (httpResponse , finalSettings .getFormat (), finalSettings , metrics );
1365+
1366+ } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1367+ lastException = httpClientHelper .wrapException ("Query request initiation failed" , e );
1368+ if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1369+ LOG .warn ("Retrying." , e );
1370+ selectedNode = getNextAliveNode ();
1371+ } else {
1372+ throw lastException ;
1373+ }
13281374 } catch (ClientException e ) {
13291375 throw e ;
1330- } catch (ConnectionRequestTimeoutException | ConnectTimeoutException e ) {
1331- throw new ConnectionInitiationException ("Failed to get connection" , e );
13321376 } catch (Exception e ) {
1333- throw new ClientException ("Failed to execute query " , e );
1377+ throw new ClientException ("Query request failed " , e );
13341378 }
13351379 }
1336- throw new ClientException ("Failed to get table schema: too many retries" );
1380+
1381+ throw new ClientException ("Query request failed after retries" , lastException );
13371382 };
13381383 } else {
13391384 ClickHouseRequest <?> request = oldClient .read (getServerNode ());
@@ -1610,4 +1655,6 @@ public Set<String> getEndpoints() {
16101655 private ClickHouseNode getNextAliveNode () {
16111656 return serverNodes .get (0 );
16121657 }
1658+
1659+ public static final String VALUES_LIST_DELIMITER = "," ;
16131660}
0 commit comments