Skip to content

Commit eb19133

Browse files
czy0061996fanrui
andauthored
[FLINK-34537] Autoscaler JDBC Support HikariPool (#785)
* [FLINK-34537] Autoscaler JDBC Support HikariPool * fix pom with dependency * Remove unnecessary HikariCP dependency for flink-autoscaler-standalone module * Polish the error message * Address Yuepeng's comments --------- Co-authored-by: Rui Fan <[email protected]>
1 parent 84d9b74 commit eb19133

File tree

8 files changed

+89
-42
lines changed

8 files changed

+89
-42
lines changed

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,15 @@
2222
import org.apache.flink.autoscaler.event.LoggingEventHandler;
2323
import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
2424
import org.apache.flink.autoscaler.jdbc.event.JdbcEventInteractor;
25+
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.configuration.DescribedEnum;
2728
import org.apache.flink.configuration.description.InlineElement;
2829

29-
import java.sql.DriverManager;
30-
3130
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
3231
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
3332
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
34-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
35-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
36-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
3733
import static org.apache.flink.configuration.description.TextElement.text;
38-
import static org.apache.flink.util.Preconditions.checkArgument;
3934

4035
/** The factory of {@link AutoScalerEventHandler}. */
4136
public class AutoscalerEventHandlerFactory {
@@ -77,12 +72,7 @@ AutoScalerEventHandler<KEY, Context> create(Configuration conf) throws Exception
7772
private static <KEY, Context extends JobAutoScalerContext<KEY>>
7873
AutoScalerEventHandler<KEY, Context> createJdbcEventHandler(Configuration conf)
7974
throws Exception {
80-
final var jdbcUrl = conf.get(JDBC_URL);
81-
checkArgument(jdbcUrl != null, "%s is required for jdbc event handler.", JDBC_URL.key());
82-
var user = conf.get(JDBC_USERNAME);
83-
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
84-
85-
var conn = DriverManager.getConnection(jdbcUrl, user, password);
75+
var conn = HikariJDBCUtil.getConnection(conf);
8676
return new JdbcAutoScalerEventHandler<>(new JdbcEventInteractor(conn));
8777
}
8878
}

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,17 @@
2121
import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
2222
import org.apache.flink.autoscaler.jdbc.state.JdbcStateInteractor;
2323
import org.apache.flink.autoscaler.jdbc.state.JdbcStateStore;
24+
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
2425
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
2526
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.configuration.DescribedEnum;
2829
import org.apache.flink.configuration.description.InlineElement;
2930

30-
import java.sql.DriverManager;
31-
3231
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
3332
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
34-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
35-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
36-
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
3733
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
3834
import static org.apache.flink.configuration.description.TextElement.text;
39-
import static org.apache.flink.util.Preconditions.checkArgument;
4035

4136
/** The factory of {@link AutoScalerStateStore}. */
4237
public class AutoscalerStateStoreFactory {
@@ -79,12 +74,7 @@ AutoScalerStateStore<KEY, Context> create(Configuration conf) throws Exception {
7974
private static <KEY, Context extends JobAutoScalerContext<KEY>>
8075
AutoScalerStateStore<KEY, Context> createJdbcStateStore(Configuration conf)
8176
throws Exception {
82-
final var jdbcUrl = conf.get(JDBC_URL);
83-
checkArgument(jdbcUrl != null, "%s is required for jdbc state store.", JDBC_URL.key());
84-
var user = conf.get(JDBC_USERNAME);
85-
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
86-
87-
var conn = DriverManager.getConnection(jdbcUrl, user, password);
77+
var conn = HikariJDBCUtil.getConnection(conf);
8878
return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn)));
8979
}
9080
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.standalone.utils;
19+
20+
import org.apache.flink.configuration.Configuration;
21+
import org.apache.flink.util.StringUtils;
22+
23+
import com.zaxxer.hikari.HikariConfig;
24+
import com.zaxxer.hikari.HikariDataSource;
25+
26+
import java.sql.Connection;
27+
import java.sql.SQLException;
28+
29+
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE;
30+
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
31+
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_USERNAME;
32+
import static org.apache.flink.util.Preconditions.checkArgument;
33+
34+
/** Hikari JDBC common util. */
35+
public class HikariJDBCUtil {
36+
37+
public static final String JDBC_URL_REQUIRED_HINT =
38+
String.format(
39+
"%s is required when jdbc state store or jdbc event handler is used.",
40+
JDBC_URL.key());
41+
42+
public static Connection getConnection(Configuration conf) throws SQLException {
43+
final var jdbcUrl = conf.get(JDBC_URL);
44+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(jdbcUrl), JDBC_URL_REQUIRED_HINT);
45+
var user = conf.get(JDBC_USERNAME);
46+
var password = System.getenv().get(conf.get(JDBC_PASSWORD_ENV_VARIABLE));
47+
HikariConfig hikariConfig = new HikariConfig();
48+
hikariConfig.setJdbcUrl(jdbcUrl);
49+
hikariConfig.setUsername(user);
50+
hikariConfig.setPassword(password);
51+
return new HikariDataSource(hikariConfig).getConnection();
52+
}
53+
}

flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ This project bundles the following dependencies under the Apache Software Licens
2020
- org.apache.logging.log4j:log4j-api:2.17.1
2121
- org.apache.logging.log4j:log4j-core:2.17.1
2222
- org.apache.logging.log4j:log4j-1.2-api:2.17.1
23+
- com.zaxxer:HikariCP:5.1.0
2324

2425
This project bundles the following dependencies under the BSD License.
2526
See bundled license files for details.

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@
1919

2020
import org.apache.flink.autoscaler.event.LoggingEventHandler;
2121
import org.apache.flink.autoscaler.jdbc.event.JdbcAutoScalerEventHandler;
22+
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
2223
import org.apache.flink.configuration.Configuration;
2324

2425
import org.junit.jupiter.api.Test;
2526

26-
import java.sql.DriverManager;
27-
import java.sql.SQLException;
28-
2927
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.JDBC;
3028
import static org.apache.flink.autoscaler.standalone.AutoscalerEventHandlerFactory.EventHandlerType.LOGGING;
3129
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.EVENT_HANDLER_TYPE;
3230
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
31+
import static org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil.JDBC_URL_REQUIRED_HINT;
3332
import static org.assertj.core.api.Assertions.assertThat;
3433
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3534

@@ -59,25 +58,26 @@ void testCreateJdbcEventHandlerWithoutURL() {
5958
conf.set(EVENT_HANDLER_TYPE, JDBC);
6059
assertThatThrownBy(() -> AutoscalerEventHandlerFactory.create(conf))
6160
.isInstanceOf(IllegalArgumentException.class)
62-
.hasMessage("%s is required for jdbc event handler.", JDBC_URL.key());
61+
.hasMessage(JDBC_URL_REQUIRED_HINT);
6362
}
6463

6564
@Test
6665
void testCreateJdbcEventHandler() throws Exception {
6766
final var jdbcUrl = "jdbc:derby:memory:test";
68-
DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close();
69-
7067
// Test for create JDBC Event Handler.
7168
final var conf = new Configuration();
7269
conf.set(EVENT_HANDLER_TYPE, JDBC);
73-
conf.set(JDBC_URL, jdbcUrl);
70+
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
71+
HikariJDBCUtil.getConnection(conf).close();
7472

7573
var eventHandler = AutoscalerEventHandlerFactory.create(conf);
7674
assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class);
7775

7876
try {
79-
DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close();
80-
} catch (SQLException ignored) {
77+
conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
78+
HikariJDBCUtil.getConnection(conf).close();
79+
} catch (RuntimeException ignored) {
80+
// database shutdown ignored exception
8181
}
8282
}
8383
}

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818
package org.apache.flink.autoscaler.standalone;
1919

2020
import org.apache.flink.autoscaler.jdbc.state.JdbcAutoScalerStateStore;
21+
import org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil;
2122
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
2223
import org.apache.flink.configuration.Configuration;
2324

2425
import org.junit.jupiter.api.Test;
2526

26-
import java.sql.DriverManager;
27-
import java.sql.SQLException;
28-
2927
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.JDBC;
3028
import static org.apache.flink.autoscaler.standalone.AutoscalerStateStoreFactory.StateStoreType.MEMORY;
3129
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_URL;
3230
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.STATE_STORE_TYPE;
31+
import static org.apache.flink.autoscaler.standalone.utils.HikariJDBCUtil.JDBC_URL_REQUIRED_HINT;
3332
import static org.assertj.core.api.Assertions.assertThat;
3433
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3534

@@ -58,25 +57,26 @@ void testCreateJdbcStateStoreWithoutURL() {
5857
conf.set(STATE_STORE_TYPE, JDBC);
5958
assertThatThrownBy(() -> AutoscalerStateStoreFactory.create(conf))
6059
.isInstanceOf(IllegalArgumentException.class)
61-
.hasMessage("%s is required for jdbc state store.", JDBC_URL.key());
60+
.hasMessage(JDBC_URL_REQUIRED_HINT);
6261
}
6362

6463
@Test
6564
void testCreateJdbcStateStore() throws Exception {
6665
final var jdbcUrl = "jdbc:derby:memory:test";
67-
DriverManager.getConnection(String.format("%s;create=true", jdbcUrl)).close();
68-
6966
// Test for create JDBC State store.
7067
final var conf = new Configuration();
7168
conf.set(STATE_STORE_TYPE, JDBC);
72-
conf.set(JDBC_URL, jdbcUrl);
69+
conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl));
70+
HikariJDBCUtil.getConnection(conf).close();
7371

7472
var stateStore = AutoscalerStateStoreFactory.create(conf);
7573
assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class);
7674

7775
try {
78-
DriverManager.getConnection(String.format("%s;shutdown=true", jdbcUrl)).close();
79-
} catch (SQLException ignored) {
76+
conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl));
77+
HikariJDBCUtil.getConnection(conf).close();
78+
} catch (RuntimeException ignored) {
79+
// database shutdown ignored exception
8080
}
8181
}
8282
}

flink-autoscaler/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ under the License.
5757
<groupId>org.quartz-scheduler</groupId>
5858
<artifactId>quartz</artifactId>
5959
<version>${quartz.version}</version>
60+
<exclusions>
61+
<exclusion>
62+
<artifactId>HikariCP-java7</artifactId>
63+
<groupId>com.zaxxer</groupId>
64+
</exclusion>
65+
</exclusions>
66+
</dependency>
67+
68+
<dependency>
69+
<groupId>com.zaxxer</groupId>
70+
<artifactId>HikariCP</artifactId>
71+
<version>${hikari.version}</version>
6072
</dependency>
6173

6274
<dependency>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ under the License.
9999
<assertj.version>3.21.0</assertj.version>
100100

101101
<quartz.version>2.3.2</quartz.version>
102+
<hikari.version>5.1.0</hikari.version>
102103

103104
<flink-kubernetes-operator.surefire.baseArgLine>-XX:+IgnoreUnrecognizedVMOptions ${surefire.module.config}</flink-kubernetes-operator.surefire.baseArgLine>
104105

0 commit comments

Comments
 (0)