Skip to content

Commit 67990fa

Browse files
committed
print detail error log when async side load data
1 parent 62ee4f9 commit 67990fa

File tree

7 files changed

+24
-7
lines changed

7 files changed

+24
-7
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ plugins/
1111
lib/
1212
.vertx/
1313
bin/nohup.out
14-
14+
.DS_Store
1515
bin/sideSql.txt

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void onFailure(Throwable t) {
256256
t.getMessage());
257257
System.out.println("Failed to retrieve the data: " + t.getMessage());
258258
cluster.closeAsync();
259-
resultFuture.complete(null);
259+
resultFuture.completeExceptionally(t);
260260
}
261261
});
262262
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.side;
2222

@@ -28,9 +28,12 @@
2828
import org.apache.flink.configuration.Configuration;
2929
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3030
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
31+
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
3132
import org.apache.flink.types.Row;
3233

34+
import java.util.Collection;
3335
import java.util.Collections;
36+
import java.util.concurrent.TimeoutException;
3437

3538
/**
3639
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -50,6 +53,18 @@ public AsyncReqRow(SideInfo sideInfo){
5053
this.sideInfo = sideInfo;
5154
}
5255

56+
@Override
57+
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
58+
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
59+
try {
60+
if (null == future.get()) {
61+
new TimeoutException("Async function call has timed out.");
62+
}
63+
} catch (Exception e) {
64+
throw new Exception(e);
65+
}
66+
}
67+
5368
private void initCache(){
5469
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
5570
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){

hbase/hbase-side/hbase-async-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<artifactSet>
5151
<excludes>
5252
<exclude>org.apache.hadoop:hadoop-common</exclude>
53+
<exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
5354
<exclude>org.apache.hadoop:hadoop-auth</exclude>
5455
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
5556
<exclude>org.slf4j:*</exclude>

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
106106
}
107107
}
108108
}catch (Exception e){
109-
resultFuture.complete(null);
109+
resultFuture.completeExceptionally(e);
110110
LOG.error("record:" + input);
111111
LOG.error("get side record exception:", e);
112112
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
9292
}
9393
resultFuture.complete(rowList);
9494
} else {
95-
throw new RuntimeException("not support cache obj type " + val.getType());
95+
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
9696
}
9797
return;
9898
}
@@ -110,7 +110,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
110110
connection.queryWithParams(sqlCondition, inputParams, rs -> {
111111
if (rs.failed()) {
112112
LOG.error("Cannot retrieve the data from the database", rs.cause());
113-
resultFuture.complete(null);
113+
resultFuture.completeExceptionally(rs.cause());
114114
return;
115115
}
116116

redis5/redis5-side/redis-async-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAsyncReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
148148
}
149149
resultFuture.complete(rowList);
150150
}else{
151-
throw new RuntimeException("not support cache obj type " + val.getType());
151+
RuntimeException exception = new RuntimeException("not support cache obj type " + val.getType());
152+
resultFuture.completeExceptionally(exception);
152153
}
153154
return;
154155
}

0 commit comments

Comments
 (0)