@@ -115,7 +115,7 @@ public class Client implements AutoCloseable {
115
115
private HttpAPIClientHelper httpClientHelper = null ;
116
116
117
117
private final List <Endpoint > endpoints ;
118
- private final Map <String , String > configuration ;
118
+ private final Map <String , Object > configuration ;
119
119
120
120
private final Map <String , String > readOnlyConfig ;
121
121
@@ -145,15 +145,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
145
145
private Client (Set <String > endpoints , Map <String ,String > configuration ,
146
146
ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
147
147
// Simple initialization
148
- this .configuration = configuration ;
149
- this .readOnlyConfig = Collections .unmodifiableMap (this . configuration );
148
+ this .configuration = ClientConfigProperties . parseConfigMap ( configuration ) ;
149
+ this .readOnlyConfig = Collections .unmodifiableMap (configuration );
150
150
this .metricsRegistry = metricsRegistry ;
151
151
152
152
// Serialization
153
153
this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
154
154
155
155
// Operation Execution
156
- boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
156
+ boolean isAsyncEnabled = ClientConfigProperties .ASYNC_OPERATIONS .getOrDefault (this .configuration );
157
+
157
158
if (isAsyncEnabled && sharedOperationExecutor == null ) {
158
159
this .isSharedOpExecutorOwned = true ;
159
160
this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
@@ -179,7 +180,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
179
180
}
180
181
181
182
this .endpoints = tmpEndpoints .build ();
182
- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
183
+ this .httpClientHelper = new HttpAPIClientHelper (this . configuration , metricsRegistry , initSslContext );
183
184
184
185
String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
185
186
this .retries = retry == null ? 0 : Integer .parseInt (retry );
@@ -217,7 +218,7 @@ public void loadServerInfo() {
217
218
* @return String - actual default database name.
218
219
*/
219
220
public String getDefaultDatabase () {
220
- return this .configuration .get ("database" );
221
+ return ( String ) this .configuration .get (ClientConfigProperties . DATABASE . getKey () );
221
222
}
222
223
223
224
@@ -845,7 +846,7 @@ public Builder setMaxRetries(int maxRetries) {
845
846
* @return
846
847
*/
847
848
public Builder allowBinaryReaderToReuseBuffers (boolean reuse ) {
848
- this .configuration .put ("client_allow_binary_reader_to_reuse_buffers" , String .valueOf (reuse ));
849
+ this .configuration .put (ClientConfigProperties . BINARY_READER_USE_PREALLOCATED_BUFFERS . getKey () , String .valueOf (reuse ));
849
850
return this ;
850
851
}
851
852
@@ -1009,20 +1010,21 @@ public Client build() {
1009
1010
throw new IllegalArgumentException ("At least one endpoint is required" );
1010
1011
}
1011
1012
// check if username and password are empty. so can not initiate client?
1012
- if (!this .configuration .containsKey ("access_token" ) &&
1013
- (!this .configuration .containsKey ("user" ) || !this .configuration .containsKey ("password" )) &&
1014
- !MapUtils .getFlag (this .configuration , "ssl_authentication" , false ) &&
1015
- !this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ))) {
1013
+ boolean useSslAuth = MapUtils .getFlag (this .configuration , ClientConfigProperties .SSL_AUTH .getKey ());
1014
+ boolean hasAccessToken = this .configuration .containsKey (ClientConfigProperties .ACCESS_TOKEN .getKey ());
1015
+ boolean hasUser = this .configuration .containsKey (ClientConfigProperties .USER .getKey ());
1016
+ boolean hasPassword = this .configuration .containsKey (ClientConfigProperties .PASSWORD .getKey ());
1017
+ boolean customHttpHeaders = this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ));
1018
+
1019
+ if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders )) {
1016
1020
throw new IllegalArgumentException ("Username and password (or access token or SSL authentication or pre-define Authorization header) are required" );
1017
1021
}
1018
1022
1019
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1020
- (this .configuration .containsKey ("password" ) || this .configuration .containsKey ("access_token" ))) {
1023
+ if (useSslAuth && (hasAccessToken || hasPassword )) {
1021
1024
throw new IllegalArgumentException ("Only one of password, access token or SSL authentication can be used per client." );
1022
1025
}
1023
1026
1024
- if (this .configuration .containsKey ("ssl_authentication" ) &&
1025
- !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1027
+ if (useSslAuth && !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1026
1028
throw new IllegalArgumentException ("SSL authentication requires a client certificate" );
1027
1029
}
1028
1030
@@ -1159,17 +1161,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1159
1161
if (data == null || data .isEmpty ()) {
1160
1162
throw new IllegalArgumentException ("Data cannot be empty" );
1161
1163
}
1162
-
1164
+ //Add format to the settings
1165
+ if (settings == null ) {
1166
+ settings = new InsertSettings ();
1167
+ }
1163
1168
1164
1169
String operationId = registerOperationMetrics ();
1165
1170
settings .setOperationId (operationId );
1166
1171
globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
1167
1172
globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
1168
1173
1169
- //Add format to the settings
1170
- if (settings == null ) {
1171
- settings = new InsertSettings ();
1172
- }
1173
1174
1174
1175
boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
1175
1176
ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
@@ -1193,11 +1194,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1193
1194
}
1194
1195
1195
1196
1196
- String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1197
- final int maxRetries = retry == null ? 0 : Integer . parseInt ( retry ) ;
1197
+ Integer retry = ( Integer ) configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1198
+ final int maxRetries = retry == null ? 0 : retry ;
1198
1199
1199
1200
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1200
- final InsertSettings finalSettings = settings ;
1201
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1201
1202
Supplier <InsertResponse > supplier = () -> {
1202
1203
long startTime = System .nanoTime ();
1203
1204
// Selecting some node
@@ -1319,8 +1320,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1319
1320
InsertSettings settings ) {
1320
1321
1321
1322
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1322
- Integer .parseInt (configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (),
1323
- ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getDefaultValue ())) :
1323
+ (int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1324
1324
settings .getInputStreamCopyBufferSize ();
1325
1325
1326
1326
if (writeBufferSize <= 0 ) {
@@ -1392,17 +1392,16 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1392
1392
1393
1393
Supplier <InsertResponse > responseSupplier ;
1394
1394
1395
-
1396
1395
final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1397
- Integer . parseInt ( configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (), "8192" )) :
1396
+ ( int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
1398
1397
settings .getInputStreamCopyBufferSize ();
1399
1398
1400
1399
if (writeBufferSize <= 0 ) {
1401
1400
throw new IllegalArgumentException ("Buffer size must be greater than 0" );
1402
1401
}
1403
1402
1404
1403
settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1405
- final InsertSettings finalSettings = settings ;
1404
+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1406
1405
1407
1406
StringBuilder sqlStmt = new StringBuilder ("INSERT INTO " ).append (tableName );
1408
1407
if (columnNames != null && !columnNames .isEmpty ()) {
@@ -1532,14 +1531,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1532
1531
}
1533
1532
ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
1534
1533
clientStats .start (ClientMetrics .OP_DURATION );
1535
- applyDefaults (settings );
1536
1534
1537
1535
Supplier <QueryResponse > responseSupplier ;
1538
1536
1539
1537
if (queryParams != null ) {
1540
1538
settings .setOption ("statement_params" , queryParams );
1541
1539
}
1542
- final QuerySettings finalSettings = settings ;
1540
+ final QuerySettings finalSettings = new QuerySettings ( buildRequestSettings ( settings . getAllSettings ())) ;
1543
1541
responseSupplier = () -> {
1544
1542
long startTime = System .nanoTime ();
1545
1543
// Selecting some node
@@ -1917,7 +1915,7 @@ public CompletableFuture<CommandResponse> execute(String sql) {
1917
1915
public ClickHouseBinaryFormatReader newBinaryFormatReader (QueryResponse response , TableSchema schema ) {
1918
1916
ClickHouseBinaryFormatReader reader = null ;
1919
1917
// Using caching buffer allocator is risky so this parameter is not exposed to the user
1920
- boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" );
1918
+ boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" , false );
1921
1919
BinaryStreamReader .ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
1922
1920
new BinaryStreamReader .CachingByteBufferAllocator () :
1923
1921
new BinaryStreamReader .DefaultByteBufferAllocator ();
@@ -1955,25 +1953,6 @@ private String registerOperationMetrics() {
1955
1953
return operationId ;
1956
1954
}
1957
1955
1958
- private void applyDefaults (QuerySettings settings ) {
1959
- Map <String , Object > settingsMap = settings .getAllSettings ();
1960
-
1961
- String key = ClientConfigProperties .USE_SERVER_TIMEZONE .getKey ();
1962
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1963
- settings .setOption (key , MapUtils .getFlag (configuration , key ));
1964
- }
1965
-
1966
- key = ClientConfigProperties .USE_TIMEZONE .getKey ();
1967
- if ( !settings .getUseServerTimeZone () && !settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1968
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1969
- }
1970
-
1971
- key = ClientConfigProperties .SERVER_TIMEZONE .getKey ();
1972
- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1973
- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1974
- }
1975
- }
1976
-
1977
1956
private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
1978
1957
boolean isAsync = MapUtils .getFlag (requestSettings , configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey ());
1979
1958
if (isAsync ) {
@@ -2001,7 +1980,7 @@ public Map<String, String> getConfiguration() {
2001
1980
2002
1981
/** Returns operation timeout in seconds */
2003
1982
protected int getOperationTimeout () {
2004
- return Integer . parseInt ( configuration . get ( ClientConfigProperties .MAX_EXECUTION_TIME .getKey ()) );
1983
+ return ClientConfigProperties .MAX_EXECUTION_TIME .getOrDefault ( configuration );
2005
1984
}
2006
1985
2007
1986
/**
@@ -2014,15 +1993,16 @@ public Set<String> getEndpoints() {
2014
1993
}
2015
1994
2016
1995
public String getUser () {
2017
- return this .configuration .get (ClientConfigProperties .USER .getKey ());
1996
+ return ( String ) this .configuration .get (ClientConfigProperties .USER .getKey ());
2018
1997
}
2019
1998
2020
1999
public String getServerVersion () {
2021
2000
return this .serverVersion ;
2022
2001
}
2023
2002
2024
2003
public String getServerTimeZone () {
2025
- return this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2004
+ TimeZone tz = (TimeZone ) this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2005
+ return tz == null ? null : tz .getID ();
2026
2006
}
2027
2007
2028
2008
public String getClientVersion () {
@@ -2035,10 +2015,9 @@ public String getClientVersion() {
2035
2015
* @param dbRoles
2036
2016
*/
2037
2017
public void setDBRoles (Collection <String > dbRoles ) {
2038
- this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), ClientConfigProperties .commaSeparated (dbRoles ));
2039
- this .unmodifiableDbRolesView =
2040
- Collections .unmodifiableCollection (ClientConfigProperties .valuesFromCommaSeparated (
2041
- this .configuration .get (ClientConfigProperties .SESSION_DB_ROLES .getKey ())));
2018
+ List <String > tmp = new ArrayList <>(dbRoles );
2019
+ this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), tmp );
2020
+ this .unmodifiableDbRolesView = ImmutableList .copyOf (tmp );
2042
2021
}
2043
2022
2044
2023
public void updateClientName (String name ) {
@@ -2069,4 +2048,17 @@ private Endpoint getNextAliveNode() {
2069
2048
}
2070
2049
2071
2050
public static final String VALUES_LIST_DELIMITER = "," ;
2051
+
2052
+ /**
2053
+ * Produces a merge of operation and client settings.
2054
+ * Operation settings override client settings
2055
+ * @param opSettings - operation settings
2056
+ * @return request settings - merged client and operation settings
2057
+ */
2058
+ private Map <String , Object > buildRequestSettings (Map <String , Object > opSettings ) {
2059
+ Map <String , Object > requestSettings = new HashMap <>();
2060
+ requestSettings .putAll (configuration );
2061
+ requestSettings .putAll (opSettings );
2062
+ return requestSettings ;
2063
+ }
2072
2064
}
0 commit comments