Skip to content

Commit 3dbc348

Browse files
committed
[FLINK-36696] Switch sql connection usages to datasource
1 parent 5d301d1 commit 3dbc348

File tree

22 files changed

+191
-182
lines changed

22 files changed

+191
-182
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
import javax.annotation.Nonnull;
2525
import javax.annotation.Nullable;
26+
import javax.sql.DataSource;
2627

27-
import java.sql.Connection;
2828
import java.sql.ResultSet;
2929
import java.sql.SQLException;
3030
import java.sql.Timestamp;
@@ -39,11 +39,11 @@
3939
/** Responsible for interacting with the database. */
4040
public class JdbcEventInteractor {
4141

42-
private final Connection conn;
42+
private final DataSource dataSource;
4343
private Clock clock = Clock.systemDefaultZone();
4444

45-
public JdbcEventInteractor(Connection conn) {
46-
this.conn = conn;
45+
public JdbcEventInteractor(DataSource dataSource) {
46+
this.dataSource = dataSource;
4747
}
4848

4949
public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
@@ -52,7 +52,8 @@ public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason,
5252
"select * from t_flink_autoscaler_event_handler "
5353
+ "where job_key = ? and reason = ? and event_key = ? ";
5454

55-
try (var pstmt = conn.prepareStatement(query)) {
55+
try (var conn = dataSource.getConnection();
56+
var pstmt = conn.prepareStatement(query)) {
5657
pstmt.setString(1, jobKey);
5758
pstmt.setString(2, reason);
5859
pstmt.setString(3, eventKey);
@@ -99,7 +100,8 @@ public void createEvent(
99100
+ " values (?, ?, ?, ?, ?, ?, ?, ?)";
100101

101102
var createTime = Timestamp.from(clock.instant());
102-
try (var pstmt = conn.prepareStatement(query)) {
103+
try (var conn = dataSource.getConnection();
104+
var pstmt = conn.prepareStatement(query)) {
103105
pstmt.setTimestamp(1, createTime);
104106
pstmt.setTimestamp(2, createTime);
105107
pstmt.setString(3, jobKey);
@@ -117,7 +119,8 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio
117119
"UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";
118120

119121
var updateTime = Timestamp.from(clock.instant());
120-
try (var pstmt = conn.prepareStatement(query)) {
122+
try (var conn = dataSource.getConnection();
123+
var pstmt = conn.prepareStatement(query)) {
121124
pstmt.setTimestamp(1, updateTime);
122125
pstmt.setString(2, message);
123126
pstmt.setInt(3, eventCount);
@@ -136,7 +139,8 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws
136139
"select * from t_flink_autoscaler_event_handler "
137140
+ "where job_key = ? and reason = ? ";
138141

139-
try (var pstmt = conn.prepareStatement(query)) {
142+
try (var conn = dataSource.getConnection();
143+
var pstmt = conn.prepareStatement(query)) {
140144
pstmt.setString(1, jobKey);
141145
pstmt.setString(2, reason);
142146

@@ -160,7 +164,8 @@ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
160164
"SELECT id from t_flink_autoscaler_event_handler "
161165
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
162166
+ " and create_time < ?";
163-
try (var pstmt = conn.prepareStatement(sql)) {
167+
try (var conn = dataSource.getConnection();
168+
var pstmt = conn.prepareStatement(sql)) {
164169
pstmt.setObject(1, timestamp);
165170
ResultSet resultSet = pstmt.executeQuery();
166171
return resultSet.next() ? resultSet.getLong(1) : null;
@@ -171,7 +176,8 @@ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp time
171176
throws Exception {
172177
var query =
173178
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
174-
try (var pstmt = conn.prepareStatement(query)) {
179+
try (var conn = dataSource.getConnection();
180+
var pstmt = conn.prepareStatement(query)) {
175181
pstmt.setObject(1, startId);
176182
pstmt.setObject(2, endId);
177183
pstmt.setObject(3, timestamp);

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcStateInteractor.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.sql.Connection;
23+
import javax.sql.DataSource;
24+
2425
import java.sql.Timestamp;
2526
import java.time.Instant;
2627
import java.util.Collections;
@@ -35,17 +36,18 @@ public class JdbcStateInteractor {
3536

3637
private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);
3738

38-
private final Connection conn;
39+
private final DataSource dataSource;
3940

40-
public JdbcStateInteractor(Connection conn) {
41-
this.conn = conn;
41+
public JdbcStateInteractor(DataSource dataSource) {
42+
this.dataSource = dataSource;
4243
}
4344

4445
public Map<StateType, String> queryData(String jobKey) throws Exception {
4546
var query =
4647
"select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
4748
var data = new HashMap<StateType, String>();
48-
try (var pstmt = conn.prepareStatement(query)) {
49+
try (var conn = dataSource.getConnection();
50+
var pstmt = conn.prepareStatement(query)) {
4951
pstmt.setString(1, jobKey);
5052
var rs = pstmt.executeQuery();
5153
while (rs.next()) {
@@ -63,7 +65,8 @@ public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws
6365
String.format(
6466
"DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
6567
String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
66-
try (var pstmt = conn.prepareStatement(query)) {
68+
try (var conn = dataSource.getConnection();
69+
var pstmt = conn.prepareStatement(query)) {
6770
pstmt.setString(1, jobKey);
6871
int i = 2;
6972
for (var stateType : deletedStateTypes) {
@@ -80,7 +83,8 @@ public void createData(
8083
var query =
8184
"INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
8285
var updateTime = Timestamp.from(Instant.now());
83-
try (var pstmt = conn.prepareStatement(query)) {
86+
try (var conn = dataSource.getConnection();
87+
var pstmt = conn.prepareStatement(query)) {
8488
for (var stateType : createdStateTypes) {
8589
pstmt.setTimestamp(1, updateTime);
8690
pstmt.setString(2, jobKey);
@@ -106,7 +110,8 @@ public void updateData(
106110
"UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";
107111

108112
var updateTime = Timestamp.from(Instant.now());
109-
try (var pstmt = conn.prepareStatement(query)) {
113+
try (var conn = dataSource.getConnection();
114+
var pstmt = conn.prepareStatement(query)) {
110115
for (var stateType : updatedStateTypes) {
111116
pstmt.setTimestamp(1, updateTime);
112117

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcEventInteractorITCase.java

Lines changed: 55 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -46,64 +46,61 @@ void testAllOperations() throws Exception {
4646

4747
// The datetime precision is seconds in MySQL by default.
4848
var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
49-
try (var conn = getConnection()) {
50-
var jdbcEventInteractor = new JdbcEventInteractor(conn);
51-
jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
52-
53-
jdbcEventInteractor.createEvent(
54-
jobKey, reason, AutoScalerEventHandler.Type.Normal, message, eventKey);
55-
var firstEventOptional = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
56-
assertThat(firstEventOptional).isPresent();
57-
assertEvent(
58-
firstEventOptional.get(),
59-
createTime,
60-
createTime,
61-
jobKey,
62-
reason,
63-
message,
64-
1,
65-
eventKey);
66-
67-
// The create time is changed for the second event.
68-
var secondCreateTime = createTime.plusSeconds(5);
69-
jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, ZoneId.systemDefault()));
70-
jdbcEventInteractor.createEvent(
71-
jobKey, reason, AutoScalerEventHandler.Type.Normal, message + 2, eventKey);
72-
// The latest event should be the second event.
73-
var secondEventOptional =
74-
jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
75-
assertThat(secondEventOptional).isPresent();
76-
var secondEvent = secondEventOptional.get();
77-
assertEvent(
78-
secondEvent,
79-
secondCreateTime,
80-
secondCreateTime,
81-
jobKey,
82-
reason,
83-
message + 2,
84-
1,
85-
eventKey);
86-
87-
// Update event
88-
var updateTime = secondCreateTime.plusSeconds(3);
89-
jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault()));
90-
jdbcEventInteractor.updateEvent(
91-
secondEvent.getId(), secondEvent.getMessage() + 3, secondEvent.getCount() + 1);
92-
93-
var updatedEventOptional =
94-
jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
95-
assertThat(updatedEventOptional).isPresent();
96-
var updatedEvent = updatedEventOptional.get();
97-
assertEvent(
98-
updatedEvent,
99-
secondCreateTime,
100-
updateTime,
101-
jobKey,
102-
reason,
103-
secondEvent.getMessage() + 3,
104-
2,
105-
eventKey);
106-
}
49+
var dataSource = getDataSource();
50+
var jdbcEventInteractor = new JdbcEventInteractor(dataSource);
51+
jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));
52+
53+
jdbcEventInteractor.createEvent(
54+
jobKey, reason, AutoScalerEventHandler.Type.Normal, message, eventKey);
55+
var firstEventOptional = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
56+
assertThat(firstEventOptional).isPresent();
57+
assertEvent(
58+
firstEventOptional.get(),
59+
createTime,
60+
createTime,
61+
jobKey,
62+
reason,
63+
message,
64+
1,
65+
eventKey);
66+
67+
// The create time is changed for the second event.
68+
var secondCreateTime = createTime.plusSeconds(5);
69+
jdbcEventInteractor.setClock(Clock.fixed(secondCreateTime, ZoneId.systemDefault()));
70+
jdbcEventInteractor.createEvent(
71+
jobKey, reason, AutoScalerEventHandler.Type.Normal, message + 2, eventKey);
72+
// The latest event should be the second event.
73+
var secondEventOptional = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
74+
assertThat(secondEventOptional).isPresent();
75+
var secondEvent = secondEventOptional.get();
76+
assertEvent(
77+
secondEvent,
78+
secondCreateTime,
79+
secondCreateTime,
80+
jobKey,
81+
reason,
82+
message + 2,
83+
1,
84+
eventKey);
85+
86+
// Update event
87+
var updateTime = secondCreateTime.plusSeconds(3);
88+
jdbcEventInteractor.setClock(Clock.fixed(updateTime, ZoneId.systemDefault()));
89+
jdbcEventInteractor.updateEvent(
90+
secondEvent.getId(), secondEvent.getMessage() + 3, secondEvent.getCount() + 1);
91+
92+
var updatedEventOptional = jdbcEventInteractor.queryLatestEvent(jobKey, reason, eventKey);
93+
assertThat(updatedEventOptional).isPresent();
94+
var updatedEvent = updatedEventOptional.get();
95+
assertEvent(
96+
updatedEvent,
97+
secondCreateTime,
98+
updateTime,
99+
jobKey,
100+
reason,
101+
secondEvent.getMessage() + 3,
102+
2,
103+
eventKey);
107104
}
108105

109106
private void assertEvent(

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateInteractorITCase.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,33 +42,32 @@ void testAllOperations() throws Exception {
4242
var value1 = "value1";
4343
var value2 = "value2";
4444
var value3 = "value3";
45-
try (var conn = getConnection()) {
46-
var jdbcStateInteractor = new JdbcStateInteractor(conn);
47-
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
45+
var dataSource = getDataSource();
46+
var jdbcStateInteractor = new JdbcStateInteractor(dataSource);
47+
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
4848

49-
// Test for creating data.
50-
jdbcStateInteractor.createData(
51-
jobKey,
52-
List.of(COLLECTED_METRICS, SCALING_HISTORY),
53-
Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
54-
assertThat(jdbcStateInteractor.queryData(jobKey))
55-
.isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
49+
// Test for creating data.
50+
jdbcStateInteractor.createData(
51+
jobKey,
52+
List.of(COLLECTED_METRICS, SCALING_HISTORY),
53+
Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
54+
assertThat(jdbcStateInteractor.queryData(jobKey))
55+
.isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
5656

57-
// Test for updating data.
58-
jdbcStateInteractor.updateData(
59-
jobKey,
60-
List.of(COLLECTED_METRICS),
61-
Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
62-
assertThat(jdbcStateInteractor.queryData(jobKey))
63-
.isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
57+
// Test for updating data.
58+
jdbcStateInteractor.updateData(
59+
jobKey,
60+
List.of(COLLECTED_METRICS),
61+
Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
62+
assertThat(jdbcStateInteractor.queryData(jobKey))
63+
.isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
6464

65-
// Test for deleting data.
66-
jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
67-
assertThat(jdbcStateInteractor.queryData(jobKey))
68-
.isEqualTo(Map.of(SCALING_HISTORY, value2));
69-
jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
70-
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
71-
}
65+
// Test for deleting data.
66+
jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
67+
assertThat(jdbcStateInteractor.queryData(jobKey))
68+
.isEqualTo(Map.of(SCALING_HISTORY, value2));
69+
jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
70+
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
7271
}
7372
}
7473

flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJdbcStateStoreITCase.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
2525
import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
2626

27-
import org.junit.jupiter.api.AfterEach;
2827
import org.junit.jupiter.api.BeforeEach;
2928
import org.junit.jupiter.api.Test;
3029

31-
import java.sql.Connection;
30+
import javax.sql.DataSource;
31+
3232
import java.sql.SQLException;
3333
import java.util.List;
3434
import java.util.Map;
@@ -45,24 +45,17 @@
4545
abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {
4646

4747
private static final String DEFAULT_JOB_KEY = "jobKey";
48-
private Connection conn;
48+
private DataSource dataSource;
4949
private CountableJdbcStateInteractor jdbcStateInteractor;
5050
private JdbcStateStore jdbcStateStore;
5151

5252
@BeforeEach
5353
void beforeEach() throws Exception {
54-
this.conn = getConnection();
55-
this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
54+
this.dataSource = getDataSource();
55+
this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
5656
this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
5757
}
5858

59-
@AfterEach
60-
void afterEach() throws SQLException {
61-
if (conn != null) {
62-
conn.close();
63-
}
64-
}
65-
6659
@Test
6760
void testCaching() throws Exception {
6861
var value1 = "value1";
@@ -178,7 +171,7 @@ void testErrorHandlingDuringFlush() throws Exception {
178171
assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
179172

180173
// Modify the database directly.
181-
var tmpJdbcInteractor = new JdbcStateInteractor(conn);
174+
var tmpJdbcInteractor = new JdbcStateInteractor(dataSource);
182175
tmpJdbcInteractor.createData(
183176
DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
184177
assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
@@ -205,7 +198,7 @@ void testErrorHandlingDuringQuery() throws Exception {
205198
final var expectedException = new RuntimeException("Database isn't stable.");
206199

207200
var exceptionableJdbcStateInteractor =
208-
new CountableJdbcStateInteractor(conn) {
201+
new CountableJdbcStateInteractor(dataSource) {
209202
private final AtomicBoolean isFirst = new AtomicBoolean(true);
210203

211204
@Override
@@ -282,7 +275,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp
282275

283276
private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
284277
throws Exception {
285-
var jdbcInteractor = new JdbcStateInteractor(conn);
278+
var jdbcInteractor = new JdbcStateInteractor(dataSource);
286279
return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
287280
}
288281
}

0 commit comments

Comments
 (0)