Skip to content

Commit 3de90b0

Browse files
committed
Merge remote-tracking branch 'origin/v1.8.0_dev' into v1.9.0_dev
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java # kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java
2 parents c9487c9 + ecc7b00 commit 3de90b0

File tree

104 files changed

+4261
-1547
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+4261
-1547
lines changed

README.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727

2828
# 已支持
2929
* 源表:kafka 0.9、0.10、0.11、1.x版本
30-
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse
31-
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse
30+
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
31+
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver
3232

3333
# 后续开发计划
3434
* 维表快照
@@ -56,9 +56,16 @@
5656
```
5757
mvn clean package -Dmaven.test.skip
5858
59-
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
6059
```
6160

61+
打包完成后的包结构:
62+
63+
> * dt-center-flinkStreamSQL
64+
> > * bin: 任务启动脚本
65+
> > * lib: launcher包存储路径,是任务提交的入口
66+
> > * plugins: 插件包存储路径
67+
> > * ........ : core及插件代码
68+
6269
### 1.4 启动
6370

6471
#### 1.4.1 启动命令
@@ -128,6 +135,8 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
128135
* taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
129136
* taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
130137
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
138+
* savePointPath:任务恢复点的路径(默认无)
139+
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
131140
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
132141

133142

@@ -141,16 +150,6 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
141150
* 必选:否
142151
* 默认值:无
143152

144-
* **savePointPath**
145-
* 描述:任务恢复点的路径
146-
* 必选:否
147-
* 默认值:无
148-
149-
* **allowNonRestoredState**
150-
* 描述:指示保存点是否允许非还原状态的标志
151-
* 必选:否
152-
* 默认值:false
153-
154153
* **flinkJarPath**
155154
* 描述:per_job 模式提交需要指定本地的flink jar存放路径
156155
* 必选:否
@@ -186,6 +185,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
186185
* [kudu 结果表插件](docs/kuduSink.md)
187186
* [postgresql 结果表插件](docs/postgresqlSink.md)
188187
* [clickhouse 结果表插件](docs/clickhouseSink.md)
188+
* [impala 结果表插件](docs/impalaSink.md)
189+
* [db2 结果表插件](docs/db2Sink.md)
190+
* [sqlserver 结果表插件](docs/sqlserverSink.md)
189191

190192
### 2.3 维表插件
191193
* [hbase 维表插件](docs/hbaseSide.md)
@@ -197,6 +199,9 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
197199
* [kudu 维表插件](docs/kuduSide.md)
198200
* [postgresql 维表插件](docs/postgresqlSide.md)
199201
* [clickhouse 维表插件](docs/clickhouseSide.md)
202+
* [impala 维表插件](docs/impalaSide.md)
203+
* [db2 维表插件](docs/db2Side.md)
204+
* [sqlserver 维表插件](docs/sqlserverSide.md)
200205

201206
## 3 性能指标(新增)
202207

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
<goal>shade</goal>
3737
</goals>
3838
<configuration>
39+
<createDependencyReducedPom>false</createDependencyReducedPom>
3940
<artifactSet>
4041
<excludes>
41-
42+
<exclude>org.slf4j</exclude>
4243
</excludes>
4344
</artifactSet>
4445
<filters>

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@
5252
<goal>shade</goal>
5353
</goals>
5454
<configuration>
55+
<createDependencyReducedPom>false</createDependencyReducedPom>
5556
<artifactSet>
5657
<excludes>
57-
58+
<exclude>org.slf4j</exclude>
5859
</excludes>
5960
</artifactSet>
6061
<filters>

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,16 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
174174
return;
175175
}
176176
inputParams.add(equalObj);
177-
stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178-
.append(" = ").append("'" + equalObj + "'")
179-
.append(" and ");
177+
StringBuffer sqlTemp = stringBuffer.append(sideInfo.getEqualFieldList().get(i))
178+
.append(" = ");
179+
if (equalObj instanceof String) {
180+
sqlTemp.append("'" + equalObj + "'")
181+
.append(" and ");
182+
} else {
183+
sqlTemp.append(equalObj)
184+
.append(" and ");
185+
}
186+
180187
}
181188

182189
String key = buildCacheKey(inputParams);
@@ -190,12 +197,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
190197
dealMissKey(input, resultFuture);
191198
return;
192199
} else if (ECacheContentType.MultiLine == val.getType()) {
193-
194-
for (Object rowArray : (List) val.getContent()) {
195-
Row row = fillData(input, rowArray);
196-
resultFuture.complete(Collections.singleton(row));
200+
List<Row> rowList = Lists.newArrayList();
201+
for (Object jsonArray : (List) val.getContent()) {
202+
Row row = fillData(input, jsonArray);
203+
rowList.add(row);
197204
}
198-
205+
resultFuture.complete(rowList);
199206
} else {
200207
throw new RuntimeException("not support cache obj type " + val.getType());
201208
}
@@ -206,7 +213,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
206213
//connect Cassandra
207214
connCassandraDB(cassandraSideTableInfo);
208215

209-
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere;
216+
String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
210217
System.out.println("sqlCondition:" + sqlCondition);
211218

212219
ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
@@ -231,14 +238,15 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
231238
cluster.closeAsync();
232239
if (rows.size() > 0) {
233240
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241+
List<Row> rowList = Lists.newArrayList();
234242
for (com.datastax.driver.core.Row line : rows) {
235243
Row row = fillData(input, line);
236244
if (openCache()) {
237245
cacheContent.add(line);
238246
}
239-
resultFuture.complete(Collections.singleton(row));
247+
rowList.add(row);
240248
}
241-
249+
resultFuture.complete(rowList);
242250
if (openCache()) {
243251
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
244252
}
@@ -280,7 +288,6 @@ public Row fillData(Row input, Object line) {
280288
}
281289
}
282290

283-
System.out.println("row:" + row.toString());
284291
return row;
285292
}
286293

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import com.dtstack.flink.sql.table.TableInfo;
2424
import com.dtstack.flink.sql.util.MathUtil;
2525

26+
import java.math.BigDecimal;
27+
import java.sql.Date;
28+
import java.sql.Timestamp;
2629
import java.util.Map;
2730
import java.util.regex.Matcher;
2831
import java.util.regex.Pattern;
@@ -96,4 +99,32 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9699

97100
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
98101
}
102+
103+
public Class dbTypeConvertToJavaType(String fieldType) {
104+
switch (fieldType.toLowerCase()) {
105+
case "bigint":
106+
return Long.class;
107+
case "int":
108+
case "counter":
109+
return Integer.class;
110+
111+
case "text":
112+
case "inet":
113+
case "varchar":
114+
case "ascii":
115+
case "timeuuid":
116+
return String.class;
117+
118+
case "decimal":
119+
case "float":
120+
return Float.class;
121+
case "double":
122+
return Double.class;
123+
case "timestamp":
124+
return Timestamp.class;
125+
}
126+
127+
throw new RuntimeException("不支持 " + fieldType + " 类型");
128+
129+
}
99130
}

cassandra/cassandra-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<configuration>
3434
<artifactSet>
3535
<excludes>
36-
36+
<exclude>org.slf4j</exclude>
3737
</excludes>
3838
</artifactSet>
3939
<filters>

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,11 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.metric.MetricConstant;
52-
import org.apache.flink.api.common.io.RichOutputFormat;
51+
import com.dtstack.flink.sql.sink.MetricOutputFormat;
5352
import org.apache.flink.api.common.typeinfo.TypeInformation;
5453
import org.apache.flink.api.java.tuple.Tuple;
5554
import org.apache.flink.api.java.tuple.Tuple2;
5655
import org.apache.flink.configuration.Configuration;
57-
import org.apache.flink.metrics.Counter;
58-
import org.apache.flink.metrics.Meter;
59-
import org.apache.flink.metrics.MeterView;
6056
import org.apache.flink.types.Row;
6157
import org.slf4j.Logger;
6258
import org.slf4j.LoggerFactory;
@@ -73,7 +69,7 @@
7369
* @see Tuple
7470
* @see DriverManager
7571
*/
76-
public class CassandraOutputFormat extends RichOutputFormat<Tuple2> {
72+
public class CassandraOutputFormat extends MetricOutputFormat {
7773
private static final long serialVersionUID = -7994311331389155692L;
7874

7975
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -94,17 +90,9 @@ public class CassandraOutputFormat extends RichOutputFormat<Tuple2> {
9490
protected String[] fieldNames;
9591
TypeInformation<?>[] fieldTypes;
9692

97-
private int batchInterval = 5000;
98-
9993
private Cluster cluster;
10094
private Session session = null;
10195

102-
private int batchCount = 0;
103-
104-
private transient Counter outRecords;
105-
106-
private transient Meter outRecordsRate;
107-
10896
public CassandraOutputFormat() {
10997
}
11098

@@ -120,7 +108,8 @@ public void configure(Configuration parameters) {
120108
* I/O problem.
121109
*/
122110
@Override
123-
public void open(int taskNumber, int numTasks) throws IOException {
111+
public void open(int taskNumber, int numTasks) {
112+
initMetric();
124113
try {
125114
if (session == null) {
126115
QueryOptions queryOptions = new QueryOptions();
@@ -176,17 +165,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
176165
// 建立连接 连接已存在的键空间
177166
session = cluster.connect(database);
178167
LOG.info("connect cassandra is successed!");
179-
initMetric();
180168
}
181169
} catch (Exception e) {
182170
LOG.error("connect cassandra is error:" + e.getMessage());
183171
}
184172
}
185173

186-
private void initMetric() {
187-
outRecords = getRuntimeContext().getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
188-
outRecordsRate = getRuntimeContext().getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(outRecords, 20));
189-
}
190174

191175
/**
192176
* Adds a record to the prepared statement.
@@ -226,6 +210,7 @@ private void insertWrite(Row row) {
226210
resultSet.wasApplied();
227211
}
228212
} catch (Exception e) {
213+
outDirtyRecords.inc();
229214
LOG.error("[upsert] is error:" + e.getMessage());
230215
}
231216
}
@@ -237,7 +222,11 @@ private String buildSql(Row row) {
237222
if (row.getField(index) == null) {
238223
} else {
239224
fields.append(fieldNames[index] + ",");
240-
values.append("'" + row.getField(index) + "'" + ",");
225+
if (row.getField(index) instanceof String) {
226+
values.append("'" + row.getField(index) + "'" + ",");
227+
} else {
228+
values.append(row.getField(index) + ",");
229+
}
241230
}
242231
}
243232
fields.deleteCharAt(fields.length() - 1);

core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
<version>2.5</version>
118118
</dependency>
119119

120+
120121
</dependencies>
121122

122123
<build>

core/src/main/java/com/dtstack/flink/sql/constrant/ConfigConstrant.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public class ConfigConstrant {
4747

4848
public static final String SQL_MAX_ENV_PARALLELISM = "sql.max.env.parallelism";
4949

50-
public static final String MR_JOB_PARALLELISM = "mr.job.parallelism";
51-
50+
public static final String SAVE_POINT_PATH_KEY = "savePointPath";
51+
public static final String ALLOW_NON_RESTORED_STATE_KEY = "allowNonRestoredState";
52+
5253
public static final String SQL_BUFFER_TIMEOUT_MILLIS = "sql.buffer.timeout.millis";
5354

5455
public static final String FLINK_TIME_CHARACTERISTIC_KEY = "time.characteristic";

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,6 @@ public class Options {
5757
@OptionRequired(description = "sql ref prop,eg specify event time")
5858
private String confProp = "{}";
5959

60-
@OptionRequired(description = "Savepoint restore path")
61-
private String savePointPath;
62-
63-
@OptionRequired(description = "Flag indicating whether non restored state is allowed if the savepoint")
64-
private String allowNonRestoredState = "false";
65-
6660
@OptionRequired(description = "flink jar path for submit of perjob mode")
6761
private String flinkJarPath;
6862

@@ -147,22 +141,6 @@ public void setConfProp(String confProp) {
147141
this.confProp = confProp;
148142
}
149143

150-
public String getSavePointPath() {
151-
return savePointPath;
152-
}
153-
154-
public void setSavePointPath(String savePointPath) {
155-
this.savePointPath = savePointPath;
156-
}
157-
158-
public String getAllowNonRestoredState() {
159-
return allowNonRestoredState;
160-
}
161-
162-
public void setAllowNonRestoredState(String allowNonRestoredState) {
163-
this.allowNonRestoredState = allowNonRestoredState;
164-
}
165-
166144
public String getFlinkJarPath() {
167145
return flinkJarPath;
168146
}

0 commit comments

Comments
 (0)