Skip to content

Commit 0c6d976

Browse files
committed
Merge branch '1.8_release_4.0.x' into temp_1.10_4.0.x_merge
# Conflicts: # flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java # flinkx-rdb/flinkx-rdb-writer/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java
2 parents b0746a5 + ca1214b commit 0c6d976

File tree

5 files changed

+71
-68
lines changed

5 files changed

+71
-68
lines changed

flinkx-phoenix5/flinkx-phoenix5-reader/src/main/java/com/dtstack/flinkx/phoenix5/format/Phoenix5InputFormat.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ public boolean reachedEnd() throws IOException{
239239
* 获取数据库连接,用于子类覆盖
240240
* @return connection
241241
*/
242+
@Override
242243
protected Connection getConnection() throws SQLException {
243244
Field declaredField = ReflectionUtils.getDeclaredField(getClass().getClassLoader(), "ucp");
244245
assert declaredField != null;

flinkx-postgresql/flinkx-postgresql-writer/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlOutputFormat.java

Lines changed: 15 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.dtstack.flinkx.enums.EWriteMode;
2121
import com.dtstack.flinkx.exception.WriteRecordException;
2222
import com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.flink.types.Row;
2425
import org.postgresql.copy.CopyManager;
2526
import org.postgresql.core.BaseConnection;
@@ -46,6 +47,8 @@ public class PostgresqlOutputFormat extends JdbcOutputFormat {
4647

4748
private static final String LINE_DELIMITER = "\n";
4849

50+
private boolean isCopyMode = false;
51+
4952
/**
5053
* now just add ext insert mode:copy
5154
*/
@@ -63,7 +66,8 @@ protected PreparedStatement prepareTemplates() throws SQLException {
6366
}
6467

6568
//check is use copy mode for insert
66-
if (EWriteMode.INSERT.name().equalsIgnoreCase(mode) && checkIsCopyMode(insertSqlMode)) {
69+
isCopyMode = checkIsCopyMode(insertSqlMode);
70+
if (EWriteMode.INSERT.name().equalsIgnoreCase(mode) && isCopyMode) {
6771
copyManager = new CopyManager((BaseConnection) dbConn);
6872
copySql = String.format(COPY_SQL_TEMPL, table, String.join(",", column), DEFAULT_FIELD_DELIM, DEFAULT_NULL_DELIM);
6973
return null;
@@ -72,27 +76,10 @@ protected PreparedStatement prepareTemplates() throws SQLException {
7276
return super.prepareTemplates();
7377
}
7478

75-
@Override
76-
protected void openInternal(int taskNumber, int numTasks){
77-
super.openInternal(taskNumber, numTasks);
78-
try {
79-
if (batchInterval > 1) {
80-
dbConn.setAutoCommit(false);
81-
}
82-
} catch (Exception e) {
83-
LOG.warn("", e);
84-
}
85-
}
86-
8779
@Override
8880
protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
89-
if(!checkIsCopyMode(insertSqlMode)){
90-
if (batchInterval == 1) {
91-
super.writeSingleRecordInternal(row);
92-
} else {
93-
writeSingleRecordCommit(row);
94-
}
95-
81+
if(!isCopyMode){
82+
super.writeSingleRecordInternal(row);
9683
return;
9784
}
9885

@@ -130,31 +117,10 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
130117
}
131118
}
132119

133-
private void writeSingleRecordCommit(Row row) throws WriteRecordException {
134-
try {
135-
super.writeSingleRecordInternal(row);
136-
try {
137-
dbConn.commit();
138-
} catch (Exception e) {
139-
// 提交失败直接结束任务
140-
throw new RuntimeException(e);
141-
}
142-
} catch (WriteRecordException e) {
143-
try {
144-
dbConn.rollback();
145-
} catch (Exception e1) {
146-
// 回滚失败直接结束任务
147-
throw new RuntimeException(e);
148-
}
149-
150-
throw e;
151-
}
152-
}
153-
154120
@Override
155121
protected void writeMultipleRecordsInternal() throws Exception {
156-
if(!checkIsCopyMode(insertSqlMode)){
157-
writeMultipleRecordsCommit();
122+
if(!isCopyMode){
123+
super.writeMultipleRecordsInternal();
158124
return;
159125
}
160126

@@ -197,16 +163,6 @@ protected void writeMultipleRecordsInternal() throws Exception {
197163
}
198164
}
199165

200-
private void writeMultipleRecordsCommit() throws Exception {
201-
try {
202-
super.writeMultipleRecordsInternal();
203-
dbConn.commit();
204-
} catch (Exception e){
205-
dbConn.rollback();
206-
throw e;
207-
}
208-
}
209-
210166
@Override
211167
protected Object getField(Row row, int index) {
212168
Object field = super.getField(row, index);
@@ -216,8 +172,13 @@ protected Object getField(Row row, int index) {
216172
return field;
217173
}
218174

175+
/**
176+
* 判断是否为copy模式
177+
* @param insertMode
178+
* @return
179+
*/
219180
private boolean checkIsCopyMode(String insertMode){
220-
if(insertMode == null || insertMode.length() == 0){
181+
if(StringUtils.isBlank(insertMode)){
221182
return false;
222183
}
223184

@@ -227,6 +188,4 @@ private boolean checkIsCopyMode(String insertMode){
227188

228189
return true;
229190
}
230-
231-
232191
}

flinkx-rdb/flinkx-rdb-core/src/main/java/com/dtstack/flinkx/rdb/util/DbUtil.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ public static void closeDbResources(ResultSet rs, Statement stmt, Connection con
192192
try {
193193
if(commit){
194194
commit(conn);
195+
}else {
196+
rollBack(conn);
195197
}
196198

197199
conn.close();
@@ -215,6 +217,20 @@ public static void commit(Connection conn){
215217
}
216218
}
217219

220+
/**
221+
* 手动回滚事物
222+
* @param conn Connection
223+
*/
224+
public static void rollBack(Connection conn){
225+
try {
226+
if (!conn.isClosed() && !conn.getAutoCommit()){
227+
conn.rollback();
228+
}
229+
} catch (SQLException e){
230+
LOG.warn("rollBack error:{}", ExceptionUtil.getErrorMessage(e));
231+
}
232+
}
233+
218234
/**
219235
* 批量执行sql
220236
* @param dbConn Connection

flinkx-rdb/flinkx-rdb-reader/src/main/java/com.dtstack.flinkx.rdb.inputformat/JdbcInputFormat.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ public void openInternal(InputSplit inputSplit) throws IOException {
147147
querySql = buildQuerySql(inputSplit);
148148
try {
149149
executeQuery(((JdbcInputSplit) inputSplit).getStartLocation());
150-
columnCount = resultSet.getMetaData().getColumnCount();
150+
if(!resultSet.isClosed()){
151+
columnCount = resultSet.getMetaData().getColumnCount();
152+
}
151153
} catch (SQLException se) {
152154
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
153155
}
@@ -213,6 +215,7 @@ public boolean reachedEnd() throws IOException{
213215
@Override
214216
public Row nextRecordInternal(Row row) throws IOException {
215217
try {
218+
updateColumnCount();
216219
if (!ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())) {
217220
for (int i = 0; i < columnCount; i++) {
218221
Object val = row.getField(i);
@@ -880,4 +883,19 @@ protected void checkSize(int columnCount, List<MetaColumn> metaColumns) {
880883
throw new RuntimeException(message);
881884
}
882885
}
886+
887+
/**
888+
* 兼容db2 在间隔轮训场景 且第一次读取时没有任何数据
889+
* 在openInternal方法调用时 由于数据库没有数据,db2会自动关闭resultSet,因此只有在间隔轮训中某次读取到数据之后,进行更新columnCount
890+
* @throws SQLException
891+
*/
892+
private void updateColumnCount() throws SQLException {
893+
if(columnCount == 0){
894+
columnCount =resultSet.getMetaData().getColumnCount();
895+
boolean splitWithRowCol = numPartitions > 1 && StringUtils.isNotEmpty(splitKey) && splitKey.contains("(");
896+
if (splitWithRowCol) {
897+
columnCount = columnCount - 1;
898+
}
899+
}
900+
}
883901
}

flinkx-rdb/flinkx-rdb-writer/src/main/java/com/dtstack/flinkx/rdb/outputformat/JdbcOutputFormat.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,8 @@ protected void openInternal(int taskNumber, int numTasks){
147147
ClassUtil.forName(driverName, getClass().getClassLoader());
148148
dbConn = DbUtil.getConnection(dbUrl, username, password);
149149

150-
if (restoreConfig.isRestore()){
151-
dbConn.setAutoCommit(false);
152-
}
150+
//默认关闭事务自动提交,手动控制事务
151+
dbConn.setAutoCommit(false);
153152

154153
if(CollectionUtils.isEmpty(fullColumn)) {
155154
fullColumn = probeFullColumns(table, dbConn);
@@ -180,6 +179,8 @@ protected void openInternal(int taskNumber, int numTasks){
180179
LOG.info("subTask[{}}] wait finished", taskNumber);
181180
} catch (SQLException sqe) {
182181
throw new IllegalArgumentException("open() failed.", sqe);
182+
}finally {
183+
DbUtil.commit(dbConn);
183184
}
184185
}
185186

@@ -218,7 +219,9 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
218219
}
219220

220221
preparedStatement.execute();
222+
DbUtil.commit(dbConn);
221223
} catch (Exception e) {
224+
DbUtil.rollBack(dbConn);
222225
processWriteException(e, index, row);
223226
}
224227
}
@@ -231,7 +234,9 @@ protected void processWriteException(Exception e, int index, Row row) throws Wri
231234
}
232235

233236
if(index < row.getArity()) {
234-
throw new WriteRecordException(recordConvertDetailErrorMessage(index, row), e, index, row);
237+
String message = recordConvertDetailErrorMessage(index, row);
238+
LOG.error(message, e);
239+
throw new WriteRecordException(message, e, index, row);
235240
}
236241
throw new WriteRecordException(e.getMessage(), e);
237242
}
@@ -264,17 +269,18 @@ protected void writeMultipleRecordsInternal() throws Exception {
264269

265270
if(restoreConfig.isRestore()){
266271
rowsOfCurrentTransaction += rows.size();
272+
}else{
273+
//手动提交事务
274+
DbUtil.commit(dbConn);
267275
}
276+
preparedStatement.clearBatch();
268277
} catch (Exception e){
269-
if (restoreConfig.isRestore()){
270-
LOG.warn("writeMultipleRecordsInternal:Start rollback");
271-
dbConn.rollback();
272-
LOG.warn("writeMultipleRecordsInternal:Rollback success");
273-
}
274278
LOG.warn("write Multiple Records error, row size = {}, first row = {}, e = {}",
275279
rows.size(),
276280
rows.size() > 0 ? GsonUtil.GSON.toJson(rows.get(0)) : "null",
277281
ExceptionUtil.getErrorMessage(e));
282+
LOG.warn("error to writeMultipleRecords, start to rollback connection, e = {}", ExceptionUtil.getErrorMessage(e));
283+
DbUtil.rollBack(dbConn);
278284
throw e;
279285
}finally {
280286
//执行完后清空batch
@@ -300,7 +306,9 @@ public FormatState getFormatState(){
300306
}else{
301307
preparedStatement.executeBatch();
302308
}
309+
//若事务提交失败,抛出异常
303310
dbConn.commit();
311+
preparedStatement.clearBatch();
304312
LOG.info("getFormatState:Commit connection success");
305313

306314
snapshotWriteCounter.add(rowsOfCurrentTransaction);
@@ -321,6 +329,7 @@ public FormatState getFormatState(){
321329
//执行完后清空batch
322330
preparedStatement.clearBatch();
323331
LOG.warn("getFormatState:Start rollback");
332+
//若事务回滚失败,抛出异常
324333
dbConn.rollback();
325334
LOG.warn("getFormatState:Rollback success");
326335
} catch (SQLException sqlE){
@@ -417,6 +426,7 @@ protected boolean needWaitBeforeWriteRecords() {
417426

418427
@Override
419428
protected void beforeWriteRecords() {
429+
// preSql
420430
if(taskNumber == 0) {
421431
DbUtil.executeBatch(dbConn, preSql);
422432
}
@@ -429,10 +439,9 @@ protected boolean needWaitBeforeCloseInternal() {
429439

430440
@Override
431441
protected void beforeCloseInternal() {
432-
// 执行postsql
442+
// 执行postSql
433443
if(taskNumber == 0) {
434444
DbUtil.executeBatch(dbConn, postSql);
435445
}
436446
}
437-
438447
}

0 commit comments

Comments
 (0)