@@ -115,7 +115,7 @@ public class Client implements AutoCloseable {
115
115
private HttpAPIClientHelper httpClientHelper = null ;
116
116
117
117
private final List <Endpoint > endpoints ;
118
- private final Map <String , String > configuration ;
118
+ private final Map <String , Object > configuration ;
119
119
120
120
private final Map <String , String > readOnlyConfig ;
121
121
@@ -145,15 +145,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
145
145
private Client (Set <String > endpoints , Map <String ,String > configuration ,
146
146
ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
147
147
// Simple initialization
148
- this .configuration = configuration ;
149
- this .readOnlyConfig = Collections .unmodifiableMap (this . configuration );
148
+ this .configuration = ClientConfigProperties . parseConfigMap ( configuration ) ;
149
+ this .readOnlyConfig = Collections .unmodifiableMap (configuration );
150
150
this .metricsRegistry = metricsRegistry ;
151
151
152
152
// Serialization
153
153
this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
154
154
155
155
// Operation Execution
156
- boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
156
+ boolean isAsyncEnabled = ClientConfigProperties .ASYNC_OPERATIONS .getOrDefault (this .configuration );
157
+
157
158
if (isAsyncEnabled && sharedOperationExecutor == null ) {
158
159
this .isSharedOpExecutorOwned = true ;
159
160
this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
@@ -179,7 +180,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
179
180
}
180
181
181
182
this .endpoints = tmpEndpoints .build ();
182
- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
183
+ this .httpClientHelper = new HttpAPIClientHelper (this . configuration , metricsRegistry , initSslContext );
183
184
184
185
String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
185
186
this .retries = retry == null ? 0 : Integer .parseInt (retry );
@@ -217,7 +218,7 @@ public void loadServerInfo() {
217
218
* @return String - actual default database name.
218
219
*/
219
220
public String getDefaultDatabase () {
220
- return this .configuration .get ("database" );
221
+ return ( String ) this .configuration .get (ClientConfigProperties . DATABASE . getKey () );
221
222
}
222
223
223
224
@@ -845,7 +846,7 @@ public Builder setMaxRetries(int maxRetries) {
845
846
* @return
846
847
*/
847
848
public Builder allowBinaryReaderToReuseBuffers (boolean reuse ) {
848
- this .configuration .put ("client_allow_binary_reader_to_reuse_buffers" , String .valueOf (reuse ));
849
+ this .configuration .put (ClientConfigProperties . BINARY_READER_USE_PREALLOCATED_BUFFERS . getKey () , String .valueOf (reuse ));
849
850
return this ;
850
851
}
851
852
@@ -1009,20 +1010,21 @@ public Client build() {
1009
1010
throw new IllegalArgumentException ("At least one endpoint is required" );
1010
1011
}
1011
1012
// check if username and password are empty. so can not initiate client?
1012
- if (!this .configuration .containsKey ("access_token" ) &&
1013
- (!this .configuration .containsKey ("user" ) || !this .configuration .containsKey ("password" )) &&
1014
- !MapUtils .getFlag (this .configuration , "ssl_authentication" , false ) &&
1015
- !this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ))) {
1013
+ boolean useSslAuth = MapUtils .getFlag (this .configuration , ClientConfigProperties .SSL_AUTH .getKey ());
1014
+ boolean hasAccessToken = this .configuration .containsKey (ClientConfigProperties .ACCESS_TOKEN .getKey ());
1015
+ boolean hasUser = this .configuration .containsKey (ClientConfigProperties .USER .getKey ());
1016
+ boolean hasPassword = this .configuration .containsKey (ClientConfigProperties .PASSWORD .getKey ());
1017
+ boolean customHttpHeaders = this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ));
1018
+
1019
+ if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders )) {
1016
1020
throw new IllegalArgumentException ("Username and password (or access token or SSL authentication or pre-define Authorization header) are required" );
1017
1021
}
1018
1022
1019
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1020
- (this .configuration .containsKey ("password" ) || this .configuration .containsKey ("access_token" ))) {
1023
+ if (useSslAuth && (hasAccessToken || hasPassword )) {
1021
1024
throw new IllegalArgumentException ("Only one of password, access token or SSL authentication can be used per client." );
1022
1025
}
1023
1026
1024
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1025
- !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1027
+ if (useSslAuth && !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1026
1028
throw new IllegalArgumentException ("SSL authentication requires a client certificate" );
1027
1029
}
1028
1030
@@ -1159,17 +1161,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1159
1161
if (data == null || data .isEmpty ()) {
1160
1162
throw new IllegalArgumentException ("Data cannot be empty" );
1161
1163
}
1162
-
1164
+ //Add format to the settings
1165
+ if (settings == null ) {
1166
+ settings = new InsertSettings ();
1167
+ }
1163
1168
1164
1169
String operationId = registerOperationMetrics ();
1165
1170
settings .setOperationId (operationId );
1166
1171
globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
1167
1172
globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
1168
1173
1169
- //Add format to the settings
1170
- if (settings == null ) {
1171
- settings = new InsertSettings ();
1172
- }
1173
1174
1174
1175
boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
1175
1176
ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
@@ -1193,11 +1194,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1193
1194
}
1194
1195
1195
1196
1196
- String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1197
- final int maxRetries = retry == null ? 0 : Integer . parseInt ( retry ) ;
1197
+ Integer retry = ( Integer ) configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1198
+ final int maxRetries = retry == null ? 0 : retry ;
1198
1199
1199
1200
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1200
- final InsertSettings finalSettings = settings ;
1201
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1201
1202
Supplier <InsertResponse > supplier = () -> {
1202
1203
long startTime = System .nanoTime ();
1203
1204
// Selecting some node
@@ -1319,8 +1320,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1319
1320
InsertSettings settings ) {
1320
1321
1321
1322
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1322
- Integer .parseInt (configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (),
1323
- ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getDefaultValue ())) :
1323
+ (int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1324
1324
settings .getInputStreamCopyBufferSize ();
1325
1325
1326
1326
if (writeBufferSize <= 0 ) {
@@ -1392,17 +1392,16 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1392
1392
1393
1393
Supplier <InsertResponse > responseSupplier ;
1394
1394
1395
-
1396
1395
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1397
- Integer . parseInt ( configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (), "8192" )) :
1396
+ ( int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1398
1397
settings .getInputStreamCopyBufferSize ();
1399
1398
1400
1399
if (writeBufferSize <= 0 ) {
1401
1400
throw new IllegalArgumentException ("Buffer size must be greater than 0" );
1402
1401
}
1403
1402
1404
1403
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1405
- final InsertSettings finalSettings = settings ;
1404
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1406
1405
1407
1406
StringBuilder sqlStmt = new StringBuilder ("INSERT INTO " ).append (tableName );
1408
1407
if (columnNames != null && !columnNames .isEmpty ()) {
@@ -1531,14 +1530,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1531
1530
}
1532
1531
ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
1533
1532
clientStats .start (ClientMetrics .OP_DURATION );
1534
- applyDefaults (settings );
1535
1533
1536
1534
Supplier <QueryResponse > responseSupplier ;
1537
1535
1538
1536
if (queryParams != null ) {
1539
1537
settings .setOption ("statement_params" , queryParams );
1540
1538
}
1541
- final QuerySettings finalSettings = settings ;
1539
+ final QuerySettings finalSettings = new QuerySettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1542
1540
responseSupplier = () -> {
1543
1541
long startTime = System .nanoTime ();
1544
1542
// Selecting some node
@@ -1916,7 +1914,7 @@ public CompletableFuture<CommandResponse> execute(String sql) {
1916
1914
public ClickHouseBinaryFormatReader newBinaryFormatReader (QueryResponse response , TableSchema schema ) {
1917
1915
ClickHouseBinaryFormatReader reader = null ;
1918
1916
// Using caching buffer allocator is risky so this parameter is not exposed to the user
1919
- boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" );
1917
+ boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" , false );
1920
1918
BinaryStreamReader .ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
1921
1919
new BinaryStreamReader .CachingByteBufferAllocator () :
1922
1920
new BinaryStreamReader .DefaultByteBufferAllocator ();
@@ -1954,25 +1952,6 @@ private String registerOperationMetrics() {
1954
1952
return operationId ;
1955
1953
}
1956
1954
1957
- private void applyDefaults (QuerySettings settings ) {
1958
- Map <String , Object > settingsMap = settings .getAllSettings ();
1959
-
1960
- String key = ClientConfigProperties .USE_SERVER_TIMEZONE .getKey ();
1961
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1962
- settings .setOption (key , MapUtils .getFlag (configuration , key ));
1963
- }
1964
-
1965
- key = ClientConfigProperties .USE_TIMEZONE .getKey ();
1966
- if ( !settings .getUseServerTimeZone () && !settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1967
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1968
- }
1969
-
1970
- key = ClientConfigProperties .SERVER_TIMEZONE .getKey ();
1971
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1972
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1973
- }
1974
- }
1975
-
1976
1955
private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
1977
1956
boolean isAsync = MapUtils .getFlag (requestSettings , configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey ());
1978
1957
if (isAsync ) {
@@ -2000,7 +1979,7 @@ public Map<String, String> getConfiguration() {
2000
1979
2001
1980
/** Returns operation timeout in seconds */
2002
1981
protected int getOperationTimeout () {
2003
- return Integer . parseInt ( configuration .get (ClientConfigProperties .MAX_EXECUTION_TIME .getKey () ));
1982
+ return ( int ) configuration .get (ClientConfigProperties .MAX_EXECUTION_TIME .getKey ());
2004
1983
}
2005
1984
2006
1985
/**
@@ -2013,15 +1992,16 @@ public Set<String> getEndpoints() {
2013
1992
}
2014
1993
2015
1994
public String getUser () {
2016
- return this .configuration .get (ClientConfigProperties .USER .getKey ());
1995
+ return ( String ) this .configuration .get (ClientConfigProperties .USER .getKey ());
2017
1996
}
2018
1997
2019
1998
public String getServerVersion () {
2020
1999
return this .serverVersion ;
2021
2000
}
2022
2001
2023
2002
public String getServerTimeZone () {
2024
- return this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2003
+ TimeZone tz = (TimeZone ) this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2004
+ return tz == null ? null : tz .getID ();
2025
2005
}
2026
2006
2027
2007
public String getClientVersion () {
@@ -2034,10 +2014,9 @@ public String getClientVersion() {
2034
2014
* @param dbRoles
2035
2015
*/
2036
2016
public void setDBRoles (Collection <String > dbRoles ) {
2037
- this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), ClientConfigProperties .commaSeparated (dbRoles ));
2038
- this .unmodifiableDbRolesView =
2039
- Collections .unmodifiableCollection (ClientConfigProperties .valuesFromCommaSeparated (
2040
- this .configuration .get (ClientConfigProperties .SESSION_DB_ROLES .getKey ())));
2017
+ List <String > tmp = new ArrayList <>(dbRoles );
2018
+ this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), tmp );
2019
+ this .unmodifiableDbRolesView = ImmutableList .copyOf (tmp );
2041
2020
}
2042
2021
2043
2022
public void updateClientName (String name ) {
@@ -2068,4 +2047,17 @@ private Endpoint getNextAliveNode() {
2068
2047
}
2069
2048
2070
2049
public static final String VALUES_LIST_DELIMITER = "," ;
2050
+
2051
+ /**
2052
+ * Produces a merge of operation and client settings.
2053
+ * Operation settings override client settings
2054
+ * @param opSettings - operation settings
2055
+ * @return request settings - merged client and operation settings
2056
+ */
2057
+ private Map <String , Object > buildRequestSettings (Map <String , Object > opSettings ) {
2058
+ Map <String , Object > requestSettings = new HashMap <>();
2059
+ requestSettings .putAll (configuration );
2060
+ requestSettings .putAll (opSettings );
2061
+ return requestSettings ;
2062
+ }
2071
2063
}
0 commit comments