11package com .clickhouse .client .api ;
22
3- import com .clickhouse .client .ClickHouseNode ;
43import com .clickhouse .client .api .command .CommandResponse ;
54import com .clickhouse .client .api .command .CommandSettings ;
65import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
3635import com .clickhouse .client .api .query .QueryResponse ;
3736import com .clickhouse .client .api .query .QuerySettings ;
3837import com .clickhouse .client .api .query .Records ;
38+ import com .clickhouse .client .api .transport .Endpoint ;
39+ import com .clickhouse .client .api .transport .HttpEndpoint ;
3940import com .clickhouse .client .config .ClickHouseClientOption ;
4041import com .clickhouse .data .ClickHouseColumn ;
4142import com .clickhouse .data .ClickHouseFormat ;
43+ import com .google .common .collect .ImmutableList ;
4244import net .jpountz .lz4 .LZ4Factory ;
4345import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
4446import org .apache .hc .core5 .http .ClassicHttpResponse ;
7880import java .util .concurrent .TimeUnit ;
7981import java .util .concurrent .TimeoutException ;
8082import java .util .function .Supplier ;
83+ import java .util .stream .Collectors ;
8184
8285import static java .time .temporal .ChronoUnit .MILLIS ;
8386import static java .time .temporal .ChronoUnit .SECONDS ;
@@ -116,13 +119,12 @@ public class Client implements AutoCloseable {
116119
117120 private HttpAPIClientHelper httpClientHelper = null ;
118121
119- private final Set <String > endpoints ;
122+ private final List <Endpoint > endpoints ;
123+
120124 private final Map <String , String > configuration ;
121125
122126 private final Map <String , String > readOnlyConfig ;
123127
124- private final List <ClickHouseNode > serverNodes = new ArrayList <>();
125-
126128 // POJO serializer mapping (class -> (schema -> (format -> serializer)))
127129 private final Map <Class <?>, Map <String , Map <String , POJOSerializer >>> serializers ;
128130
@@ -154,16 +156,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
154156
155157 private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ,
156158 ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
157- this . endpoints = endpoints ;
159+ // Simple initialization
158160 this .configuration = configuration ;
159161 this .readOnlyConfig = Collections .unmodifiableMap (this .configuration );
160- this .endpoints .forEach (endpoint -> {
161- this .serverNodes .add (ClickHouseNode .of (endpoint , this .configuration ));
162- });
163162 this .metricsRegistry = metricsRegistry ;
163+
164+ // Serialization
164165 this .serializers = new ConcurrentHashMap <>();
165166 this .deserializers = new ConcurrentHashMap <>();
166167
168+ // Operation Execution
167169 boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
168170 if (isAsyncEnabled && sharedOperationExecutor == null ) {
169171 this .isSharedOpExecutorOwned = true ;
@@ -172,10 +174,29 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
172174 this .isSharedOpExecutorOwned = false ;
173175 this .sharedOperationExecutor = sharedOperationExecutor ;
174176 }
175- boolean initSslContext = getEndpoints ().stream ().anyMatch (s -> s .toLowerCase ().contains ("https://" ));
176- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
177+
177178 this .columnToMethodMatchingStrategy = columnToMethodMatchingStrategy ;
178179
180+
181+ // Transport
182+ ImmutableList .Builder <Endpoint > tmpEndpoints = ImmutableList .builder ();
183+ boolean initSslContext = false ;
184+ for (String ep : endpoints ) {
185+ try {
186+ HttpEndpoint endpoint = new HttpEndpoint (ep );
187+ if (endpoint .isSecure ()) {
188+ initSslContext = true ;
189+ }
190+ LOG .debug ("Adding endpoint: {}" , endpoint );
191+ tmpEndpoints .add (endpoint );
192+ } catch (Exception e ) {
193+ throw new ClientException ("Failed to add endpoint " + ep , e );
194+ }
195+ }
196+
197+ this .endpoints = tmpEndpoints .build ();
198+ this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
199+
179200 String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
180201 this .retries = retry == null ? 0 : Integer .parseInt (retry );
181202 boolean useNativeCompression = !MapUtils .getFlag (configuration , ClientConfigProperties .DISABLE_NATIVE_COMPRESSION .getKey (), false );
@@ -1185,11 +1206,6 @@ private void setDefaults() {
11851206 }
11861207 }
11871208
1188- private ClickHouseNode getServerNode () {
1189- // TODO: implement load balancing using existing logic
1190- return this .serverNodes .get (0 );
1191- }
1192-
11931209 /**
11941210 * Pings the server to check if it is alive
11951211 * @return true if the server is alive, false otherwise
@@ -1374,13 +1390,13 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
13741390 Supplier <InsertResponse > supplier = () -> {
13751391 long startTime = System .nanoTime ();
13761392 // Selecting some node
1377- ClickHouseNode selectedNode = getNextAliveNode ();
1393+ Endpoint selectedEndpoint = getNextAliveNode ();
13781394
13791395 RuntimeException lastException = null ;
13801396 for (int i = 0 ; i <= maxRetries ; i ++) {
13811397 // Execute request
13821398 try (ClassicHttpResponse httpResponse =
1383- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory ,
1399+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
13841400 out -> {
13851401 out .write ("INSERT INTO " .getBytes ());
13861402 out .write (tableName .getBytes ());
@@ -1404,7 +1420,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
14041420 // Check response
14051421 if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
14061422 LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , httpResponse .getCode (), System .nanoTime () - startTime );
1407- selectedNode = getNextAliveNode ();
1423+ selectedEndpoint = getNextAliveNode ();
14081424 continue ;
14091425 }
14101426
@@ -1421,7 +1437,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
14211437 (i + 1 ), (maxRetries + 1 ), System .nanoTime () - startTime ), e );
14221438 if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
14231439 LOG .warn ("Retrying." , e );
1424- selectedNode = getNextAliveNode ();
1440+ selectedEndpoint = getNextAliveNode ();
14251441 } else {
14261442 throw lastException ;
14271443 }
@@ -1591,13 +1607,13 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15911607 responseSupplier = () -> {
15921608 long startTime = System .nanoTime ();
15931609 // Selecting some node
1594- ClickHouseNode selectedNode = getNextAliveNode ();
1610+ Endpoint selectedEndpoint = getNextAliveNode ();
15951611
15961612 RuntimeException lastException = null ;
15971613 for (int i = 0 ; i <= retries ; i ++) {
15981614 // Execute request
15991615 try (ClassicHttpResponse httpResponse =
1600- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory ,
1616+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
16011617 out -> {
16021618 writer .onOutput (out );
16031619 out .close ();
@@ -1607,7 +1623,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
16071623 // Check response
16081624 if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
16091625 LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , System .nanoTime () - startTime , httpResponse .getCode ());
1610- selectedNode = getNextAliveNode ();
1626+ selectedEndpoint = getNextAliveNode ();
16111627 continue ;
16121628 }
16131629
@@ -1623,7 +1639,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
16231639 (i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
16241640 if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
16251641 LOG .warn ("Retrying." , e );
1626- selectedNode = getNextAliveNode ();
1642+ selectedEndpoint = getNextAliveNode ();
16271643 } else {
16281644 throw lastException ;
16291645 }
@@ -1715,20 +1731,20 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
17151731 responseSupplier = () -> {
17161732 long startTime = System .nanoTime ();
17171733 // Selecting some node
1718- ClickHouseNode selectedNode = getNextAliveNode ();
1734+ Endpoint selectedEndpoint = getNextAliveNode ();
17191735 RuntimeException lastException = null ;
17201736 for (int i = 0 ; i <= retries ; i ++) {
17211737 try {
17221738 ClassicHttpResponse httpResponse =
1723- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory , output -> {
1739+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory , output -> {
17241740 output .write (sqlQuery .getBytes (StandardCharsets .UTF_8 ));
17251741 output .close ();
17261742 });
17271743
17281744 // Check response
17291745 if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
17301746 LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , System .nanoTime () - startTime , httpResponse .getCode ());
1731- selectedNode = getNextAliveNode ();
1747+ selectedEndpoint = getNextAliveNode ();
17321748 continue ;
17331749 }
17341750
@@ -1753,7 +1769,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
17531769 (i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
17541770 if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
17551771 LOG .warn ("Retrying." , e );
1756- selectedNode = getNextAliveNode ();
1772+ selectedEndpoint = getNextAliveNode ();
17571773 } else {
17581774 throw lastException ;
17591775 }
@@ -1898,8 +1914,9 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz, TableSchema schema)
18981914 * @param allocator - optional supplier to create new instances of the DTO.
18991915 * @throws IllegalArgumentException when class is not registered or no setters found
19001916 * @return List of POJOs filled with data
1901- * @param <T>
1917+ * @param <T> type of POJO
19021918 */
1919+ @ SuppressWarnings ("unchecked" )
19031920 public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema , Supplier <T > allocator ) {
19041921 Map <String , POJOSetter > classDeserializers = deserializers .getOrDefault (clazz ,
19051922 Collections .emptyMap ()).getOrDefault (schema .getTableName () == null ?
@@ -2180,9 +2197,10 @@ protected int getOperationTimeout() {
21802197 /**
21812198 * Returns unmodifiable set of endpoints.
21822199 * @return - set of endpoints
2200+ * @deprecated
21832201 */
21842202 public Set <String > getEndpoints () {
2185- return Collections . unmodifiableSet ( endpoints );
2203+ return endpoints . stream (). map ( Endpoint :: getBaseURL ). collect ( Collectors . toSet () );
21862204 }
21872205
21882206 public String getUser () {
@@ -2236,8 +2254,8 @@ public void updateBearerToken(String bearer) {
22362254 this .configuration .put (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ), "Bearer " + bearer );
22372255 }
22382256
2239- private ClickHouseNode getNextAliveNode () {
2240- return serverNodes .get (0 );
2257+ private Endpoint getNextAliveNode () {
2258+ return endpoints .get (0 );
22412259 }
22422260
22432261 public static final String VALUES_LIST_DELIMITER = "," ;
0 commit comments