Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ public void handleScalingEvent(
}

@Override
public void close() {
public void close() throws Exception {
if (Objects.nonNull(scheduledEventHandlerCleaner)
&& !scheduledEventHandlerCleaner.isShutdown()) {
scheduledEventHandlerCleaner.shutdownNow();
}
jdbcEventInteractor.close();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
Expand All @@ -37,13 +37,13 @@
import static org.apache.flink.util.Preconditions.checkState;

/** Responsible for interacting with the database. */
public class JdbcEventInteractor {
public class JdbcEventInteractor implements AutoCloseable {

private final Connection conn;
private final DataSource dataSource;
private Clock clock = Clock.systemDefaultZone();

public JdbcEventInteractor(Connection conn) {
this.conn = conn;
public JdbcEventInteractor(DataSource dataSource) {
this.dataSource = dataSource;
}

public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
Expand All @@ -52,7 +52,8 @@ public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason,
"select * from t_flink_autoscaler_event_handler "
+ "where job_key = ? and reason = ? and event_key = ? ";

try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
pstmt.setString(2, reason);
pstmt.setString(3, eventKey);
Expand Down Expand Up @@ -99,7 +100,8 @@ public void createEvent(
+ " values (?, ?, ?, ?, ?, ?, ?, ?)";

var createTime = Timestamp.from(clock.instant());
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setTimestamp(1, createTime);
pstmt.setTimestamp(2, createTime);
pstmt.setString(3, jobKey);
Expand All @@ -117,7 +119,8 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio
"UPDATE t_flink_autoscaler_event_handler set update_time = ?, message = ?, event_count = ? where id = ?";

var updateTime = Timestamp.from(clock.instant());
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setTimestamp(1, updateTime);
pstmt.setString(2, message);
pstmt.setInt(3, eventCount);
Expand All @@ -136,7 +139,8 @@ protected List<AutoScalerEvent> queryEvents(String jobKey, String reason) throws
"select * from t_flink_autoscaler_event_handler "
+ "where job_key = ? and reason = ? ";

try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
pstmt.setString(2, reason);

Expand All @@ -160,7 +164,8 @@ Long queryMinEventIdByCreateTime(Timestamp timestamp) throws Exception {
"SELECT id from t_flink_autoscaler_event_handler "
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) "
+ " and create_time < ?";
try (var pstmt = conn.prepareStatement(sql)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(sql)) {
pstmt.setObject(1, timestamp);
ResultSet resultSet = pstmt.executeQuery();
return resultSet.next() ? resultSet.getLong(1) : null;
Expand All @@ -171,11 +176,19 @@ int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp time
throws Exception {
var query =
"delete from t_flink_autoscaler_event_handler where id >= ? and id < ? and create_time < ?";
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setObject(1, startId);
pstmt.setObject(2, endId);
pstmt.setObject(3, timestamp);
return pstmt.executeUpdate();
}
}

@Override
public void close() throws Exception {
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,9 @@ private static DelayedScaleDown deserializeDelayedScaleDown(String delayedScaleD
throws JacksonException {
return YAML_MAPPER.readValue(delayedScaleDown, new TypeReference<>() {});
}

@Override
public void close() throws Exception {
jdbcStateStore.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import javax.sql.DataSource;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -31,21 +32,22 @@
import static org.apache.flink.util.Preconditions.checkState;

/** Responsible for interacting with the database. */
public class JdbcStateInteractor {
public class JdbcStateInteractor implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(JdbcStateInteractor.class);

private final Connection conn;
private final DataSource dataSource;

public JdbcStateInteractor(Connection conn) {
this.conn = conn;
public JdbcStateInteractor(DataSource dataSource) {
this.dataSource = dataSource;
}

public Map<StateType, String> queryData(String jobKey) throws Exception {
var query =
"select state_type, state_value from t_flink_autoscaler_state_store where job_key = ?";
var data = new HashMap<StateType, String>();
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
var rs = pstmt.executeQuery();
while (rs.next()) {
Expand All @@ -63,7 +65,8 @@ public void deleteData(String jobKey, List<StateType> deletedStateTypes) throws
String.format(
"DELETE FROM t_flink_autoscaler_state_store where job_key = ? and state_type in (%s)",
String.join(",", Collections.nCopies(deletedStateTypes.size(), "?")));
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
int i = 2;
for (var stateType : deletedStateTypes) {
Expand All @@ -80,7 +83,8 @@ public void createData(
var query =
"INSERT INTO t_flink_autoscaler_state_store (update_time, job_key, state_type, state_value) values (?, ?, ?, ?)";
var updateTime = Timestamp.from(Instant.now());
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
for (var stateType : createdStateTypes) {
pstmt.setTimestamp(1, updateTime);
pstmt.setString(2, jobKey);
Expand All @@ -106,7 +110,8 @@ public void updateData(
"UPDATE t_flink_autoscaler_state_store set update_time = ?, state_value = ? where job_key = ? and state_type = ?";

var updateTime = Timestamp.from(Instant.now());
try (var pstmt = conn.prepareStatement(query)) {
try (var conn = dataSource.getConnection();
var pstmt = conn.prepareStatement(query)) {
for (var stateType : updatedStateTypes) {
pstmt.setTimestamp(1, updateTime);

Expand All @@ -123,4 +128,11 @@ public void updateData(
pstmt.executeBatch();
}
}

@Override
public void close() throws Exception {
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;

/** The jdbc state store. */
public class JdbcStateStore {
public class JdbcStateStore implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(JdbcStateStore.class);

Expand Down Expand Up @@ -89,4 +89,9 @@ private JobStateView getJobStateView(String jobKey) {
private JobStateView createJobStateView(String jobKey) throws Exception {
return new JobStateView(jdbcStateInteractor, jobKey);
}

@Override
public void close() throws Exception {
jdbcStateInteractor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;
import javax.sql.DataSource;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -73,20 +74,22 @@ abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest
generateScalingSummaries(currentParallelism, newParallelism, metricAvg, metricCurrent);
private final Clock defaultClock = Clock.fixed(createTime, ZoneId.systemDefault());

private DataSource dataSource;
private CountableJdbcEventInteractor jdbcEventInteractor;
private JdbcAutoScalerEventHandler<JobID, JobAutoScalerContext<JobID>> eventHandler;
private JobAutoScalerContext<JobID> ctx;

@BeforeEach
void beforeEach() throws Exception {
jdbcEventInteractor = new CountableJdbcEventInteractor(getConnection());
dataSource = getDataSource();
jdbcEventInteractor = new CountableJdbcEventInteractor(dataSource);
jdbcEventInteractor.setClock(defaultClock);
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, Duration.ZERO);
ctx = createDefaultJobAutoScalerContext();
}

@AfterEach
void tearDown() {
void tearDown() throws Exception {
eventHandler.close();
}

Expand Down Expand Up @@ -459,7 +462,7 @@ void testDeleteCounterWhenIdNotConsecutive() throws Exception {
.max(Comparable::compareTo)
.orElseThrow();

try (Connection connection = getConnection();
try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
Expand Down Expand Up @@ -505,6 +508,7 @@ void testCleanExpiredEvents(
Duration eventHandlerTtl,
int unexpiredRecordsNum)
throws Exception {

eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, eventHandlerTtl);

// Init the expired records.
Expand All @@ -525,7 +529,7 @@ void testCleanExpiredEvents(

eventHandler.cleanExpiredEvents();

try (Connection connection = getConnection();
try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"select count(1) from t_flink_autoscaler_event_handler");
Expand All @@ -542,7 +546,7 @@ private void tryDeleteOneRecord(int expiredRecordsNum) throws Exception {
if (minId == null) {
return;
}
try (Connection connection = getConnection();
try (Connection connection = dataSource.getConnection();
PreparedStatement ps =
connection.prepareStatement(
"delete from t_flink_autoscaler_event_handler where id = ?")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ void testAllOperations() throws Exception {

// The datetime precision is seconds in MySQL by default.
var createTime = Instant.now().truncatedTo(ChronoUnit.SECONDS);
try (var conn = getConnection()) {
var jdbcEventInteractor = new JdbcEventInteractor(conn);
try (var jdbcEventInteractor = new JdbcEventInteractor(getDataSource())) {
jdbcEventInteractor.setClock(Clock.fixed(createTime, ZoneId.systemDefault()));

jdbcEventInteractor.createEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import org.apache.flink.autoscaler.event.AutoScalerEventHandler;

import java.sql.Connection;
import javax.sql.DataSource;

import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -34,8 +35,8 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor {
private final AtomicLong updateCounter;
private final AtomicLong deleteExpiredCounter;

public CountableJdbcEventInteractor(Connection conn) {
super(conn);
public CountableJdbcEventInteractor(DataSource dataSource) {
super(dataSource);
queryCounter = new AtomicLong();
createCounter = new AtomicLong();
updateCounter = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ void testAllOperations() throws Exception {
var value1 = "value1";
var value2 = "value2";
var value3 = "value3";
try (var conn = getConnection()) {
var jdbcStateInteractor = new JdbcStateInteractor(conn);
try (var jdbcStateInteractor = new JdbcStateInteractor(getDataSource())) {
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();

// Test for creating data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import javax.sql.DataSource;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand All @@ -45,21 +46,21 @@
abstract class AbstractJdbcStateStoreITCase implements DatabaseTest {

private static final String DEFAULT_JOB_KEY = "jobKey";
private Connection conn;
private DataSource dataSource;
private CountableJdbcStateInteractor jdbcStateInteractor;
private JdbcStateStore jdbcStateStore;

@BeforeEach
void beforeEach() throws Exception {
this.conn = getConnection();
this.jdbcStateInteractor = new CountableJdbcStateInteractor(conn);
this.dataSource = getDataSource();
this.jdbcStateInteractor = new CountableJdbcStateInteractor(dataSource);
this.jdbcStateStore = new JdbcStateStore(jdbcStateInteractor);
}

@AfterEach
void afterEach() throws SQLException {
if (conn != null) {
conn.close();
void afterEach() throws Exception {
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
}

Expand Down Expand Up @@ -178,7 +179,7 @@ void testErrorHandlingDuringFlush() throws Exception {
assertThat(jdbcStateStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();

// Modify the database directly.
var tmpJdbcInteractor = new JdbcStateInteractor(conn);
var tmpJdbcInteractor = new JdbcStateInteractor(dataSource);
tmpJdbcInteractor.createData(
DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
Expand All @@ -205,7 +206,7 @@ void testErrorHandlingDuringQuery() throws Exception {
final var expectedException = new RuntimeException("Database isn't stable.");

var exceptionableJdbcStateInteractor =
new CountableJdbcStateInteractor(conn) {
new CountableJdbcStateInteractor(dataSource) {
private final AtomicBoolean isFirst = new AtomicBoolean(true);

@Override
Expand Down Expand Up @@ -282,7 +283,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp

private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
throws Exception {
var jdbcInteractor = new JdbcStateInteractor(conn);
var jdbcInteractor = new JdbcStateInteractor(dataSource);
return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
}
}
Expand Down
Loading
Loading