@@ -118,7 +118,7 @@ public class Client implements AutoCloseable {
118
118
private HttpAPIClientHelper httpClientHelper = null ;
119
119
120
120
private final List <Endpoint > endpoints ;
121
- private final Map <String , String > configuration ;
121
+ private final Map <String , Object > configuration ;
122
122
123
123
private final Map <String , String > readOnlyConfig ;
124
124
@@ -150,15 +150,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
150
150
private Client (Set <String > endpoints , Map <String ,String > configuration ,
151
151
ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
152
152
// Simple initialization
153
- this .configuration = configuration ;
154
- this .readOnlyConfig = Collections .unmodifiableMap (this . configuration );
153
+ this .configuration = ClientConfigProperties . parseConfigMap ( configuration ) ;
154
+ this .readOnlyConfig = Collections .unmodifiableMap (configuration );
155
155
this .metricsRegistry = metricsRegistry ;
156
156
157
157
// Serialization
158
158
this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
159
159
160
160
// Operation Execution
161
- boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
161
+ boolean isAsyncEnabled = ClientConfigProperties .ASYNC_OPERATIONS .getOrDefault (this .configuration );
162
+
162
163
if (isAsyncEnabled && sharedOperationExecutor == null ) {
163
164
this .isSharedOpExecutorOwned = true ;
164
165
this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
@@ -184,7 +185,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
184
185
}
185
186
186
187
this .endpoints = tmpEndpoints .build ();
187
- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
188
+ this .httpClientHelper = new HttpAPIClientHelper (this . configuration , metricsRegistry , initSslContext );
188
189
189
190
String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
190
191
this .retries = retry == null ? 0 : Integer .parseInt (retry );
@@ -242,7 +243,7 @@ public void loadServerInfo() {
242
243
* @return String - actual default database name.
243
244
*/
244
245
public String getDefaultDatabase () {
245
- return this .configuration .get ("database" );
246
+ return ( String ) this .configuration .get (ClientConfigProperties . DATABASE . getKey () );
246
247
}
247
248
248
249
@@ -870,7 +871,7 @@ public Builder setMaxRetries(int maxRetries) {
870
871
* @return
871
872
*/
872
873
public Builder allowBinaryReaderToReuseBuffers (boolean reuse ) {
873
- this .configuration .put ("client_allow_binary_reader_to_reuse_buffers" , String .valueOf (reuse ));
874
+ this .configuration .put (ClientConfigProperties . BINARY_READER_USE_PREALLOCATED_BUFFERS . getKey () , String .valueOf (reuse ));
874
875
return this ;
875
876
}
876
877
@@ -1048,20 +1049,21 @@ public Client build() {
1048
1049
throw new IllegalArgumentException ("At least one endpoint is required" );
1049
1050
}
1050
1051
// check if username and password are empty. so can not initiate client?
1051
- if (!this .configuration .containsKey ("access_token" ) &&
1052
- (!this .configuration .containsKey ("user" ) || !this .configuration .containsKey ("password" )) &&
1053
- !MapUtils .getFlag (this .configuration , "ssl_authentication" , false ) &&
1054
- !this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ))) {
1052
+ boolean useSslAuth = MapUtils .getFlag (this .configuration , ClientConfigProperties .SSL_AUTH .getKey ());
1053
+ boolean hasAccessToken = this .configuration .containsKey (ClientConfigProperties .ACCESS_TOKEN .getKey ());
1054
+ boolean hasUser = this .configuration .containsKey (ClientConfigProperties .USER .getKey ());
1055
+ boolean hasPassword = this .configuration .containsKey (ClientConfigProperties .PASSWORD .getKey ());
1056
+ boolean customHttpHeaders = this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ));
1057
+
1058
+ if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders )) {
1055
1059
throw new IllegalArgumentException ("Username and password (or access token or SSL authentication or pre-define Authorization header) are required" );
1056
1060
}
1057
1061
1058
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1059
- (this .configuration .containsKey ("password" ) || this .configuration .containsKey ("access_token" ))) {
1062
+ if (useSslAuth && (hasAccessToken || hasPassword )) {
1060
1063
throw new IllegalArgumentException ("Only one of password, access token or SSL authentication can be used per client." );
1061
1064
}
1062
1065
1063
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1064
- !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1066
+ if (useSslAuth && !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1065
1067
throw new IllegalArgumentException ("SSL authentication requires a client certificate" );
1066
1068
}
1067
1069
@@ -1198,17 +1200,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1198
1200
if (data == null || data .isEmpty ()) {
1199
1201
throw new IllegalArgumentException ("Data cannot be empty" );
1200
1202
}
1201
-
1203
+ //Add format to the settings
1204
+ if (settings == null ) {
1205
+ settings = new InsertSettings ();
1206
+ }
1202
1207
1203
1208
String operationId = registerOperationMetrics ();
1204
1209
settings .setOperationId (operationId );
1205
1210
globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
1206
1211
globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
1207
1212
1208
- //Add format to the settings
1209
- if (settings == null ) {
1210
- settings = new InsertSettings ();
1211
- }
1212
1213
1213
1214
boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
1214
1215
ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
@@ -1232,11 +1233,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1232
1233
}
1233
1234
1234
1235
1235
- String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1236
- final int maxRetries = retry == null ? 0 : Integer . parseInt ( retry ) ;
1236
+ Integer retry = ( Integer ) configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1237
+ final int maxRetries = retry == null ? 0 : retry ;
1237
1238
1238
1239
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1239
- final InsertSettings finalSettings = settings ;
1240
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1240
1241
Supplier <InsertResponse > supplier = () -> {
1241
1242
long startTime = System .nanoTime ();
1242
1243
// Selecting some node
@@ -1358,8 +1359,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1358
1359
InsertSettings settings ) {
1359
1360
1360
1361
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1361
- Integer .parseInt (configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (),
1362
- ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getDefaultValue ())) :
1362
+ (int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1363
1363
settings .getInputStreamCopyBufferSize ();
1364
1364
1365
1365
if (writeBufferSize <= 0 ) {
@@ -1431,17 +1431,16 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1431
1431
1432
1432
Supplier <InsertResponse > responseSupplier ;
1433
1433
1434
-
1435
1434
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1436
- Integer . parseInt ( configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (), "8192" )) :
1435
+ ( int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1437
1436
settings .getInputStreamCopyBufferSize ();
1438
1437
1439
1438
if (writeBufferSize <= 0 ) {
1440
1439
throw new IllegalArgumentException ("Buffer size must be greater than 0" );
1441
1440
}
1442
1441
1443
1442
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1444
- final InsertSettings finalSettings = settings ;
1443
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1445
1444
1446
1445
StringBuilder sqlStmt = new StringBuilder ("INSERT INTO " ).append (tableName );
1447
1446
if (columnNames != null && !columnNames .isEmpty ()) {
@@ -1571,14 +1570,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1571
1570
}
1572
1571
ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
1573
1572
clientStats .start (ClientMetrics .OP_DURATION );
1574
- applyDefaults (settings );
1575
1573
1576
1574
Supplier <QueryResponse > responseSupplier ;
1577
1575
1578
1576
if (queryParams != null ) {
1579
1577
settings .setOption ("statement_params" , queryParams );
1580
1578
}
1581
- final QuerySettings finalSettings = settings ;
1579
+ final QuerySettings finalSettings = new QuerySettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1582
1580
responseSupplier = () -> {
1583
1581
long startTime = System .nanoTime ();
1584
1582
// Selecting some node
@@ -1956,7 +1954,7 @@ public CompletableFuture<CommandResponse> execute(String sql) {
1956
1954
public ClickHouseBinaryFormatReader newBinaryFormatReader (QueryResponse response , TableSchema schema ) {
1957
1955
ClickHouseBinaryFormatReader reader = null ;
1958
1956
// Using caching buffer allocator is risky so this parameter is not exposed to the user
1959
- boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" );
1957
+ boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" , false );
1960
1958
BinaryStreamReader .ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
1961
1959
new BinaryStreamReader .CachingByteBufferAllocator () :
1962
1960
new BinaryStreamReader .DefaultByteBufferAllocator ();
@@ -1991,25 +1989,6 @@ private String registerOperationMetrics() {
1991
1989
return operationId ;
1992
1990
}
1993
1991
1994
- private void applyDefaults (QuerySettings settings ) {
1995
- Map <String , Object > settingsMap = settings .getAllSettings ();
1996
-
1997
- String key = ClientConfigProperties .USE_SERVER_TIMEZONE .getKey ();
1998
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1999
- settings .setOption (key , MapUtils .getFlag (configuration , key ));
2000
- }
2001
-
2002
- key = ClientConfigProperties .USE_TIMEZONE .getKey ();
2003
- if ( !settings .getUseServerTimeZone () && !settingsMap .containsKey (key ) && configuration .containsKey (key )) {
2004
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
2005
- }
2006
-
2007
- key = ClientConfigProperties .SERVER_TIMEZONE .getKey ();
2008
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
2009
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
2010
- }
2011
- }
2012
-
2013
1992
private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
2014
1993
boolean isAsync = MapUtils .getFlag (requestSettings , configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey ());
2015
1994
if (isAsync ) {
@@ -2037,7 +2016,7 @@ public Map<String, String> getConfiguration() {
2037
2016
2038
2017
/** Returns operation timeout in seconds */
2039
2018
protected int getOperationTimeout () {
2040
- return Integer . parseInt ( configuration . get ( ClientConfigProperties .MAX_EXECUTION_TIME .getKey ()) );
2019
+ return ClientConfigProperties .MAX_EXECUTION_TIME .getOrDefault ( configuration );
2041
2020
}
2042
2021
2043
2022
/**
@@ -2050,15 +2029,16 @@ public Set<String> getEndpoints() {
2050
2029
}
2051
2030
2052
2031
public String getUser () {
2053
- return this .configuration .get (ClientConfigProperties .USER .getKey ());
2032
+ return ( String ) this .configuration .get (ClientConfigProperties .USER .getKey ());
2054
2033
}
2055
2034
2056
2035
public String getServerVersion () {
2057
2036
return this .serverVersion ;
2058
2037
}
2059
2038
2060
2039
public String getServerTimeZone () {
2061
- return this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2040
+ TimeZone tz = (TimeZone ) this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2041
+ return tz == null ? null : tz .getID ();
2062
2042
}
2063
2043
2064
2044
public String getClientVersion () {
@@ -2071,10 +2051,9 @@ public String getClientVersion() {
2071
2051
* @param dbRoles
2072
2052
*/
2073
2053
public void setDBRoles (Collection <String > dbRoles ) {
2074
- this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), ClientConfigProperties .commaSeparated (dbRoles ));
2075
- this .unmodifiableDbRolesView =
2076
- Collections .unmodifiableCollection (ClientConfigProperties .valuesFromCommaSeparated (
2077
- this .configuration .get (ClientConfigProperties .SESSION_DB_ROLES .getKey ())));
2054
+ List <String > tmp = new ArrayList <>(dbRoles );
2055
+ this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), tmp );
2056
+ this .unmodifiableDbRolesView = ImmutableList .copyOf (tmp );
2078
2057
}
2079
2058
2080
2059
public void updateClientName (String name ) {
@@ -2105,4 +2084,17 @@ private Endpoint getNextAliveNode() {
2105
2084
}
2106
2085
2107
2086
public static final String VALUES_LIST_DELIMITER = "," ;
2087
+
2088
+ /**
2089
+ * Produces a merge of operation and client settings.
2090
+ * Operation settings override client settings
2091
+ * @param opSettings - operation settings
2092
+ * @return request settings - merged client and operation settings
2093
+ */
2094
+ private Map <String , Object > buildRequestSettings (Map <String , Object > opSettings ) {
2095
+ Map <String , Object > requestSettings = new HashMap <>();
2096
+ requestSettings .putAll (configuration );
2097
+ requestSettings .putAll (opSettings );
2098
+ return requestSettings ;
2099
+ }
2108
2100
}
0 commit comments