Skip to content

Commit 0db36b8

Browse files
authored
[BugFix][Jdbc][#1459] Remove the 'checkConnValid', because some connector doesn't support 'isValid'. (#1465)
1 parent 1d87b29 commit 0db36b8

File tree

2 files changed

+2
-26
lines changed
  • chunjun-connectors

2 files changed

+2
-26
lines changed

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcOutputFormat.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,13 @@
2929
import com.dtstack.chunjun.enums.EWriteMode;
3030
import com.dtstack.chunjun.enums.Semantic;
3131
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
32-
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
3332
import com.dtstack.chunjun.throwable.WriteRecordException;
3433
import com.dtstack.chunjun.util.ExceptionUtil;
3534
import com.dtstack.chunjun.util.GsonUtil;
3635
import com.dtstack.chunjun.util.JsonUtil;
3736

3837
import org.apache.flink.table.data.GenericRowData;
3938
import org.apache.flink.table.data.RowData;
40-
import org.apache.flink.util.FlinkRuntimeException;
4139

4240
import org.apache.commons.collections.CollectionUtils;
4341
import org.apache.commons.lang3.StringUtils;
@@ -181,29 +179,6 @@ protected void writeMultipleRecordsInternal() throws Exception {
181179
}
182180
}
183181

184-
@Override
185-
public synchronized void writeRecord(RowData rowData) {
186-
checkConnValid();
187-
super.writeRecord(rowData);
188-
}
189-
190-
public void checkConnValid() {
191-
try {
192-
LOG.debug("check db connection valid..");
193-
if (!dbConn.isValid(10)) {
194-
if (Semantic.EXACTLY_ONCE == semantic) {
195-
throw new FlinkRuntimeException(
196-
"jdbc connection is valid!work's semantic is ExactlyOnce.To prevent data loss,we don't try to reopen the connection");
197-
}
198-
LOG.info("db connection reconnect..");
199-
dbConn = getConnection();
200-
stmtProxy.reOpen(dbConn);
201-
}
202-
} catch (Exception e) {
203-
throw new ChunJunRuntimeException("failed to check jdbcConnection valid", e);
204-
}
205-
}
206-
207182
@Override
208183
public void preCommit() throws Exception {
209184
if (jdbcConf.getRestoreColumnIndex() > -1) {

chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ protected void openInternal(int taskNumber, int numTasks) {
9090
checkUpsert();
9191
if (rowConverter instanceof JdbcColumnConverter) {
9292
if (jdbcDialect.dialectName().equals("PostgresSQL")) {
93-
((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn);
93+
((PostgresqlColumnConverter) rowConverter)
94+
.setConnection((BaseConnection) dbConn);
9495
}
9596
((PostgresqlColumnConverter) rowConverter).setFieldTypeList(columnTypeList);
9697
}

0 commit comments

Comments
 (0)