@@ -115,7 +115,7 @@ public class Client implements AutoCloseable {
115115 private HttpAPIClientHelper httpClientHelper = null ;
116116
117117 private final List <Endpoint > endpoints ;
118- private final Map <String , String > configuration ;
118+ private final Map <String , Object > configuration ;
119119
120120 private final Map <String , String > readOnlyConfig ;
121121
@@ -145,15 +145,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
145145 private Client (Set <String > endpoints , Map <String ,String > configuration ,
146146 ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
147147 // Simple initialization
148- this .configuration = configuration ;
149- this .readOnlyConfig = Collections .unmodifiableMap (this . configuration );
148+ this .configuration = ClientConfigProperties . parseConfigMap ( configuration ) ;
149+ this .readOnlyConfig = Collections .unmodifiableMap (configuration );
150150 this .metricsRegistry = metricsRegistry ;
151151
152152 // Serialization
153153 this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
154154
155155 // Operation Execution
156- boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
156+ boolean isAsyncEnabled = ClientConfigProperties .ASYNC_OPERATIONS .getOrDefault (this .configuration );
157+
157158 if (isAsyncEnabled && sharedOperationExecutor == null ) {
158159 this .isSharedOpExecutorOwned = true ;
159160 this .sharedOperationExecutor = Executors .newCachedThreadPool (new DefaultThreadFactory ("chc-operation" ));
@@ -179,7 +180,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration,
179180 }
180181
181182 this .endpoints = tmpEndpoints .build ();
182- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
183+ this .httpClientHelper = new HttpAPIClientHelper (this . configuration , metricsRegistry , initSslContext );
183184
184185 String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
185186 this .retries = retry == null ? 0 : Integer .parseInt (retry );
@@ -217,7 +218,7 @@ public void loadServerInfo() {
217218 * @return String - actual default database name.
218219 */
219220 public String getDefaultDatabase () {
220- return this .configuration .get ("database" );
221+ return ( String ) this .configuration .get (ClientConfigProperties . DATABASE . getKey () );
221222 }
222223
223224
@@ -845,7 +846,7 @@ public Builder setMaxRetries(int maxRetries) {
845846 * @return
846847 */
847848 public Builder allowBinaryReaderToReuseBuffers (boolean reuse ) {
848- this .configuration .put ("client_allow_binary_reader_to_reuse_buffers" , String .valueOf (reuse ));
849+ this .configuration .put (ClientConfigProperties . BINARY_READER_USE_PREALLOCATED_BUFFERS . getKey () , String .valueOf (reuse ));
849850 return this ;
850851 }
851852
@@ -1009,20 +1010,21 @@ public Client build() {
10091010 throw new IllegalArgumentException ("At least one endpoint is required" );
10101011 }
10111012 // check if username and password are empty. so can not initiate client?
1012- if (!this .configuration .containsKey ("access_token" ) &&
1013- (!this .configuration .containsKey ("user" ) || !this .configuration .containsKey ("password" )) &&
1014- !MapUtils .getFlag (this .configuration , "ssl_authentication" , false ) &&
1015- !this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ))) {
1013+ boolean useSslAuth = MapUtils .getFlag (this .configuration , ClientConfigProperties .SSL_AUTH .getKey ());
1014+ boolean hasAccessToken = this .configuration .containsKey (ClientConfigProperties .ACCESS_TOKEN .getKey ());
1015+ boolean hasUser = this .configuration .containsKey (ClientConfigProperties .USER .getKey ());
1016+ boolean hasPassword = this .configuration .containsKey (ClientConfigProperties .PASSWORD .getKey ());
1017+ boolean customHttpHeaders = this .configuration .containsKey (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ));
1018+
1019+ if (!(useSslAuth || hasAccessToken || hasUser || hasPassword || customHttpHeaders )) {
10161020 throw new IllegalArgumentException ("Username and password (or access token or SSL authentication or pre-define Authorization header) are required" );
10171021 }
10181022
1019- if (this .configuration .containsKey ("ssl_authentication" ) &&
1020- (this .configuration .containsKey ("password" ) || this .configuration .containsKey ("access_token" ))) {
1023+ if (useSslAuth && (hasAccessToken || hasPassword )) {
10211024 throw new IllegalArgumentException ("Only one of password, access token or SSL authentication can be used per client." );
10221025 }
10231026
1024- if (this .configuration .containsKey ("ssl_authentication" ) &&
1025- !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
1027+ if (useSslAuth && !this .configuration .containsKey (ClientConfigProperties .SSL_CERTIFICATE .getKey ())) {
10261028 throw new IllegalArgumentException ("SSL authentication requires a client certificate" );
10271029 }
10281030
@@ -1159,17 +1161,16 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
11591161 if (data == null || data .isEmpty ()) {
11601162 throw new IllegalArgumentException ("Data cannot be empty" );
11611163 }
1162-
1164+ //Add format to the settings
1165+ if (settings == null ) {
1166+ settings = new InsertSettings ();
1167+ }
11631168
11641169 String operationId = registerOperationMetrics ();
11651170 settings .setOperationId (operationId );
11661171 globalClientStats .get (operationId ).start (ClientMetrics .OP_DURATION );
11671172 globalClientStats .get (operationId ).start (ClientMetrics .OP_SERIALIZATION );
11681173
1169- //Add format to the settings
1170- if (settings == null ) {
1171- settings = new InsertSettings ();
1172- }
11731174
11741175 boolean hasDefaults = this .tableSchemaHasDefaults .get (tableName );
11751176 ClickHouseFormat format = hasDefaults ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
@@ -1193,11 +1194,11 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
11931194 }
11941195
11951196
1196- String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1197- final int maxRetries = retry == null ? 0 : Integer . parseInt ( retry ) ;
1197+ Integer retry = ( Integer ) configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
1198+ final int maxRetries = retry == null ? 0 : retry ;
11981199
11991200 settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1200- final InsertSettings finalSettings = settings ;
1201+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
12011202 Supplier <InsertResponse > supplier = () -> {
12021203 long startTime = System .nanoTime ();
12031204 // Selecting some node
@@ -1319,8 +1320,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
13191320 InsertSettings settings ) {
13201321
13211322 final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1322- Integer .parseInt (configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (),
1323- ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getDefaultValue ())) :
1323+ (int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
13241324 settings .getInputStreamCopyBufferSize ();
13251325
13261326 if (writeBufferSize <= 0 ) {
@@ -1392,17 +1392,16 @@ public CompletableFuture<InsertResponse> insert(String tableName,
13921392
13931393 Supplier <InsertResponse > responseSupplier ;
13941394
1395-
13961395 final int writeBufferSize = settings .getInputStreamCopyBufferSize () <= 0 ?
1397- Integer . parseInt ( configuration .getOrDefault (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey (), "8192" )) :
1396+ ( int ) configuration .get (ClientConfigProperties .CLIENT_NETWORK_BUFFER_SIZE .getKey ()) :
13981397 settings .getInputStreamCopyBufferSize ();
13991398
14001399 if (writeBufferSize <= 0 ) {
14011400 throw new IllegalArgumentException ("Buffer size must be greater than 0" );
14021401 }
14031402
14041403 settings .setOption (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey (), format .name ());
1405- final InsertSettings finalSettings = settings ;
1404+ final InsertSettings finalSettings = new InsertSettings ( buildRequestSettings ( settings . getAllSettings ())) ;
14061405
14071406 StringBuilder sqlStmt = new StringBuilder ("INSERT INTO " ).append (tableName );
14081407 if (columnNames != null && !columnNames .isEmpty ()) {
@@ -1532,14 +1531,13 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
15321531 }
15331532 ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
15341533 clientStats .start (ClientMetrics .OP_DURATION );
1535- applyDefaults (settings );
15361534
15371535 Supplier <QueryResponse > responseSupplier ;
15381536
15391537 if (queryParams != null ) {
15401538 settings .setOption ("statement_params" , queryParams );
15411539 }
1542- final QuerySettings finalSettings = settings ;
1540+ final QuerySettings finalSettings = new QuerySettings ( buildRequestSettings ( settings . getAllSettings ())) ;
15431541 responseSupplier = () -> {
15441542 long startTime = System .nanoTime ();
15451543 // Selecting some node
@@ -1917,7 +1915,7 @@ public CompletableFuture<CommandResponse> execute(String sql) {
19171915 public ClickHouseBinaryFormatReader newBinaryFormatReader (QueryResponse response , TableSchema schema ) {
19181916 ClickHouseBinaryFormatReader reader = null ;
19191917 // Using caching buffer allocator is risky so this parameter is not exposed to the user
1920- boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" );
1918+ boolean useCachingBufferAllocator = MapUtils .getFlag (configuration , "client_allow_binary_reader_to_reuse_buffers" , false );
19211919 BinaryStreamReader .ByteBufferAllocator byteBufferPool = useCachingBufferAllocator ?
19221920 new BinaryStreamReader .CachingByteBufferAllocator () :
19231921 new BinaryStreamReader .DefaultByteBufferAllocator ();
@@ -1955,25 +1953,6 @@ private String registerOperationMetrics() {
19551953 return operationId ;
19561954 }
19571955
1958- private void applyDefaults (QuerySettings settings ) {
1959- Map <String , Object > settingsMap = settings .getAllSettings ();
1960-
1961- String key = ClientConfigProperties .USE_SERVER_TIMEZONE .getKey ();
1962- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1963- settings .setOption (key , MapUtils .getFlag (configuration , key ));
1964- }
1965-
1966- key = ClientConfigProperties .USE_TIMEZONE .getKey ();
1967- if ( !settings .getUseServerTimeZone () && !settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1968- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1969- }
1970-
1971- key = ClientConfigProperties .SERVER_TIMEZONE .getKey ();
1972- if (!settingsMap .containsKey (key ) && configuration .containsKey (key )) {
1973- settings .setOption (key , TimeZone .getTimeZone (configuration .get (key )));
1974- }
1975- }
1976-
19771956 private <T > CompletableFuture <T > runAsyncOperation (Supplier <T > resultSupplier , Map <String , Object > requestSettings ) {
19781957 boolean isAsync = MapUtils .getFlag (requestSettings , configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey ());
19791958 if (isAsync ) {
@@ -2001,7 +1980,7 @@ public Map<String, String> getConfiguration() {
20011980
20021981 /** Returns operation timeout in seconds */
20031982 protected int getOperationTimeout () {
2004- return Integer . parseInt ( configuration . get ( ClientConfigProperties .MAX_EXECUTION_TIME .getKey ()) );
1983+ return ClientConfigProperties .MAX_EXECUTION_TIME .getOrDefault ( configuration );
20051984 }
20061985
20071986 /**
@@ -2014,15 +1993,16 @@ public Set<String> getEndpoints() {
20141993 }
20151994
20161995 public String getUser () {
2017- return this .configuration .get (ClientConfigProperties .USER .getKey ());
1996+ return ( String ) this .configuration .get (ClientConfigProperties .USER .getKey ());
20181997 }
20191998
20201999 public String getServerVersion () {
20212000 return this .serverVersion ;
20222001 }
20232002
20242003 public String getServerTimeZone () {
2025- return this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2004+ TimeZone tz = (TimeZone ) this .configuration .get (ClientConfigProperties .SERVER_TIMEZONE .getKey ());
2005+ return tz == null ? null : tz .getID ();
20262006 }
20272007
20282008 public String getClientVersion () {
@@ -2035,10 +2015,9 @@ public String getClientVersion() {
20352015 * @param dbRoles
20362016 */
20372017 public void setDBRoles (Collection <String > dbRoles ) {
2038- this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), ClientConfigProperties .commaSeparated (dbRoles ));
2039- this .unmodifiableDbRolesView =
2040- Collections .unmodifiableCollection (ClientConfigProperties .valuesFromCommaSeparated (
2041- this .configuration .get (ClientConfigProperties .SESSION_DB_ROLES .getKey ())));
2018+ List <String > tmp = new ArrayList <>(dbRoles );
2019+ this .configuration .put (ClientConfigProperties .SESSION_DB_ROLES .getKey (), tmp );
2020+ this .unmodifiableDbRolesView = ImmutableList .copyOf (tmp );
20422021 }
20432022
20442023 public void updateClientName (String name ) {
@@ -2069,4 +2048,17 @@ private Endpoint getNextAliveNode() {
20692048 }
20702049
20712050 public static final String VALUES_LIST_DELIMITER = "," ;
2051+
2052+ /**
2053+ * Produces a merge of operation and client settings.
2054+ * Operation settings override client settings
2055+ * @param opSettings - operation settings
2056+ * @return request settings - merged client and operation settings
2057+ */
2058+ private Map <String , Object > buildRequestSettings (Map <String , Object > opSettings ) {
2059+ Map <String , Object > requestSettings = new HashMap <>();
2060+ requestSettings .putAll (configuration );
2061+ requestSettings .putAll (opSettings );
2062+ return requestSettings ;
2063+ }
20722064}
0 commit comments