Skip to content

Commit a7000cd

Browse files
committed
side async param
1 parent 987eea8 commit a7000cd

File tree

5 files changed

+47
-6
lines changed

5 files changed

+47
-6
lines changed

core/src/main/java/com/dtstack/flink/sql/side/SideTableInfo.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,20 @@ public abstract class SideTableInfo extends TableInfo implements Serializable {
4747

4848
public static final String CACHE_MODE_KEY = "cacheMode";
4949

50+
public static final String ASYNC_CAP_KEY = "asyncCapacity";
51+
52+
public static final String ASYNC_TIMEOUT_KEY = "asyncTimeout";
53+
5054
private String cacheType = "none";//None or LRU or ALL
5155

5256
private int cacheSize = 10000;
5357

5458
private long cacheTimeout = 60 * 1000;//
5559

60+
private int asyncCapacity=100;
61+
62+
private int asyncTimeout=10000;
63+
5664
private boolean partitionedJoin = false;
5765

5866
private String cacheMode="ordered";
@@ -107,4 +115,20 @@ public String getCacheMode() {
107115
public void setCacheMode(String cacheMode) {
108116
this.cacheMode = cacheMode;
109117
}
118+
119+
public int getAsyncCapacity() {
120+
return asyncCapacity;
121+
}
122+
123+
public void setAsyncCapacity(int asyncCapacity) {
124+
this.asyncCapacity = asyncCapacity;
125+
}
126+
127+
public int getAsyncTimeout() {
128+
return asyncTimeout;
129+
}
130+
131+
public void setAsyncTimeout(int asyncTimeout) {
132+
this.asyncTimeout = asyncTimeout;
133+
}
110134
}

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ public class SideAsyncOperator {
4949
private static final String ORDERED = "ordered";
5050

5151

52-
//TODO need to set by create table task
53-
private static int asyncCapacity = 100;
54-
5552
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
5653
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
5754
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -70,10 +67,10 @@ public static DataStream getSideJoinDataStream(DataStream inputStream, String si
7067

7168
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
7269
if (ORDERED.equals(sideTableInfo.getCacheMode())){
73-
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, 10000, TimeUnit.MILLISECONDS, asyncCapacity)
70+
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
7471
.setParallelism(sideTableInfo.getParallelism());
7572
}else {
76-
return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, 10000, TimeUnit.MILLISECONDS, asyncCapacity)
73+
return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
7774
.setParallelism(sideTableInfo.getParallelism());
7875
}
7976

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,22 @@ protected void parseCacheProp(SideTableInfo sideTableInfo, Map<String, Object> p
9595
}
9696
sideTableInfo.setCacheMode(cachemode.toLowerCase());
9797
}
98+
99+
if(props.containsKey(SideTableInfo.ASYNC_CAP_KEY.toLowerCase())){
100+
Integer asyncCap = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_CAP_KEY.toLowerCase()));
101+
if(asyncCap < 0){
102+
throw new RuntimeException("asyncCapacity size need > 0.");
103+
}
104+
sideTableInfo.setAsyncCapacity(asyncCap);
105+
}
106+
107+
if(props.containsKey(SideTableInfo.ASYNC_TIMEOUT_KEY.toLowerCase())){
108+
Integer asyncTimeout = MathUtil.getIntegerVal(props.get(SideTableInfo.ASYNC_TIMEOUT_KEY.toLowerCase()));
109+
if (asyncTimeout<0){
110+
throw new RuntimeException("asyncTimeout size need > 0.");
111+
}
112+
sideTableInfo.setAsyncTimeout(asyncTimeout);
113+
}
98114
}
99115
}
100116
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public long extractTimestamp(Row row) {
6262
Long extractTime=eveTime;
6363

6464
lastTime = extractTime + timezone.getOffset(extractTime);
65+
6566
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - convertTimeZone(extractTime))/1000));
6667

6768
return lastTime;

docs/mysqlSide.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
* cacheSize: 缓存的条目数量
5454
* cacheTTLMs:缓存的过期时间(ms)
5555
* cacheMode: (unordered|ordered)异步加载是有序还是无序,默认有序。
56-
56+
* asyncCapacity:异步请求容量,默认1000
57+
* asyncTimeout:异步请求超时时间,默认10000毫秒
5758

5859
## 5.样例
5960
```
@@ -72,6 +73,8 @@ create table sideTable(
7273
cacheSize ='10000',
7374
cacheTTLMs ='60000',
7475
cacheMode='unordered',
76+
asyncCapacity='1000',
77+
asyncTimeout='10000'
7578
parallelism ='1',
7679
partitionedJoin='false'
7780
);

0 commit comments

Comments
 (0)