From 36bdea7e55ed155e6c0f36a2beef2a9d3c0bb180 Mon Sep 17 00:00:00 2001 From: dsaisharath Date: Tue, 21 Jan 2025 16:40:27 -0800 Subject: [PATCH 1/2] [FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource --- .../event/JdbcAutoScalerEventHandler.java | 3 +- .../jdbc/event/JdbcEventInteractor.java | 35 +++++++++++++------ .../jdbc/state/JdbcAutoScalerStateStore.java | 5 +++ .../jdbc/state/JdbcStateInteractor.java | 30 +++++++++++----- .../autoscaler/jdbc/state/JdbcStateStore.java | 7 +++- ...tractJdbcAutoscalerEventHandlerITCase.java | 14 +++++--- .../AbstractJdbcEventInteractorITCase.java | 3 +- .../event/CountableJdbcEventInteractor.java | 7 ++-- .../AbstractJdbcStateInteractorITCase.java | 3 +- .../state/AbstractJdbcStateStoreITCase.java | 21 +++++------ .../state/CountableJdbcStateInteractor.java | 7 ++-- .../state/JdbcAutoScalerStateStoreTest.java | 4 +-- .../jdbc/state/JobStateViewTest.java | 18 +++++----- .../testutils/databases/DatabaseTest.java | 4 +-- .../databases/derby/DerbyExtension.java | 19 ++++++---- .../databases/derby/DerbyTestBase.java | 6 ++-- .../databases/mysql/MySQL56TestBase.java | 6 ++-- .../databases/mysql/MySQL57TestBase.java | 6 ++-- .../databases/mysql/MySQL8TestBase.java | 6 ++-- .../databases/mysql/MySQLExtension.java | 17 +++++---- .../postgres/PostgreSQLExtension.java | 17 +++++---- .../postgres/PostgreSQLTestBase.java | 6 ++-- .../AutoscalerEventHandlerFactory.java | 4 +-- .../AutoscalerStateStoreFactory.java | 5 +-- .../StandaloneAutoscalerExecutor.java | 5 ++- .../standalone/utils/HikariJDBCUtil.java | 5 ++- .../AutoscalerEventHandlerFactoryTest.java | 8 +++-- .../AutoscalerStateStoreFactoryTest.java | 8 +++-- .../StandaloneAutoscalerExecutorTest.java | 4 +-- .../event/AutoScalerEventHandler.java | 6 +--- .../autoscaler/event/LoggingEventHandler.java | 3 ++ .../state/AutoScalerStateStore.java | 3 +- .../state/InMemoryAutoScalerStateStore.java | 3 ++ .../event/TestingEventCollector.java | 3 ++ .../KubernetesAutoScalerEventHandler.java | 3 ++ .../state/KubernetesAutoScalerStateStore.java | 3 ++ 36 files changed, 190 insertions(+), 117 deletions(-) diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java index 7613717b7c..8faef51f48 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java @@ -135,11 +135,12 @@ public void handleScalingEvent( } @Override - public void close() { + public void close() throws Exception { if (Objects.nonNull(scheduledEventHandlerCleaner) && !scheduledEventHandlerCleaner.isShutdown()) { scheduledEventHandlerCleaner.shutdownNow(); } + jdbcEventInteractor.close(); } @VisibleForTesting diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java index ce3fc48db9..f40740ac8c 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java @@ -23,8 +23,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.sql.DataSource; -import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; @@ -37,13 +37,13 @@ import static org.apache.flink.util.Preconditions.checkState; /** Responsible for interacting with the database. */ -public class JdbcEventInteractor { +public class JdbcEventInteractor implements AutoCloseable { - private final Connection conn; + private final DataSource dataSource; private Clock clock = Clock.systemDefaultZone(); - public JdbcEventInteractor(Connection conn) { - this.conn = conn; + public JdbcEventInteractor(DataSource dataSource) { + this.dataSource = dataSource; } public Optional queryLatestEvent(String jobKey, String reason, String eventKey) @@ -52,7 +52,8 @@ public Optional queryLatestEvent(String jobKey, String reason, "select * from t_flink_autoscaler_event_handler " + "where job_key = ? and reason = ? and event_key = ? "; - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setString(1, jobKey); pstmt.setString(2, reason); pstmt.setString(3, eventKey); @@ -99,7 +100,8 @@ public void createEvent( + " values (?, ?, ?, ?, ?, ?, ?, ?)"; var createTime = Timestamp.from(clock.instant()); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setTimestamp(1, createTime); pstmt.setTimestamp(2, createTime); pstmt.setString(3, jobKey); @@ -117,7 +119,8 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio "UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?"; var updateTime = Timestamp.from(clock.instant()); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setTimestamp(1, updateTime); pstmt.setString(2, message); pstmt.setInt(3, eventCount); @@ -136,7 +139,8 @@ protected List queryEvents(String jobKey, String reason) throws "select * from t_flink_autoscaler_event_handler " + "where job_key = ? and reason = ? "; - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setString(1, jobKey); pstmt.setString(2, reason); @@ -160,7 +164,8 @@ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception { "SELECT id from t_flink_autoscaler_event_handler " + " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) " + " and create_time < ?"; - try (var pstmt = conn.prepareStatement(sql)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(sql)) { pstmt.setObject(1, timestamp); ResultSet resultSet = pstmt.executeQuery(); return resultSet.next() ? resultSet.getLong(1) : null; @@ -171,11 +176,19 @@ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp time throws Exception { var query = "delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?"; - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setObject(1, startId); pstmt.setObject(2, endId); pstmt.setObject(3, timestamp); return pstmt.executeUpdate(); } } + + @Override + public void close() throws Exception { + if (dataSource instanceof AutoCloseable) { + ((AutoCloseable) dataSource).close(); + } + } } diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java index 1ed3ad96e7..6bc488784f 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -333,4 +333,9 @@ private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleD throws JacksonException { return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {}); } + + @Override + public void close() throws Exception { + jdbcStateStore.close(); + } } diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java index 7c12bf8333..9ddc5c55d5 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java @@ -20,7 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; +import javax.sql.DataSource; + import java.sql.Timestamp; import java.time.Instant; import java.util.Collections; @@ -31,21 +32,22 @@ import static org.apache.flink.util.Preconditions.checkState; /** Responsible for interacting with the database. */ -public class JdbcStateInteractor { +public class JdbcStateInteractor implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class); - private final Connection conn; + private final DataSource dataSource; - public JdbcStateInteractor(Connection conn) { - this.conn = conn; + public JdbcStateInteractor(DataSource dataSource) { + this.dataSource = dataSource; } public Map queryData(String jobKey) throws Exception { var query = "select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?"; var data = new HashMap(); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setString(1, jobKey); var rs = pstmt.executeQuery(); while (rs.next()) { @@ -63,7 +65,8 @@ public void deleteData(String jobKey, List deletedStateTypes) throws String.format( "DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)", String.join(",", Collections.nCopies(deletedStateTypes.size(), "?"))); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { pstmt.setString(1, jobKey); int i = 2; for (var stateType : deletedStateTypes) { @@ -80,7 +83,8 @@ public void createData( var query = "INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)"; var updateTime = Timestamp.from(Instant.now()); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { for (var stateType : createdStateTypes) { pstmt.setTimestamp(1, updateTime); pstmt.setString(2, jobKey); @@ -106,7 +110,8 @@ public void updateData( "UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?"; var updateTime = Timestamp.from(Instant.now()); - try (var pstmt = conn.prepareStatement(query)) { + try (var conn = dataSource.getConnection(); + var pstmt = conn.prepareStatement(query)) { for (var stateType : updatedStateTypes) { pstmt.setTimestamp(1, updateTime); @@ -123,4 +128,11 @@ public void updateData( pstmt.executeBatch(); } } + + @Override + public void close() throws Exception { + if (dataSource instanceof AutoCloseable) { + ((AutoCloseable) dataSource).close(); + } + } } diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java index 1e237a47b2..e891516aeb 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java @@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; /** The jdbc state store. */ -public class JdbcStateStore { +public class JdbcStateStore implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class); @@ -89,4 +89,9 @@ private JobStateView getJobStateView(String jobKey) { private JobStateView createJobStateView(String jobKey) throws Exception { return new JobStateView(jdbcStateInteractor, jobKey); } + + @Override + public void close() throws Exception { + jdbcStateInteractor.close(); + } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java index f5278171c6..10c7486ed0 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java @@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; +import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; @@ -73,20 +74,22 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent); private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault()); + private DataSource dataSource; private CountableJdbcEventInteractor jdbcEventInteractor; private JdbcAutoScalerEventHandler> eventHandler; private JobAutoScalerContext ctx; @BeforeEach void beforeEach() throws Exception { - jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection()); + dataSource = getDataSource(); + jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource); jdbcEventInteractor.setClock(defaultClock); eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO); ctx = createDefaultJobAutoScalerContext(); } @AfterEach - void tearDown() { + void tearDown() throws Exception { eventHandler.close(); } @@ -459,7 +462,7 @@ void testDeleteCounterWhenIdNotConsecutive() throws Exception { .max(Comparable::compareTo) .orElseThrow(); - try (Connection connection = getConnection(); + try (Connection connection = dataSource.getConnection(); PreparedStatement ps = connection.prepareStatement( "update t_flink_autoscaler_event_handler set id = ? where id = ?")) { @@ -505,6 +508,7 @@ void testCleanExpiredEvents( Duration eventHandlerTtl, int unexpiredRecordsNum) throws Exception { + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl); // Init the expired records. @@ -525,7 +529,7 @@ void testCleanExpiredEvents( eventHandler.cleanExpiredEvents(); - try (Connection connection = getConnection(); + try (Connection connection = dataSource.getConnection(); PreparedStatement ps = connection.prepareStatement( "select count(1) from t_flink_autoscaler_event_handler"); @@ -542,7 +546,7 @@ private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception { if (minId == null) { return; } - try (Connection connection = getConnection(); + try (Connection connection = dataSource.getConnection(); PreparedStatement ps = connection.prepareStatement( "delete from t_flink_autoscaler_event_handler where id = ?")) { diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java index 2fe6920100..f5f7ca618d 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java @@ -46,8 +46,7 @@ void testAllOperations() throws Exception { // The datetime precision is seconds in MySQL by default. var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); - try (var conn = getConnection()) { - var jdbcEventInteractor = new JdbcEventInteractor(conn); + try (var jdbcEventInteractor = new JdbcEventInteractor(getDataSource())) { jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault())); jdbcEventInteractor.createEvent( diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java index 39ccd76cae..2ea04a71a5 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java @@ -19,7 +19,8 @@ import org.apache.flink.autoscaler.event.AutoScalerEventHandler; -import java.sql.Connection; +import javax.sql.DataSource; + import java.sql.Timestamp; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -34,8 +35,8 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor { private final AtomicLong updateCounter; private final AtomicLong deleteExpiredCounter; - public CountableJdbcEventInteractor(Connection conn) { - super(conn); + public CountableJdbcEventInteractor(DataSource dataSource) { + super(dataSource); queryCounter = new AtomicLong(); createCounter = new AtomicLong(); updateCounter = new AtomicLong(); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java index da737a54b9..159fc68b57 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java @@ -42,8 +42,7 @@ void testAllOperations() throws Exception { var value1 = "value1"; var value2 = "value2"; var value3 = "value3"; - try (var conn = getConnection()) { - var jdbcStateInteractor = new JdbcStateInteractor(conn); + try (var jdbcStateInteractor = new JdbcStateInteractor(getDataSource())) { assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty(); // Test for creating data. diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java index 5ab471373b..bb50ce62b8 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java @@ -28,7 +28,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.sql.Connection; +import javax.sql.DataSource; + import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -45,21 +46,21 @@ abstract class AbstractJdbcStateStoreITCase implements DatabaseTest { private static final String DEFAULT_JOB_KEY = "jobKey"; - private Connection conn; + private DataSource dataSource; private CountableJdbcStateInteractor jdbcStateInteractor; private JdbcStateStore jdbcStateStore; @BeforeEach void beforeEach() throws Exception { - this.conn = getConnection(); - this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn); + this.dataSource = getDataSource(); + this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource); this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor); } @AfterEach - void afterEach() throws SQLException { - if (conn != null) { - conn.close(); + void afterEach() throws Exception { + if (dataSource instanceof AutoCloseable) { + ((AutoCloseable) dataSource).close(); } } @@ -178,7 +179,7 @@ void testErrorHandlingDuringFlush() throws Exception { assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty(); // Modify the database directly. - var tmpJdbcInteractor = new JdbcStateInteractor(conn); + var tmpJdbcInteractor = new JdbcStateInteractor(dataSource); tmpJdbcInteractor.createData( DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1)); assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1); @@ -205,7 +206,7 @@ void testErrorHandlingDuringQuery() throws Exception { final var expectedException = new RuntimeException("Database isn't stable."); var exceptionableJdbcStateInteractor = - new CountableJdbcStateInteractor(conn) { + new CountableJdbcStateInteractor(dataSource) { private final AtomicBoolean isFirst = new AtomicBoolean(true); @Override @@ -282,7 +283,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp private Optional getValueFromDatabase(String jobKey, StateType stateType) throws Exception { - var jdbcInteractor = new JdbcStateInteractor(conn); + var jdbcInteractor = new JdbcStateInteractor(dataSource); return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType)); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java index 8fafc95d91..c1cb86e4dc 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJdbcStateInteractor.java @@ -17,7 +17,8 @@ package org.apache.flink.autoscaler.jdbc.state; -import java.sql.Connection; +import javax.sql.DataSource; + import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -32,8 +33,8 @@ public class CountableJdbcStateInteractor extends JdbcStateInteractor { private final AtomicLong createCounter; private final AtomicLong updateCounter; - public CountableJdbcStateInteractor(Connection conn) { - super(conn); + public CountableJdbcStateInteractor(DataSource dataSource) { + super(dataSource); queryCounter = new AtomicLong(); deleteCounter = new AtomicLong(); createCounter = new AtomicLong(); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java index a79c07bc8b..31dbd07be4 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStoreTest.java @@ -48,7 +48,7 @@ class JdbcAutoScalerStateStoreTest @Override protected void preSetup() throws Exception { - jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getConnection())); + jdbcStateStore = new JdbcStateStore(new JdbcStateInteractor(getDataSource())); cachedStateStore = new JdbcAutoScalerStateStore<>(jdbcStateStore); } @@ -56,7 +56,7 @@ protected void preSetup() throws Exception { protected AutoScalerStateStore> createPhysicalAutoScalerStateStore() throws Exception { return new JdbcAutoScalerStateStore<>( - new JdbcStateStore(new JdbcStateInteractor(getConnection()))); + new JdbcStateStore(new JdbcStateInteractor(getDataSource()))); } @Override diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java index 3989dbffae..9f6d7ba523 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java @@ -23,8 +23,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.sql.Connection; -import java.sql.SQLException; +import javax.sql.DataSource; + import java.util.Optional; import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS; @@ -36,22 +36,22 @@ class JobStateViewTest implements DerbyTestBase { private static final String DEFAULT_JOB_KEY = "jobKey"; - private Connection conn; + private DataSource dataSource; private CountableJdbcStateInteractor jdbcStateInteractor; private JobStateView jobStateView; @BeforeEach void beforeEach() throws Exception { - this.conn = getConnection(); - this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn); + this.dataSource = getDataSource(); + this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource); this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY); jdbcStateInteractor.assertCountableJdbcInteractor(1, 0, 0, 0); } @AfterEach - void afterEach() throws SQLException { - if (conn != null) { - conn.close(); + void afterEach() throws Exception { + if (jdbcStateInteractor != null) { + jdbcStateInteractor.close(); } } @@ -195,7 +195,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp } private Optional getValueFromDatabase(StateType stateType) throws Exception { - var jdbcInteractor = new JdbcStateInteractor(conn); + var jdbcInteractor = new JdbcStateInteractor(dataSource); return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType)); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java index 261578b32b..27fce55118 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java @@ -17,10 +17,10 @@ package org.apache.flink.autoscaler.jdbc.testutils.databases; -import java.sql.Connection; +import javax.sql.DataSource; /** Database testing. */ public interface DatabaseTest { - Connection getConnection() throws Exception; + DataSource getDataSource() throws Exception; } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java index 42566d9685..d02fee57e5 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java @@ -17,12 +17,13 @@ package org.apache.flink.autoscaler.jdbc.testutils.databases.derby; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; -import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.List; @@ -34,8 +35,11 @@ public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, Afte List.of("t_flink_autoscaler_state_store", "t_flink_autoscaler_event_handler"); private static final String JDBC_URL = "jdbc:derby:memory:test"; - public Connection getConnection() throws Exception { - return DriverManager.getConnection(JDBC_URL); + public HikariDataSource getDataSource() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(JDBC_URL); + config.setValidationTimeout(1000); + return new HikariDataSource(config); } @Override @@ -76,7 +80,8 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { var jobKeyReasonCreateTimeIndex = "CREATE INDEX job_key_reason_create_time_idx ON t_flink_autoscaler_event_handler (job_key, reason, create_time)"; - try (var conn = getConnection(); + try (var dataSource = getDataSource(); + var conn = dataSource.getConnection(); var st = conn.createStatement()) { st.execute(stateStoreDDL); st.execute(createStateStoreIndex); @@ -88,7 +93,8 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception { @Override public void afterAll(ExtensionContext extensionContext) throws Exception { - try (var conn = getConnection(); + try (var dataSource = getDataSource(); + var conn = dataSource.getConnection(); var st = conn.createStatement()) { for (var tableName : TABLES) { st.executeUpdate(String.format("DROP TABLE %s", tableName)); @@ -103,7 +109,8 @@ public void afterAll(ExtensionContext extensionContext) throws Exception { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { // Clean up all data - try (var conn = getConnection(); + try (var dataSource = getDataSource(); + var conn = dataSource.getConnection(); var st = conn.createStatement()) { for (var tableName : TABLES) { st.executeUpdate(String.format("DELETE from %s", tableName)); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java index e0ac3f76f8..feebb91a35 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; -import java.sql.Connection; +import javax.sql.DataSource; /** Derby database for testing. */ public interface DerbyTestBase extends DatabaseTest { @@ -29,7 +29,7 @@ public interface DerbyTestBase extends DatabaseTest { @RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension(); @Override - default Connection getConnection() throws Exception { - return DERBY_EXTENSION.getConnection(); + default DataSource getDataSource() throws Exception { + return DERBY_EXTENSION.getDataSource(); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java index f2b1858cef..f76e92f1dd 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java @@ -21,14 +21,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; -import java.sql.Connection; +import javax.sql.DataSource; /** MySQL 5.6.x database for testing. */ public interface MySQL56TestBase extends DatabaseTest { @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.6.51"); - default Connection getConnection() throws Exception { - return MYSQL_EXTENSION.getConnection(); + default DataSource getDataSource() throws Exception { + return MYSQL_EXTENSION.getDataSource(); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java index 0b8a69685b..b1cc46dade 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java @@ -21,14 +21,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; -import java.sql.Connection; +import javax.sql.DataSource; /** MySQL 5.7.x database for testing. */ public interface MySQL57TestBase extends DatabaseTest { @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.7.41"); - default Connection getConnection() throws Exception { - return MYSQL_EXTENSION.getConnection(); + default DataSource getDataSource() throws Exception { + return MYSQL_EXTENSION.getDataSource(); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java index daf7788576..d1330116c9 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java @@ -21,14 +21,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; -import java.sql.Connection; +import javax.sql.DataSource; /** MySQL 8.x database for testing. */ public interface MySQL8TestBase extends DatabaseTest { @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("8.0.32"); - default Connection getConnection() throws Exception { - return MYSQL_EXTENSION.getConnection(); + default DataSource getDataSource() throws Exception { + return MYSQL_EXTENSION.getDataSource(); } } diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java index f0873a6f0b..2dfb1acb06 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java @@ -17,14 +17,14 @@ package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.containers.MySQLContainer; -import java.sql.Connection; -import java.sql.DriverManager; import java.util.List; /** The extension of MySQL. */ @@ -50,9 +50,13 @@ public MySQLExtension(String mysqlVersion) { .withEnv("MYSQL_ROOT_HOST", "%"); } - public Connection getConnection() throws Exception { - return DriverManager.getConnection( - container.getJdbcUrl(), container.getUsername(), container.getPassword()); + public HikariDataSource getDataSource() throws Exception { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(container.getJdbcUrl()); + config.setUsername(container.getUsername()); + config.setPassword(container.getPassword()); + config.setValidationTimeout(1000); + return new HikariDataSource(config); } @Override @@ -67,7 +71,8 @@ public void afterAll(ExtensionContext extensionContext) { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - try (var conn = getConnection(); + try (var dataSource = getDataSource(); + var conn = dataSource.getConnection(); var st = conn.createStatement()) { for (var tableName : TABLES) { st.executeUpdate(String.format("DELETE from %s", tableName)); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java index ba4fd2f249..f085d4ebf0 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java @@ -17,14 +17,14 @@ package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.containers.PostgreSQLContainer; -import java.sql.Connection; -import java.sql.DriverManager; import java.util.List; /** The extension of PostgreSQL. */ @@ -49,9 +49,13 @@ public PostgreSQLExtension(String postgresqlVersion) { .withEnv("POSTGRES_MAX_CONNECTIONS", "10"); } - public Connection getConnection() throws Exception { - return DriverManager.getConnection( - container.getJdbcUrl(), container.getUsername(), container.getPassword()); + public HikariDataSource getDataSource() throws Exception { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(container.getJdbcUrl()); + config.setUsername(container.getUsername()); + config.setPassword(container.getPassword()); + config.setValidationTimeout(1000); + return new HikariDataSource(config); } @Override @@ -66,7 +70,8 @@ public void afterAll(ExtensionContext extensionContext) { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { - try (var conn = getConnection(); + try (var dataSource = getDataSource(); + var conn = getDataSource().getConnection(); var st = conn.createStatement()) { for (var tableName : TABLES) { st.executeUpdate(String.format("DELETE from %s", tableName)); diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java index 1e27a6e129..4970cb957e 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java @@ -21,14 +21,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; -import java.sql.Connection; +import javax.sql.DataSource; /** PostgreSQL database for testing. */ public interface PostgreSQLTestBase extends DatabaseTest { @RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new PostgreSQLExtension("15.1"); - default Connection getConnection() throws Exception { - return POSTGRE_SQL_EXTENSION.getConnection(); + default DataSource getDataSource() throws Exception { + return POSTGRE_SQL_EXTENSION.getDataSource(); } } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java index 2f58ca6e37..f6eacb6283 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java @@ -73,8 +73,8 @@ AutoScalerEventHandler create(Configuration conf) throws Exception private static > AutoScalerEventHandler createJdbcEventHandler(Configuration conf) throws Exception { - var conn = HikariJDBCUtil.getConnection(conf); + var dataSource = HikariJDBCUtil.getDataSource(conf); return new JdbcAutoScalerEventHandler<>( - new JdbcEventInteractor(conn), conf.get(JDBC_EVENT_HANDLER_TTL)); + new JdbcEventInteractor(dataSource), conf.get(JDBC_EVENT_HANDLER_TTL)); } } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java index 0528362784..a21cea9801 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactory.java @@ -74,7 +74,8 @@ AutoScalerStateStore create(Configuration conf) throws Exception { private static > AutoScalerStateStore createJdbcStateStore(Configuration conf) throws Exception { - var conn = HikariJDBCUtil.getConnection(conf); - return new JdbcAutoScalerStateStore<>(new JdbcStateStore(new JdbcStateInteractor(conn))); + var dataSource = HikariJDBCUtil.getDataSource(conf); + return new JdbcAutoScalerStateStore<>( + new JdbcStateStore(new JdbcStateInteractor(dataSource))); } } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java index a87de96412..4a83743869 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java @@ -33,7 +33,6 @@ import javax.annotation.Nonnull; -import java.io.Closeable; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -56,7 +55,7 @@ /** The executor of the standalone autoscaler. */ public class StandaloneAutoscalerExecutor> - implements Closeable { + implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(StandaloneAutoscalerExecutor.class); @@ -113,7 +112,7 @@ public void start() { } @Override - public void close() { + public void close() throws Exception { scheduledExecutorService.shutdownNow(); scalingThreadPool.shutdownNow(); eventHandler.close(); diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java index 65fb964299..141b12bbe6 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/utils/HikariJDBCUtil.java @@ -23,7 +23,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; -import java.sql.Connection; import java.sql.SQLException; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.JDBC_PASSWORD_ENV_VARIABLE; @@ -39,7 +38,7 @@ public class HikariJDBCUtil { "%s is required when jdbc state store or jdbc event handler is used.", JDBC_URL.key()); - public static Connection getConnection(Configuration conf) throws SQLException { + public static HikariDataSource getDataSource(Configuration conf) throws SQLException { final var jdbcUrl = conf.get(JDBC_URL); checkArgument(!StringUtils.isNullOrWhitespaceOnly(jdbcUrl), JDBC_URL_REQUIRED_HINT); var user = conf.get(JDBC_USERNAME); @@ -48,6 +47,6 @@ public static Connection getConnection(Configuration conf) throws SQLException { hikariConfig.setJdbcUrl(jdbcUrl); hikariConfig.setUsername(user); hikariConfig.setPassword(password); - return new HikariDataSource(hikariConfig).getConnection(); + return new HikariDataSource(hikariConfig); } } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java index 2ab21aa11a..f8d0e33fcc 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactoryTest.java @@ -68,14 +68,16 @@ void testCreateJdbcEventHandler() throws Exception { final var conf = new Configuration(); conf.set(EVENT_HANDLER_TYPE, JDBC); conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl)); - HikariJDBCUtil.getConnection(conf).close(); + HikariJDBCUtil.getDataSource(conf).close(); var eventHandler = AutoscalerEventHandlerFactory.create(conf); assertThat(eventHandler).isInstanceOf(JdbcAutoScalerEventHandler.class); + conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl)); try { - conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl)); - HikariJDBCUtil.getConnection(conf).close(); + var datasource = HikariJDBCUtil.getDataSource(conf); + datasource.getConnection().close(); + datasource.close(); } catch (RuntimeException ignored) { // database shutdown ignored exception } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java index 7ef3c4ff1a..eae17f6f4d 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/AutoscalerStateStoreFactoryTest.java @@ -67,14 +67,16 @@ void testCreateJdbcStateStore() throws Exception { final var conf = new Configuration(); conf.set(STATE_STORE_TYPE, JDBC); conf.set(JDBC_URL, String.format("%s;create=true", jdbcUrl)); - HikariJDBCUtil.getConnection(conf).close(); + HikariJDBCUtil.getDataSource(conf).close(); var stateStore = AutoscalerStateStoreFactory.create(conf); assertThat(stateStore).isInstanceOf(JdbcAutoScalerStateStore.class); + conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl)); try { - conf.set(JDBC_URL, String.format("%s;shutdown=true", jdbcUrl)); - HikariJDBCUtil.getConnection(conf).close(); + var datasource = HikariJDBCUtil.getDataSource(conf); + datasource.getConnection().close(); + datasource.close(); } catch (RuntimeException ignored) { // database shutdown ignored exception } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java index b17d4ba220..082f84bfdb 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java @@ -110,7 +110,7 @@ protected void scalingSingleJob(JobAutoScalerContext jobContext) { } @Test - void testFetchException() { + void testFetchException() throws Exception { var eventCollector = new TestingEventCollector>(); try (var autoscalerExecutor = new StandaloneAutoscalerExecutor<>( @@ -137,7 +137,7 @@ public void cleanup(JobAutoScalerContext context) { } @Test - void testScalingParallelism() { + void testScalingParallelism() throws Exception { var parallelism = 10; var jobList = new ArrayList>(); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index 0fcf50b7a6..badc4694f6 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -27,7 +27,6 @@ import javax.annotation.Nullable; -import java.io.Closeable; import java.time.Duration; import java.util.Map; import java.util.stream.Collectors; @@ -44,7 +43,7 @@ */ @Experimental public interface AutoScalerEventHandler> - extends Closeable { + extends AutoCloseable { String SCALING_SUMMARY_ENTRY = "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}"; String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:"; @@ -101,9 +100,6 @@ default void handleScalingEvent( interval); } - /** Close the related resource. */ - default void close() {} - static String scalingReport(Map scalingSummaries, String message) { StringBuilder sb = new StringBuilder(message); scalingSummaries.forEach( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java index 2724034653..05fc48d185 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java @@ -49,4 +49,7 @@ public void handleEvent( messageKey, interval); } + + @Override + public void close() throws Exception {} } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java index 64e4dd5bc2..08d3051177 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -39,7 +39,8 @@ * @param Instance of JobAutoScalerContext. */ @Experimental -public interface AutoScalerStateStore> { +public interface AutoScalerStateStore> + extends AutoCloseable { void storeScalingHistory( Context jobContext, Map> scalingHistory) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java index c660f73b64..e3cdd610a5 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java @@ -191,4 +191,7 @@ public boolean hasDataFor(Context jobContext) { delayedScaleDownStore) .anyMatch(m -> m.containsKey(k)); } + + @Override + public void close() throws Exception {} } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java index 7151c9c717..865dc69b99 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java @@ -77,6 +77,9 @@ private String generateEventKey(Context context, Type type, String reason, Strin return context.getJobID() + type.name() + reason + message; } + @Override + public void close() throws Exception {} + /** The collected event. */ public static class Event> { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java index 1a836a52c6..55bd3ea59f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java @@ -103,4 +103,7 @@ public boolean test(Map stringStringMap) { labels); } } + + @Override + public void close() throws Exception {} } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index da445973d1..ed6549ad0a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -403,4 +403,7 @@ private static YAMLFactory yamlFactory() { loaderOptions.setCodePointLimit(20 * 1024 * 1024); return YAMLFactory.builder().loaderOptions(loaderOptions).build(); } + + @Override + public void close() throws Exception {} } From de09534a17c257281408cfc8b6db663e07210650 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Sun, 26 Jan 2025 12:02:33 +0800 Subject: [PATCH 2/2] Use the dataSource instead of `getDataSource()` to avoid connection leak --- .../jdbc/testutils/databases/postgres/PostgreSQLExtension.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java index f085d4ebf0..a54657fc58 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java @@ -71,7 +71,7 @@ public void afterAll(ExtensionContext extensionContext) { @Override public void afterEach(ExtensionContext extensionContext) throws Exception { try (var dataSource = getDataSource(); - var conn = getDataSource().getConnection(); + var conn = dataSource.getConnection(); var st = conn.createStatement()) { for (var tableName : TABLES) { st.executeUpdate(String.format("DELETE from %s", tableName));