Skip to content

Commit 267511a

Browse files
author
dapeng
committed
修复退出逻辑
1 parent bfce8c8 commit 267511a

File tree

1 file changed

+1
-3
lines changed

1 file changed

+1
-3
lines changed

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
9292
@Override
9393
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {
9494
AtomicInteger failCounter = new AtomicInteger(0);
95-
AtomicReference<Throwable> connErrMsg = new AtomicReference<>();
9695
AtomicBoolean finishFlag = new AtomicBoolean(false);
9796
while(!finishFlag.get()){
9897
AtomicBoolean connectFinish = new AtomicBoolean(false);
@@ -103,10 +102,9 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, Resul
103102
logger.error("getConnection error", conn.cause());
104103
}
105104
if(failCounter.get() >= sideInfo.getSideTableInfo().getAsyncFailMaxNum(3)){
106-
resultFuture.completeExceptionally(connErrMsg.get());
105+
resultFuture.completeExceptionally(conn.cause());
107106
finishFlag.set(true);
108107
}
109-
connErrMsg.set(conn.cause());
110108
conn.result().close();
111109
return;
112110
}

0 commit comments

Comments
 (0)