Skip to content

Commit e7b6a13

Browse files
committed
db reconnection
1 parent 637911e commit e7b6a13

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

pom.xml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@
1010
<url>http://maven.apache.org</url>
1111
<modules>
1212
<module>core</module>
13-
<module>kafka09</module>
14-
<module>kafka10</module>
13+
<!--<module>kafka09</module>-->
14+
<!--<module>kafka10</module>-->
1515
<module>kafka11</module>
1616
<module>mysql</module>
17-
<module>hbase</module>
18-
<module>elasticsearch5</module>
19-
<module>mongo</module>
20-
<module>redis5</module>
17+
<!--<module>hbase</module>-->
18+
<!--<module>elasticsearch5</module>-->
19+
<!--<module>mongo</module>-->
20+
<!--<module>redis5</module>-->
2121
<module>launcher</module>
2222
<module>rdb</module>
23-
<module>sqlserver</module>
24-
<module>oracle</module>
25-
<module>cassandra</module>
23+
<!--<module>sqlserver</module>-->
24+
<!--<module>oracle</module>-->
25+
<!--<module>cassandra</module>-->
2626
</modules>
2727

2828

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/RetractJDBCOutputFormat.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void configure(Configuration parameters) {
8585
@Override
8686
public void open(int taskNumber, int numTasks) throws IOException {
8787
try {
88-
establishConnection();
88+
dbConn = establishConnection();
8989
initMetric();
9090
if (dbConn.getMetaData().getTables(null, null, tableName, null).next()) {
9191
if (isReplaceInsertQuery()) {
@@ -104,13 +104,15 @@ public void open(int taskNumber, int numTasks) throws IOException {
104104
}
105105

106106

107-
private void establishConnection() throws SQLException, ClassNotFoundException {
107+
private Connection establishConnection() throws SQLException, ClassNotFoundException {
108+
Connection connection ;
108109
Class.forName(drivername);
109110
if (username == null) {
110-
dbConn = DriverManager.getConnection(dbURL);
111+
connection = DriverManager.getConnection(dbURL);
111112
} else {
112-
dbConn = DriverManager.getConnection(dbURL, username, password);
113+
connection = DriverManager.getConnection(dbURL, username, password);
113114
}
115+
return connection;
114116
}
115117

116118
/**
@@ -151,7 +153,7 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
151153

152154

153155
private void insertWrite(Row row) throws SQLException {
154-
156+
checkConnectionOpen(dbConn);
155157
updatePreparedStmt(row, upload);
156158
upload.addBatch();
157159
batchCount++;
@@ -161,6 +163,20 @@ private void insertWrite(Row row) throws SQLException {
161163
}
162164
}
163165

166+
private void checkConnectionOpen(Connection dbConn) {
167+
try {
168+
if (dbConn.isClosed()) {
169+
LOG.info("db connection reconnect..");
170+
dbConn= establishConnection();
171+
upload = dbConn.prepareStatement(insertQuery);
172+
}
173+
} catch (SQLException e) {
174+
LOG.error("check connection open failed..", e);
175+
} catch (ClassNotFoundException e) {
176+
LOG.error("load jdbc class error when reconnect db..", e);
177+
}
178+
}
179+
164180
private void updatePreparedStmt(Row row, PreparedStatement pstmt) throws SQLException {
165181
if (typesArray == null) {
166182
// no types provided

0 commit comments

Comments
 (0)