Skip to content

Commit d5d027e

Browse files
[FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource (#929)
* [FLINK-36696] [flink-autoscaler-plugin-jdbc] Switch sql connection usages to datasource * Use the dataSource instead of `getDataSource()` to avoid connection leak --------- Co-authored-by: Rui Fan <[email protected]>
1 parent 4c2c90c commit d5d027e

File tree

36 files changed

+190
-117
lines changed

36 files changed

+190
-117
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,12 @@ public void handleScalingEvent(
135135
}
136136

137137
@Override
138-
public void close() {
138+
public void close() throws Exception {
139139
if (Objects.nonNull(scheduledEventHandlerCleaner)
140140
&& !scheduledEventHandlerCleaner.isShutdown()) {
141141
scheduledEventHandlerCleaner.shutdownNow();
142142
}
143+
jdbcEventInteractor.close();
143144
}
144145

145146
@VisibleForTesting

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
import javax.annotation.Nonnull;
2525
import javax.annotation.Nullable;
26+
import javax.sql.DataSource;
2627

27-
import java.sql.Connection;
2828
import java.sql.ResultSet;
2929
import java.sql.SQLException;
3030
import java.sql.Timestamp;
@@ -37,13 +37,13 @@
3737
import static org.apache.flink.util.Preconditions.checkState;
3838

3939
/** Responsible for interacting with the database. */
40-
public class JdbcEventInteractor {
40+
public class JdbcEventInteractor implements AutoCloseable {
4141

42-
private final Connection conn;
42+
private final DataSource dataSource;
4343
private Clock clock = Clock.systemDefaultZone();
4444

45-
public JdbcEventInteractor(Connection conn) {
46-
this.conn = conn;
45+
public JdbcEventInteractor(DataSource dataSource) {
46+
this.dataSource = dataSource;
4747
}
4848

4949
public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
@@ -52,7 +52,8 @@ public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason,
5252
"select * from t_flink_autoscaler_event_handler "
5353
+ "where job_key = ? and reason = ? and event_key = ? ";
5454

55-
try (var pstmt = conn.prepareStatement(query)) {
55+
try (var conn = dataSource.getConnection();
56+
var pstmt = conn.prepareStatement(query)) {
5657
pstmt.setString(1, jobKey);
5758
pstmt.setString(2, reason);
5859
pstmt.setString(3, eventKey);
@@ -99,7 +100,8 @@ public void createEvent(
99100
+ " values (?, ?, ?, ?, ?, ?, ?, ?)";
100101

101102
var createTime = Timestamp.from(clock.instant());
102-
try (var pstmt = conn.prepareStatement(query)) {
103+
try (var conn = dataSource.getConnection();
104+
var pstmt = conn.prepareStatement(query)) {
103105
pstmt.setTimestamp(1, createTime);
104106
pstmt.setTimestamp(2, createTime);
105107
pstmt.setString(3, jobKey);
@@ -117,7 +119,8 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio
117119
"UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";
118120

119121
var updateTime = Timestamp.from(clock.instant());
120-
try (var pstmt = conn.prepareStatement(query)) {
122+
try (var conn = dataSource.getConnection();
123+
var pstmt = conn.prepareStatement(query)) {
121124
pstmt.setTimestamp(1, updateTime);
122125
pstmt.setString(2, message);
123126
pstmt.setInt(3, eventCount);
@@ -136,7 +139,8 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws
136139
"select * from t_flink_autoscaler_event_handler "
137140
+ "where job_key = ? and reason = ? ";
138141

139-
try (var pstmt = conn.prepareStatement(query)) {
142+
try (var conn = dataSource.getConnection();
143+
var pstmt = conn.prepareStatement(query)) {
140144
pstmt.setString(1, jobKey);
141145
pstmt.setString(2, reason);
142146

@@ -160,7 +164,8 @@ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
160164
"SELECT id from t_flink_autoscaler_event_handler "
161165
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
162166
+ " and create_time < ?";
163-
try (var pstmt = conn.prepareStatement(sql)) {
167+
try (var conn = dataSource.getConnection();
168+
var pstmt = conn.prepareStatement(sql)) {
164169
pstmt.setObject(1, timestamp);
165170
ResultSet resultSet = pstmt.executeQuery();
166171
return resultSet.next() ? resultSet.getLong(1) : null;
@@ -171,11 +176,19 @@ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp time
171176
throws Exception {
172177
var query =
173178
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
174-
try (var pstmt = conn.prepareStatement(query)) {
179+
try (var conn = dataSource.getConnection();
180+
var pstmt = conn.prepareStatement(query)) {
175181
pstmt.setObject(1, startId);
176182
pstmt.setObject(2, endId);
177183
pstmt.setObject(3, timestamp);
178184
return pstmt.executeUpdate();
179185
}
180186
}
187+
188+
@Override
189+
public void close() throws Exception {
190+
if (dataSource instanceof AutoCloseable) {
191+
((AutoCloseable) dataSource).close();
192+
}
193+
}
181194
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,9 @@ private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleD
333333
throws JacksonException {
334334
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
335335
}
336+
337+
@Override
338+
public void close() throws Exception {
339+
jdbcStateStore.close();
340+
}
336341
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.sql.Connection;
23+
import javax.sql.DataSource;
24+
2425
import java.sql.Timestamp;
2526
import java.time.Instant;
2627
import java.util.Collections;
@@ -31,21 +32,22 @@
3132
import static org.apache.flink.util.Preconditions.checkState;
3233

3334
/** Responsible for interacting with the database. */
34-
public class JdbcStateInteractor {
35+
public class JdbcStateInteractor implements AutoCloseable {
3536

3637
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);
3738

38-
private final Connection conn;
39+
private final DataSource dataSource;
3940

40-
public JdbcStateInteractor(Connection conn) {
41-
this.conn = conn;
41+
public JdbcStateInteractor(DataSource dataSource) {
42+
this.dataSource = dataSource;
4243
}
4344

4445
public Map<StateType, String> queryData(String jobKey) throws Exception {
4546
var query =
4647
"select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
4748
var data = new HashMap<StateType, String>();
48-
try (var pstmt = conn.prepareStatement(query)) {
49+
try (var conn = dataSource.getConnection();
50+
var pstmt = conn.prepareStatement(query)) {
4951
pstmt.setString(1, jobKey);
5052
var rs = pstmt.executeQuery();
5153
while (rs.next()) {
@@ -63,7 +65,8 @@ public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws
6365
String.format(
6466
"DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
6567
String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
66-
try (var pstmt = conn.prepareStatement(query)) {
68+
try (var conn = dataSource.getConnection();
69+
var pstmt = conn.prepareStatement(query)) {
6770
pstmt.setString(1, jobKey);
6871
int i = 2;
6972
for (var stateType : deletedStateTypes) {
@@ -80,7 +83,8 @@ public void createData(
8083
var query =
8184
"INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
8285
var updateTime = Timestamp.from(Instant.now());
83-
try (var pstmt = conn.prepareStatement(query)) {
86+
try (var conn = dataSource.getConnection();
87+
var pstmt = conn.prepareStatement(query)) {
8488
for (var stateType : createdStateTypes) {
8589
pstmt.setTimestamp(1, updateTime);
8690
pstmt.setString(2, jobKey);
@@ -106,7 +110,8 @@ public void updateData(
106110
"UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";
107111

108112
var updateTime = Timestamp.from(Instant.now());
109-
try (var pstmt = conn.prepareStatement(query)) {
113+
try (var conn = dataSource.getConnection();
114+
var pstmt = conn.prepareStatement(query)) {
110115
for (var stateType : updatedStateTypes) {
111116
pstmt.setTimestamp(1, updateTime);
112117

@@ -123,4 +128,11 @@ public void updateData(
123128
pstmt.executeBatch();
124129
}
125130
}
131+
132+
@Override
133+
public void close() throws Exception {
134+
if (dataSource instanceof AutoCloseable) {
135+
((AutoCloseable) dataSource).close();
136+
}
137+
}
126138
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525

2626
/** The jdbc state store. */
27-
public class JdbcStateStore {
27+
public class JdbcStateStore implements AutoCloseable {
2828

2929
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class);
3030

@@ -89,4 +89,9 @@ private JobStateView getJobStateView(String jobKey) {
8989
private JobStateView createJobStateView(String jobKey) throws Exception {
9090
return new JobStateView(jdbcStateInteractor, jobKey);
9191
}
92+
93+
@Override
94+
public void close() throws Exception {
95+
jdbcStateInteractor.close();
96+
}
9297
}

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.jupiter.params.provider.MethodSource;
4343

4444
import javax.annotation.Nonnull;
45+
import javax.sql.DataSource;
4546

4647
import java.sql.Connection;
4748
import java.sql.PreparedStatement;
@@ -73,20 +74,22 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest
7374
generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
7475
private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
7576

77+
private DataSource dataSource;
7678
private CountableJdbcEventInteractor jdbcEventInteractor;
7779
private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
7880
private JobAutoScalerContext<JobID> ctx;
7981

8082
@BeforeEach
8183
void beforeEach() throws Exception {
82-
jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
84+
dataSource = getDataSource();
85+
jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource);
8386
jdbcEventInteractor.setClock(defaultClock);
8487
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
8588
ctx = createDefaultJobAutoScalerContext();
8689
}
8790

8891
@AfterEach
89-
void tearDown() {
92+
void tearDown() throws Exception {
9093
eventHandler.close();
9194
}
9295

@@ -459,7 +462,7 @@ void testDeleteCounterWhenIdNotConsecutive() throws Exception {
459462
.max(Comparable::compareTo)
460463
.orElseThrow();
461464

462-
try (Connection connection = getConnection();
465+
try (Connection connection = dataSource.getConnection();
463466
PreparedStatement ps =
464467
connection.prepareStatement(
465468
"update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
@@ -505,6 +508,7 @@ void testCleanExpiredEvents(
505508
Duration eventHandlerTtl,
506509
int unexpiredRecordsNum)
507510
throws Exception {
511+
508512
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
509513

510514
// Init the expired records.
@@ -525,7 +529,7 @@ void testCleanExpiredEvents(
525529

526530
eventHandler.cleanExpiredEvents();
527531

528-
try (Connection connection = getConnection();
532+
try (Connection connection = dataSource.getConnection();
529533
PreparedStatement ps =
530534
connection.prepareStatement(
531535
"select count(1) from t_flink_autoscaler_event_handler");
@@ -542,7 +546,7 @@ private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
542546
if (minId == null) {
543547
return;
544548
}
545-
try (Connection connection = getConnection();
549+
try (Connection connection = dataSource.getConnection();
546550
PreparedStatement ps =
547551
connection.prepareStatement(
548552
"delete from t_flink_autoscaler_event_handler where id = ?")) {

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ void testAllOperations() throws Exception {
4646

4747
// The datetime precision is seconds in MySQL by default.
4848
var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
49-
try (var conn = getConnection()) {
50-
var jdbcEventInteractor = new JdbcEventInteractor(conn);
49+
try (var jdbcEventInteractor = new JdbcEventInteractor(getDataSource())) {
5150
jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
5251

5352
jdbcEventInteractor.createEvent(

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2121

22-
import java.sql.Connection;
22+
import javax.sql.DataSource;
23+
2324
import java.sql.Timestamp;
2425
import java.util.Optional;
2526
import java.util.concurrent.atomic.AtomicLong;
@@ -34,8 +35,8 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor {
3435
private final AtomicLong updateCounter;
3536
private final AtomicLong deleteExpiredCounter;
3637

37-
public CountableJdbcEventInteractor(Connection conn) {
38-
super(conn);
38+
public CountableJdbcEventInteractor(DataSource dataSource) {
39+
super(dataSource);
3940
queryCounter = new AtomicLong();
4041
createCounter = new AtomicLong();
4142
updateCounter = new AtomicLong();

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ void testAllOperations() throws Exception {
4242
var value1 = "value1";
4343
var value2 = "value2";
4444
var value3 = "value3";
45-
try (var conn = getConnection()) {
46-
var jdbcStateInteractor = new JdbcStateInteractor(conn);
45+
try (var jdbcStateInteractor = new JdbcStateInteractor(getDataSource())) {
4746
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
4847

4948
// Test for creating data.

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
import org.junit.jupiter.api.BeforeEach;
2929
import org.junit.jupiter.api.Test;
3030

31-
import java.sql.Connection;
31+
import javax.sql.DataSource;
32+
3233
import java.sql.SQLException;
3334
import java.util.List;
3435
import java.util.Map;
@@ -45,21 +46,21 @@
4546
abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {
4647

4748
private static final String DEFAULT_JOB_KEY = "jobKey";
48-
private Connection conn;
49+
private DataSource dataSource;
4950
private CountableJdbcStateInteractor jdbcStateInteractor;
5051
private JdbcStateStore jdbcStateStore;
5152

5253
@BeforeEach
5354
void beforeEach() throws Exception {
54-
this.conn = getConnection();
55-
this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
55+
this.dataSource = getDataSource();
56+
this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
5657
this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
5758
}
5859

5960
@AfterEach
60-
void afterEach() throws SQLException {
61-
if (conn != null) {
62-
conn.close();
61+
void afterEach() throws Exception {
62+
if (dataSource instanceof AutoCloseable) {
63+
((AutoCloseable) dataSource).close();
6364
}
6465
}
6566

@@ -178,7 +179,7 @@ void testErrorHandlingDuringFlush() throws Exception {
178179
assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
179180

180181
// Modify the database directly.
181-
var tmpJdbcInteractor = new JdbcStateInteractor(conn);
182+
var tmpJdbcInteractor = new JdbcStateInteractor(dataSource);
182183
tmpJdbcInteractor.createData(
183184
DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
184185
assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
@@ -205,7 +206,7 @@ void testErrorHandlingDuringQuery() throws Exception {
205206
final var expectedException = new RuntimeException("Database isn't stable.");
206207

207208
var exceptionableJdbcStateInteractor =
208-
new CountableJdbcStateInteractor(conn) {
209+
new CountableJdbcStateInteractor(dataSource) {
209210
private final AtomicBoolean isFirst = new AtomicBoolean(true);
210211

211212
@Override
@@ -282,7 +283,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp
282283

283284
private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
284285
throws Exception {
285-
var jdbcInteractor = new JdbcStateInteractor(conn);
286+
var jdbcInteractor = new JdbcStateInteractor(dataSource);
286287
return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
287288
}
288289
}

0 commit comments

Comments
 (0)