Skip to content

Commit ca1214b

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents 8a85162 + 8f4c0fe commit ca1214b

File tree

3 files changed

+52
-68
lines changed

3 files changed

+52
-68
lines changed

flinkx-postgresql/flinkx-postgresql-writer/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlOutputFormat.java

Lines changed: 15 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.dtstack.flinkx.enums.EWriteMode;
2121
import com.dtstack.flinkx.exception.WriteRecordException;
2222
import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.flink.types.Row;
2425
import org.postgresql.copy.CopyManager;
2526
import org.postgresql.core.BaseConnection;
@@ -46,6 +47,8 @@ public class PostgresqlOutputFormat extends JdbcOutputFormat {
4647

4748
private static final String LINE_DELIMITER = "\n";
4849

50+
private boolean isCopyMode = false;
51+
4952
/**
5053
* now just add ext insert mode:copy
5154
*/
@@ -63,7 +66,8 @@ protected PreparedStatement prepareTemplates() throws SQLException {
6366
}
6467

6568
//check is use copy mode for insert
66-
if (EWriteMode.INSERT.name().equalsIgnoreCase(mode) && checkIsCopyMode(insertSqlMode)) {
69+
isCopyMode = checkIsCopyMode(insertSqlMode);
70+
if (EWriteMode.INSERT.name().equalsIgnoreCase(mode) && isCopyMode) {
6771
copyManager = new CopyManager((BaseConnection) dbConn);
6872
copySql = String.format(COPY_SQL_TEMPL, table, String.join(",", column), DEFAULT_FIELD_DELIM, DEFAULT_NULL_DELIM);
6973
return null;
@@ -72,27 +76,10 @@ protected PreparedStatement prepareTemplates() throws SQLException {
7276
return super.prepareTemplates();
7377
}
7478

75-
@Override
76-
protected void openInternal(int taskNumber, int numTasks){
77-
super.openInternal(taskNumber, numTasks);
78-
try {
79-
if (batchInterval > 1) {
80-
dbConn.setAutoCommit(false);
81-
}
82-
} catch (Exception e) {
83-
LOG.warn("", e);
84-
}
85-
}
86-
8779
@Override
8880
protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
89-
if(!checkIsCopyMode(insertSqlMode)){
90-
if (batchInterval == 1) {
91-
super.writeSingleRecordInternal(row);
92-
} else {
93-
writeSingleRecordCommit(row);
94-
}
95-
81+
if(!isCopyMode){
82+
super.writeSingleRecordInternal(row);
9683
return;
9784
}
9885

@@ -130,31 +117,10 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
130117
}
131118
}
132119

133-
private void writeSingleRecordCommit(Row row) throws WriteRecordException {
134-
try {
135-
super.writeSingleRecordInternal(row);
136-
try {
137-
dbConn.commit();
138-
} catch (Exception e) {
139-
// 提交失败直接结束任务
140-
throw new RuntimeException(e);
141-
}
142-
} catch (WriteRecordException e) {
143-
try {
144-
dbConn.rollback();
145-
} catch (Exception e1) {
146-
// 回滚失败直接结束任务
147-
throw new RuntimeException(e);
148-
}
149-
150-
throw e;
151-
}
152-
}
153-
154120
@Override
155121
protected void writeMultipleRecordsInternal() throws Exception {
156-
if(!checkIsCopyMode(insertSqlMode)){
157-
writeMultipleRecordsCommit();
122+
if(!isCopyMode){
123+
super.writeMultipleRecordsInternal();
158124
return;
159125
}
160126

@@ -197,16 +163,6 @@ protected void writeMultipleRecordsInternal() throws Exception {
197163
}
198164
}
199165

200-
private void writeMultipleRecordsCommit() throws Exception {
201-
try {
202-
super.writeMultipleRecordsInternal();
203-
dbConn.commit();
204-
} catch (Exception e){
205-
dbConn.rollback();
206-
throw e;
207-
}
208-
}
209-
210166
@Override
211167
protected Object getField(Row row, int index) {
212168
Object field = super.getField(row, index);
@@ -216,8 +172,13 @@ protected Object getField(Row row, int index) {
216172
return field;
217173
}
218174

175+
/**
176+
* 判断是否为copy模式
177+
* @param insertMode
178+
* @return
179+
*/
219180
private boolean checkIsCopyMode(String insertMode){
220-
if(insertMode == null || insertMode.length() == 0){
181+
if(StringUtils.isBlank(insertMode)){
221182
return false;
222183
}
223184

@@ -227,6 +188,4 @@ private boolean checkIsCopyMode(String insertMode){
227188

228189
return true;
229190
}
230-
231-
232191
}

flinkx-rdb/flinkx-rdb-core/src/main/java/com/dtstack/flinkx/rdb/util/DbUtil.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ public static void closeDbResources(ResultSet rs, Statement stmt, Connection con
191191
try {
192192
if(commit){
193193
commit(conn);
194+
}else {
195+
rollBack(conn);
194196
}
195197

196198
conn.close();
@@ -214,6 +216,20 @@ public static void commit(Connection conn){
214216
}
215217
}
216218

219+
/**
220+
* 手动回滚事物
221+
* @param conn Connection
222+
*/
223+
public static void rollBack(Connection conn){
224+
try {
225+
if (!conn.isClosed() && !conn.getAutoCommit()){
226+
conn.rollback();
227+
}
228+
} catch (SQLException e){
229+
LOG.warn("rollBack error:{}", ExceptionUtil.getErrorMessage(e));
230+
}
231+
}
232+
217233
/**
218234
* 批量执行sql
219235
* @param dbConn Connection

flinkx-rdb/flinkx-rdb-writer/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.dtstack.flinkx.restore.FormatState;
2828
import com.dtstack.flinkx.util.ClassUtil;
2929
import com.dtstack.flinkx.util.DateUtil;
30+
import com.dtstack.flinkx.util.ExceptionUtil;
3031
import org.apache.commons.collections.CollectionUtils;
3132
import org.apache.commons.lang.ObjectUtils;
3233
import org.apache.commons.lang.StringUtils;
@@ -142,9 +143,8 @@ protected void openInternal(int taskNumber, int numTasks){
142143
ClassUtil.forName(driverName, getClass().getClassLoader());
143144
dbConn = DbUtil.getConnection(dbUrl, username, password);
144145

145-
if (restoreConfig.isRestore()){
146-
dbConn.setAutoCommit(false);
147-
}
146+
//默认关闭事务自动提交,手动控制事务
147+
dbConn.setAutoCommit(false);
148148

149149
if(CollectionUtils.isEmpty(fullColumn)) {
150150
fullColumn = probeFullColumns(table, dbConn);
@@ -175,6 +175,8 @@ protected void openInternal(int taskNumber, int numTasks){
175175
LOG.info("subTask[{}}] wait finished", taskNumber);
176176
} catch (SQLException sqe) {
177177
throw new IllegalArgumentException("open() failed.", sqe);
178+
}finally {
179+
DbUtil.commit(dbConn);
178180
}
179181
}
180182

@@ -213,7 +215,9 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
213215
}
214216

215217
preparedStatement.execute();
218+
DbUtil.commit(dbConn);
216219
} catch (Exception e) {
220+
DbUtil.rollBack(dbConn);
217221
processWriteException(e, index, row);
218222
}
219223
}
@@ -226,7 +230,9 @@ protected void processWriteException(Exception e, int index, Row row) throws Wri
226230
}
227231

228232
if(index < row.getArity()) {
229-
throw new WriteRecordException(recordConvertDetailErrorMessage(index, row), e, index, row);
233+
String message = recordConvertDetailErrorMessage(index, row);
234+
LOG.error(message, e);
235+
throw new WriteRecordException(message, e, index, row);
230236
}
231237
throw new WriteRecordException(e.getMessage(), e);
232238
}
@@ -259,14 +265,14 @@ protected void writeMultipleRecordsInternal() throws Exception {
259265

260266
if(restoreConfig.isRestore()){
261267
rowsOfCurrentTransaction += rows.size();
268+
}else{
269+
//手动提交事务
270+
DbUtil.commit(dbConn);
262271
}
272+
preparedStatement.clearBatch();
263273
} catch (Exception e){
264-
if (restoreConfig.isRestore()){
265-
LOG.warn("writeMultipleRecordsInternal:Start rollback");
266-
dbConn.rollback();
267-
LOG.warn("writeMultipleRecordsInternal:Rollback success");
268-
}
269-
274+
LOG.warn("error to writeMultipleRecords, start to rollback connection, e = {}", ExceptionUtil.getErrorMessage(e));
275+
DbUtil.rollBack(dbConn);
270276
throw e;
271277
}
272278
}
@@ -289,7 +295,9 @@ public FormatState getFormatState(){
289295
}else{
290296
preparedStatement.executeBatch();
291297
}
298+
//若事务提交失败,抛出异常
292299
dbConn.commit();
300+
preparedStatement.clearBatch();
293301
LOG.info("getFormatState:Commit connection success");
294302

295303
snapshotWriteCounter.add(rowsOfCurrentTransaction);
@@ -308,6 +316,7 @@ public FormatState getFormatState(){
308316
} catch (Exception e){
309317
try {
310318
LOG.warn("getFormatState:Start rollback");
319+
//若事务回滚失败,抛出异常
311320
dbConn.rollback();
312321
LOG.warn("getFormatState:Rollback success");
313322
} catch (SQLException sqlE){
@@ -404,6 +413,7 @@ protected boolean needWaitBeforeWriteRecords() {
404413

405414
@Override
406415
protected void beforeWriteRecords() {
416+
// preSql
407417
if(taskNumber == 0) {
408418
DbUtil.executeBatch(dbConn, preSql);
409419
}
@@ -416,10 +426,9 @@ protected boolean needWaitBeforeCloseInternal() {
416426

417427
@Override
418428
protected void beforeCloseInternal() {
419-
// 执行postsql
429+
// 执行postSql
420430
if(taskNumber == 0) {
421431
DbUtil.executeBatch(dbConn, postSql);
422432
}
423433
}
424-
425434
}

0 commit comments

Comments
 (0)