Skip to content

Commit 3fcb5f7

Browse files
committed
[FLINK-36696] Switch sql connection usages to datasource
1 parent 5d301d1 commit 3fcb5f7

File tree

34 files changed

+275
-196
lines changed

34 files changed

+275
-196
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcAutoScalerEventHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,12 @@ public void handleScalingEvent(
135135
}
136136

137137
@Override
138-
public void close() {
138+
public void close() throws Exception {
139139
if (Objects.nonNull(scheduledEventHandlerCleaner)
140140
&& !scheduledEventHandlerCleaner.isShutdown()) {
141141
scheduledEventHandlerCleaner.shutdownNow();
142142
}
143+
jdbcEventInteractor.close();
143144
}
144145

145146
@VisibleForTesting

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
import javax.annotation.Nonnull;
2525
import javax.annotation.Nullable;
26+
import javax.sql.DataSource;
2627

27-
import java.sql.Connection;
2828
import java.sql.ResultSet;
2929
import java.sql.SQLException;
3030
import java.sql.Timestamp;
@@ -37,13 +37,13 @@
3737
import static org.apache.flink.util.Preconditions.checkState;
3838

3939
/** Responsible for interacting with the database. */
40-
public class JdbcEventInteractor {
40+
public class JdbcEventInteractor implements AutoCloseable {
4141

42-
private final Connection conn;
42+
private final DataSource dataSource;
4343
private Clock clock = Clock.systemDefaultZone();
4444

45-
public JdbcEventInteractor(Connection conn) {
46-
this.conn = conn;
45+
public JdbcEventInteractor(DataSource dataSource) {
46+
this.dataSource = dataSource;
4747
}
4848

4949
public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
@@ -52,7 +52,8 @@ public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason,
5252
"select * from t_flink_autoscaler_event_handler "
5353
+ "where job_key = ? and reason = ? and event_key = ? ";
5454

55-
try (var pstmt = conn.prepareStatement(query)) {
55+
try (var conn = dataSource.getConnection();
56+
var pstmt = conn.prepareStatement(query)) {
5657
pstmt.setString(1, jobKey);
5758
pstmt.setString(2, reason);
5859
pstmt.setString(3, eventKey);
@@ -99,7 +100,8 @@ public void createEvent(
99100
+ " values (?, ?, ?, ?, ?, ?, ?, ?)";
100101

101102
var createTime = Timestamp.from(clock.instant());
102-
try (var pstmt = conn.prepareStatement(query)) {
103+
try (var conn = dataSource.getConnection();
104+
var pstmt = conn.prepareStatement(query)) {
103105
pstmt.setTimestamp(1, createTime);
104106
pstmt.setTimestamp(2, createTime);
105107
pstmt.setString(3, jobKey);
@@ -117,7 +119,8 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio
117119
"UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";
118120

119121
var updateTime = Timestamp.from(clock.instant());
120-
try (var pstmt = conn.prepareStatement(query)) {
122+
try (var conn = dataSource.getConnection();
123+
var pstmt = conn.prepareStatement(query)) {
121124
pstmt.setTimestamp(1, updateTime);
122125
pstmt.setString(2, message);
123126
pstmt.setInt(3, eventCount);
@@ -136,7 +139,8 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws
136139
"select * from t_flink_autoscaler_event_handler "
137140
+ "where job_key = ? and reason = ? ";
138141

139-
try (var pstmt = conn.prepareStatement(query)) {
142+
try (var conn = dataSource.getConnection();
143+
var pstmt = conn.prepareStatement(query)) {
140144
pstmt.setString(1, jobKey);
141145
pstmt.setString(2, reason);
142146

@@ -160,7 +164,8 @@ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
160164
"SELECT id from t_flink_autoscaler_event_handler "
161165
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
162166
+ " and create_time < ?";
163-
try (var pstmt = conn.prepareStatement(sql)) {
167+
try (var conn = dataSource.getConnection();
168+
var pstmt = conn.prepareStatement(sql)) {
164169
pstmt.setObject(1, timestamp);
165170
ResultSet resultSet = pstmt.executeQuery();
166171
return resultSet.next() ? resultSet.getLong(1) : null;
@@ -171,11 +176,19 @@ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp time
171176
throws Exception {
172177
var query =
173178
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
174-
try (var pstmt = conn.prepareStatement(query)) {
179+
try (var conn = dataSource.getConnection();
180+
var pstmt = conn.prepareStatement(query)) {
175181
pstmt.setObject(1, startId);
176182
pstmt.setObject(2, endId);
177183
pstmt.setObject(3, timestamp);
178184
return pstmt.executeUpdate();
179185
}
180186
}
187+
188+
@Override
189+
public void close() throws Exception {
190+
if (dataSource instanceof AutoCloseable) {
191+
((AutoCloseable) dataSource).close();
192+
}
193+
}
181194
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,4 +337,9 @@ private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleD
337337
throws JacksonException {
338338
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
339339
}
340+
341+
@Override
342+
public void close() throws Exception {
343+
jdbcStateStore.close();
344+
}
340345
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.sql.Connection;
23+
import javax.sql.DataSource;
24+
2425
import java.sql.Timestamp;
2526
import java.time.Instant;
2627
import java.util.Collections;
@@ -31,21 +32,22 @@
3132
import static org.apache.flink.util.Preconditions.checkState;
3233

3334
/** Responsible for interacting with the database. */
34-
public class JdbcStateInteractor {
35+
public class JdbcStateInteractor implements AutoCloseable {
3536

3637
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);
3738

38-
private final Connection conn;
39+
private final DataSource dataSource;
3940

40-
public JdbcStateInteractor(Connection conn) {
41-
this.conn = conn;
41+
public JdbcStateInteractor(DataSource dataSource) {
42+
this.dataSource = dataSource;
4243
}
4344

4445
public Map<StateType, String> queryData(String jobKey) throws Exception {
4546
var query =
4647
"select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
4748
var data = new HashMap<StateType, String>();
48-
try (var pstmt = conn.prepareStatement(query)) {
49+
try (var conn = dataSource.getConnection();
50+
var pstmt = conn.prepareStatement(query)) {
4951
pstmt.setString(1, jobKey);
5052
var rs = pstmt.executeQuery();
5153
while (rs.next()) {
@@ -63,7 +65,8 @@ public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws
6365
String.format(
6466
"DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
6567
String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
66-
try (var pstmt = conn.prepareStatement(query)) {
68+
try (var conn = dataSource.getConnection();
69+
var pstmt = conn.prepareStatement(query)) {
6770
pstmt.setString(1, jobKey);
6871
int i = 2;
6972
for (var stateType : deletedStateTypes) {
@@ -80,7 +83,8 @@ public void createData(
8083
var query =
8184
"INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
8285
var updateTime = Timestamp.from(Instant.now());
83-
try (var pstmt = conn.prepareStatement(query)) {
86+
try (var conn = dataSource.getConnection();
87+
var pstmt = conn.prepareStatement(query)) {
8488
for (var stateType : createdStateTypes) {
8589
pstmt.setTimestamp(1, updateTime);
8690
pstmt.setString(2, jobKey);
@@ -106,7 +110,8 @@ public void updateData(
106110
"UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";
107111

108112
var updateTime = Timestamp.from(Instant.now());
109-
try (var pstmt = conn.prepareStatement(query)) {
113+
try (var conn = dataSource.getConnection();
114+
var pstmt = conn.prepareStatement(query)) {
110115
for (var stateType : updatedStateTypes) {
111116
pstmt.setTimestamp(1, updateTime);
112117

@@ -123,4 +128,11 @@ public void updateData(
123128
pstmt.executeBatch();
124129
}
125130
}
131+
132+
@Override
133+
public void close() throws Exception {
134+
if (dataSource instanceof AutoCloseable) {
135+
((AutoCloseable) dataSource).close();
136+
}
137+
}
126138
}

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525

2626
/** The jdbc state store. */
27-
public class JdbcStateStore {
27+
public class JdbcStateStore implements AutoCloseable {
2828

2929
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class);
3030

@@ -89,4 +89,9 @@ private JobStateView getJobStateView(String jobKey) {
8989
private JobStateView createJobStateView(String jobKey) throws Exception {
9090
return new JobStateView(jdbcStateInteractor, jobKey);
9191
}
92+
93+
@Override
94+
public void close() throws Exception {
95+
jdbcStateInteractor.close();
96+
}
9297
}

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.jupiter.params.provider.MethodSource;
4343

4444
import javax.annotation.Nonnull;
45+
import javax.sql.DataSource;
4546

4647
import java.sql.Connection;
4748
import java.sql.PreparedStatement;
@@ -73,20 +74,22 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest
7374
generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
7475
private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());
7576

77+
private DataSource dataSource;
7678
private CountableJdbcEventInteractor jdbcEventInteractor;
7779
private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
7880
private JobAutoScalerContext<JobID> ctx;
7981

8082
@BeforeEach
8183
void beforeEach() throws Exception {
82-
jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
84+
dataSource = getDataSource();
85+
jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource);
8386
jdbcEventInteractor.setClock(defaultClock);
8487
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
8588
ctx = createDefaultJobAutoScalerContext();
8689
}
8790

8891
@AfterEach
89-
void tearDown() {
92+
void tearDown() throws Exception {
9093
eventHandler.close();
9194
}
9295

@@ -459,7 +462,8 @@ void testDeleteCounterWhenIdNotConsecutive() throws Exception {
459462
.max(Comparable::compareTo)
460463
.orElseThrow();
461464

462-
try (Connection connection = getConnection();
465+
var dataSource = getDataSource();
466+
try (Connection connection = dataSource.getConnection();
463467
PreparedStatement ps =
464468
connection.prepareStatement(
465469
"update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
@@ -505,6 +509,7 @@ void testCleanExpiredEvents(
505509
Duration eventHandlerTtl,
506510
int unexpiredRecordsNum)
507511
throws Exception {
512+
508513
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);
509514

510515
// Init the expired records.
@@ -525,7 +530,7 @@ void testCleanExpiredEvents(
525530

526531
eventHandler.cleanExpiredEvents();
527532

528-
try (Connection connection = getConnection();
533+
try (Connection connection = dataSource.getConnection();
529534
PreparedStatement ps =
530535
connection.prepareStatement(
531536
"select count(1) from t_flink_autoscaler_event_handler");
@@ -542,7 +547,7 @@ private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
542547
if (minId == null) {
543548
return;
544549
}
545-
try (Connection connection = getConnection();
550+
try (Connection connection = dataSource.getConnection();
546551
PreparedStatement ps =
547552
connection.prepareStatement(
548553
"delete from t_flink_autoscaler_event_handler where id = ?")) {

0 commit comments

Comments
 (0)