Skip to content

Commit 73a3ae6

Browse files
authored
Merge pull request #6 from DTStack/v1.4.1
V1.4.1
2 parents 79c30cc + fc66f77 commit 73a3ae6

File tree

83 files changed

+2984
-657
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+2984
-657
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
target
22
.idea/
33
/.idea/*
4-
/target
5-
target
4+
target/
65
.class
76
.project
87
.classpath
98
*.eclipse.*
109
*.iml
1110
plugins/
11+
lib/

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,18 @@
3838

3939
```
4040
mvn clean package -Dmaven.test.skip
41+
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
4142
```
4243

43-
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包
44+
### 1.4 启动
4445

46+
#### 1.4.1 启动命令
4547

46-
### 1.4 启动
48+
```
49+
sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp {\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000}
50+
```
4751

48-
#### 1.4.1 命令行参数选项
52+
#### 1.4.2 命令行参数选项
4953

5054
* **model**
5155
* 描述:执行模式,也就是flink集群的工作模式

bin/submit.sh

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
set -e
21+
22+
export SQL_HOME="$(cd "`dirname "$0"`"/..; pwd)"
23+
24+
# Find the java binary
25+
if [ -n "${JAVA_HOME}" ]; then
26+
JAVA_RUN="${JAVA_HOME}/bin/java"
27+
else
28+
if [ `command -v java` ]; then
29+
JAVA_RUN="java"
30+
else
31+
echo "JAVA_HOME is not set" >&2
32+
exit 1
33+
fi
34+
fi
35+
36+
JAR_DIR=$SQL_HOME/lib/*
37+
CLASS_NAME=com.dtstack.flink.sql.launcher.LauncherMain
38+
39+
echo "sql submit ..."
40+
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
33
<parent>
44
<artifactId>flink.sql</artifactId>
5-
<groupId>com.dtstack.flinkx</groupId>
5+
<groupId>com.dtstack.flink</groupId>
66
<version>1.0-SNAPSHOT</version>
77
<relativePath>../pom.xml</relativePath>
88
</parent>

core/src/main/java/com/dtstack/flink/sql/enums/ECacheType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author xuchao
2828
*/
2929
public enum ECacheType {
30-
NONE, LRU;
30+
NONE, LRU, ALL;
3131

3232
public static boolean isValid(String type){
3333
for(ECacheType tmpType : ECacheType.values()){

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
package com.dtstack.flink.sql.parser;
2222

23+
import com.dtstack.flink.sql.util.DtStringUtil;
2324
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
2425

26+
import java.util.List;
2527
import java.util.Map;
2628
import java.util.regex.Matcher;
2729
import java.util.regex.Pattern;
@@ -70,9 +72,9 @@ private Map parseProp(String propsStr){
7072
String[] strs = propsStr.trim().split("'\\s*,");
7173
Map<String, Object> propMap = Maps.newHashMap();
7274
for(int i=0; i<strs.length; i++){
73-
String[] ss = strs[i].split("=");
74-
String key = ss[0].trim();
75-
String value = ss[1].trim().replaceAll("'", "").trim();
75+
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
76+
String key = ss.get(0).trim();
77+
String value = ss.get(1).trim().replaceAll("'", "").trim();
7678
propMap.put(key, value);
7779
}
7880

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.side;
22+
23+
import com.dtstack.flink.sql.threadFactory.DTThreadFactory;
24+
import org.apache.flink.api.common.functions.RichFlatMapFunction;
25+
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.types.Row;
27+
28+
import java.sql.SQLException;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
32+
33+
/**
34+
* Reason:
35+
* Date: 2018/9/18
36+
* Company: www.dtstack.com
37+
* @author xuchao
38+
*/
39+
40+
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
41+
42+
protected SideInfo sideInfo;
43+
44+
private ScheduledExecutorService es;
45+
46+
public AllReqRow(SideInfo sideInfo){
47+
this.sideInfo = sideInfo;
48+
49+
}
50+
51+
protected abstract Row fillData(Row input, Object sideInput);
52+
53+
protected abstract void initCache() throws SQLException;
54+
55+
protected abstract void reloadCache();
56+
57+
@Override
58+
public void open(Configuration parameters) throws Exception {
59+
super.open(parameters);
60+
initCache();
61+
System.out.println("----- all cacheRef init end-----");
62+
63+
//start reload cache thread
64+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
65+
es = Executors.newSingleThreadScheduledExecutor(new DTThreadFactory("cache-all-reload"));
66+
es.scheduleAtFixedRate(() -> reloadCache(), sideTableInfo.getCacheTimeout(), sideTableInfo.getCacheTimeout(), TimeUnit.MILLISECONDS);
67+
}
68+
69+
}

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 10 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,12 @@
2525
import com.dtstack.flink.sql.side.cache.CacheObj;
2626
import com.dtstack.flink.sql.side.cache.LRUSideCache;
2727
import org.apache.calcite.sql.JoinType;
28-
import org.apache.calcite.sql.SqlBasicCall;
29-
import org.apache.calcite.sql.SqlIdentifier;
30-
import org.apache.calcite.sql.SqlKind;
31-
import org.apache.calcite.sql.SqlNode;
32-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
33-
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
34-
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
3528
import org.apache.flink.configuration.Configuration;
3629
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3730
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3831
import org.apache.flink.types.Row;
39-
import org.slf4j.Logger;
40-
import org.slf4j.LoggerFactory;
4132

4233
import java.util.Collections;
43-
import java.util.List;
44-
import java.util.Map;
4534

4635
/**
4736
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
@@ -53,50 +42,24 @@
5342

5443
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
5544

56-
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
57-
5845
private static final long serialVersionUID = 2098635244857937717L;
5946

60-
protected RowTypeInfo rowTypeInfo;
61-
62-
protected List<FieldInfo> outFieldInfoList;
63-
64-
protected List<String> equalFieldList = Lists.newArrayList();
65-
66-
protected List<Integer> equalValIndex = Lists.newArrayList();
67-
68-
protected String sqlCondition = "";
69-
70-
protected String sideSelectFields = "";
71-
72-
protected JoinType joinType;
47+
protected SideInfo sideInfo;
7348

74-
//key:Returns the value of the position, returns the index values ​​in the input data
75-
protected Map<Integer, Integer> inFieldIndex = Maps.newHashMap();
76-
77-
protected Map<Integer, Integer> sideFieldIndex = Maps.newHashMap();
78-
79-
protected SideTableInfo sideTableInfo;
80-
81-
protected AbsSideCache sideCache;
82-
83-
public AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
84-
SideTableInfo sideTableInfo){
85-
this.rowTypeInfo = rowTypeInfo;
86-
this.outFieldInfoList = outFieldInfoList;
87-
this.joinType = joinInfo.getJoinType();
88-
this.sideTableInfo = sideTableInfo;
89-
parseSelectFields(joinInfo);
90-
buildEqualInfo(joinInfo, sideTableInfo);
49+
public AsyncReqRow(SideInfo sideInfo){
50+
this.sideInfo = sideInfo;
9151
}
9252

9353
private void initCache(){
54+
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
9455
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
9556
return;
9657
}
9758

59+
AbsSideCache sideCache;
9860
if(ECacheType.LRU.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
9961
sideCache = new LRUSideCache(sideTableInfo);
62+
sideInfo.setSideCache(sideCache);
10063
}else{
10164
throw new RuntimeException("not support side cache with type:" + sideTableInfo.getCacheType());
10265
}
@@ -105,101 +68,22 @@ private void initCache(){
10568
}
10669

10770
protected CacheObj getFromCache(String key){
108-
return sideCache.getFromCache(key);
71+
return sideInfo.getSideCache().getFromCache(key);
10972
}
11073

11174
protected void putCache(String key, CacheObj value){
112-
sideCache.putCache(key, value);
75+
sideInfo.getSideCache().putCache(key, value);
11376
}
11477

11578
protected boolean openCache(){
116-
return sideCache != null;
79+
return sideInfo.getSideCache() != null;
11780
}
11881

119-
public void parseSelectFields(JoinInfo joinInfo){
120-
String sideTableName = joinInfo.getSideTableName();
121-
String nonSideTableName = joinInfo.getNonSideTable();
122-
List<String> fields = Lists.newArrayList();
123-
124-
int sideIndex = 0;
125-
for( int i=0; i<outFieldInfoList.size(); i++){
126-
FieldInfo fieldInfo = outFieldInfoList.get(i);
127-
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
128-
fields.add(fieldInfo.getFieldName());
129-
sideFieldIndex.put(i, sideIndex);
130-
sideIndex++;
131-
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
132-
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
133-
inFieldIndex.put(i, nonSideIndex);
134-
}else{
135-
throw new RuntimeException("unknown table " + fieldInfo.getTable());
136-
}
137-
}
138-
139-
if(fields.size() == 0){
140-
throw new RuntimeException("select non field from table " + sideTableName);
141-
}
142-
143-
sideSelectFields = String.join(",", fields);
144-
}
145-
146-
public abstract void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo);
147-
148-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
149-
if(sqlNode.getKind() != SqlKind.EQUALS){
150-
throw new RuntimeException("not equal operator.");
151-
}
152-
153-
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];
154-
SqlIdentifier right = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[1];
155-
156-
String leftTableName = left.getComponent(0).getSimple();
157-
String leftField = left.getComponent(1).getSimple();
158-
159-
String rightTableName = right.getComponent(0).getSimple();
160-
String rightField = right.getComponent(1).getSimple();
161-
162-
if(leftTableName.equalsIgnoreCase(sideTableName)){
163-
equalFieldList.add(leftField);
164-
int equalFieldIndex = -1;
165-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
166-
String fieldName = rowTypeInfo.getFieldNames()[i];
167-
if(fieldName.equalsIgnoreCase(rightField)){
168-
equalFieldIndex = i;
169-
}
170-
}
171-
if(equalFieldIndex == -1){
172-
throw new RuntimeException("can't find equal field " + rightField);
173-
}
174-
175-
equalValIndex.add(equalFieldIndex);
176-
177-
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
178-
179-
equalFieldList.add(rightField);
180-
int equalFieldIndex = -1;
181-
for(int i=0; i<rowTypeInfo.getFieldNames().length; i++){
182-
String fieldName = rowTypeInfo.getFieldNames()[i];
183-
if(fieldName.equalsIgnoreCase(leftField)){
184-
equalFieldIndex = i;
185-
}
186-
}
187-
if(equalFieldIndex == -1){
188-
throw new RuntimeException("can't find equal field " + rightField);
189-
}
190-
191-
equalValIndex.add(equalFieldIndex);
192-
193-
}else{
194-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
195-
}
196-
197-
}
19882

19983
protected abstract Row fillData(Row input, Object sideInput);
20084

20185
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
202-
if(joinType == JoinType.LEFT){
86+
if(sideInfo.getJoinType() == JoinType.LEFT){
20387
//Reserved left table data
20488
Row row = fillData(input, null);
20589
resultFuture.complete(Collections.singleton(row));

0 commit comments

Comments
 (0)